# SPDX-FileCopyrightText: 2022-present Inria
# SPDX-FileCopyrightText: 2022-present Alexandre Abadie <alexandre.abadie@inria.fr>
#
# SPDX-License-Identifier: BSD-3-Clause
"""Module containing client code to interact with the controller REST API."""
import urllib.parse
from contextlib import asynccontextmanager
from typing import List, Optional
import httpx
from dotbot.logger import LOGGER, setup_logging
from dotbot.models import DotBotMapSizeModel, DotBotModel, DotBotQueryModel
from dotbot.protocol import ApplicationType
[docs]
class RestClient:
"""Client to interact with the controller REST API."""
def __init__(self, hostname, port, https):
self.hostname = hostname
self.port = port
self.https = https
setup_logging(None, "info", ["console"])
self._logger = LOGGER.bind(context=__name__)
self._client = httpx.AsyncClient()
@property
def base_url(self):
"""Returns the base URL of the controller REST API."""
return f"{'https' if self.https else 'http'}://{self.hostname}:{self.port}/controller"
[docs]
async def close(self):
await self._client.aclose()
[docs]
async def fetch_dotbots(
self, query: Optional[DotBotQueryModel] = None
) -> List[DotBotModel]:
"""Fetch DotBots matching the query."""
try:
url = f"{self.base_url}/dotbots"
if query is not None:
url += f"?{urllib.parse.urlencode(query.model_dump(exclude_none=True))}"
response = await self._client.get(
url,
headers={
"Accept": "application/json",
},
)
except httpx.ConnectError as exc:
self._logger.warning(f"Failed to fetch dotbots: {exc}")
else:
if response.status_code != 200:
self._logger.warning(
f"Failed to fetch dotbots: {response} {response.text}"
)
else:
return [DotBotModel(**dotbot) for dotbot in response.json()]
return []
[docs]
async def fetch_map_size(self) -> DotBotMapSizeModel:
"""Fetch DotBot area map size."""
try:
response = await self._client.get(
f"{self.base_url}/map_size",
headers={
"Accept": "application/json",
},
)
except httpx.ConnectError as exc:
self._logger.warning(f"Failed to fetch map size: {exc}")
else:
if response.status_code != 200:
self._logger.warning(
f"Failed to fetch map size: {response} {response.text}"
)
raise RuntimeError("Failed to fetch map size")
return DotBotMapSizeModel(**response.json())
async def _send_command(self, address, application, resource, command):
self._logger.info(
"Sending command",
address=address,
application=application,
resource=resource,
command=command.__class__.__name__,
)
try:
response = await self._client.put(
f"{self.base_url}/dotbots/{address}/{application.value}/{resource}",
headers={
"Accept": "application/json",
"Content-Type": "application/json",
},
content=command.model_dump_json(),
)
except httpx.ConnectError as exc:
self._logger.warning(f"Failed to send command: {exc}")
return
if response.status_code != 200:
self._logger.error(
"Cannot send command",
response=str(response),
status_code=response.status_code,
content=str(response.text),
)
[docs]
async def send_move_raw_command(self, address, application, command):
"""Send a move raw command to a DotBot."""
await self._send_command(address, application, "move_raw", command)
[docs]
async def send_rgb_led_command(self, address, command):
"""Send an RGB LED command to a DotBot."""
await self._send_command(address, ApplicationType.SailBot, "rgb_led", command)
[docs]
async def send_waypoint_command(self, address, application, command):
"""Send an waypoint command to a DotBot."""
await self._send_command(address, application, "waypoints", command)
[docs]
async def clear_position_history(self, address):
"""Clear the position history of a DotBot."""
try:
response = await self._client.put(
f"{self.base_url}/dotbots" f"/{address}/positions",
headers={
"Accept": "application/json",
"Content-Type": "application/json",
},
)
except httpx.ConnectError as exc:
self._logger.warning(f"Failed to clear positions: {exc}")
return
if response.status_code != 200:
self._logger.error(
"Cannot clear positions",
response=str(response),
status_code=response.status_code,
content=str(response.text),
)
[docs]
@asynccontextmanager
async def rest_client(host, port, https):
client = RestClient(host, port, https)
try:
yield client
finally:
await client.close()