Source code for dotbot.server

# SPDX-FileCopyrightText: 2022-present Inria
# SPDX-FileCopyrightText: 2022-present Alexandre Abadie <alexandre.abadie@inria.fr>
#
# SPDX-License-Identifier: BSD-3-Clause

"""Module for the web server application."""

import os
from typing import List

import httpx
from fastapi import Depends, FastAPI, HTTPException, WebSocket, WebSocketDisconnect
from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import Response
from fastapi.staticfiles import StaticFiles
from pydantic import TypeAdapter, ValidationError
from starlette.middleware.base import BaseHTTPMiddleware

from dotbot import pydotbot_version
from dotbot.logger import LOGGER
from dotbot.models import (
    DotBotModel,
    DotBotMoveRawCommandModel,
    DotBotNotificationCommand,
    DotBotNotificationModel,
    DotBotQueryModel,
    DotBotRgbLedCommandModel,
    DotBotWaypoints,
    WSMessage,
    WSMoveRaw,
    WSRgbLed,
    WSWaypoints,
)
from dotbot.protocol import (
    ApplicationType,
    PayloadCommandMoveRaw,
    PayloadCommandRgbLed,
    PayloadGPSPosition,
    PayloadGPSWaypoints,
    PayloadLH2Location,
    PayloadLH2Waypoints,
)

PYDOTBOT_FRONTEND_BASE_URL = os.getenv(
    "PYDOTBOT_FRONTEND_BASE_URL", "https://dotbots.github.io/PyDotBot"
)

ws_adapter = TypeAdapter(WSMessage)


[docs] class ReverseProxyMiddleware(BaseHTTPMiddleware):
[docs] async def dispatch(self, request, call_next): if request.url.path.startswith("/pin"): headers = {k: v for k, v in request.headers.items()} url = f"http://localhost:8080{request.url.path}" async with httpx.AsyncClient() as client: try: response = await client.get( url, headers=headers, ) except httpx.ConnectError as exc: LOGGER.warning(exc) return Response(status_code=502, content=b"Proxy connection failed") return Response( content=response.content, status_code=response.status_code, headers=response.headers, ) response = await call_next(request) return response
api = FastAPI( debug=0, title="DotBot controller API", description="This is the DotBot controller API", version=pydotbot_version(), docs_url="/api", redoc_url=None, ) api.add_middleware( CORSMiddleware, allow_origins=["*"], allow_credentials=True, allow_methods=["*"], allow_headers=["*"], ) api.add_middleware(ReverseProxyMiddleware)
[docs] @api.put( path="/controller/dotbots/{address}/{application}/move_raw", summary="Move the dotbot", tags=["dotbots"], ) async def dotbots_move_raw( address: str, application: int, command: DotBotMoveRawCommandModel ): """Set the current active DotBot.""" if address not in api.controller.dotbots: raise HTTPException(status_code=404, detail="No matching dotbot found") _dotbots_move_raw(address=address, command=command)
def _dotbots_move_raw(address: str, command: DotBotMoveRawCommandModel): payload = PayloadCommandMoveRaw( left_x=command.left_x, left_y=command.left_y, right_x=command.right_x, right_y=command.right_y, ) api.controller.send_payload(int(address, 16), payload) api.controller.dotbots[address].move_raw = command
[docs] @api.put( path="/controller/dotbots/{address}/{application}/rgb_led", summary="Set the dotbot RGB LED color", tags=["dotbots"], ) async def dotbots_rgb_led( address: str, application: int, command: DotBotRgbLedCommandModel ): """Set the current active DotBot.""" if address not in api.controller.dotbots: raise HTTPException(status_code=404, detail="No matching dotbot found") _dotbots_rgb_led(address=address, command=command)
def _dotbots_rgb_led(address: str, command: DotBotRgbLedCommandModel): payload = PayloadCommandRgbLed( red=command.red, green=command.green, blue=command.blue ) api.controller.send_payload(int(address, 16), payload) api.controller.dotbots[address].rgb_led = command
[docs] @api.put( path="/controller/dotbots/{address}/{application}/waypoints", summary="Set the dotbot control mode", tags=["dotbots"], ) async def dotbots_waypoints( address: str, application: int, waypoints: DotBotWaypoints, ): """Set the waypoints of a DotBot.""" if address not in api.controller.dotbots: raise HTTPException(status_code=404, detail="No matching dotbot found") await _dotbots_waypoints( address=address, application=application, waypoints=waypoints )
async def _dotbots_waypoints( address: str, application: int, waypoints: DotBotWaypoints, ): waypoints_list = waypoints.waypoints if application == ApplicationType.SailBot.value: if api.controller.dotbots[address].gps_position is not None: waypoints_list = [ api.controller.dotbots[address].gps_position ] + waypoints.waypoints payload = PayloadGPSWaypoints( threshold=waypoints.threshold, count=len(waypoints.waypoints), waypoints=[ PayloadGPSPosition( latitude=int(waypoint.latitude * 1e6), longitude=int(waypoint.longitude * 1e6), ) for waypoint in waypoints.waypoints ], ) else: # DotBot application if api.controller.dotbots[address].lh2_position is not None: waypoints_list = [ api.controller.dotbots[address].lh2_position ] + waypoints.waypoints payload = PayloadLH2Waypoints( threshold=waypoints.threshold, count=len(waypoints.waypoints), waypoints=[ PayloadLH2Location( pos_x=int(waypoint.x * 1e6), pos_y=int(waypoint.y * 1e6), pos_z=int(waypoint.z * 1e6), ) for waypoint in waypoints.waypoints ], ) api.controller.dotbots[address].waypoints = waypoints_list api.controller.dotbots[address].waypoints_threshold = waypoints.threshold api.controller.send_payload(int(address, 16), payload) await api.controller.notify_clients( DotBotNotificationModel(cmd=DotBotNotificationCommand.RELOAD) )
[docs] @api.delete( path="/controller/dotbots/{address}/positions", summary="Clear the history of positions of a DotBot", tags=["dotbots"], ) async def dotbot_positions_history_clear(address: str): """Clear the history of positions of a dotbot.""" if address not in api.controller.dotbots: raise HTTPException(status_code=404, detail="No matching dotbot found") api.controller.dotbots[address].position_history = []
[docs] @api.get( path="/controller/dotbots/{address}", response_model=DotBotModel, response_model_exclude_none=True, summary="Return information about a dotbot given its address", tags=["dotbots"], ) async def dotbot(address: str, query: DotBotQueryModel = Depends()): """Dotbot HTTP GET handler.""" if address not in api.controller.dotbots: raise HTTPException(status_code=404, detail="No matching dotbot found") _dotbot = DotBotModel(**api.controller.dotbots[address].model_dump()) _dotbot.position_history = _dotbot.position_history[: query.max_positions] return _dotbot
[docs] @api.get( path="/controller/dotbots", response_model=List[DotBotModel], response_model_exclude_none=True, summary="Return the list of available dotbots", tags=["dotbots"], ) async def dotbots(query: DotBotQueryModel = Depends()): """Dotbots HTTP GET handler.""" return api.controller.get_dotbots(query)
[docs] @api.websocket("/controller/ws/status") async def websocket_endpoint(websocket: WebSocket): """Websocket server endpoint.""" await websocket.accept() api.controller.websockets.append(websocket) try: while True: _ = await websocket.receive_text() except WebSocketDisconnect: if websocket in api.controller.websockets: api.controller.websockets.remove(websocket)
[docs] @api.websocket("/controller/ws/dotbots") async def ws_dotbots(websocket: WebSocket): await websocket.accept() try: while True: raw = await websocket.receive_json() try: msg = ws_adapter.validate_python(raw) except ValidationError as e: await websocket.send_json( { "error": "invalid_message", "details": e.errors(), } ) continue if msg.address not in api.controller.dotbots: # ignore messages where address doesn't exist continue if isinstance(msg, WSRgbLed): _dotbots_rgb_led( address=msg.address, command=msg.data, ) elif isinstance(msg, WSMoveRaw): _dotbots_move_raw( address=msg.address, command=msg.data, ) elif isinstance(msg, WSWaypoints): await _dotbots_waypoints( address=msg.address, application=msg.application, waypoints=msg.data, ) except WebSocketDisconnect: LOGGER.debug("WebSocket client disconnected")
# Mount static files after all routes are defined FRONTEND_DIR = os.path.join(os.path.dirname(__file__), "frontend", "build") api.mount("/PyDotBot", StaticFiles(directory=FRONTEND_DIR, html=True), name="PyDotBot")