Source code for dotbot.adapter

"""Module containing classes for interfacing with the DotBot gateway."""

import asyncio
from abc import ABC, abstractmethod

from dotbot_utils.hdlc import HDLCHandler, HDLCState, hdlc_encode
from dotbot_utils.protocol import (
    Frame,
    Header,
    Packet,
    Payload,
    ProtocolPayloadParserException,
)
from dotbot_utils.serial_interface import SerialInterface
from marilib.communication_adapter import MQTTAdapter as MarilibMQTTAdapter
from marilib.communication_adapter import SerialAdapter as MarilibSerialAdapter
from marilib.mari_protocol import Frame as MariFrame
from marilib.mari_protocol import NextProto
from marilib.marilib_cloud import MarilibCloud
from marilib.marilib_edge import MarilibEdge
from marilib.model import EdgeEvent, MariNode

from dotbot import SIMULATOR_INIT_STATE_DEFAULT
from dotbot.dotbot_simulator import DotBotSimulatorCommunicationInterface
from dotbot.logger import LOGGER
from dotbot.sailbot_simulator import SailBotSimulatorCommunicationInterface


[docs] class GatewayAdapterBase(ABC): """Base class for interface adapters."""
[docs] @abstractmethod async def start(self, on_frame_received: callable): """Initialize the interface."""
[docs] @abstractmethod def close(self): """Close the interface."""
[docs] @abstractmethod def send_payload(self, destination: int, payload: Payload): """Send payload to the interface."""
[docs] class SerialAdapter(GatewayAdapterBase): """Raw (non-Mari) serial gateway interface. Deprecated: the `--conn` CLI no longer selects this — a device-path connection maps to the Mari `edge` adapter, since bare DotBot apps now emit Mari-shaped frames and the gateway speaks Mari-shaped UART packets (so the edge adapter handles both sandbox and bare). Kept for now as no CLI path constructs it; likely removed in a future cleanup. """ def __init__( self, port: str, baudrate: int, ): self.port = port self.baudrate = baudrate self.hdlc_handler = HDLCHandler()
[docs] def on_byte_received(self, byte: bytes): self.hdlc_handler.handle_byte(byte) if self.hdlc_handler.state == HDLCState.READY: try: data = self.hdlc_handler.payload try: frame = Frame.from_bytes(data) except (ValueError, ProtocolPayloadParserException) as exc: LOGGER.debug(f"Error parsing frame: {exc}") return except Exception as _: return self.on_frame_received(frame)
[docs] async def start(self, on_frame_received: callable): self.on_frame_received = on_frame_received queue = asyncio.Queue() event_loop = asyncio.get_event_loop() def _byte_received(byte): """Callback called on byte received.""" event_loop.call_soon_threadsafe(queue.put_nowait, byte) self.serial = SerialInterface(self.port, self.baudrate, _byte_received) await asyncio.sleep(1) LOGGER.info("Connected to gateway over serial") while 1: byte = await queue.get() self.on_byte_received(byte)
[docs] def close(self): LOGGER.info("Disconnect from gateway...") self.serial.stop()
[docs] def send_payload(self, destination: int, payload: Payload): frame = Frame( header=Header(destination=destination), packet=Packet.from_payload(payload), ) self.serial.write(hdlc_encode(frame.to_bytes())) self.serial.flush()
[docs] class MarilibEdgeAdapter(GatewayAdapterBase): """Class used to interface with Marilib.""" def __init__(self, port: str, baudrate: int, verbose: bool = False): self.port = port self.baudrate = baudrate
[docs] async def start(self, on_frame_received: callable): self.on_frame_received = on_frame_received queue = asyncio.Queue() event_loop = asyncio.get_event_loop() def _on_mari_event(event: EdgeEvent, event_data: MariNode | MariFrame): if event == EdgeEvent.NODE_JOINED: LOGGER.debug(f"Node joined: {event_data.address:016x}") elif event == EdgeEvent.NODE_LEFT: LOGGER.debug(f"Node left: {event_data.address:016x}") elif event == EdgeEvent.NODE_DATA: if event_data.header.next_proto != NextProto.DOTBOT_APP: return try: packet = Packet.from_bytes(event_data.payload) except (ValueError, ProtocolPayloadParserException) as exc: LOGGER.debug(f"Error parsing packet: {exc}") return if not hasattr(self, "on_frame_received"): return event_loop.call_soon_threadsafe( queue.put_nowait, Frame(header=event_data.header, packet=packet) ) self.mari = MarilibEdge( _on_mari_event, MarilibSerialAdapter(self.port, self.baudrate) ) await asyncio.sleep(3) LOGGER.info("Connected to mari edge") while 1: frame = await queue.get() self.on_frame_received(frame)
[docs] def close(self): self.mari.close()
[docs] def send_payload(self, destination: int, payload: Payload): self.mari.send_frame( dst=destination, payload=Packet.from_payload(payload).to_bytes(), next_proto=NextProto.DOTBOT_APP, )
[docs] class MarilibCloudAdapter(GatewayAdapterBase): """Class used to interface with Marilib.""" def __init__( self, host: str, port: int, use_tls: bool, network_id: int, username: str | None = None, password: str | None = None, ): self.host = host self.port = port self.use_tls = use_tls self.network_id = network_id self.username = username self.password = password
[docs] async def start(self, on_frame_received: callable): self.on_frame_received = on_frame_received queue = asyncio.Queue() event_loop = asyncio.get_event_loop() def _on_mari_event(event: EdgeEvent, event_data: MariNode | MariFrame): if event == EdgeEvent.NODE_JOINED: LOGGER.debug(f"Node joined: {event_data.address:016x}") elif event == EdgeEvent.NODE_LEFT: LOGGER.debug(f"Node left: {event_data.address:016x}") elif event == EdgeEvent.NODE_DATA: if event_data.header.next_proto != NextProto.DOTBOT_APP: return try: packet = Packet.from_bytes(event_data.payload) except (ValueError, ProtocolPayloadParserException) as exc: LOGGER.debug(f"Error parsing packet: {exc}") return if not hasattr(self, "on_frame_received"): return event_loop.call_soon_threadsafe( queue.put_nowait, Frame(header=event_data.header, packet=packet) ) # Broker credentials (from DOTBOT_MQTT_USER / DOTBOT_MQTT_PASS, # threaded down by controller_app) are passed only when set. # NOTE: requires the marilib companion that adds username/password # to MarilibMQTTAdapter; until that lands, set credentials are a # no-op (anonymous connect), which matches today's behaviour. mqtt_kwargs = {} if self.username is not None: mqtt_kwargs["username"] = self.username if self.password is not None: mqtt_kwargs["password"] = self.password self.mari = MarilibCloud( _on_mari_event, MarilibMQTTAdapter( self.host, self.port, use_tls=self.use_tls, is_edge=False, **mqtt_kwargs, ), self.network_id, ) await asyncio.sleep(3) while 1: frame = await queue.get() self.on_frame_received(frame) LOGGER.info("Connected to mari edge")
[docs] def close(self): pass
[docs] def send_payload(self, destination: int, payload: Payload): self.mari.send_frame( dst=destination, payload=Packet.from_payload(payload).to_bytes(), next_proto=NextProto.DOTBOT_APP, )
[docs] class SimulatorAdapterBase(GatewayAdapterBase): """Base class used to interface with the simulator.""" # Assigned in start(); stays None if start() failed before the # simulator was constructed, so close() can no-op instead of raising. simulator = None
[docs] @abstractmethod def create_simulator(self, _byte_received: callable): """Create the simulator instance."""
[docs] async def start(self, on_frame_received: callable): self.on_frame_received = on_frame_received queue = asyncio.Queue() event_loop = asyncio.get_event_loop() def _frame_received(frame): """Callback called on byte received.""" event_loop.call_soon_threadsafe(queue.put_nowait, frame) self.simulator = self.create_simulator(_frame_received) self.simulator.start() LOGGER.info("Connected to simulator") while 1: frame = await queue.get() self.on_frame_received(frame)
[docs] def close(self): if self.simulator is None: return LOGGER.info("Disconnect from simulator...") self.simulator.stop()
[docs] def send_payload(self, destination: int, payload: Payload): frame = Frame( header=Header(destination=destination), packet=Packet.from_payload(payload), ) self.simulator.write(frame.to_bytes())
[docs] class DotBotSimulatorAdapter(SimulatorAdapterBase): """Class used to interface with the dotbot simulator.""" def __init__( self, simulator_init_state: str = SIMULATOR_INIT_STATE_DEFAULT, ): self.simulator_init_state = simulator_init_state
[docs] def create_simulator(self, on_frame_received: callable): return DotBotSimulatorCommunicationInterface( on_frame_received, self.simulator_init_state )
[docs] class SailBotSimulatorAdapter(SimulatorAdapterBase): """Class used to interface with the sailbot simulator."""
[docs] def create_simulator(self, on_frame_received: callable): return SailBotSimulatorCommunicationInterface(on_frame_received)