Source code for dotbot.rest
# SPDX-FileCopyrightText: 2022-present Inria
# SPDX-FileCopyrightText: 2022-present Alexandre Abadie <alexandre.abadie@inria.fr>
#
# SPDX-License-Identifier: BSD-3-Clause
"""Module containing client code to interact with the controller REST API."""
from contextlib import asynccontextmanager
from typing import List
import httpx
from dotbot.logger import LOGGER, setup_logging
from dotbot.models import DotBotModel, DotBotStatus
from dotbot.protocol import ApplicationType
[docs]
@asynccontextmanager
async def rest_client(host, port, https):
client = RestClient(host, port, https)
try:
yield client
finally:
await client.close()
[docs]
class RestClient:
"""Client to interact with the controller REST API."""
def __init__(self, hostname, port, https):
self.hostname = hostname
self.port = port
self.https = https
setup_logging(None, "info", ["console"])
self._logger = LOGGER.bind(context=__name__)
self._client = httpx.AsyncClient()
@property
def base_url(self):
"""Returns the base URL of the controller REST API."""
return f"{'https' if self.https else 'http'}://{self.hostname}:{self.port}/controller"
[docs]
async def close(self):
await self._client.aclose()
[docs]
async def fetch_active_dotbots(self) -> List[DotBotModel]:
"""Fetch active DotBots."""
try:
response = await self._client.get(
f"{self.base_url}/dotbots",
headers={
"Accept": "application/json",
},
)
except httpx.ConnectError as exc:
self._logger.warning(f"Failed to fetch dotbots: {exc}")
else:
if response.status_code != 200:
self._logger.warning(
f"Failed to fetch dotbots: {response} {response.text}"
)
else:
return [
DotBotModel(**dotbot)
for dotbot in response.json()
if dotbot["status"] == DotBotStatus.ACTIVE.value
]
return []
async def _send_command(self, address, application, resource, command):
try:
response = await self._client.put(
f"{self.base_url}/dotbots" f"/{address}/{application.value}/{resource}",
headers={
"Accept": "application/json",
"Content-Type": "application/json",
},
content=command.model_dump_json(),
)
except httpx.ConnectError as exc:
self._logger.warning(f"Failed to send command: {exc}")
return
if response.status_code != 200:
self._logger.error(
"Cannot send command",
response=str(response),
status_code=response.status_code,
content=str(response.text),
)
[docs]
async def send_move_raw_command(self, address, application, command):
"""Send a move raw command to a DotBot."""
await self._send_command(address, application, "move_raw", command)
[docs]
async def send_rgb_led_command(self, address, command):
"""Send an RGB LED command to a DotBot."""
await self._send_command(address, ApplicationType.SailBot, "rgb_led", command)
[docs]
async def send_waypoint_command(self, address, application, command):
"""Send an waypoint command to a DotBot."""
await self._send_command(address, application, "waypoints", command)