Source code for dotbot.sailbot_simulator

# SPDX-FileCopyrightText: 2024-present Inria
# SPDX-FileCopyrightText: 2024-present Diego Badillo <diego.badillo@sansano.usm.cl>
# SPDX-FileCopyrightText: 2024-present Mališa Vučinić <malisa.vucinic@inria.fr>
# SPDX-FileCopyrightText: 2024-present Alexandre Abadie <alexandre.abadie@inria.fr>
#
# SPDX-License-Identifier: BSD-3-Clause

"""Sailbot simulator for the DotBot project."""

import math
import threading
import time
from dataclasses import dataclass
from enum import Enum
from typing import Callable

from dotbot_utils.protocol import Frame, Header, Packet
from numpy import clip

from dotbot import GATEWAY_ADDRESS_DEFAULT
from dotbot.logger import LOGGER
from dotbot.protocol import (
    ApplicationType,
    PayloadAdvertisement,
    PayloadSailBotData,
    PayloadType,
)

SIM_DELTA_T = 0.01  # second
CONTROL_DELTA_T = 1  # second

RUDDER_ANGLE_MAX = (-math.pi / 6, math.pi / 6)
SAIL_ANGLE_MAX = (0, math.pi / 2)

# constants
EARTH_RADIUS_KM = 6371
ORIGIN_COORD_LAT = 48.825908
ORIGIN_COORD_LON = 2.406433
COS_PHI_0 = 0.658139837


[docs] def cartesian2geographical(x, y): """Converts cartesian coordinates to geographical coordinates.""" latitude = (((y * 0.180) / math.pi) / EARTH_RADIUS_KM) + ORIGIN_COORD_LAT longitude = ( ((x * 0.180) / math.pi / COS_PHI_0) / EARTH_RADIUS_KM ) + ORIGIN_COORD_LON return latitude, longitude
[docs] def geographical2cartesian(latitude, longitude): """Converts geographical coordinates to cartesian coordinates.""" x = ((longitude - ORIGIN_COORD_LON) * EARTH_RADIUS_KM * COS_PHI_0 * math.pi) / 0.180 y = ((latitude - ORIGIN_COORD_LAT) * EARTH_RADIUS_KM * math.pi) / 0.180 return (x, y)
[docs] class SailBotSimulatorMode(Enum): """Operation mode of the sailbot simulator.""" MANUAL = "MANUAL" AUTOMATIC = "AUTOMATIC"
[docs] @dataclass class Point: """Point class for the sailbot simulator.""" x: float y: float
[docs] class SailBotSimulatorLineClass: """Simulator zig-zag routine helper class.""" def __init__(self, point: Point, angle_radians): # calculate coefficients for Ax + By + C = 0 self.A = -math.tan(angle_radians) self.B = 1 self.C = (math.tan(angle_radians) * point.x) - point.y
[docs] def distance2point(self, point: Point): """Returns the distance from the point to the line.""" return abs((self.A * point.x) + (self.B * point.y) + self.C) / math.sqrt( self.A**2 + self.B**2 )
[docs] def line_side(self, point: Point): """Am I on the left or the right?.""" d = (self.A * point.x) + (self.B * point.y) + self.C return d >= 0
[docs] class SailBotSimulator: """Simulator class for the sailbot.""" def __init__(self, address): self.address = address self.true_wind_speed = 4 # [m/s] self.true_wind_angle = 0.34906 # [rad] (20 degrees) # initializations self.latitude = 48.832313 self.longitude = 2.412689 self.app_wind_angle = 0.0 # [rad] self.app_wind_speed = 0.0 # [m/s] self.direction = math.pi / 2 # [rad] self.x, self.y = geographical2cartesian(self.latitude, self.longitude) self.v = 0.0 # speed [m/s] self.w = 0.0 # angular velocity [rad/s] # inputs received by controller self.rudder_slider = 0 # rudder slider self.sail_slider = 0 # sail slider self.operation_mode: SailBotSimulatorMode = SailBotSimulatorMode.MANUAL # autonomous mode initialisations self.waypoint_threshold = 0 self.waypoints = [] self.next_waypoint = 0 self.zigzag_flag = False self.logger = LOGGER.bind(context=__name__) @property def header(self): return Header( destination=int(GATEWAY_ADDRESS_DEFAULT, 16), source=int(self.address, 16), ) def _update_state_space_model(self, rudder_in_rad, sail_length_in_rad): # define model parameters p1 = 0.03 # drift coefficient [-] p2 = 40 # tangential friction [kgs^−1] p3 = 6000 # angular friction [kgm] p4 = 200 # sail lift [kgs^−1] p5 = 1500 # rudder lift [kgs^−1] p6 = 0.5 # distance to sail CoE [m] p7 = 0.5 # distance to mast [m] p8 = 2 # distance to rudder [m] p9 = 300 # mass of boat [kg] p10 = 400 # moment of inertia [kgm^2] p11 = 0.2 # rudder break coefficient [-] # get apparent wind speed and angle, f(v,heading,true_wind) self._true2apparent_wind() # map mainsheet length to actual sail angle sail_in_rad = self._mainsheet2sail_angle( sail_length_in_rad, self.app_wind_angle ) # rudder and sail forces g_r = p5 * self.v**2 * math.sin(rudder_in_rad) g_s = p4 * self.app_wind_speed * math.sin(sail_in_rad - self.app_wind_angle) # state-space model x_dot = self.v * math.cos( self.direction ) + p1 * self.true_wind_speed * math.cos(self.true_wind_angle) y_dot = self.v * math.sin( self.direction ) + p1 * self.true_wind_speed * math.sin(self.true_wind_angle) direction_dot = self.w v_dot = ( g_s * math.sin(sail_in_rad) - g_r * p11 * math.sin(rudder_in_rad) - p2 * self.v**2 ) / p9 w_dot = ( g_s * (p6 - p7 * math.cos(sail_in_rad)) - g_r * p8 * math.cos(rudder_in_rad) - p3 * self.w * self.v ) / p10 # update state-space variables and apparent wind angle using Euler's method self.x += x_dot * SIM_DELTA_T self.y += y_dot * SIM_DELTA_T self.direction += direction_dot * SIM_DELTA_T self.v += v_dot * SIM_DELTA_T self.w += w_dot * SIM_DELTA_T # get latitude and longitude from cartesian coordinates self.latitude, self.longitude = cartesian2geographical(self.x, self.y) def _true2apparent_wind(self): Wc_aw = ( self.true_wind_speed * math.cos(self.true_wind_angle - self.direction) - self.v, self.true_wind_speed * math.sin(self.true_wind_angle - self.direction), ) self.app_wind_speed = math.sqrt(Wc_aw[0] ** 2 + Wc_aw[1] ** 2) self.app_wind_angle = math.atan2(Wc_aw[1], Wc_aw[0]) def _map_slider(self, value, target_min, target_max): slider_min, slider_max = -128, 128 return target_min + (target_max - target_min) * (value - slider_min) / ( slider_max - slider_min ) def _mainsheet2sail_angle(self, sail_in_length_rad, app_wind_angle): if math.cos(app_wind_angle) + math.cos(sail_in_length_rad) > 0: # sail is tight sail_out_rad = -math.copysign(sail_in_length_rad, math.sin(app_wind_angle)) else: # sail is loose sail_out_rad = math.pi + app_wind_angle sail_out_rad = (sail_out_rad + math.pi) % (2 * math.pi) - math.pi return sail_out_rad
[docs] def simulation_update(self): """Updates the state-space model every SIM_DELTA_T seconds convert to radians.""" rudder_in_rad = self._map_slider( self.rudder_slider, RUDDER_ANGLE_MAX[0], RUDDER_ANGLE_MAX[1] ) sail_length_in_rad = self._map_slider( self.sail_slider, SAIL_ANGLE_MAX[0], SAIL_ANGLE_MAX[1] ) self._update_state_space_model(rudder_in_rad, sail_length_in_rad) return self.encode_serial_output()
[docs] def control_loop_update(self): """Control loop for the sailbot in automatic mode.""" if not self.waypoints: return self.logger.debug("Loop update") logger = self.logger.bind( next_waypoint=self.next_waypoint, waypoint_threshold=self.waypoint_threshold, waypoints=self.waypoints, ) logger.debug("Loop start") # update control inputs if in automatic mode if self.next_waypoint >= len(self.waypoints): logger.debug("Finished!") self.waypoints = [] self.next_waypoint = 0 self.zigzag_flag = False self.operation_mode = SailBotSimulatorMode.MANUAL return # convert current position and next waypoint to cartesian current_x, current_y = geographical2cartesian(self.latitude, self.longitude) next_geo = self.waypoints[self.next_waypoint] next_x, next_y = geographical2cartesian( next_geo.latitude / 1e6, next_geo.longitude / 1e6 ) # check if current position is within the threshold distance2target = math.sqrt( (next_x - current_x) ** 2 + (next_y - current_y) ** 2 ) if distance2target < self.waypoint_threshold: self.next_waypoint += 1 self.zigzag_flag = False return # compute angle between current position and target angle2target = math.atan2(next_y - current_y, next_x - current_x) angle2target %= 2 * math.pi true_wind = self.true_wind_angle % (2 * math.pi) # check if to get there, we have to sail against the wind in_irons_angle = 0.8 # radians upper_bound_irons = (true_wind + math.pi + in_irons_angle) % (2 * math.pi) lower_bound_irons = (true_wind + math.pi - in_irons_angle) % (2 * math.pi) detected_irons = False if lower_bound_irons > upper_bound_irons: if angle2target < upper_bound_irons or angle2target > lower_bound_irons: detected_irons = True else: if angle2target < upper_bound_irons and angle2target > lower_bound_irons: detected_irons = True if detected_irons: if self.zigzag_flag is False: logger.debug("In irons! Starting zig-zag routine") self.zigzag_flag = True # initialise zig zag parameters self.zigzag_width = 15 # meters self.zigzag_dir = False # where to start zig-zagging (can be improved by choosing the shortest path) self.line2target = SailBotSimulatorLineClass( Point(current_x, current_y), angle2target ) # already zig-zagging! if ( self.line2target.distance2point(Point(current_x, current_y)) > self.zigzag_width ): self.zigzag_dir = self.line2target.line_side( Point(current_x, current_y) ) # redefine reference heading angle2target = upper_bound_irons if self.zigzag_dir else lower_bound_irons else: # return to normal mode self.zigzag_flag = False # calculate error between angle2target and current heading error_heading2target = angle2target - self.direction error_heading2target = (error_heading2target + math.pi) % ( 2 * math.pi ) - math.pi # map error to rudder angle (proportional controller) Kp = -256 / math.pi # control proportional constant self.rudder_slider = int(clip(error_heading2target * Kp, -128, 127)) # linear map apparent wind angle to sail opening app_wind_angle = (self.app_wind_angle + math.pi) % (2 * math.pi) - math.pi sail_length = ((SAIL_ANGLE_MAX[0] - SAIL_ANGLE_MAX[1]) / math.pi) * abs( app_wind_angle ) + SAIL_ANGLE_MAX[1] self.sail_slider = int(clip(sail_length * 512 / math.pi - 128, -128, 127)) logger = self.logger.bind( rudder_slider=self.rudder_slider, sail_slider=self.sail_slider, zigzag_flag=self.zigzag_flag, ) logger.debug("Loop end")
[docs] def decode_serial_input(self, bytes_): """Decode the serial input received from the gateway.""" if bytes_[1] in [0xFF, 0xFE]: return frame = Frame.from_bytes(bytes_) if self.address == hex(frame.header.destination)[2:]: if frame.payload_type == PayloadType.CMD_MOVE_RAW: self.rudder_slider = ( frame.packet.payload.left_x - 256 if frame.packet.payload.left_x > 127 else frame.packet.payload.left_x ) self.sail_slider = ( frame.packet.payload.right_y - 256 if frame.packet.payload.right_y > 127 else frame.packet.payload.right_y ) if frame.payload_type == PayloadType.GPS_WAYPOINTS: self.operation_mode = SailBotSimulatorMode.MANUAL self.waypoint_threshold = frame.packet.payload.threshold self.waypoints = frame.packet.payload.waypoints if self.waypoints: self.operation_mode = SailBotSimulatorMode.AUTOMATIC
[docs] def encode_serial_output(self): """Encode the sailbot data to be sent to the gateway.""" payload = PayloadSailBotData( direction=(90 - int(math.degrees(self.direction))) % 360, latitude=int(self.latitude * 1e6), longitude=int(self.longitude * 1e6), wind_angle=(-int(math.degrees(self.app_wind_angle))) % 360, rudder_angle=int( math.degrees( self._map_slider( self.rudder_slider, RUDDER_ANGLE_MAX[0], RUDDER_ANGLE_MAX[1] ) ) ), sail_angle=int( math.degrees( self._map_slider( self.sail_slider, SAIL_ANGLE_MAX[0], SAIL_ANGLE_MAX[1] ) ) ), ) return Frame(header=self.header, packet=Packet().from_payload(payload))
[docs] def advertise(self): """Send an advertisement message to the gateway.""" frame = Frame( header=self.header, packet=Packet().from_payload( PayloadAdvertisement(application=ApplicationType.SailBot) ), ) return frame
[docs] class SailBotSimulatorCommunicationInterface(threading.Thread): """Bidirectional serial interface to control simulated robots.""" def __init__(self, on_frame_received: Callable): self.on_frame_received = on_frame_received self.running = True self.sailbots = [ SailBotSimulator("1234567890123456"), ] super().__init__(daemon=True) # automatically close when the main program exits self.logger = LOGGER.bind(context=__name__) self.logger.info("SailBot simulation started")
[docs] def run(self): """Listen continuously at each byte received on the fake serial interface.""" for sailbot in self.sailbots: self.on_frame_received(sailbot.advertise()) next_sim_time = time.time() + SIM_DELTA_T next_control_time = time.time() + CONTROL_DELTA_T updates = [bytearray()] * len(self.sailbots) updates_interval = 0 while self.running: current_time = time.time() # update simulation every SIM_DELTA_T seconds if current_time >= next_sim_time: for idx, sailbot in enumerate(self.sailbots): updates[idx] = sailbot.simulation_update() next_sim_time = current_time + SIM_DELTA_T if updates_interval >= 10: for update in updates: self.on_frame_received(update) updates_interval = 0 updates_interval += 1 # update control inputs every CONTROL_DELTA_T seconds if current_time >= next_control_time: for sailbot in self.sailbots: if sailbot.operation_mode == SailBotSimulatorMode.AUTOMATIC: sailbot.control_loop_update() next_control_time = current_time + CONTROL_DELTA_T time.sleep(0.02)
[docs] def stop(self): self.logger.info("Stopping Sailbot Simulation...") self.running = False self.join()
[docs] def write(self, bytes_): """Write bytes on the fake serial, similar to the real gateway.""" for sailbot in self.sailbots: sailbot.decode_serial_input(bytes_)