# SPDX-FileCopyrightText: 2024-present Inria
# SPDX-FileCopyrightText: 2024-present Diego Badillo <diego.badillo@sansano.usm.cl>
# SPDX-FileCopyrightText: 2024-present Mališa Vučinić <malisa.vucinic@inria.fr>
# SPDX-FileCopyrightText: 2024-present Alexandre Abadie <alexandre.abadie@inria.fr>
#
# SPDX-License-Identifier: BSD-3-Clause
"""Sailbot simulator for the DotBot project."""
import math
import threading
import time
from dataclasses import dataclass
from enum import Enum
from typing import Callable
from dotbot_utils.protocol import Frame, Header, Packet
from numpy import clip
from dotbot import GATEWAY_ADDRESS_DEFAULT
from dotbot.logger import LOGGER
from dotbot.protocol import (
ApplicationType,
PayloadAdvertisement,
PayloadSailBotData,
PayloadType,
)
SIM_DELTA_T = 0.01 # second
CONTROL_DELTA_T = 1 # second
RUDDER_ANGLE_MAX = (-math.pi / 6, math.pi / 6)
SAIL_ANGLE_MAX = (0, math.pi / 2)
# constants
EARTH_RADIUS_KM = 6371
ORIGIN_COORD_LAT = 48.825908
ORIGIN_COORD_LON = 2.406433
COS_PHI_0 = 0.658139837
[docs]
def cartesian2geographical(x, y):
"""Converts cartesian coordinates to geographical coordinates."""
latitude = (((y * 0.180) / math.pi) / EARTH_RADIUS_KM) + ORIGIN_COORD_LAT
longitude = (
((x * 0.180) / math.pi / COS_PHI_0) / EARTH_RADIUS_KM
) + ORIGIN_COORD_LON
return latitude, longitude
[docs]
def geographical2cartesian(latitude, longitude):
"""Converts geographical coordinates to cartesian coordinates."""
x = ((longitude - ORIGIN_COORD_LON) * EARTH_RADIUS_KM * COS_PHI_0 * math.pi) / 0.180
y = ((latitude - ORIGIN_COORD_LAT) * EARTH_RADIUS_KM * math.pi) / 0.180
return (x, y)
[docs]
class SailBotSimulatorMode(Enum):
"""Operation mode of the sailbot simulator."""
MANUAL = "MANUAL"
AUTOMATIC = "AUTOMATIC"
[docs]
@dataclass
class Point:
"""Point class for the sailbot simulator."""
x: float
y: float
[docs]
class SailBotSimulatorLineClass:
"""Simulator zig-zag routine helper class."""
def __init__(self, point: Point, angle_radians):
# calculate coefficients for Ax + By + C = 0
self.A = -math.tan(angle_radians)
self.B = 1
self.C = (math.tan(angle_radians) * point.x) - point.y
[docs]
def distance2point(self, point: Point):
"""Returns the distance from the point to the line."""
return abs((self.A * point.x) + (self.B * point.y) + self.C) / math.sqrt(
self.A**2 + self.B**2
)
[docs]
def line_side(self, point: Point):
"""Am I on the left or the right?."""
d = (self.A * point.x) + (self.B * point.y) + self.C
return d >= 0
[docs]
class SailBotSimulator:
"""Simulator class for the sailbot."""
def __init__(self, address):
self.address = address
self.true_wind_speed = 4 # [m/s]
self.true_wind_angle = 0.34906 # [rad] (20 degrees)
# initializations
self.latitude = 48.832313
self.longitude = 2.412689
self.app_wind_angle = 0.0 # [rad]
self.app_wind_speed = 0.0 # [m/s]
self.direction = math.pi / 2 # [rad]
self.x, self.y = geographical2cartesian(self.latitude, self.longitude)
self.v = 0.0 # speed [m/s]
self.w = 0.0 # angular velocity [rad/s]
# inputs received by controller
self.rudder_slider = 0 # rudder slider
self.sail_slider = 0 # sail slider
self.operation_mode: SailBotSimulatorMode = SailBotSimulatorMode.MANUAL
# autonomous mode initialisations
self.waypoint_threshold = 0
self.waypoints = []
self.next_waypoint = 0
self.zigzag_flag = False
self.logger = LOGGER.bind(context=__name__)
@property
def header(self):
return Header(
destination=int(GATEWAY_ADDRESS_DEFAULT, 16),
source=int(self.address, 16),
)
def _update_state_space_model(self, rudder_in_rad, sail_length_in_rad):
# define model parameters
p1 = 0.03 # drift coefficient [-]
p2 = 40 # tangential friction [kgs^−1]
p3 = 6000 # angular friction [kgm]
p4 = 200 # sail lift [kgs^−1]
p5 = 1500 # rudder lift [kgs^−1]
p6 = 0.5 # distance to sail CoE [m]
p7 = 0.5 # distance to mast [m]
p8 = 2 # distance to rudder [m]
p9 = 300 # mass of boat [kg]
p10 = 400 # moment of inertia [kgm^2]
p11 = 0.2 # rudder break coefficient [-]
# get apparent wind speed and angle, f(v,heading,true_wind)
self._true2apparent_wind()
# map mainsheet length to actual sail angle
sail_in_rad = self._mainsheet2sail_angle(
sail_length_in_rad, self.app_wind_angle
)
# rudder and sail forces
g_r = p5 * self.v**2 * math.sin(rudder_in_rad)
g_s = p4 * self.app_wind_speed * math.sin(sail_in_rad - self.app_wind_angle)
# state-space model
x_dot = self.v * math.cos(
self.direction
) + p1 * self.true_wind_speed * math.cos(self.true_wind_angle)
y_dot = self.v * math.sin(
self.direction
) + p1 * self.true_wind_speed * math.sin(self.true_wind_angle)
direction_dot = self.w
v_dot = (
g_s * math.sin(sail_in_rad)
- g_r * p11 * math.sin(rudder_in_rad)
- p2 * self.v**2
) / p9
w_dot = (
g_s * (p6 - p7 * math.cos(sail_in_rad))
- g_r * p8 * math.cos(rudder_in_rad)
- p3 * self.w * self.v
) / p10
# update state-space variables and apparent wind angle using Euler's method
self.x += x_dot * SIM_DELTA_T
self.y += y_dot * SIM_DELTA_T
self.direction += direction_dot * SIM_DELTA_T
self.v += v_dot * SIM_DELTA_T
self.w += w_dot * SIM_DELTA_T
# get latitude and longitude from cartesian coordinates
self.latitude, self.longitude = cartesian2geographical(self.x, self.y)
def _true2apparent_wind(self):
Wc_aw = (
self.true_wind_speed * math.cos(self.true_wind_angle - self.direction)
- self.v,
self.true_wind_speed * math.sin(self.true_wind_angle - self.direction),
)
self.app_wind_speed = math.sqrt(Wc_aw[0] ** 2 + Wc_aw[1] ** 2)
self.app_wind_angle = math.atan2(Wc_aw[1], Wc_aw[0])
def _map_slider(self, value, target_min, target_max):
slider_min, slider_max = -128, 128
return target_min + (target_max - target_min) * (value - slider_min) / (
slider_max - slider_min
)
def _mainsheet2sail_angle(self, sail_in_length_rad, app_wind_angle):
if math.cos(app_wind_angle) + math.cos(sail_in_length_rad) > 0:
# sail is tight
sail_out_rad = -math.copysign(sail_in_length_rad, math.sin(app_wind_angle))
else:
# sail is loose
sail_out_rad = math.pi + app_wind_angle
sail_out_rad = (sail_out_rad + math.pi) % (2 * math.pi) - math.pi
return sail_out_rad
[docs]
def simulation_update(self):
"""Updates the state-space model every SIM_DELTA_T seconds convert to radians."""
rudder_in_rad = self._map_slider(
self.rudder_slider, RUDDER_ANGLE_MAX[0], RUDDER_ANGLE_MAX[1]
)
sail_length_in_rad = self._map_slider(
self.sail_slider, SAIL_ANGLE_MAX[0], SAIL_ANGLE_MAX[1]
)
self._update_state_space_model(rudder_in_rad, sail_length_in_rad)
return self.encode_serial_output()
[docs]
def control_loop_update(self):
"""Control loop for the sailbot in automatic mode."""
if not self.waypoints:
return
self.logger.debug("Loop update")
logger = self.logger.bind(
next_waypoint=self.next_waypoint,
waypoint_threshold=self.waypoint_threshold,
waypoints=self.waypoints,
)
logger.debug("Loop start")
# update control inputs if in automatic mode
if self.next_waypoint >= len(self.waypoints):
logger.debug("Finished!")
self.waypoints = []
self.next_waypoint = 0
self.zigzag_flag = False
self.operation_mode = SailBotSimulatorMode.MANUAL
return
# convert current position and next waypoint to cartesian
current_x, current_y = geographical2cartesian(self.latitude, self.longitude)
next_geo = self.waypoints[self.next_waypoint]
next_x, next_y = geographical2cartesian(
next_geo.latitude / 1e6, next_geo.longitude / 1e6
)
# check if current position is within the threshold
distance2target = math.sqrt(
(next_x - current_x) ** 2 + (next_y - current_y) ** 2
)
if distance2target < self.waypoint_threshold:
self.next_waypoint += 1
self.zigzag_flag = False
return
# compute angle between current position and target
angle2target = math.atan2(next_y - current_y, next_x - current_x)
angle2target %= 2 * math.pi
true_wind = self.true_wind_angle % (2 * math.pi)
# check if to get there, we have to sail against the wind
in_irons_angle = 0.8 # radians
upper_bound_irons = (true_wind + math.pi + in_irons_angle) % (2 * math.pi)
lower_bound_irons = (true_wind + math.pi - in_irons_angle) % (2 * math.pi)
detected_irons = False
if lower_bound_irons > upper_bound_irons:
if angle2target < upper_bound_irons or angle2target > lower_bound_irons:
detected_irons = True
else:
if angle2target < upper_bound_irons and angle2target > lower_bound_irons:
detected_irons = True
if detected_irons:
if self.zigzag_flag is False:
logger.debug("In irons! Starting zig-zag routine")
self.zigzag_flag = True
# initialise zig zag parameters
self.zigzag_width = 15 # meters
self.zigzag_dir = False # where to start zig-zagging (can be improved by choosing the shortest path)
self.line2target = SailBotSimulatorLineClass(
Point(current_x, current_y), angle2target
)
# already zig-zagging!
if (
self.line2target.distance2point(Point(current_x, current_y))
> self.zigzag_width
):
self.zigzag_dir = self.line2target.line_side(
Point(current_x, current_y)
)
# redefine reference heading
angle2target = upper_bound_irons if self.zigzag_dir else lower_bound_irons
else:
# return to normal mode
self.zigzag_flag = False
# calculate error between angle2target and current heading
error_heading2target = angle2target - self.direction
error_heading2target = (error_heading2target + math.pi) % (
2 * math.pi
) - math.pi
# map error to rudder angle (proportional controller)
Kp = -256 / math.pi # control proportional constant
self.rudder_slider = int(clip(error_heading2target * Kp, -128, 127))
# linear map apparent wind angle to sail opening
app_wind_angle = (self.app_wind_angle + math.pi) % (2 * math.pi) - math.pi
sail_length = ((SAIL_ANGLE_MAX[0] - SAIL_ANGLE_MAX[1]) / math.pi) * abs(
app_wind_angle
) + SAIL_ANGLE_MAX[1]
self.sail_slider = int(clip(sail_length * 512 / math.pi - 128, -128, 127))
logger = self.logger.bind(
rudder_slider=self.rudder_slider,
sail_slider=self.sail_slider,
zigzag_flag=self.zigzag_flag,
)
logger.debug("Loop end")
[docs]
def encode_serial_output(self):
"""Encode the sailbot data to be sent to the gateway."""
payload = PayloadSailBotData(
direction=(90 - int(math.degrees(self.direction))) % 360,
latitude=int(self.latitude * 1e6),
longitude=int(self.longitude * 1e6),
wind_angle=(-int(math.degrees(self.app_wind_angle))) % 360,
rudder_angle=int(
math.degrees(
self._map_slider(
self.rudder_slider, RUDDER_ANGLE_MAX[0], RUDDER_ANGLE_MAX[1]
)
)
),
sail_angle=int(
math.degrees(
self._map_slider(
self.sail_slider, SAIL_ANGLE_MAX[0], SAIL_ANGLE_MAX[1]
)
)
),
)
return Frame(header=self.header, packet=Packet().from_payload(payload))
[docs]
def advertise(self):
"""Send an advertisement message to the gateway."""
frame = Frame(
header=self.header,
packet=Packet().from_payload(
PayloadAdvertisement(application=ApplicationType.SailBot)
),
)
return frame
[docs]
class SailBotSimulatorCommunicationInterface(threading.Thread):
"""Bidirectional serial interface to control simulated robots."""
def __init__(self, on_frame_received: Callable):
self.on_frame_received = on_frame_received
self.running = True
self.sailbots = [
SailBotSimulator("1234567890123456"),
]
super().__init__(daemon=True) # automatically close when the main program exits
self.start()
self.logger = LOGGER.bind(context=__name__)
self.logger.info("SailBot simulation started")
[docs]
def run(self):
"""Listen continuously at each byte received on the fake serial interface."""
for sailbot in self.sailbots:
self.on_frame_received(sailbot.advertise())
next_sim_time = time.time() + SIM_DELTA_T
next_control_time = time.time() + CONTROL_DELTA_T
updates = [bytearray()] * len(self.sailbots)
updates_interval = 0
while self.running:
current_time = time.time()
# update simulation every SIM_DELTA_T seconds
if current_time >= next_sim_time:
for idx, sailbot in enumerate(self.sailbots):
updates[idx] = sailbot.simulation_update()
next_sim_time = current_time + SIM_DELTA_T
if updates_interval >= 10:
for update in updates:
self.on_frame_received(update)
updates_interval = 0
updates_interval += 1
# update control inputs every CONTROL_DELTA_T seconds
if current_time >= next_control_time:
for sailbot in self.sailbots:
if sailbot.operation_mode == SailBotSimulatorMode.AUTOMATIC:
sailbot.control_loop_update()
next_control_time = current_time + CONTROL_DELTA_T
time.sleep(0.02)
[docs]
def stop(self):
self.logger.info("Stopping Sailbot Simulation...")
self.running = False
self.join()
[docs]
def write(self, bytes_):
"""Write bytes on the fake serial, similar to the real gateway."""
for sailbot in self.sailbots:
sailbot.decode_serial_input(bytes_)