Source code for dotbot.joystick

# SPDX-FileCopyrightText: 2022-present Inria
# SPDX-FileCopyrightText: 2022-present Alexandre Abadie <alexandre.abadie@inria.fr>
#
# SPDX-License-Identifier: BSD-3-Clause

"""Module implementing a joystick Dotbot controller."""

# pylint: disable=attribute-defined-outside-init

import asyncio
import os
import sys

import click

from dotbot import (
    CONTROLLER_HTTP_HOSTNAME_DEFAULT,
    CONTROLLER_HTTP_PORT_DEFAULT,
    DOTBOT_ADDRESS_DEFAULT,
    pydotbot_version,
)
from dotbot.logger import LOGGER, setup_logging
from dotbot.models import DotBotMoveRawCommandModel
from dotbot.protocol import ApplicationType
from dotbot.rest import rest_client

# Pygame support prompt is annoying, it can be hidden using an environment variable
os.environ["PYGAME_HIDE_SUPPORT_PROMPT"] = "1"
import pygame  # noqa: E402, pylint: disable=wrong-import-order, wrong-import-position

JOYSTICK_HYSTERERIS_THRES = 0.09
JOYSTICK_AXIS_COUNT = 4
REFRESH_PERIOD = 0.05
NULL_POSITION = [0.0, 0.0, 0.0, 0.0]

DOTBOT_APPLICATION_DEFAULT = "sailbot"
APPLICATION_TYPE_MAP = {
    "dotbot": ApplicationType.DotBot,
    "sailbot": ApplicationType.SailBot,
}


[docs] class JoystickController: """A Dotbot controller for a joystick interface.""" def __init__(self, joystick_index, client, dotbot_address, application): """Initialize the joystick controller.""" self.api = rest_client self.dotbots = [] self.dotbot_address = dotbot_address self.application = APPLICATION_TYPE_MAP[application] pygame.init() # pylint: disable=no-member pygame.joystick.init() # joysticks initialization self._logger = LOGGER.bind(context=__name__) if pygame.joystick.get_count() < joystick_index + 1: self._logger.error("No joystick connected") sys.exit("Error: No joystick connected.\nExiting program...") self.joystick = pygame.joystick.Joystick( joystick_index ) # instantiation of a joystick self.joystick.init() # initialization of the joystick num_axes = self.joystick.get_numaxes() if num_axes < JOYSTICK_AXIS_COUNT: self._logger.error("Not enough axes") sys.exit( f"Not enough axes on your joystick. {num_axes} found, expected at least {JOYSTICK_AXIS_COUNT}." ) self.previous_positions = NULL_POSITION self._logger.info("Controller initialized", num_axes=num_axes) @property def selected_dotbot(self): _selected_dotbot = self.dotbot_address if _selected_dotbot == DOTBOT_ADDRESS_DEFAULT: if self.dotbots and self.dotbots[0]["status"] == 0: _selected_dotbot = self.dotbots[0]["address"] else: self._logger.info("No active DotBot") return elif _selected_dotbot not in [dotbot["address"] for dotbot in self.dotbots]: self._logger.info("Active DotBot not available") return return _selected_dotbot
[docs] def pos_from_joystick(self): """Fetch positions of the joystick.""" pygame.event.pump() # queue needs to be pumped positions = [] for axis_idx in range(JOYSTICK_AXIS_COUNT): axis = self.joystick.get_axis(axis_idx) # dead zones if axis_idx % 2: axis = -axis if -JOYSTICK_HYSTERERIS_THRES < axis <= JOYSTICK_HYSTERERIS_THRES: axis = 0.0 # from [-1;1] to [-127;127] positions.append(axis * 127) return positions
[docs] async def fetch_active_dotbots(self): while 1: self.dotbots = await self.api.fetch_active_dotbots() await asyncio.sleep(1)
[docs] async def start(self): """Starts to read continuously joystick positions.""" asyncio.create_task(self.fetch_active_dotbots()) while True: # fetch positions from joystick positions = self.pos_from_joystick() if positions != NULL_POSITION or self.previous_positions != NULL_POSITION: self._logger.info("refresh positions", positions=positions) await self.api.send_move_raw_command( self.selected_dotbot, self.application, DotBotMoveRawCommandModel( left_x=int(positions[0]), left_y=int(positions[1]), right_x=int(positions[2]), right_y=int(positions[3]), ), ) self.previous_positions = positions await asyncio.sleep(REFRESH_PERIOD) # 50ms delay between each update
@click.command() @click.option( "-j", "--joystick", type=int, default=0, help="Index of the joystick to use. Defaults to 0", ) @click.option( "-h", "--hostname", type=str, default=CONTROLLER_HTTP_HOSTNAME_DEFAULT, help="Hostname of the controller. Defaults to 'localhost'", ) @click.option( "-p", "--port", type=int, default=CONTROLLER_HTTP_PORT_DEFAULT, help=f"HTTP port. Defaults to '{CONTROLLER_HTTP_PORT_DEFAULT}'", ) @click.option( "-s", "--https", is_flag=True, default=False, help="Use HTTPS protocol instead of HTTP", ) @click.option( "-d", "--dotbot-address", type=str, default=DOTBOT_ADDRESS_DEFAULT, help=f"Address in hex of the DotBot to control. Defaults to {DOTBOT_ADDRESS_DEFAULT:>0{16}}", ) @click.option( "-a", "--application", type=click.Choice(["dotbot", "sailbot"]), default=DOTBOT_APPLICATION_DEFAULT, help=f"Application to control. Defaults to {DOTBOT_APPLICATION_DEFAULT}", ) @click.option( "--log-level", type=click.Choice(["debug", "info", "warning", "error"]), default="info", help="Logging level. Defaults to info", ) def main(joystick, hostname, port, https, dotbot_address, application, log_level): """DotBot joystick controller.""" asyncio.run( cli(joystick, hostname, port, https, dotbot_address, application, log_level) )
[docs] async def cli(joystick, hostname, port, https, dotbot_address, application, log_level): print(f"Welcome to the DotBots joystick interface (version: {pydotbot_version()}).") setup_logging(None, log_level, ["console"]) async with rest_client(hostname, port, https) as client: joystick_controller = JoystickController( joystick, client, dotbot_address, application, ) try: await joystick_controller.start() except (SystemExit, KeyboardInterrupt): sys.exit(0)
if __name__ == "__main__": main() # pragma: nocover, pylint: disable=no-value-for-parameter