Source code for dotbot.server

# SPDX-FileCopyrightText: 2022-present Inria
# SPDX-FileCopyrightText: 2022-present Alexandre Abadie <alexandre.abadie@inria.fr>
#
# SPDX-License-Identifier: BSD-3-Clause

"""Module for the web server application."""

import base64
import os
from typing import Annotated, List

import httpx
from fastapi import (
    FastAPI,
    HTTPException,
    Query,
    WebSocket,
    WebSocketDisconnect,
)
from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import Response
from fastapi.staticfiles import StaticFiles
from pydantic import TypeAdapter, ValidationError
from starlette.middleware.base import BaseHTTPMiddleware

from dotbot import pydotbot_version
from dotbot.logger import LOGGER
from dotbot.models import (
    MAX_POSITION_HISTORY_SIZE,
    DotBotBackgroundMapModel,
    DotBotMapSizeModel,
    DotBotModel,
    DotBotMoveRawCommandModel,
    DotBotNotificationCommand,
    DotBotNotificationModel,
    DotBotNotificationUpdate,
    DotBotQueryModel,
    DotBotRgbLedCommandModel,
    DotBotWaypoints,
    WSMessage,
    WSMoveRaw,
    WSRgbLed,
    WSWaypoints,
)
from dotbot.protocol import (
    ApplicationType,
    PayloadCommandMoveRaw,
    PayloadCommandRgbLed,
    PayloadGPSPosition,
    PayloadGPSWaypoints,
    PayloadLH2Location,
    PayloadLH2Waypoints,
)

PYDOTBOT_FRONTEND_BASE_URL = os.getenv(
    "PYDOTBOT_FRONTEND_BASE_URL", "https://dotbots.github.io/PyDotBot"
)

ws_adapter = TypeAdapter(WSMessage)


[docs] class ReverseProxyMiddleware(BaseHTTPMiddleware):
[docs] async def dispatch(self, request, call_next): if request.url.path.startswith("/pin"): headers = {k: v for k, v in request.headers.items()} url = f"http://localhost:8080{request.url.path}" async with httpx.AsyncClient() as client: try: response = await client.get( url, headers=headers, ) except httpx.ConnectError as exc: LOGGER.warning(exc) return Response(status_code=502, content=b"Proxy connection failed") return Response( content=response.content, status_code=response.status_code, headers=response.headers, ) response = await call_next(request) return response
api = FastAPI( debug=0, title="DotBot controller API", description="This is the DotBot controller API", version=pydotbot_version(), docs_url="/api", redoc_url=None, ) api.add_middleware( CORSMiddleware, allow_origins=["*"], allow_credentials=True, allow_methods=["*"], allow_headers=["*"], ) api.add_middleware(ReverseProxyMiddleware)
[docs] @api.put( path="/controller/dotbots/{address}/{application}/move_raw", summary="Move the dotbot", tags=["dotbots"], ) async def dotbots_move_raw( address: str, application: int, command: DotBotMoveRawCommandModel ): """Set the current active DotBot.""" if address not in api.controller.dotbots: raise HTTPException(status_code=404, detail="No matching dotbot found") _dotbots_move_raw(address=address, command=command)
def _dotbots_move_raw(address: str, command: DotBotMoveRawCommandModel): payload = PayloadCommandMoveRaw( left_x=command.left_x, left_y=command.left_y, right_x=command.right_x, right_y=command.right_y, ) api.controller.send_payload(int(address, 16), payload) api.controller.dotbots[address].move_raw = command
[docs] @api.put( path="/controller/dotbots/{address}/{application}/rgb_led", summary="Set the dotbot RGB LED color", tags=["dotbots"], ) async def dotbots_rgb_led( address: str, application: int, command: DotBotRgbLedCommandModel ): """Set the current active DotBot.""" if address not in api.controller.dotbots: raise HTTPException(status_code=404, detail="No matching dotbot found") await _dotbots_rgb_led(address=address, command=command)
async def _dotbots_rgb_led(address: str, command: DotBotRgbLedCommandModel): payload = PayloadCommandRgbLed( red=command.red, green=command.green, blue=command.blue ) api.controller.send_payload(int(address, 16), payload) api.controller.dotbots[address].rgb_led = command notification = DotBotNotificationModel( cmd=DotBotNotificationCommand.UPDATE, data=DotBotNotificationUpdate(address=address, rgb_led=command), ) await api.controller.notify_clients(notification)
[docs] @api.put( path="/controller/dotbots/{address}/{application}/waypoints", summary="Set the dotbot control mode", tags=["dotbots"], ) async def dotbots_waypoints( address: str, application: int, waypoints: DotBotWaypoints, ): """Set the waypoints of a DotBot.""" if address not in api.controller.dotbots: raise HTTPException(status_code=404, detail="No matching dotbot found") await _dotbots_waypoints( address=address, application=application, waypoints=waypoints )
async def _dotbots_waypoints( address: str, application: int, waypoints: DotBotWaypoints, ): waypoints_list = waypoints.waypoints if application == ApplicationType.SailBot.value: if api.controller.dotbots[address].gps_position is not None: waypoints_list = [ api.controller.dotbots[address].gps_position ] + waypoints.waypoints payload = PayloadGPSWaypoints( threshold=waypoints.threshold, count=len(waypoints.waypoints), waypoints=[ PayloadGPSPosition( latitude=int(waypoint.latitude * 1e6), longitude=int(waypoint.longitude * 1e6), ) for waypoint in waypoints.waypoints ], ) update_data = DotBotNotificationUpdate( address=address, gps_waypoints=waypoints_list, waypoints_threshold=waypoints.threshold, ) else: # DotBot application if api.controller.dotbots[address].lh2_position is not None: waypoints_list = [ api.controller.dotbots[address].lh2_position ] + waypoints.waypoints payload = PayloadLH2Waypoints( threshold=waypoints.threshold, count=len(waypoints.waypoints), waypoints=[ PayloadLH2Location( pos_x=int(waypoint.x), pos_y=int(waypoint.y), ) for waypoint in waypoints.waypoints ], ) update_data = DotBotNotificationUpdate( address=address, lh2_waypoints=waypoints_list, waypoints_threshold=waypoints.threshold, ) api.controller.dotbots[address].waypoints = waypoints_list api.controller.dotbots[address].waypoints_threshold = waypoints.threshold api.controller.send_payload(int(address, 16), payload) notification = DotBotNotificationModel( cmd=DotBotNotificationCommand.UPDATE, data=update_data ) await api.controller.notify_clients(notification)
[docs] @api.delete( path="/controller/dotbots/{address}/positions", summary="Clear the history of positions of a DotBot", tags=["dotbots"], ) async def dotbot_positions_history_clear(address: str): """Clear the history of positions of a dotbot.""" if address not in api.controller.dotbots: raise HTTPException(status_code=404, detail="No matching dotbot found") api.controller.dotbots[address].position_history = [] await api.controller.notify_clients( DotBotNotificationModel( cmd=DotBotNotificationCommand.UPDATE, data=DotBotNotificationUpdate(address=address, position_history=[]), ) )
[docs] @api.get( path="/controller/dotbots/{address}", response_model=DotBotModel, response_model_exclude_none=True, summary="Return information about a dotbot given its address", tags=["dotbots"], ) async def dotbot(address: str, max_positions: int = MAX_POSITION_HISTORY_SIZE): """Dotbot HTTP GET handler.""" if address not in api.controller.dotbots: raise HTTPException(status_code=404, detail="No matching dotbot found") _dotbot = DotBotModel(**api.controller.dotbots[address].model_dump()) _dotbot.position_history = _dotbot.position_history[:max_positions] return _dotbot
[docs] @api.get( path="/controller/dotbots", response_model=List[DotBotModel], response_model_exclude_none=True, summary="Return the list of available dotbots", tags=["dotbots"], ) async def dotbots(query: Annotated[DotBotQueryModel, Query()]): """Dotbots HTTP GET handler.""" return api.controller.get_dotbots(query)
[docs] @api.get( path="/controller/map_size", response_model=DotBotMapSizeModel, response_model_exclude_none=True, summary="Return the map size of the controller", tags=["controller"], ) async def map_size(): """Map size HTTP GET handler.""" return api.controller.map_size
[docs] @api.get( path="/controller/background_map", response_model=DotBotBackgroundMapModel, summary="Return the background map of the controller", tags=["controller"], ) async def background_map(): """Background map HTTP GET handler.""" if not api.controller.settings.background_map: return DotBotBackgroundMapModel(data="") with open(api.controller.settings.background_map, "rb") as f: encoded_string = base64.b64encode(f.read()).decode("utf-8") return DotBotBackgroundMapModel(data=encoded_string)
[docs] @api.websocket("/controller/ws/status") async def websocket_endpoint(websocket: WebSocket): """Websocket server endpoint.""" await websocket.accept() api.controller.websockets.append(websocket) try: while True: _ = await websocket.receive_text() except WebSocketDisconnect: if websocket in api.controller.websockets: api.controller.websockets.remove(websocket)
[docs] @api.websocket("/controller/ws/dotbots") async def ws_dotbots(websocket: WebSocket): await websocket.accept() try: while True: raw = await websocket.receive_json() try: msg = ws_adapter.validate_python(raw) except ValidationError as e: await websocket.send_json( { "error": "invalid_message", "details": e.errors(), } ) continue if msg.address not in api.controller.dotbots: # ignore messages where address doesn't exist continue if isinstance(msg, WSRgbLed): await _dotbots_rgb_led( address=msg.address, command=msg.data, ) elif isinstance(msg, WSMoveRaw): _dotbots_move_raw( address=msg.address, command=msg.data, ) elif isinstance(msg, WSWaypoints): await _dotbots_waypoints( address=msg.address, application=msg.application, waypoints=msg.data, ) except WebSocketDisconnect: LOGGER.debug("WebSocket client disconnected")
# Mount static files after all routes are defined FRONTEND_DIR = os.path.join(os.path.dirname(__file__), "frontend", "build") if os.path.isdir(FRONTEND_DIR): api.mount( "/PyDotBot", StaticFiles(directory=FRONTEND_DIR, html=True), name="PyDotBot" ) else: LOGGER.warning( "Frontend build not found at %s; the web UI will be unavailable. " "Install the published wheel (pip install --pre pydotbot) or build the " "frontend: cd dotbot/frontend && npm install && npm run build", FRONTEND_DIR, )