Skip to content

SOURCE CODE pylnlib.Interface DOCS

# pylnlib : a package to communicate with a model railroad controller using the LocoNetĀ® protocol
#
# (c) 2022 Michel Anders (varkenvarken)
#
# License: GPL 3, see file LICENSE
#
# Version: 20220801144046

import signal
import sys
import threading
from datetime import datetime
from queue import Queue
from time import sleep

import serial

from .Message import CaptureTimeStamp, Message


class Interface:DOCS
    """
    Handles thread safe sending and receiving LocoNet messages on a serial interface.

    Args:
        port (str): serial port, on Linux typically something like `/dev/ttyACM0`
        baud (int, optional): the baudrate. Defaults to 57600.
        fast (bool, optional): if false it uses timestamp message in the input data to replay a realistic speed. Defaults to False.
        dummy (bool, optional): if true it will not write anyhting on the serial port. Defaults to False.

    See Also:
        [Capture and replay](capture_and_replay)

    Example:
        ```python
        interface = Interface()
        scrollkeeper = Scrollkeeper(interface)
        interface.receiver_handler.append(scrollkeeper.messageListener)
        Thread(target=interface.run, daemon=True).start()
        ```

    It will gracefully exit if it receives a SIGINT or SIGTERM signal,
    i.e. terminate the thread that handle the input and output queues.
    """

    def __init__(
        self, port: str, baud: int = 57600, fast: bool = False, dummy: bool = False
    ):
        self.running = False
        self.time = None
        self.fast = fast
        self.dummy = dummy

        signal.signal(signal.SIGTERM, self._on_interrupt)
        signal.signal(signal.SIGINT, self._on_interrupt)

        self.exit = False
        self.capture_finished = False

        self.inputThread = threading.Thread(
            name="receiver", target=self._receiver_thread
        )
        self.inputThread.setDaemon(True)

        self.receiver_handler = []

        self.rd_event = threading.Event()

        self.inputqueue: Queue = Queue()

        self.outputThread = threading.Thread(name="sender", target=self._sender_thread)
        self.outputThread.setDaemon(True)
        self.outputqueue: Queue = Queue()

        self.input = "com"
        if type(port) == str:
            if not dummy:
                try:
                    # 8 bits, 1 stop bit, 1 start bit
                    self.com = serial.Serial(
                        port=port,
                        baudrate=baud,
                        stopbits=serial.STOPBITS_ONE,
                        bytesize=serial.EIGHTBITS,
                        timeout=None,
                    )
                except serial.SerialException as e:
                    exit(e)
        else:
            self.input = "file"
            self.com = port

    def _on_interrupt(self, signum, frame):
        """
        Signal handler, sets self.exit to True.

        Args:
            signum: not used
            frame : not used
        """
        self.exit = True

    def _on_receive(self, msg):
        """
        Dispatch a message object to registered handlers.

        Args:
            msg (Message): A LocoNet [Message](pylnlib.Message.md)
        """
        if self.receiver_handler != None:
            for handler in self.receiver_handler:
                handler(msg)

    def _processTimeStamp(self, msg):
        """
        Act on a [CaptureTimeStamp](pylnlib.Message.md#pylnlibmessagecapturetimestamp) messages.

        Args:
            msg (CaptureTimeStamp): a LocoNet message representing a timestamp

        If the `fast` attribute is False, this message will sleep to synchronize the replay of messages.

        !!! note
            CaptureTimeStamp is not defined in the LocoNet specification. It is defined in the pylnlib package
            and these message are added to captured datastream to allow for realistic replay of messages.
        """
        if not self.fast and self.time is not None:
            sleep(timeDiff(self.time, msg.time))
        self.time = msg.time

    def run(self, delay=0.0):DOCS
        """
        Start processing input and output.

        This will continue until an external SIGINT or SIGKILL signal is received.

        Args:
            delay (float, optional): Time in seconds to wait between incoming messages. Defaults to 0.

        Returns:
            only returns upon receiving an external signal.
        """
        self.running = True
        self.time = None

        self.inputThread.start()
        self.outputThread.start()

        # main loop that pulls messages from msg_queue
        while not self.exit:
            if self.rd_event.wait(1):
                self.rd_event.clear()

                while not self.inputqueue.empty():
                    if delay > 0:
                        sleep(delay)
                    msg = self.inputqueue.get()
                    if isinstance(msg, CaptureTimeStamp):
                        self._processTimeStamp(msg)
                    else:
                        self._on_receive(msg)

        self.inputThread.join()
        self.outputThread.join()

        self.com.close()

        print("Done...", file=sys.stderr)
        self.running = False

    def run_in_background(self, delay=0.0):DOCS
        """
        Start processing input and output and return immediately.

        Processing will continue until an external SIGINT or SIGKILL signal is received.

        Args:
            delay (float, optional): Time in seconds to wait between incoming messages. Defaults to 0.

        Returns:
            immediately.
        """
        threading.Thread(
            name="interface", target=self.run, daemon=True, args=(delay,)
        ).start()

    def _receiver_thread(self):
        """
        Read data from the interface and fill the internal input queue with messages.
        """
        while not self.exit:
            if self.dummy:
                sleep(0.1)
            elif self.input == "com":
                n = self.com.inWaiting()
                if n > 1:  # all messages are at least 2 bytes
                    data = self.com.read(2)
                    length = Message.length(data[0], data[1])
                    if length == 2:
                        msg = Message.from_data(data)
                    else:
                        data2 = self.com.read(
                            length - 2
                        )  # this is blocking, might want to change that
                        msg = Message.from_data(data + data2)
                    self.inputqueue.put(msg)
                    self.rd_event.set()
                else:
                    sleep(0.1)
            elif not self.capture_finished:
                data = self.com.read(2)
                if len(data) == 0:
                    self.capture_finished = True
                elif len(data) < 2:
                    raise IOError("captured stream ended prematurely")
                else:
                    length = Message.length(data[0], data[1])
                    if length == 2:
                        msg = Message.from_data(data)
                    else:
                        data2 = self.com.read(length - 2)
                        if len(data2) < length - 2:
                            raise IOError("captured stream ended prematurely")
                        msg = Message.from_data(data + data2)
                    self.inputqueue.put(msg)
                    self.rd_event.set()

    def _sender_thread(self):
        """
        Retrieve messages in the internal output queue and send them to the serial interface.
        """
        while not self.exit:
            if not self.outputqueue.empty():
                msg = self.outputqueue.get()
                if self.input == "com" and not self.dummy:
                    self.com.write(msg.data)
                else:  # on replay or dummy output we simply shunt back the output message
                    self.inputqueue.put(msg)
                    self.rd_event.set()
                sleep(0.25)

    def sendMessage(self, msg):DOCS
        """
        Put a message on the internal output queue.

        Args:
            msg (Message): A LocoNet [Message](pylnlib.Message.md)
        """
        self.outputqueue.put(msg)


def timeDiff(a, b):DOCS
    """
    Return the total number of seconds between  a and b.

    b MUST be later than a, so the difference between a = 23:55:49 and b = 00:05:49 will be correctly reported as 10 minutes.

    Args:
        a (datetime) : earlier timestamp
        b (datetime) : later timestamp

    Returns:
        (float) : the total number of seconds between  a and b
    """
    T = datetime.today()
    A = datetime.combine(T, a)
    B = datetime.combine(T, b)
    s = (B - A).total_seconds()
    if s < 0:
        s = 24 * 3600 - s
    return s