Source code for dotbot.lighthouse2

# SPDX-FileCopyrightText: 2022-present Inria
# SPDX-FileCopyrightText: 2022-present Filip Maksimovic <filip.maksimovic@inria.fr>
# SPDX-FileCopyrightText: 2022-present Alexandre Abadie <alexandre.abadie@inria.fr>
#
# SPDX-License-Identifier: BSD-3-Clause

"""Module containing the API to convert LH2 raw data to relative positions."""

# pylint: disable=invalid-name,unspecified-encoding,no-member

import math
import os
import pickle
import sys
from ctypes import CDLL
from dataclasses import dataclass
from enum import Enum
from pathlib import Path
from typing import List, Optional

import cv2
import numpy as np

from dotbot.logger import LOGGER
from dotbot.models import DotBotCalibrationStateModel
from dotbot.protocol import PayloadLh2CalibrationHomography, PayloadLh2RawData

if sys.platform == "win32":
    LIB_EXT = "dll"
elif sys.platform == "darwin":
    LIB_EXT = "dylib"
else:
    LIB_EXT = "so"

LH2_LIB_PATH = os.path.join(os.path.dirname(__file__), "lib", f"lh2.{LIB_EXT}")
LH2_LIB = CDLL(LH2_LIB_PATH)
REFERENCE_POINTS_DEFAULT = [
    [-0.1, 0.1],
    [0.1, 0.1],
    [-0.1, -0.1],
    [0.1, -0.1],
]
CALIBRATION_DIR = Path.home() / ".pydotbot"


def _lh2_raw_data_to_counts(raw_data: PayloadLh2RawData, func: callable) -> List[int]:
    counts = [0] * 2
    pos_A = 0
    pos_B = 0
    for i in range(2):
        index = 0
        if raw_data.locations[i].polynomial_index in [0, 1]:
            index = pos_A
            pos_A += 1
        elif raw_data.locations[i].polynomial_index in [0, 1]:
            index = 2 + pos_B
            pos_B += 1
        if index > 1:
            continue
        counts[index] = func(
            raw_data.locations[i].polynomial_index,
            raw_data.locations[i].bits >> (47 - raw_data.locations[i].offset),
        )
    return counts


[docs] def lh2_raw_data_to_counts(raw_data: PayloadLh2RawData) -> List[int]: """Convert bits sequence to an array of counts.""" return _lh2_raw_data_to_counts(raw_data, LH2_LIB.reverse_count_p)
[docs] def calculate_camera_point(count1, count2, poly_in): """Calculate camera points from counts.""" if poly_in < 2: period = 959000 if poly_in > 1: period = 957000 a1 = (count1 * 8 / period) * 2 * math.pi a2 = (count2 * 8 / period) * 2 * math.pi cam_x = -math.tan(0.5 * (a1 + a2)) if count1 < count2: cam_y = -math.sin(a2 / 2 - a1 / 2 - 60 * math.pi / 180) / math.tan(math.pi / 6) else: cam_y = -math.sin(a1 / 2 - a2 / 2 - 60 * math.pi / 180) / math.tan(math.pi / 6) return cam_x, cam_y
def _unitize(x_in, y_in): magnitude = np.sqrt(x_in**2 + y_in**2) return x_in / magnitude, y_in / magnitude
[docs] @dataclass class CalibrationData: """Class that stores calibration data.""" zeta: float random_rodriguez: np.array normal: np.array m: np.array
[docs] class LighthouseManagerState(Enum): """Enum for lighthouse manager internal state.""" NotCalibrated = 0 CalibrationInProgress = 1 Ready = 2 Calibrated = 3
[docs] class LighthouseManager: """Class to manage the LightHouse positionning state and workflow.""" def __init__(self): self.logger = LOGGER.bind(context=__name__) self.state = LighthouseManagerState.NotCalibrated self.reference_points = REFERENCE_POINTS_DEFAULT Path.mkdir(CALIBRATION_DIR, exist_ok=True) self.calibration_output_path = CALIBRATION_DIR / "calibration.out" self.calibration: PayloadLh2CalibrationHomography = self._load_calibration() self.calibration_points = np.zeros( (2, len(self.reference_points), 2), dtype=np.float64 ) self.calibration_points_available = [False] * len(self.reference_points) self.last_raw_data = None self.logger.info("Lighthouse initialized") @property def state_model(self) -> DotBotCalibrationStateModel: """Return the state as pydantic model.""" if self.state == LighthouseManagerState.CalibrationInProgress: return DotBotCalibrationStateModel(state="running") if self.state == LighthouseManagerState.Ready: return DotBotCalibrationStateModel(state="ready") if self.state == LighthouseManagerState.Calibrated: return DotBotCalibrationStateModel(state="done") return DotBotCalibrationStateModel(state="unknown") def _load_calibration(self) -> Optional[PayloadLh2CalibrationHomography]: if not os.path.exists(self.calibration_output_path): self.logger.info("No calibration file found") return None with open(self.calibration_output_path, "rb") as calibration_file: calibration = pickle.load(calibration_file) self.logger.info("Lighthouse calibration loaded") self.state = LighthouseManagerState.Calibrated return calibration
[docs] def add_calibration_point(self, index): """Register a new camera points for calibration.""" if self.last_raw_data is None: self.logger.warning("Missing raw data", index=index) return self.calibration_points_available[index] = True counts = lh2_raw_data_to_counts(self.last_raw_data) self.calibration_points[0][index] = np.asarray( calculate_camera_point( counts[0], counts[1], self.last_raw_data.locations[0].polynomial_index, ), dtype=np.float64, ) self.calibration_points[1][index] = np.asarray( calculate_camera_point( counts[0], counts[1], self.last_raw_data.locations[1].polynomial_index, ), dtype=np.float64, ) self.last_raw_data: PayloadLh2RawData = None if all(self.calibration_points_available) is False: self.state = LighthouseManagerState.CalibrationInProgress if all(self.calibration_points_available) is True: self.state = LighthouseManagerState.Ready self.logger.info("Calibration point added", index=index, state=self.state)
[docs] def compute_calibration(self): # pylint: disable=too-many-locals """Compute the calibration values and matrices.""" if self.state != LighthouseManagerState.Ready: self.logger.warning("Not ready, skipping calibration") return self.logger.info("Calibrating", points=self.calibration_points) camera_points = [[], []] for data in self.calibration_points[0]: camera_points[0].append(data) for data in self.calibration_points[1]: camera_points[1].append(data) camera_points_arr = np.asarray(camera_points, dtype=np.float64) homography_mat = cv2.findHomography( camera_points_arr[0][0 : len(camera_points[0])][:], camera_points_arr[1][0 : len(camera_points[1])][:], method=cv2.RANSAC, ransacReprojThreshold=0.001, )[0] _, S, V = np.linalg.svd(homography_mat) V = V.T s1 = S[0] / S[1] s3 = S[2] / S[1] zeta = s1 - s3 a1 = np.sqrt(1 - s3**2) b1 = np.sqrt(s1**2 - 1) a, b = _unitize(a1, b1) v1 = np.array(V[:, 0]) v3 = np.array(V[:, 2]) n = b * v1 + a * v3 if n[2] < 0: n = -n random_rodriguez = np.array( [ [ -n[1] / np.sqrt(n[0] * n[0] + n[1] * n[1]), n[0] / np.sqrt(n[0] * n[0] + n[1] * n[1]), 0, ], [ n[0] * n[2] / np.sqrt(n[0] * n[0] + n[1] * n[1]), n[1] * n[2] / np.sqrt(n[0] * n[0] + n[1] * n[1]), -np.sqrt(n[0] * n[0] + n[1] * n[1]), ], [-n[0], -n[1], -n[2]], ] ) pts_cam_new = np.hstack( (camera_points_arr[1], np.ones((len(camera_points_arr[1]), 1))) ) scales = (1 / zeta) / np.matmul(n, pts_cam_new.T) scales_matrix = np.vstack((scales, scales, scales)) final_points = scales_matrix * pts_cam_new.T final_points = final_points.T temporary_numpy_trash_heap = ( np.array([self.reference_points], dtype=np.float64) + 0.5 ) temporary_numpy_trash_heap_pt2 = temporary_numpy_trash_heap.squeeze() M, _ = cv2.findHomography( camera_points_arr[0], temporary_numpy_trash_heap_pt2, method=cv2.RANSAC, ransacReprojThreshold=0.001, ) calibration_data = CalibrationData(zeta, random_rodriguez, n, M) matrix_bytes = bytearray() for bytes_block in [ int(n * 1e6).to_bytes(4, "little", signed=True) for n in calibration_data.m.ravel() ]: matrix_bytes += bytes_block # Prepare homography matrix and send it to the robot self.calibration = PayloadLh2CalibrationHomography( index=0, homography_matrix=matrix_bytes, ) # Store calibration data as pickle for later reload with open(self.calibration_output_path, "wb") as output_file: pickle.dump(self.calibration, output_file) self.state = LighthouseManagerState.Calibrated self.logger.info("Calibration done", data=self.calibration)