# SPDX-FileCopyrightText: 2023-present Inria
# SPDX-FileCopyrightText: 2023-present Filip Maksimovic <filip.maksimovic@inria.fr>
# SPDX-FileCopyrightText: 2024-present Alexandre Abadie <alexandre.abadie@inria.fr>
#
# SPDX-License-Identifier: BSD-3-Clause
"""Dotbot simulator for the DotBot project."""
import random
import threading
import time
from binascii import hexlify
from dataclasses import dataclass
from enum import Enum
from math import atan2, cos, pi, sin, sqrt
from typing import Callable, List
import toml
from dotbot_utils.protocol import Frame, Header, Packet
from pydantic import BaseModel
from dotbot import GATEWAY_ADDRESS_DEFAULT
from dotbot.logger import LOGGER
from dotbot.protocol import PayloadDotBotAdvertisement, PayloadType
R = 1
L = 2
SIMULATOR_STEP_DELTA_T = 0.002
INITIAL_BATTERY_VOLTAGE = 3000 # mV
MAX_BATTERY_DURATION = 300 # 5 minutes
[docs]
def battery_discharge_model(time_elapsed_s: float) -> int:
"""Simple battery discharge model."""
t = min(time_elapsed_s / MAX_BATTERY_DURATION, 1.0)
nonlinear_t = t**1.3
voltage = int(INITIAL_BATTERY_VOLTAGE * (1 - nonlinear_t))
if voltage < 0:
voltage = 0
return voltage
[docs]
class DotBotSimulatorMode(Enum):
"""Operation mode of the dotbot simulator."""
MANUAL = "MANUAL"
AUTOMATIC = "AUTOMATIC"
[docs]
@dataclass
class Waypoint:
"""Waypoint class for the dotbot simulator."""
x: int
y: int
[docs]
class SimulatedDotBotSettings(BaseModel):
address: str
pos_x: int
pos_y: int
theta: float
calibrated: bool = True
motor_left_error: float = 0.5
motor_right_error: float = 0
[docs]
class SimulatedNetworkSettings(BaseModel):
pdr: int = 100
[docs]
class InitStateToml(BaseModel):
dotbots: List[SimulatedDotBotSettings]
network: SimulatedNetworkSettings = SimulatedNetworkSettings()
[docs]
class DotBotSimulator(threading.Thread):
"""Simulator class for the dotbot."""
def __init__(self, settings: SimulatedDotBotSettings):
super().__init__(daemon=True)
self.address = settings.address.lower()
self.pos_x = settings.pos_x
self.pos_y = settings.pos_y
self.theta = settings.theta
self.motor_left_error = settings.motor_left_error
self.motor_right_error = settings.motor_right_error
self.time_elapsed_s = 0
self.v_left = 0
self.v_right = 0
self.calibrated = settings.calibrated
self.waypoint_threshold = 0
self.waypoints = []
self.waypoint_index = 0
self.controller_mode: DotBotSimulatorMode = DotBotSimulatorMode.MANUAL
self.logger = LOGGER.bind(context=__name__, address=self.address)
self.running = True
self.start()
@property
def header(self):
return Header(
destination=int(GATEWAY_ADDRESS_DEFAULT, 16),
source=int(self.address, 16),
)
def _diff_drive_bot(self, x_pos_old, y_pos_old, theta_old, v_right, v_left):
"""Execute state space model of a rigid differential drive robot."""
v_right_real = v_right * (1 - self.motor_right_error)
v_left_real = v_left * (1 - self.motor_left_error)
x_dot = R / 2 * (v_right_real + v_left_real) * cos(theta_old - pi) * 50000
y_dot = R / 2 * (v_right_real + v_left_real) * sin(theta_old - pi) * 50000
theta_dot = R / L * (-v_right_real + v_left_real)
x_pos = x_pos_old + x_dot * SIMULATOR_STEP_DELTA_T
y_pos = y_pos_old + y_dot * SIMULATOR_STEP_DELTA_T
theta = (theta_old + theta_dot * SIMULATOR_STEP_DELTA_T) % (2 * pi)
return x_pos, y_pos, theta
[docs]
def update(self, dt: float):
"""State space model update."""
pos_x_old = self.pos_x
pos_y_old = self.pos_y
theta_old = self.theta
if self.controller_mode == DotBotSimulatorMode.MANUAL:
self.pos_x, self.pos_y, self.theta = self._diff_drive_bot(
pos_x_old, pos_y_old, theta_old, self.v_right, self.v_left
)
elif self.controller_mode == DotBotSimulatorMode.AUTOMATIC:
delta_x = self.pos_x - self.waypoints[self.waypoint_index].pos_x
delta_y = self.pos_y - self.waypoints[self.waypoint_index].pos_y
distance_to_target = sqrt(delta_x**2 + delta_y**2)
# check if we are close enough to the "next" waypoint
if distance_to_target < self.waypoint_threshold:
self.logger.info("Waypoint reached", waypoint_index=self.waypoint_index)
self.waypoint_index += 1
# check if there are no more waypoints:
if self.waypoint_index >= len(self.waypoints):
self.logger.info(
"Last waypoint reached", waypoint_index=self.waypoint_index
)
self.v_left = 0
self.v_right = 0
self.waypoint_index = 0
self.controller_mode = DotBotSimulatorMode.MANUAL
else:
robot_angle = self.theta
angle_to_target = atan2(delta_y, delta_x)
if robot_angle >= pi:
robot_angle = robot_angle - 2 * pi
# if (angle_to_target < 0):
# angle_to_target = 2*pi + angle_to_target
error_angle = ((angle_to_target - robot_angle + pi) % (2 * pi)) - pi
self.logger.debug(
"Moving to waypoint",
robot_angle=robot_angle,
angle_to_target=angle_to_target,
error_angle=error_angle,
)
angular_speed = error_angle * 200
self.v_left = 100 + angular_speed
self.v_right = 100 - angular_speed
if self.v_left > 100:
self.v_left = 100
if self.v_right > 100:
self.v_right = 100
if self.v_left < 0:
self.v_left = 0
if self.v_right < 0:
self.v_right = 0
self.pos_x, self.pos_y, self.theta = self._diff_drive_bot(
self.pos_x, self.pos_y, self.theta, self.v_right, self.v_left
)
self.logger.debug(
"DotBot simulator update", x=self.pos_x, y=self.pos_y, theta=self.theta
)
self.time_elapsed_s += dt
time.sleep(dt)
[docs]
def advertise(self):
"""Send an advertisement message to the gateway."""
payload = Frame(
header=self.header,
packet=Packet.from_payload(
PayloadDotBotAdvertisement(
calibrated=self.calibrated,
direction=int(self.theta * 180 / pi + 90),
pos_x=int(self.pos_x) if self.pos_x >= 0 else 0,
pos_y=int(self.pos_y) if self.pos_y >= 0 else 0,
pos_z=0,
battery=battery_discharge_model(self.time_elapsed_s),
)
),
)
return payload
[docs]
def handle_frame(self, frame: Frame):
"""Decode the serial input received from the gateway."""
if self.address == hex(frame.header.destination)[2:]:
if frame.payload_type == PayloadType.CMD_MOVE_RAW:
self.controller_mode = DotBotSimulatorMode.MANUAL
self.v_left = frame.packet.payload.left_y
self.v_right = frame.packet.payload.right_y
if self.v_left > 127:
self.v_left = self.v_left - 256
if self.v_right > 127:
self.v_right = self.v_right - 256
elif frame.payload_type == PayloadType.LH2_WAYPOINTS:
self.v_left = 0
self.v_right = 0
self.controller_mode = DotBotSimulatorMode.MANUAL
self.waypoint_threshold = frame.packet.payload.threshold * 1000
self.waypoints = frame.packet.payload.waypoints
if self.waypoints:
self.controller_mode = DotBotSimulatorMode.AUTOMATIC
[docs]
def stop(self):
self.logger.info("Stopping DotBot simulator...")
self.running = False
self.join()
[docs]
def run(self):
"""Update the state of the dotbot simulator."""
while self.running is True:
self.update(0.1)
[docs]
class DotBotSimulatorCommunicationInterface(threading.Thread):
"""Bidirectional serial interface to control simulated robots"""
def __init__(self, on_frame_received: Callable, simulator_init_state_path: str):
self.on_frame_received = on_frame_received
self.running = True
super().__init__(daemon=True)
init_state = InitStateToml(**toml.load(simulator_init_state_path))
self.network_pdr = init_state.network.pdr
self.dotbots = [
DotBotSimulator(
settings=dotbot_settings,
)
for dotbot_settings in init_state.dotbots
]
self.start()
self.logger = LOGGER.bind(context=__name__)
self.logger.info("DotBot Simulation Started")
[docs]
def run(self):
"""Listen continuously at each byte received on the fake serial interface."""
for dotbot in self.dotbots:
self.on_frame_received(dotbot.advertise())
time.sleep(0.1)
while self.running:
for dotbot in self.dotbots:
self.handle_dotbot_frame(dotbot.advertise())
time.sleep(
0.5 - PayloadDotBotAdvertisement().size * len(self.dotbots) * 0.000001
)
[docs]
def stop(self):
self.logger.info("Stopping DotBot Simulation...")
self.running = False
for dotbot in self.dotbots:
dotbot.stop()
self.join()
[docs]
def flush(self):
"""Flush fake serial output."""
pass
def _packet_delivered(self):
return random.randint(0, 100) < self.network_pdr
[docs]
def handle_dotbot_frame(self, frame):
"""Send bytes to the fake serial, similar to the real gateway."""
if self._packet_delivered() is False:
self.logger.info(
f"Packet from DotBot {hexlify(int(frame.header.source).to_bytes(8, 'big')).decode()} lost in simulation"
)
return
self.on_frame_received(frame)
[docs]
def write(self, bytes_):
"""Write bytes on the fake serial."""
for dotbot in self.dotbots:
if self._packet_delivered() is False:
self.logger.info(
f"Packet to DotBot {dotbot.address} lost in simulation"
)
continue
dotbot.handle_frame(Frame.from_bytes(bytes_))