Source code for dotbot.dotbot_simulator

# SPDX-FileCopyrightText: 2023-present Inria
# SPDX-FileCopyrightText: 2023-present Filip Maksimovic <filip.maksimovic@inria.fr>
# SPDX-FileCopyrightText: 2024-present Alexandre Abadie <alexandre.abadie@inria.fr>
#
# SPDX-License-Identifier: BSD-3-Clause

"""Dotbot simulator for the DotBot project."""

import random
import threading
import time
from binascii import hexlify
from dataclasses import dataclass
from enum import Enum
from math import atan2, cos, pi, sin, sqrt
from typing import Callable, List

import toml
from dotbot_utils.protocol import Frame, Header, Packet
from pydantic import BaseModel

from dotbot import GATEWAY_ADDRESS_DEFAULT
from dotbot.logger import LOGGER
from dotbot.protocol import PayloadDotBotAdvertisement, PayloadType

R = 1
L = 2
SIMULATOR_STEP_DELTA_T = 0.002

INITIAL_BATTERY_VOLTAGE = 3000  # mV
MAX_BATTERY_DURATION = 300  # 5 minutes


[docs] def battery_discharge_model(time_elapsed_s: float) -> int: """Simple battery discharge model.""" t = min(time_elapsed_s / MAX_BATTERY_DURATION, 1.0) nonlinear_t = t**1.3 voltage = int(INITIAL_BATTERY_VOLTAGE * (1 - nonlinear_t)) if voltage < 0: voltage = 0 return voltage
[docs] class DotBotSimulatorMode(Enum): """Operation mode of the dotbot simulator.""" MANUAL = "MANUAL" AUTOMATIC = "AUTOMATIC"
[docs] @dataclass class Waypoint: """Waypoint class for the dotbot simulator.""" x: int y: int
[docs] class SimulatedDotBotSettings(BaseModel): address: str pos_x: int pos_y: int theta: float calibrated: bool = True motor_left_error: float = 0.5 motor_right_error: float = 0
[docs] class SimulatedNetworkSettings(BaseModel): pdr: int = 100
[docs] class InitStateToml(BaseModel): dotbots: List[SimulatedDotBotSettings] network: SimulatedNetworkSettings = SimulatedNetworkSettings()
[docs] class DotBotSimulator(threading.Thread): """Simulator class for the dotbot.""" def __init__(self, settings: SimulatedDotBotSettings): super().__init__(daemon=True) self.address = settings.address.lower() self.pos_x = settings.pos_x self.pos_y = settings.pos_y self.theta = settings.theta self.motor_left_error = settings.motor_left_error self.motor_right_error = settings.motor_right_error self.time_elapsed_s = 0 self.v_left = 0 self.v_right = 0 self.calibrated = settings.calibrated self.waypoint_threshold = 0 self.waypoints = [] self.waypoint_index = 0 self.controller_mode: DotBotSimulatorMode = DotBotSimulatorMode.MANUAL self.logger = LOGGER.bind(context=__name__, address=self.address) self.running = True self.start() @property def header(self): return Header( destination=int(GATEWAY_ADDRESS_DEFAULT, 16), source=int(self.address, 16), ) def _diff_drive_bot(self, x_pos_old, y_pos_old, theta_old, v_right, v_left): """Execute state space model of a rigid differential drive robot.""" v_right_real = v_right * (1 - self.motor_right_error) v_left_real = v_left * (1 - self.motor_left_error) x_dot = R / 2 * (v_right_real + v_left_real) * cos(theta_old - pi) * 50000 y_dot = R / 2 * (v_right_real + v_left_real) * sin(theta_old - pi) * 50000 theta_dot = R / L * (-v_right_real + v_left_real) x_pos = x_pos_old + x_dot * SIMULATOR_STEP_DELTA_T y_pos = y_pos_old + y_dot * SIMULATOR_STEP_DELTA_T theta = (theta_old + theta_dot * SIMULATOR_STEP_DELTA_T) % (2 * pi) return x_pos, y_pos, theta
[docs] def update(self, dt: float): """State space model update.""" pos_x_old = self.pos_x pos_y_old = self.pos_y theta_old = self.theta if self.controller_mode == DotBotSimulatorMode.MANUAL: self.pos_x, self.pos_y, self.theta = self._diff_drive_bot( pos_x_old, pos_y_old, theta_old, self.v_right, self.v_left ) elif self.controller_mode == DotBotSimulatorMode.AUTOMATIC: delta_x = self.pos_x - self.waypoints[self.waypoint_index].pos_x delta_y = self.pos_y - self.waypoints[self.waypoint_index].pos_y distance_to_target = sqrt(delta_x**2 + delta_y**2) # check if we are close enough to the "next" waypoint if distance_to_target < self.waypoint_threshold: self.logger.info("Waypoint reached", waypoint_index=self.waypoint_index) self.waypoint_index += 1 # check if there are no more waypoints: if self.waypoint_index >= len(self.waypoints): self.logger.info( "Last waypoint reached", waypoint_index=self.waypoint_index ) self.v_left = 0 self.v_right = 0 self.waypoint_index = 0 self.controller_mode = DotBotSimulatorMode.MANUAL else: robot_angle = self.theta angle_to_target = atan2(delta_y, delta_x) if robot_angle >= pi: robot_angle = robot_angle - 2 * pi # if (angle_to_target < 0): # angle_to_target = 2*pi + angle_to_target error_angle = ((angle_to_target - robot_angle + pi) % (2 * pi)) - pi self.logger.debug( "Moving to waypoint", robot_angle=robot_angle, angle_to_target=angle_to_target, error_angle=error_angle, ) angular_speed = error_angle * 200 self.v_left = 100 + angular_speed self.v_right = 100 - angular_speed if self.v_left > 100: self.v_left = 100 if self.v_right > 100: self.v_right = 100 if self.v_left < 0: self.v_left = 0 if self.v_right < 0: self.v_right = 0 self.pos_x, self.pos_y, self.theta = self._diff_drive_bot( self.pos_x, self.pos_y, self.theta, self.v_right, self.v_left ) self.logger.debug( "DotBot simulator update", x=self.pos_x, y=self.pos_y, theta=self.theta ) self.time_elapsed_s += dt time.sleep(dt)
[docs] def advertise(self): """Send an advertisement message to the gateway.""" payload = Frame( header=self.header, packet=Packet.from_payload( PayloadDotBotAdvertisement( calibrated=self.calibrated, direction=int(self.theta * 180 / pi + 90), pos_x=int(self.pos_x) if self.pos_x >= 0 else 0, pos_y=int(self.pos_y) if self.pos_y >= 0 else 0, pos_z=0, battery=battery_discharge_model(self.time_elapsed_s), ) ), ) return payload
[docs] def handle_frame(self, frame: Frame): """Decode the serial input received from the gateway.""" if self.address == hex(frame.header.destination)[2:]: if frame.payload_type == PayloadType.CMD_MOVE_RAW: self.controller_mode = DotBotSimulatorMode.MANUAL self.v_left = frame.packet.payload.left_y self.v_right = frame.packet.payload.right_y if self.v_left > 127: self.v_left = self.v_left - 256 if self.v_right > 127: self.v_right = self.v_right - 256 elif frame.payload_type == PayloadType.LH2_WAYPOINTS: self.v_left = 0 self.v_right = 0 self.controller_mode = DotBotSimulatorMode.MANUAL self.waypoint_threshold = frame.packet.payload.threshold * 1000 self.waypoints = frame.packet.payload.waypoints if self.waypoints: self.controller_mode = DotBotSimulatorMode.AUTOMATIC
[docs] def stop(self): self.logger.info("Stopping DotBot simulator...") self.running = False self.join()
[docs] def run(self): """Update the state of the dotbot simulator.""" while self.running is True: self.update(0.1)
[docs] class DotBotSimulatorCommunicationInterface(threading.Thread): """Bidirectional serial interface to control simulated robots""" def __init__(self, on_frame_received: Callable, simulator_init_state_path: str): self.on_frame_received = on_frame_received self.running = True super().__init__(daemon=True) init_state = InitStateToml(**toml.load(simulator_init_state_path)) self.network_pdr = init_state.network.pdr self.dotbots = [ DotBotSimulator( settings=dotbot_settings, ) for dotbot_settings in init_state.dotbots ] self.start() self.logger = LOGGER.bind(context=__name__) self.logger.info("DotBot Simulation Started")
[docs] def run(self): """Listen continuously at each byte received on the fake serial interface.""" for dotbot in self.dotbots: self.on_frame_received(dotbot.advertise()) time.sleep(0.1) while self.running: for dotbot in self.dotbots: self.handle_dotbot_frame(dotbot.advertise()) time.sleep( 0.5 - PayloadDotBotAdvertisement().size * len(self.dotbots) * 0.000001 )
[docs] def stop(self): self.logger.info("Stopping DotBot Simulation...") self.running = False for dotbot in self.dotbots: dotbot.stop() self.join()
[docs] def flush(self): """Flush fake serial output.""" pass
def _packet_delivered(self): return random.randint(0, 100) < self.network_pdr
[docs] def handle_dotbot_frame(self, frame): """Send bytes to the fake serial, similar to the real gateway.""" if self._packet_delivered() is False: self.logger.info( f"Packet from DotBot {hexlify(int(frame.header.source).to_bytes(8, 'big')).decode()} lost in simulation" ) return self.on_frame_received(frame)
[docs] def write(self, bytes_): """Write bytes on the fake serial.""" for dotbot in self.dotbots: if self._packet_delivered() is False: self.logger.info( f"Packet to DotBot {dotbot.address} lost in simulation" ) continue dotbot.handle_frame(Frame.from_bytes(bytes_))