Skip to content

SOURCE CODE pylnlib.Scrollkeeper DOCS

# pylnlib : a package to communicate with a model railroad controller using the LocoNetĀ® protocol
#
# (c) 2022 Michel Anders (varkenvarken)
#
# License: GPL 3, see file LICENSE
#
# Version: 20220728172843

from datetime import datetime
from threading import Lock
from time import sleep

from .Message import (
    FunctionGroup1,
    FunctionGroup2,
    FunctionGroup3,
    FunctionGroupSound,
    MoveSlots,
    RequestLocAddress,
    RequestSlotData,
    RequestSwitchFunction,
    RequestSwitchState,
    SensorState,
    SlotDataReturn,
    SlotSpeed,
    SwitchState,
)
from .Sensor import Sensor
from .Slot import Slot
from .Switch import Switch
from .Throttle import Throttle


class Scrollkeeper:DOCS
    def __init__(self, interface, slottrace=False):
        """
        A Scrollkeeper instance keeps track of the state of slots, sensors and switches.

        A Scrollkeeper instance uses information sent to its messageListener method to keep track of changes to slots, sensors and switches.
        If it receives messages for which it does not have a Slot, Sensor or Switch instance, it will send requests to get this information.

        Args:
            interface (Interface): used to send a message if no information on a particular item is present.
            slottrace (bool, optional): log every internal update to the console. Defaults to False.
        """
        self.interface = interface
        self.slottrace = slottrace
        self.slots = {}
        self.slotlock = Lock()
        self.switches = {}
        self.switchlock = Lock()
        self.sensors = {}
        self.sensorlock = Lock()
        self.dummy = False

    def messageListener(self, msg) -> None:DOCS
        """
        Handles incoming messages and updates internal state.

        If information refering an unknown slot comes in, it will issue a slot status request.

        Args:
            msg (Message): An instance of a (subclass of a) Message.
        """
        if isinstance(msg, SlotDataReturn):

            self.updateSlot(
                msg.slot,
                address=msg.address,
                dir=msg.dir,
                speed=msg.speed,
                f0=msg.f0,
                f1=msg.f1,
                f2=msg.f2,
                f3=msg.f3,
                f4=msg.f4,
                f5=msg.f5,
                f6=msg.f6,
                f7=msg.f7,
                f8=msg.f8,
                status=msg.status,
                ss2=msg.ss2,
                trk=msg.trk,
                id1=msg.id1,
                id2=msg.id2,
            )
        elif isinstance(msg, FunctionGroup1):
            if msg.slot not in self.slots:
                self.sendMessage(RequestSlotData(msg.slot))
            else:
                self.updateSlot(
                    msg.slot,
                    dir=msg.dir,
                    f0=msg.f0,
                    f1=msg.f1,
                    f2=msg.f2,
                    f3=msg.f3,
                    f4=msg.f4,
                )
        elif isinstance(msg, FunctionGroupSound):
            if msg.slot not in self.slots:
                self.sendMessage(RequestSlotData(msg.slot))
            else:
                self.updateSlot(
                    msg.slot,
                    f5=msg.f5,
                    f6=msg.f6,
                    f7=msg.f7,
                    f8=msg.f8,
                )
        elif isinstance(msg, FunctionGroup2):
            if msg.slot not in self.slots:
                self.sendMessage(RequestSlotData(msg.slot))
            else:
                self.updateSlot(
                    msg.slot,
                    f9=msg.f9,
                    f10=msg.f10,
                    f11=msg.f11,
                    f12=msg.f12,
                )
        elif isinstance(msg, FunctionGroup3):
            if msg.slot not in self.slots:
                self.sendMessage(RequestSlotData(msg.slot))
            else:
                if msg.fiegroup == 0x5:
                    self.updateSlot(msg.slot, f12=msg.f12, f20=msg.f20, f28=msg.f28)
                elif msg.fiegroup == 0x8:
                    self.updateSlot(
                        msg.slot,
                        f13=msg.f13,
                        f14=msg.f14,
                        f15=msg.f15,
                        f16=msg.f16,
                        f17=msg.f17,
                        f18=msg.f18,
                        f19=msg.f19,
                    )
                elif msg.fiegroup == 0x9:
                    self.updateSlot(
                        msg.slot,
                        f21=msg.f21,
                        f22=msg.f22,
                        f23=msg.f23,
                        f24=msg.f24,
                        f25=msg.f25,
                        f26=msg.f26,
                        f27=msg.f27,
                    )
        elif isinstance(msg, SlotSpeed):
            if msg.slot not in self.slots:
                self.sendMessage(RequestSlotData(msg.slot))
            else:
                self.updateSlot(
                    msg.slot,
                    speed=msg.speed,
                )
        elif isinstance(msg, SensorState):
            self.updateSensor(msg.address, msg.level)
        elif isinstance(msg, SwitchState):
            self.updateSwitch(msg.address, msg.thrown, msg.engage)
        elif isinstance(msg, RequestSwitchFunction):
            self.updateSwitch(msg.address, msg.thrown, msg.engage)

    def updateSlot(self, id, **kwargs) -> None:DOCS
        """
        Update attributes of a slot.

        The method is thread safe.

        Args:
            id (int): The slot id.
        """
        with self.slotlock:
            if id not in self.slots:
                self.slots[id] = Slot(id)

            slot = self.slots[id]
            slot.slot = id

            for attr, val in kwargs.items():
                setattr(slot, attr, val)
            if self.slottrace:
                print(self)

    def updateSensor(self, address: int, level=None) -> None:DOCS
        """
        Update the attributes of a sensor.

        The method is thread safe.

        Args:
            address: The address of the sensor. This is zero based.
            level (bool, optional): Either True (on) or False (off). Defaults to None.
        """
        with self.sensorlock:
            if address not in self.sensors:
                self.sensors[address] = Sensor(address)
            if level is not None:
                self.sensors[address].state = level
            if self.slottrace:
                print(self)

    def updateSwitch(self, address, thrown=None, engage=None):DOCS
        """
        update the status of a switch.

        The method is thread safe.

        Args:
            address (int): The address of the switch. This is zero based.
            thrown (bool, optional): direction of the switch. True (thrown, aka Open) or False (closed). Defaults to None.
            engage (bool, optional): whether the servo is engaged. Defaults to None.
        """
        with self.switchlock:
            if address not in self.switches:
                self.switches[address] = Switch(address)
            if thrown is not None:
                self.switches[address].thrown = thrown
            if engage is not None:
                self.switches[address].engage = engage
            if self.slottrace:
                print(self)

    def getLocoSlot(self, address):DOCS
        """
        Return the slot id associated with the loc address.

        If there is no slot known for this loc, request slot data.

        Args:
            address (int): loc address

        Raises:
            ValueError: if no slot data is available for this loc address

        Returns:
            Slot: The Slot instance associated with this loc address.
        """
        for id, slot in self.slots.items():
            if slot.address == address:
                return slot
        if self.dummy:
            return Slot(id=100, dir=0, speed=0, status=0, address=address)
        self.sendMessage(RequestLocAddress(address))
        if self.waitUntilLocAddressKnown(address):
            for slot in self.slots:
                if slot.address == address:
                    return slot
        raise ValueError(f"Loc address {address} unknown")

    def getSwitchState(self, id):DOCS
        """
        Return the state of the switch.

        if the switch is unknown, request the status.
        """
        if type(id) != int:
            raise TypeError("Switch id must be an int")
        if id not in self.switches:
            self.sendMessage(RequestSwitchState(id))
            if not self.waitUntilSwitchKnown(id):
                raise ValueError("Switch id {id} unknown")
        return self.switches[id].thrown

    def getSensorState(self, id):DOCS
        """
        Return the state of the sensor.

        if the sensor is unknown, request the status.
        """
        if type(id) != int:
            raise TypeError("Sensor id must be an int")
        if id not in self.sensors:
            self.sendMessage(
                SensorState(id)
            )  # request for sensor state is same a sensor state report
            if not self.waitUntilSensorKnown(id):
                raise ValueError(f"Sensor id {id} unknown")
        return self.sensors[id].state

    def sendMessage(self, message):DOCS
        """
        place a message in the output queue of the interface.
        """
        self.interface.sendMessage(message)

    def waitUntilSwitchKnown(self, id, timeout=30):
        time_elapsed = 0
        while id not in self.switches:
            sleep(0.25)
            time_elapsed += 0.25
            if time_elapsed > timeout:
                return False
        return True

    def waitUntilSensorKnown(self, id, timeout=30):
        time_elapsed = 0
        while id not in self.sensors:
            sleep(0.25)
            time_elapsed += 0.25
            if time_elapsed > timeout:
                return False
        return True

    def waitUntilLocAddressKnown(self, address, timeout=30):
        time_elapsed = 0
        while not any(slot.address == address for slot in self.slots.values()):
            sleep(0.25)
            time_elapsed += 0.25
            if time_elapsed > timeout:
                return False
        return True

    def acquireSlot(self, slot):
        self.sendMessage(MoveSlots(src=slot.id, dst=slot.id))
        # TODO: ? should we wait for slot data ?

    def getThrottle(self, locaddress: int) -> Throttle:
        slot = self.getLocoSlot(locaddress)
        self.acquireSlot(slot)
        return Throttle(self, locaddress)

    def getSlot(self, id: int) -> Slot:
        return self.slots[id]

    def getSensor(self, id: int) -> Sensor:
        return self.sensors[id]

    def getSwitch(self, id: int) -> Switch:
        return self.switches[id]

    def getSlotIds(self):
        return [s for s in self.slots]

    def getSensorIds(self):
        return [s for s in self.sensors]

    def getSwitchIds(self):
        return [s for s in self.switches]

    def getAllStatusInfo(self):
        return {
            "time": f"{datetime.now()}",
            "slots": [self.slots[s].toJSON() for s in sorted(s for s in self.slots)],
            "switches": [
                self.switches[s].toJSON() for s in sorted(s for s in self.switches)
            ],
            "sensors": [
                self.sensors[s].toJSON() for s in sorted(s for s in self.sensors)
            ],
        }

    def __str__(self):
        newline = "\n"
        tab = "\t"
        return f"""\033[2J\033[H
Scrollkeeper [{datetime.now():%H:%M:%S}]

Slots:
{newline.join(tab+str(self.slots[s]) for s in sorted(s for s in self.slots)) if len(self.slots) else tab+'<none>'}

Switches:
{newline.join(tab+str(self.switches[s]) for s in sorted(s for s in self.switches)) if len(self.switches) else tab+'<none>'}

Sensors:
{newline.join(tab+str(self.sensors[s]) for s in sorted(s for s in self.sensors)) if len(self.sensors) else tab+'<none>'}
"""