Source code for dotbot.rest

# SPDX-FileCopyrightText: 2022-present Inria
# SPDX-FileCopyrightText: 2022-present Alexandre Abadie <alexandre.abadie@inria.fr>
#
# SPDX-License-Identifier: BSD-3-Clause

"""Module containing client code to interact with the controller REST API."""

import urllib.parse
from contextlib import asynccontextmanager
from typing import List, Optional

import httpx

from dotbot.logger import LOGGER, setup_logging
from dotbot.models import DotBotMapSizeModel, DotBotModel, DotBotQueryModel
from dotbot.protocol import ApplicationType


[docs] class RestClient: """Client to interact with the controller REST API.""" def __init__(self, hostname, port, https): self.hostname = hostname self.port = port self.https = https setup_logging(None, "info", ["console"]) self._logger = LOGGER.bind(context=__name__) self._client = httpx.AsyncClient() @property def base_url(self): """Returns the base URL of the controller REST API.""" return f"{'https' if self.https else 'http'}://{self.hostname}:{self.port}/controller"
[docs] async def close(self): await self._client.aclose()
[docs] async def fetch_dotbots( self, query: Optional[DotBotQueryModel] = None ) -> List[DotBotModel]: """Fetch DotBots matching the query.""" try: url = f"{self.base_url}/dotbots" if query is not None: url += f"?{urllib.parse.urlencode(query.model_dump(exclude_none=True))}" response = await self._client.get( url, headers={ "Accept": "application/json", }, ) except httpx.ConnectError as exc: self._logger.warning(f"Failed to fetch dotbots: {exc}") else: if response.status_code != 200: self._logger.warning( f"Failed to fetch dotbots: {response} {response.text}" ) else: return [DotBotModel(**dotbot) for dotbot in response.json()] return []
[docs] async def fetch_map_size(self) -> DotBotMapSizeModel: """Fetch DotBot area map size.""" try: response = await self._client.get( f"{self.base_url}/map_size", headers={ "Accept": "application/json", }, ) except httpx.ConnectError as exc: self._logger.warning(f"Failed to fetch map size: {exc}") else: if response.status_code != 200: self._logger.warning( f"Failed to fetch map size: {response} {response.text}" ) raise RuntimeError("Failed to fetch map size") return DotBotMapSizeModel(**response.json())
async def _send_command(self, address, application, resource, command): self._logger.info( "Sending command", address=address, application=application, resource=resource, command=command.__class__.__name__, ) try: response = await self._client.put( f"{self.base_url}/dotbots/{address}/{application.value}/{resource}", headers={ "Accept": "application/json", "Content-Type": "application/json", }, content=command.model_dump_json(), ) except httpx.ConnectError as exc: self._logger.warning(f"Failed to send command: {exc}") return if response.status_code != 200: self._logger.error( "Cannot send command", response=str(response), status_code=response.status_code, content=str(response.text), )
[docs] async def send_move_raw_command(self, address, application, command): """Send a move raw command to a DotBot.""" await self._send_command(address, application, "move_raw", command)
[docs] async def send_rgb_led_command(self, address, command): """Send an RGB LED command to a DotBot.""" await self._send_command(address, ApplicationType.SailBot, "rgb_led", command)
[docs] async def send_waypoint_command(self, address, application, command): """Send an waypoint command to a DotBot.""" await self._send_command(address, application, "waypoints", command)
[docs] async def clear_position_history(self, address): """Clear the position history of a DotBot.""" try: response = await self._client.put( f"{self.base_url}/dotbots" f"/{address}/positions", headers={ "Accept": "application/json", "Content-Type": "application/json", }, ) except httpx.ConnectError as exc: self._logger.warning(f"Failed to clear positions: {exc}") return if response.status_code != 200: self._logger.error( "Cannot clear positions", response=str(response), status_code=response.status_code, content=str(response.text), )
[docs] @asynccontextmanager async def rest_client(host, port, https): client = RestClient(host, port, https) try: yield client finally: await client.close()