Skip to content

SOURCE CODE pylnlib.Utils DOCS

# pylnlib : a package to communicate with a model railroad controller using the LocoNetĀ® protocol
#
# (c) 2022 Michel Anders (varkenvarken)
#
# License: GPL 3, see file LICENSE
#
# Version: 20220717141935

import argparse
import sys
import time
from atexit import register
from datetime import datetime
from os import environ

from .Interface import Interface
from .Message import CaptureTimeStamp
from .Scrollkeeper import Scrollkeeper

# default name of capture file
CAPTUREFILE = "pylnlib.capture"


class Args:DOCS
    """
    Common command line argument parsing.
    """

    def __init__(self):
        cmdline = argparse.ArgumentParser()
        cmdline.add_argument(
            "-p", "--port", help="path to serial port", default="/dev/ttyACM0"
        )
        cmdline.add_argument(
            "-b", "--baud", help="baudrate of serial port", default=57600, type=int
        )
        cmdline.add_argument(
            "-i",
            "--reportinterval",
            help="interval between scrollkeeper reports, or 0 to suppress",
            default=0,
            type=float,
        )
        cmdline.add_argument(
            "-c",
            "--capture",
            help=f"capture all traffic to {CAPTUREFILE}",
            action="store_true",
        )
        cmdline.add_argument(
            "-d",
            "--dummy",
            help=f"do not write to serial device",
            action="store_true",
        )
        cmdline.add_argument(
            "-t",
            "--timestamp",
            help=f"add timestamps when writing to a capture file",
            action="store_true",
        )
        cmdline.add_argument(
            "-l",
            "--log",
            help=f"log received message to stderr",
            action="store_true",
        )
        cmdline.add_argument(
            "-r",
            "--replay",
            help=f"replay all captured traffic from {CAPTUREFILE}",
            action="store_true",
        )
        cmdline.add_argument(
            "-F",
            "--fast",
            help=f"ignore timestamps when in replay",
            action="store_true",
        )
        cmdline.add_argument(
            "-f",
            "--capturefile",
            help="name of capture file",
            default=CAPTUREFILE,
            type=str,
        )
        cmdline.add_argument(
            "-s",
            "--slottrace",
            help=f"show scrollkeeper report after every slot update",
            action="store_true",
        )

        self.args = cmdline.parse_args()


class EnvArgs:
    def __init__(self):
        self.port = environ.get("PYLNLIB_PORT", "/dev/ttyACM0")
        self.baud = int(environ.get("PYLNLIB_BAUD", "57600"))
        self.capture = environ.get("PYLNLIB_CAPTURE", "False") in {
            "true",
            "True",
            "TRUE",
            "1",
            "Y",
            "y",
        }
        self.dummy = environ.get("PYLNLIB_DUMMY", "False") in {
            "true",
            "True",
            "TRUE",
            "1",
            "Y",
            "y",
        }
        self.timestamp = environ.get("PYLNLIB_TIMESTAMP", "False") in {
            "true",
            "True",
            "TRUE",
            "1",
            "Y",
            "y",
        }
        self.log = environ.get("PYLNLIB_LOG", "False") in {
            "true",
            "True",
            "TRUE",
            "1",
            "Y",
            "y",
        }
        self.replay = environ.get("PYLNLIB_REPLAY", "False") in {
            "true",
            "True",
            "TRUE",
            "1",
            "Y",
            "y",
        }
        self.fast = environ.get("PYLNLIB_FAST", "False") in {
            "true",
            "True",
            "TRUE",
            "1",
            "Y",
            "y",
        }
        self.capturefile = environ.get("PYLNLIB_CAPTUREFILE", "pylnlib.capture")
        self.port = environ.get("PYLNLIB_PORT", "/dev/ttyACM0")
        self.slottrace = False


def logger(msg):DOCS
    """
    Write a message with a timestamp to stderr.
    """
    print(time.strftime("%H:%M:%S"), msg, file=sys.stderr, flush=True)


def dumper(handle, timestamp=False):DOCS
    """
    return a function that writes raw message data to a file.

    If timestamp is true, it will prefix each message with a [CaptureTimeStamp](pylnlib.Message.CaptureTimeStamp)
    """

    def dumpmsg(msg):
        if timestamp:
            handle.write(CaptureTimeStamp(datetime.today().time()).data)
        handle.write(msg.data)

    return dumpmsg


def reporter(scrollkeeper, interval=30):DOCS
    """
    return a function that prints the contents of a Scrollkeeper instance at regular intervals.
    """

    def dump():
        while True:
            print(scrollkeeper)
            time.sleep(interval)

    return dump


def createInterface(args):DOCS
    """
    create an [Interface](pylnlib.Interface) object, possibly pointing to a file with previously captured input.
    """
    capturefile = None
    if args.replay:
        capturefile = open(args.capturefile, "rb")
        interface = Interface(capturefile, fast=args.fast)
    else:
        interface = Interface(args.port, args.baud, dummy=args.dummy)
    if args.log:
        interface.receiver_handler.append(logger)
    # open a file to write raw captured bytes to
    if args.capture and not args.replay:
        capturefile = open(args.capturefile, "wb", buffering=0)
        interface.receiver_handler.append(dumper(capturefile, timestamp=args.timestamp))
        register(lambda f: f.close(), capturefile)
    return interface


def createScrollkeeper(interface, args):DOCS
    """
    Create a [Scrollkeeper](pylnlib.Scrollkeeper) instance that receives and sends messages via interface.

    Args:
        interface (Interface): The [Interface](pylnlib.Interface) object that the Scrollkeeper will register a receiver_handler with
        args (Namespace): A Namespace (as returned by Argparser.argse_parse() ). Should have slottrace and dummy attributes.

    Returns:
        Scrollkeeper: a [Scrollkeeper](pylnlib.Scrollkeeper) instance.

    See Also:
        [Args](pylnlib.Utils.Args)
    """
    scrollkeeper = Scrollkeeper(interface, slottrace=args.slottrace)
    interface.receiver_handler.append(scrollkeeper.messageListener)
    scrollkeeper.dummy = args.dummy
    return scrollkeeper