Saltar a contenido

Pilot Module

Pilot

Bases: PilotABC, LoggerConsumerProtocol

Class for the Pilot handler.

This class defines the interface for a Pilot handler, which is responsible for
controlling the robot's movements.

Source code in devices\raspberry_pi_5\src\pilot\__init__.py
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
class Pilot(PilotABC, LoggerConsumerProtocol):
    """
    Class for the Pilot handler.

    This class defines the interface for a Pilot handler, which is responsible for
    controlling the robot's movements.
    """

    # Logger configuration
    LOGGER_TAG = "Pilot"

    # Start delay
    START_DELAY = 5

    # RPLidar wait delay
    RPLIDAR_WAIT_DELAY = 0.1

    # Update delay
    MOTOR_DELAY = 0.2
    SERVO_DELAY = 0.05
    GYROSCOPE_DELAY = 0.05

    # Wait timeout for the start event
    START_WAIT_TIMEOUT = 0.1

    def __init__(
        self,
        debug: bool,
        challenge: ValueCls,
        start_event: EventCls,
        parking_event: EventCls,
        stop_event: EventCls,
        completed_event: EventCls,
        rplidar_update_measures_event: EventCls,
        rplidar_measures_queue: Queue,
        serial_sender_messages_queue: Queue,
        writer_messages_queue: Queue,
        bno08x_yaw_deg: ValueCls,
        bno08x_turns: ValueCls,
        movement: bool = True,
        photographer_capture_image_event: Optional[EventCls] = None,
        detector_model_g_inferences_queue: Optional[Queue] = None,
        detector_model_m_inferences_queue: Optional[Queue] = None,
        detector_model_r_inferences_queue: Optional[Queue] = None,
    ):
        """
        Initialize the Pilot class.

        Args:
            debug (bool): Flag to indicate if the pilot is in debug mode.
            challenge (ValueCls): Shared value to hold the current challenge.
            start_event (EventCls): Event to signal when the pilot should start.
            parking_event (EventCls): Event to signal the parking state of the robot.
            stop_event (EventCls): Event to signal when the pilot should stop.
            completed_event (EventCls): Event to signal when the challenge has been completed successfully.
            rplidar_update_measures_event (EventCls): Event to signal when the RPLidar should update measures.
            rplidar_measures_queue (Queue): Queue to hold RPLidar measures.
            serial_sender_messages_queue (Queue): Queue to hold outgoing messages to the serial port.
            writer_messages_queue (Queue): Queue to hold log messages.
            bno08x_yaw_deg (ValueCls): Shared value for the BNO08X yaw angle in degrees.
            bno08x_turns (ValueCls): Shared value for the BNO08X turns.
            movement (bool): Flag to indicate if the pilot should handle movement.
            photographer_capture_image_event (Optional[EventCls]): Event to signal when the photographer should capture an image.
            detector_model_g_inferences_queue (Optional[Queue]): Queue for model G inferences.
            detector_model_m_inferences_queue (Optional[Queue]): Queue for model M inferences.
            detector_model_r_inferences_queue (Optional[Queue]): Queue for model R inferences.
        """
        # Initialize the debug flag
        self.__debug = debug

        # Initialize the values, queues and events
        self.__challenge = challenge
        self.__started_event = Event()
        self.__start_event = start_event
        self.__parking_event = parking_event
        self.__deleted_event = Event()
        self.__stop_event = stop_event
        self.__completed_event = completed_event
        self.__rplidar_update_measures_event = rplidar_update_measures_event
        self.__rplidar_measures_queue = rplidar_measures_queue
        self.__photographer_capture_image_event = photographer_capture_image_event
        self.__detector_model_g_inferences_queue = detector_model_g_inferences_queue
        self.__detector_model_m_inferences_queue = detector_model_m_inferences_queue
        self.__detector_model_r_inferences_queue = detector_model_r_inferences_queue
        self.__bno08x_yaw_deg = bno08x_yaw_deg
        self.__bno08x_turns = bno08x_turns

        # Initialize the serial communication dispatcher
        self.__serial_dispatcher = SerialDispatcher(
            serial_sender_messages_queue
            )

        # Initialize the logger
        self.__logger = Logger(
            writer_messages_queue,
            tag=self.LOGGER_TAG,
            debug=self.__debug
            )

        # Initialize the reentrant lock
        self.__rlock = RLock()

        # Initialize the motor speed, servo angle and movement state
        self.__motor_speed = 0.0
        self.__servo_angle = SERVO_CENTER_ANGLE
        self.__movement = movement

        # Initialize the turn flags
        self.__turn_direction = TurnDirection.NONE
        self.__is_turning = False

        # Initialize the updated flags
        self.__motor_speed_updated = False
        self.__servo_angle_updated = False

        # Initialize the average distances dictionary
        self.__average_distances = {direction: 0.0 for direction in CardinalDirection}

    @final
    @property
    def logger(self) -> Logger:
        return self.__logger

    @final
    def _set_motor_speed(self, speed: float):
        # Check if the speed is the same as the current speed
        if self.__motor_speed == speed:
            return

        # Check if the speed is within the full range
        self._check_motor_speed_full_range(speed)
        self.__motor_speed = speed

        # Send the speed message to the serial communication
        if self.__movement:
            self.__serial_dispatcher.send_motor_speed_message(
                self.__motor_speed
            )
            self.__motor_speed_updated = True

        # Log
        self.__logger.info(f"Set motor speed to: {speed}")

    @final
    def _set_motor_stop(self):
        """
        Sets the speed of the ESC motor to 0.
        """
        self._set_motor_speed(0.0)

    @final
    def _set_motor_forward(self, speed: float):
        self._check_motor_speed_half_range(speed)
        self._set_motor_speed(speed)

    @final
    def _set_motor_backward(self, speed: float):
        self._check_motor_speed_half_range(speed)
        self._set_motor_speed(-speed)

    @final
    def _set_servo_angle(self, angle: int):
        # Check if the angle is the same as the current angle
        if self.__servo_angle == angle:
            return

        # Check if the angle is within the valid range
        self._check_servo_angle(angle)
        self.__servo_angle = angle

        if self.__movement:
            self.__serial_dispatcher.send_servo_angle_message(
                self.__servo_angle
            )
            self.__servo_angle_updated = True

        # Log
        self.__logger.info(f"Set servo angle to: {angle}deg")

    @final
    def _set_servo_angle_relative_to_center(self, relative_angle: int):
        if not (SERVO_LEFT_LIMIT <= relative_angle <= SERVO_RIGHT_LIMIT):
            raise ValueError(
                f"Relative angle must be between {SERVO_LEFT_LIMIT} and"
                f" {SERVO_RIGHT_LIMIT} degrees"
            )

        self._set_servo_angle(SERVO_CENTER_ANGLE + relative_angle)

    @final
    def _set_servo_to_center(self):
        self._set_servo_angle(SERVO_CENTER_ANGLE)

    @final
    def _set_servo_to_right(self, angle: int):
        if not 0 < angle <= SERVO_RIGHT_LIMIT:
            raise ValueError(
                f"Angle must be between 0 and {SERVO_RIGHT_LIMIT} degrees for right movement"
            )

        self._set_servo_angle(SERVO_CENTER_ANGLE - angle)

    @final
    def _is_servo_to_right(self) -> bool:
        return self.__servo_angle < SERVO_CENTER_ANGLE

    @final
    def _set_servo_to_left(self, angle: int):
        if not 0 < angle <= abs(SERVO_LEFT_LIMIT):
            raise ValueError(
                f"Angle must be between 0 and {abs(SERVO_LEFT_LIMIT)} degrees for left movement"
            )

        self._set_servo_angle(SERVO_CENTER_ANGLE + angle)

    @final
    def _is_servo_to_left(self) -> bool:
        return self.__servo_angle > SERVO_CENTER_ANGLE

    @final
    def _set_servo_to_opposite(self, angle: int):
        if angle == SERVO_CENTER_ANGLE:
            return

        # Calculate the opposite angle
        angle_diff = abs(self.__servo_angle - SERVO_CENTER_ANGLE)
        if self._is_servo_to_right():
            self._set_servo_to_left(angle_diff)
        elif self._is_servo_to_left():
            self._set_servo_to_right(angle_diff)

    @final
    def _get_rplidar_measures(self) -> Dict[int, Measure] | None:
        # Set the event to signal that the RPLidar should update measures
        self.__rplidar_update_measures_event.set()

        # Get the measures from the queue
        while not self.__stop_event.is_set() and not self.__deleted_event.is_set():
            try:
                return self.__rplidar_measures_queue.get(
                    timeout=self.RPLIDAR_WAIT_DELAY
                    )

            except Empty:
                continue

    @final
    def _update_rplidar_average_distances(self) -> None:
        # Get the RPLidar measures
        measures = self._get_rplidar_measures()
        if measures is None:
            raise TimeoutError(
                "RPLidar measures could not be retrieved within the timeout."
            )

        # Calculate the average north, west and east distances
        self.__average_distances = self._calculate_average_distance(
            measures,
            *CardinalDirection
        )

    def _calculate_sleep_delay(
            self,
            start_time: float
        ) -> float:
        """
        Calculate the sleep delay based on the start time and the update delay.

        Args:
            start_time (float): The start time in seconds.
        Returns:
            float: The calculated sleep delay.
        """
        # Determine the update delay based on the commands sent
        if self.__motor_speed_updated:
            update_delay = self.MOTOR_DELAY
        elif self.__servo_angle_updated:
            update_delay = self.SERVO_DELAY
        else:
            update_delay = self.GYROSCOPE_DELAY

        # Reset the updated flags
        self.__motor_speed_updated = False
        self.__servo_angle_updated = False

        # Calculate the elapsed time since the start time
        elapsed_time = monotonic() - start_time
        delay = update_delay - elapsed_time
        return 0.0 if delay < 0 else delay

    def _sleep(self, start_time: float):
        """
        Sleep for the calculated delay based on the start time.

        Args:
            start_time (float): The start time in seconds.
        """
        sleep(self._calculate_sleep_delay(start_time))


    def _get_distance(
        self,
        direction: CardinalDirection,
    ) -> float:
        """
        Get the distance for a specific direction from the measures.

        Args:
            direction (CardinalDirection): The direction to get the distance for.
        Returns:
            float: The distance for the specified direction.
        """
        return self.__average_distances.get(direction, 0.0)

    @final
    def _challenge_with_obstacles(self):
        raise NotImplementedError(
            "Challenge with obstacles is not implemented yet."
        )

    @final
    def _challenge_without_obstacles(self):
        # Initialize the last turns
        last_turns = self.__bno08x_turns.value

        # Turns counter with RPLidar 
        rplidar_turns_counter = 0

        while not self.__stop_event.is_set() and not self.__deleted_event.is_set():
            # Get the start time
            start_time = monotonic()

            # Get the average distances from the RPLidar
            self._update_rplidar_average_distances()
            west_avg_dist = self._get_distance(CardinalDirection.WEST)
            east_avg_dist = self._get_distance(CardinalDirection.EAST)
            north_avg_dist = self._get_distance(CardinalDirection.NORTH)
            north_northeast_avg_dist = self._get_distance(CardinalDirection.NORTH_NORTHEAST)
            north_northwest_avg_dist = self._get_distance(CardinalDirection.NORTH_NORTHWEST)
            self.__logger.debug(
                f"North: {north_avg_dist}, West: {west_avg_dist}, East: {east_avg_dist}"
            )

            # Check if one of them is 0
            if (north_avg_dist == 0 or
                    west_avg_dist == 0 or east_avg_dist == 0):
                self.__logger.warning(
                    "One of the average distances is 0. This may cause unexpected behavior. Waiting for new measures..."
                )

                # Sleep
                self._sleep(start_time)
                continue

            # Check if the front distance is below the safety threshold
            if north_avg_dist < SAFETY_FRONT_DISTANCE_START_THRESHOLD:
                # Store the current servo angle
                previous_servo_angle = self.__servo_angle
                previous_motor_speed = self.__motor_speed

                # Log the warning
                self.__logger.warning(
                    f"Front distance {north_avg_dist} is below the safety threshold."
                )
                self._set_servo_to_center()
                self._set_motor_backward(MOTOR_SPEED_NORMAL)

                # Sleep for a short time before checking again
                self._sleep(start_time)

                while not self.__stop_event.is_set() and not self.__deleted_event.is_set():
                    # Update the start time
                    start_time = monotonic()

                    # Get the average distances again
                    self._update_rplidar_average_distances()
                    north_avg_dist = self.__average_distances[CardinalDirection.NORTH]

                    if north_avg_dist >= SAFETY_FRONT_DISTANCE_STOP_THRESHOLD:
                        # Stop the motor
                        self._set_motor_stop()
                        self.__logger.info(
                            "Safety front distance threshold reached. Stopping the robot."
                        )
                        break

                    # Sleep for a short time before checking again
                    self._sleep(start_time)

                # Log the recovery
                self.__logger.info(
                    "Front distance is now safe. Recovering the robot."
                )

                # Update the start time
                start_time = monotonic()

                # Set previous servo angle and motor speed back to normal
                if self.__is_turning:
                    self._set_servo_angle(previous_servo_angle)
                else:
                    self._set_servo_to_opposite(previous_servo_angle)
                self._set_motor_speed(previous_motor_speed)

                # Sleep for a short time before continuing
                self._sleep(start_time)
                continue


            # Check for the current turn and center the servo if necessary
            if self.__is_turning:
                # Check if the front distance is safe to stop turning
                turns = self.__bno08x_turns.value
                if turns > last_turns:
                    self.__logger.info(
                        f"Detected a turn. Current turns: {turns}, Last turns: {last_turns}"
                    )
                    self._set_servo_to_center()
                    self.__is_turning = False

                    # Update for the next check
                    last_turns = turns

                elif north_avg_dist >= FRONT_STOP_TURN_DISTANCE_THRESHOLD:
                    self.__logger.info(
                        "Front distance is safe. Stopping the turning state."
                    )
                    self._set_servo_to_center()
                    self.__is_turning = False

                # Sleep
                self._sleep(start_time)
                continue

            # Check if it's almost time to stop
            if last_turns >= TURNS or rplidar_turns_counter >= TURNS:
                self._set_servo_to_center()
                self._set_motor_speed(MOTOR_SPEED_NORMAL)

                while not self.__stop_event.is_set() and not self.__deleted_event.is_set():
                    # Update the start time
                    start_time = monotonic()

                    # Get the average distances
                    self._update_rplidar_average_distances()
                    north_avg_dist = self.__average_distances[CardinalDirection.NORTH]

                    if north_avg_dist <= STOP_DISTANCE_THRESHOLD:
                        # Set the completed event
                        self.__completed_event.set()

                        # Stop the motor
                        self._set_motor_stop()

                        self.__logger.info(
                            "Challenge completed successfully. Stopping the robot."
                        )

                        # Sleep for a short time before exiting
                        self._sleep(start_time)
                        return

                    # Sleep for a short time before checking again
                    self._sleep(start_time)

            # Check if the robot should move forward or turn
            if north_avg_dist >= FRONT_START_TURN_DISTANCE_THRESHOLD:
                if (north_northeast_avg_dist >= SIDE_DISTANCE_THRESHOLD and
                        north_northwest_avg_dist >= SIDE_DISTANCE_THRESHOLD):
                    self._set_motor_speed(MOTOR_SPEED_FAST)
                else:
                    self._set_motor_speed(MOTOR_SPEED_NORMAL)

                # Check if the servo should make a little turn to the left or right in order to center the robot
                if east_avg_dist >= west_avg_dist * (
                        1 + SIDE_DISTANCE_DIFFERENCE_PERCENTAGE):
                    self._set_servo_to_right(SERVO_SMALL_TURN_ANGLE)

                elif west_avg_dist >= east_avg_dist * (
                        1 + SIDE_DISTANCE_DIFFERENCE_PERCENTAGE):
                    self._set_servo_to_left(SERVO_SMALL_TURN_ANGLE)

                else:
                    self._set_servo_to_center()

            else:
                self._set_motor_speed(MOTOR_SPEED_NORMAL)

                # Check if the turn direction is set
                if self.__turn_direction == TurnDirection.RIGHT:
                    if east_avg_dist >= FRONT_START_TURN_DISTANCE_THRESHOLD:
                        self._set_servo_to_right(SERVO_BIG_TURN_ANGLE)
                        self.__is_turning = True
                        rplidar_turns_counter += 1

                elif self.__turn_direction == TurnDirection.LEFT:
                    if west_avg_dist >= FRONT_START_TURN_DISTANCE_THRESHOLD:
                        self._set_servo_to_left(SERVO_BIG_TURN_ANGLE)
                        self.__is_turning = True
                        rplidar_turns_counter += 1

                elif self.__turn_direction == TurnDirection.NONE:
                    # Check if the robot should turn left or right based on the side distances
                    if east_avg_dist >= SIDE_DISTANCE_THRESHOLD:
                        self._set_servo_to_right(SERVO_BIG_TURN_ANGLE)
                        self.__turn_direction = TurnDirection.RIGHT
                        self.__is_turning = True
                        rplidar_turns_counter += 1

                    elif west_avg_dist >= SIDE_DISTANCE_THRESHOLD:
                        self._set_servo_to_left(SERVO_BIG_TURN_ANGLE)
                        self.__turn_direction = TurnDirection.LEFT
                        self.__is_turning = True
                        rplidar_turns_counter += 1

            # Sleep for the calculated delay
            self._sleep(start_time)

    @final
    def _start(self):
        with self.__rlock:
            # Check if the stop event is set
            if self.__stop_event.is_set():
                raise RuntimeError(
                    "Stop event is set. Pilot will not run."
                )

            # Check if the Pilot is already running
            if self.__started_event.is_set():
                raise RuntimeError(
                    "Pilot is already running. Cannot start again."
                )

            # Set the started event to signal that the Pilot has started
            self.__started_event.set()

        # Log
        self.__logger.info("Initialized.")

    @final
    def _stop(self):
        with self.__rlock:
            # Clear the started event
            self.__started_event.clear()

            # Clear the deleted event
            self.__deleted_event.clear()

            # Set the stop event
            self.__stop_event.set()

            # Clear the motor speed, servo angle and turning flag
            self.__motor_speed = 0.0
            self.__servo_angle = SERVO_CENTER_ANGLE

            # Clear the turn flags
            self.__turn_direction = TurnDirection.NONE
            self.__is_turning = False

            # Clear the updated flags
            self.__motor_speed_updated = False
            self.__servo_angle_updated = False

            # Clear the average distances dictionary
            self.__average_distances = {direction: 0.0 for direction in
                                        CardinalDirection}

        # Log
        self.__logger.info("Stopped.")

    @final
    @ignore_sigint
    @log_on_error()
    def run(self):
        # Start the pilot
        self._start()

        # Wait for the start event to be set
        self.__logger.info("Waiting for the start event...")
        while not self.__stop_event.is_set() and not self.__deleted_event.is_set():
            if self.__start_event.wait(timeout=self.START_WAIT_TIMEOUT):
                break
        if self.__stop_event.is_set() or self.__deleted_event.is_set():
            # Stop the Pilot if the stop or deleted event is set
            self._stop()
            return
        self.__logger.info("Started.")

        try:
            # Start the corresponding challenge handler
            if self.__challenge.value == Challenge.WITHOUT_OBSTACLES.as_char:
                self._challenge_without_obstacles()
            elif self.__challenge.value == Challenge.WITH_OBSTACLES.as_char:
                self._challenge_with_obstacles()
            else:
                raise ValueError(
                    f"Unknown challenge: {self.__challenge.value}"
                )

            # Stop the Pilot
            self._stop()

        except Exception as e:
            # Stop the Pilot in case of an exception
            self._stop()
            raise e

    def __del__(self):
        """
        Destructor to clean up resources when the Pilot is no longer needed.
        """
        self.__deleted_event.set()

        # Log
        self.__logger.info(
            "Instance is being deleted. Resources will be cleaned up."
        )

__del__()

Destructor to clean up resources when the Pilot is no longer needed.

Source code in devices\raspberry_pi_5\src\pilot\__init__.py
644
645
646
647
648
649
650
651
652
653
def __del__(self):
    """
    Destructor to clean up resources when the Pilot is no longer needed.
    """
    self.__deleted_event.set()

    # Log
    self.__logger.info(
        "Instance is being deleted. Resources will be cleaned up."
    )

__init__(debug, challenge, start_event, parking_event, stop_event, completed_event, rplidar_update_measures_event, rplidar_measures_queue, serial_sender_messages_queue, writer_messages_queue, bno08x_yaw_deg, bno08x_turns, movement=True, photographer_capture_image_event=None, detector_model_g_inferences_queue=None, detector_model_m_inferences_queue=None, detector_model_r_inferences_queue=None)

Initialize the Pilot class.

Parameters:

Name Type Description Default
debug bool

Flag to indicate if the pilot is in debug mode.

required
challenge Value

Shared value to hold the current challenge.

required
start_event Event

Event to signal when the pilot should start.

required
parking_event Event

Event to signal the parking state of the robot.

required
stop_event Event

Event to signal when the pilot should stop.

required
completed_event Event

Event to signal when the challenge has been completed successfully.

required
rplidar_update_measures_event Event

Event to signal when the RPLidar should update measures.

required
rplidar_measures_queue Queue

Queue to hold RPLidar measures.

required
serial_sender_messages_queue Queue

Queue to hold outgoing messages to the serial port.

required
writer_messages_queue Queue

Queue to hold log messages.

required
bno08x_yaw_deg Value

Shared value for the BNO08X yaw angle in degrees.

required
bno08x_turns Value

Shared value for the BNO08X turns.

required
movement bool

Flag to indicate if the pilot should handle movement.

True
photographer_capture_image_event Optional[Event]

Event to signal when the photographer should capture an image.

None
detector_model_g_inferences_queue Optional[Queue]

Queue for model G inferences.

None
detector_model_m_inferences_queue Optional[Queue]

Queue for model M inferences.

None
detector_model_r_inferences_queue Optional[Queue]

Queue for model R inferences.

None
Source code in devices\raspberry_pi_5\src\pilot\__init__.py
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
def __init__(
    self,
    debug: bool,
    challenge: ValueCls,
    start_event: EventCls,
    parking_event: EventCls,
    stop_event: EventCls,
    completed_event: EventCls,
    rplidar_update_measures_event: EventCls,
    rplidar_measures_queue: Queue,
    serial_sender_messages_queue: Queue,
    writer_messages_queue: Queue,
    bno08x_yaw_deg: ValueCls,
    bno08x_turns: ValueCls,
    movement: bool = True,
    photographer_capture_image_event: Optional[EventCls] = None,
    detector_model_g_inferences_queue: Optional[Queue] = None,
    detector_model_m_inferences_queue: Optional[Queue] = None,
    detector_model_r_inferences_queue: Optional[Queue] = None,
):
    """
    Initialize the Pilot class.

    Args:
        debug (bool): Flag to indicate if the pilot is in debug mode.
        challenge (ValueCls): Shared value to hold the current challenge.
        start_event (EventCls): Event to signal when the pilot should start.
        parking_event (EventCls): Event to signal the parking state of the robot.
        stop_event (EventCls): Event to signal when the pilot should stop.
        completed_event (EventCls): Event to signal when the challenge has been completed successfully.
        rplidar_update_measures_event (EventCls): Event to signal when the RPLidar should update measures.
        rplidar_measures_queue (Queue): Queue to hold RPLidar measures.
        serial_sender_messages_queue (Queue): Queue to hold outgoing messages to the serial port.
        writer_messages_queue (Queue): Queue to hold log messages.
        bno08x_yaw_deg (ValueCls): Shared value for the BNO08X yaw angle in degrees.
        bno08x_turns (ValueCls): Shared value for the BNO08X turns.
        movement (bool): Flag to indicate if the pilot should handle movement.
        photographer_capture_image_event (Optional[EventCls]): Event to signal when the photographer should capture an image.
        detector_model_g_inferences_queue (Optional[Queue]): Queue for model G inferences.
        detector_model_m_inferences_queue (Optional[Queue]): Queue for model M inferences.
        detector_model_r_inferences_queue (Optional[Queue]): Queue for model R inferences.
    """
    # Initialize the debug flag
    self.__debug = debug

    # Initialize the values, queues and events
    self.__challenge = challenge
    self.__started_event = Event()
    self.__start_event = start_event
    self.__parking_event = parking_event
    self.__deleted_event = Event()
    self.__stop_event = stop_event
    self.__completed_event = completed_event
    self.__rplidar_update_measures_event = rplidar_update_measures_event
    self.__rplidar_measures_queue = rplidar_measures_queue
    self.__photographer_capture_image_event = photographer_capture_image_event
    self.__detector_model_g_inferences_queue = detector_model_g_inferences_queue
    self.__detector_model_m_inferences_queue = detector_model_m_inferences_queue
    self.__detector_model_r_inferences_queue = detector_model_r_inferences_queue
    self.__bno08x_yaw_deg = bno08x_yaw_deg
    self.__bno08x_turns = bno08x_turns

    # Initialize the serial communication dispatcher
    self.__serial_dispatcher = SerialDispatcher(
        serial_sender_messages_queue
        )

    # Initialize the logger
    self.__logger = Logger(
        writer_messages_queue,
        tag=self.LOGGER_TAG,
        debug=self.__debug
        )

    # Initialize the reentrant lock
    self.__rlock = RLock()

    # Initialize the motor speed, servo angle and movement state
    self.__motor_speed = 0.0
    self.__servo_angle = SERVO_CENTER_ANGLE
    self.__movement = movement

    # Initialize the turn flags
    self.__turn_direction = TurnDirection.NONE
    self.__is_turning = False

    # Initialize the updated flags
    self.__motor_speed_updated = False
    self.__servo_angle_updated = False

    # Initialize the average distances dictionary
    self.__average_distances = {direction: 0.0 for direction in CardinalDirection}

abstracts

PilotABC

Bases: ABC

Abstract class for the Pilot handler.

This class defines the interface for a Pilot handler, which is responsible for
controlling the robot's movements.

Source code in devices\raspberry_pi_5\src\pilot\abstracts.py
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
class PilotABC(ABC):
    """
    Abstract class for the Pilot handler.

    This class defines the interface for a Pilot handler, which is responsible for
    controlling the robot's movements.
    """

    @staticmethod
    def _check_motor_speed_half_range(speed: float):
        """
        Check the speed value to ensure it is within the valid half range for ESC motors.

        Args:
            speed (float): The speed value to check.

        Raises:
            ValueError: If the speed is not within the valid half range.
        """
        if not (0 < speed <= MOTOR_SPEED_RANGE[1]):
            raise ValueError(
                f"Speed must be between 0 and {MOTOR_SPEED_RANGE[1]}"
            )

    @staticmethod
    def _check_motor_speed_full_range(speed: float):
        """
        Check the speed value to ensure it is within the valid full range for ESC motors.

        Args:
            speed (float): The speed value to check.

        Raises:
            ValueError: If the speed is not within the valid full range.
        """
        if not (MOTOR_SPEED_RANGE[0] <= speed <= MOTOR_SPEED_RANGE[1]):
            raise ValueError(
                f"Speed must be between {MOTOR_SPEED_RANGE[0]} and {MOTOR_SPEED_RANGE[1]}"
            )

    @classmethod
    def _calculate_average_distance_from_angle(
        cls,
        measures: Dict[int, Measure],
        middle_angle: int,
        width: int = ANGLE_WIDTH
    ) -> float:
        """
        Calculate the average distance for a given list of angles.

        Args:
            measures (Dict[int, Measure]): Dictionary of measures indexed by angle.
            middle_angle (int): The middle angle to start the averaging from.
            width (int): The sum of the angles to consider with both sides and the middle angle.
        Returns:
            float: The average distance for the specified angles.
        Raises:
            ValueError: If the width is not an even number or if it is greater than or equal to 360 degrees.
        """
        # Initialize total distance and count
        total_distance = 0.0
        count = 0

        # Calculate the range of angles to consider
        if width % 2 == 0:
            raise ValueError("Width must be an odd number.")
        if width < 1:
            raise ValueError("Width must be greater than 0.")
        if width >= 360:
            raise ValueError("Width must be less than 360 degrees.")

        # Check if the width is 1, in which case we only consider the middle angle
        if width == 1:
            return measures.get(
                middle_angle,
                Measure(middle_angle, 0.0, 0)
                ).distance

        # Calculate the angles to consider
        angles = []
        width_per_side = (width - 1) // 2
        left_angle = middle_angle - width_per_side
        right_angle = middle_angle + width_per_side
        if left_angle < 0:
            angles = [
                *angles,
                *range(360 + left_angle, 360)
            ]
        if right_angle >= 360:
            angles = [
                *angles,
                *range(0, right_angle - 360 + 1)
            ]

        angles = [
            *angles,
            *range(
                max(left_angle, 0),
                min(360, right_angle + 1)
            )
        ]

        for angle in angles:
            measure = measures.get(angle, None)
            if measure is None:
                continue

            # Check the distance and quality
            if measure.distance == 0.0 or measure.quality == 0:
                continue

            total_distance += measure.distance
            count += 1
        return total_distance / count if count > 0 else 0.0

    @classmethod
    def _calculate_average_distance_from_direction(
        cls,
        measures: Dict[int, Measure],
        direction: CardinalDirection,
        width: int = ANGLE_WIDTH
    ) -> float:
        """
        Calculate the average distance for a given list of angles.

        Args:
            measures (Dict[int, Measure]): Dictionary of measures indexed by angle.
            width (int): The sum of the angles to consider with both sides and the middle angle.
            direction (CardinalDirection): Direction to calculate the average distance for.
        Returns:
            float: The average distance for the specified angles.
        Raises:
            ValueError: If the direction is not valid or no measures are found.
        """
        direction_angle = DIRECTION_TO_ANGLE.get(direction, None)
        if direction_angle is None:
            raise ValueError(f"No angle found for direction: {direction}")

        # Round the angle
        direction_angle = ceil(direction_angle) if direction_angle >= 180 else floor(direction_angle)

        return cls._calculate_average_distance_from_angle(
            measures,
            int(direction_angle),
            width
        )

    @classmethod
    def _calculate_average_distance(
        cls,
        measures: Dict[int, Measure],
        *directions: CardinalDirection,
    ) -> Dict[CardinalDirection, float]:
        """
        Calculate the average distances for the specified directions.

        Args:
            measures (Dict[int, Measure]): Dictionary of measures indexed by angle.
            *directions (CardinalDirection): Directions to calculate the average distances for.
        Returns:
            Dict[Direction, float]: Dictionary with directions as keys and their average distances as values.
        """
        avg_distances = {}
        for direction in directions:
            avg_distances[direction] = cls._calculate_average_distance_from_direction(
                measures, direction
            )
        return avg_distances

    @abstractmethod
    def logger(self) -> Logger:
        """
        Get the logger instance for the Pilot.

        Returns:
            Logger: The logger instance.
        """
        pass

    @abstractmethod
    def _set_motor_speed(self, speed: float):
        """
        Sets the speed of the ESC motor.

        Args:
            speed (float): Speed value between -1.0 (full reverse) and 1.0 (full forward).
        """
        pass

    @abstractmethod
    def _set_motor_stop(self):
        """
        Sets the speed of the ESC motor to 0.
        """
        pass

    @abstractmethod
    def _set_motor_forward(self, speed: float):
        """
        Sets the speed of the ESC motor forward.

        Args:
            speed (float): Speed value between 0 and 1.0 (full forward).
        """
        pass

    @abstractmethod
    def _set_motor_backward(self, speed: float):
        """
        Sets the speed of the ESC motor backward.

        Args:
            speed (float): Speed value between 0 and 1.0 (full backward).
        """
        pass

    @staticmethod
    def _check_servo_angle(angle: int):
        """
        Checks if the servo motor angle is within the valid range.

        Args:
            angle (int): Angle value to check.

        Raises:
            ValueError: If the angle is not within the valid range.
        """
        if not 0 <= angle <= SERVO_ACTUATION_RANGE:
            raise ValueError(
                f"Angle must be between 0 and {SERVO_ACTUATION_RANGE} degrees"
            )

    @abstractmethod
    def _set_servo_angle(self, angle: int):
        """
        Sets the angle of the servo motor.

        Args:
            angle (int): Angle value between 0 and the actuation range.

        Raises:
            ValueError: If the angle is not within the valid range.
        """
        pass

    @abstractmethod
    def _set_servo_angle_relative_to_center(self, relative_angle: int):
        """
        Sets the angle of the servo motor relative to the center
        position.

        Args:
            relative_angle (int): Relative angle value between -90 and 90 degrees.

        Raises:
            ValueError: If the relative angle is not within the left and right
            limits.
        """
        pass

    @abstractmethod
    def _set_servo_to_center(self):
        """
        Centers the servo motor to the middle position.
        """
        pass

    @abstractmethod
    def _set_servo_to_right(self, angle: int):
        """
        Sets the servo motor to the right by a specified angle.

        Args:
            angle (int): Angle value to move the servo to the right, must be between 0 and right limit.

        Raises:
            ValueError: If the angle is not within the right limit.
        """
        pass

    @abstractmethod
    def _is_servo_to_right(self) -> bool:
        """
        Check if the servo is in the right position.

        Returns:
            bool: True if the servo is in the right position, False otherwise.
        """
        pass

    @abstractmethod
    def _set_servo_to_left(self, angle: int):
        """
        Sets the servo motor to the left by a specified angle.

        Args:
            angle (int): Angle value to move the servo to the left, must be between 0 and left limit.

        Raises:
            ValueError: If the angle is not within the left limit.
        """
        pass

    @abstractmethod
    def _is_servo_to_right(self) -> bool:
        """
        Check if the servo is in the right position.

        Returns:
            bool: True if the servo is in the right position, False otherwise.
        """
        pass

    @abstractmethod
    def _set_servo_to_opposite(self, angle: int):
        """
        Sets the servo angle to the opposite direction.

        Args:
            angle (int): The angle to set the servo to.
        """
        pass

    @abstractmethod
    def _get_rplidar_measures(self) -> Dict[int, Measure]:
        """
        Gets the RPLidar measures.

        Returns:
            Dict[int, Measure]: A dictionary containing the RPLidar measures.
        Raises:
            TimeoutError: If the RPLidar measures cannot be retrieved within a timeout.
        """
        pass

    @abstractmethod
    def _update_rplidar_average_distances(self) -> None:
        """
        Updates the average distances from the RPLidar measures.
        """
        pass

    @abstractmethod
    def _challenge_without_obstacles(self):
        """
        Handles the challenge without obstacles.
        """
        pass

    @abstractmethod
    def _challenge_with_obstacles(self):
        """
        Handles the challenge with obstacles.
        """
        pass

    @abstractmethod
    def _start(self):
        """
        Starts the pilot handler.

        Raises:
            RuntimeError: If the pilot handler cannot be started.
        """
        pass

    @abstractmethod
    def _stop(self):
        """
        Stops the pilot handler.
        """
        pass

    @abstractmethod
    def run(self):
        """
        Runs the pilot handler.
        """
        pass

logger() abstractmethod

Get the logger instance for the Pilot.

Returns:

Name Type Description
Logger Logger

The logger instance.

Source code in devices\raspberry_pi_5\src\pilot\abstracts.py
185
186
187
188
189
190
191
192
193
@abstractmethod
def logger(self) -> Logger:
    """
    Get the logger instance for the Pilot.

    Returns:
        Logger: The logger instance.
    """
    pass

run() abstractmethod

Runs the pilot handler.

Source code in devices\raspberry_pi_5\src\pilot\abstracts.py
389
390
391
392
393
394
@abstractmethod
def run(self):
    """
    Runs the pilot handler.
    """
    pass

enums

CardinalDirection

Bases: Enum

Enum to represent the different cardinal directions that the RPLidar can face.

Source code in devices\raspberry_pi_5\src\pilot\enums.py
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
@unique
class CardinalDirection(Enum):
    """
    Enum to represent the different cardinal directions that the RPLidar can face.
    """
    NORTH = 1
    WEST = 2
    EAST = 3
    SOUTH = 4
    NORTHWEST = 5
    NORTHEAST = 6
    SOUTHWEST = 7
    SOUTHEAST = 8
    WEST_NORTHWEST = 9
    NORTH_NORTHWEST = 10
    EAST_NORTHEAST = 11
    NORTH_NORTHEAST = 12
    WEST_SOUTHWEST = 13
    EAST_SOUTHEAST = 14
    SOUTH_SOUTHWEST = 15
    SOUTH_SOUTHEAST = 16

    def get_name(self) -> str:
        """
        Get the direction name in lowercase.

        Returns:
            str: The direction name in lowercase.
        """
        return self.name.lower()

    @classmethod
    def from_string(cls, direction_str: str) -> 'CardinalDirection':
        """
        Convert a string to a Direction enum value.

        Args:
            direction_str (str): The string representation of the direction.
        Returns:
            CardinalDirection: The corresponding Direction enum value.
        """
        return map_string_to_enum(direction_str.upper(), cls)

from_string(direction_str) classmethod

Convert a string to a Direction enum value.

Parameters:

Name Type Description Default
direction_str str

The string representation of the direction.

required

Returns:
CardinalDirection: The corresponding Direction enum value.

Source code in devices\raspberry_pi_5\src\pilot\enums.py
37
38
39
40
41
42
43
44
45
46
47
@classmethod
def from_string(cls, direction_str: str) -> 'CardinalDirection':
    """
    Convert a string to a Direction enum value.

    Args:
        direction_str (str): The string representation of the direction.
    Returns:
        CardinalDirection: The corresponding Direction enum value.
    """
    return map_string_to_enum(direction_str.upper(), cls)

get_name()

Get the direction name in lowercase.

Returns:

Name Type Description
str str

The direction name in lowercase.

Source code in devices\raspberry_pi_5\src\pilot\enums.py
28
29
30
31
32
33
34
35
def get_name(self) -> str:
    """
    Get the direction name in lowercase.

    Returns:
        str: The direction name in lowercase.
    """
    return self.name.lower()

TurnDirection

Bases: Enum

Enum to represent the direction of a turn.

Source code in devices\raspberry_pi_5\src\pilot\enums.py
49
50
51
52
53
54
55
56
class TurnDirection(Enum):
    """
    Enum to represent the direction of a turn.
    """

    LEFT = 1
    RIGHT = 2
    NONE = 3

multiprocessing

pilot_target(debug, challenge, start_event, parking_event, stop_event, completed_event, rplidar_update_measures_event, rplidar_measures_queue, serial_sender_messages_queue, writer_messages_queue, bno08x_yaw_deg, bno08x_turns, movement=True, photographer_capture_image_event=None, detector_model_g_inferences_queue=None, detector_model_m_inferences_queue=None, detector_model_r_inferences_queue=None)

Target function for a multiprocessing process that handles the Pilot.

Parameters:

Name Type Description Default
debug bool

Flag to indicate if the pilot is in debug mode.

required
challenge Value

Shared value to hold the current challenge.

required
start_event Event

Event to signal when the pilot should start.

required
parking_event Event

Event to signal the parking state of the robot.

required
stop_event Event

Event to signal when the pilot should stop.

required
completed_event Event

Event to signal when the challenge has been completed successfully.

required
rplidar_update_measures_event Event

Event to signal when the RPLidar should update measures.

required
rplidar_measures_queue Queue

Queue to hold RPLidar measures.

required
serial_sender_messages_queue Queue

Queue to hold outgoing messages to the serial port.

required
writer_messages_queue Queue

Queue to hold log messages.

required
bno08x_yaw_deg Value

Shared value for the BNO08X yaw angle in degrees.

required
bno08x_turns Value

Shared value for the BNO08X turns.

required
movement bool

Flag to indicate if the pilot should handle movement.

True
photographer_capture_image_event Optional[Event]

Event to signal when the photographer should capture an image.

None
detector_model_g_inferences_queue Optional[Queue]

Queue for model G inferences.

None
detector_model_m_inferences_queue Optional[Queue]

Queue for model M inferences.

None
detector_model_r_inferences_queue Optional[Queue]

Queue for model R inferences.

None
Source code in devices\raspberry_pi_5\src\pilot\multiprocessing.py
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
def pilot_target(
    debug: bool,
    challenge: ValueCls,
    start_event: EventCls,
    parking_event: EventCls,
    stop_event: EventCls,
    completed_event: EventCls,
    rplidar_update_measures_event: EventCls,
    rplidar_measures_queue: Queue,
    serial_sender_messages_queue: Queue,
    writer_messages_queue: Queue,
    bno08x_yaw_deg: ValueCls,
    bno08x_turns: ValueCls,
    movement: bool = True,
    photographer_capture_image_event: Optional[EventCls] = None,
    detector_model_g_inferences_queue: Optional[Queue] = None,
    detector_model_m_inferences_queue: Optional[Queue] = None,
    detector_model_r_inferences_queue: Optional[Queue] = None,
) -> None:
    """
    Target function for a multiprocessing process that handles the Pilot.

    Args:
        debug (bool): Flag to indicate if the pilot is in debug mode.
        challenge (ValueCls): Shared value to hold the current challenge.
        start_event (EventCls): Event to signal when the pilot should start.
        parking_event (EventCls): Event to signal the parking state of the robot.
        stop_event (EventCls): Event to signal when the pilot should stop.
        completed_event (EventCls): Event to signal when the challenge has been completed successfully.
        rplidar_update_measures_event (Event): Event to signal when the RPLidar should update measures.
        rplidar_measures_queue (Queue): Queue to hold RPLidar measures.
        serial_sender_messages_queue (Queue): Queue to hold outgoing messages to the serial port.
        writer_messages_queue (Queue): Queue to hold log messages.
        bno08x_yaw_deg (ValueCls): Shared value for the BNO08X yaw angle in degrees.
        bno08x_turns (ValueCls): Shared value for the BNO08X turns.
        movement (bool): Flag to indicate if the pilot should handle movement.
        photographer_capture_image_event (Optional[EventCls]): Event to signal when the photographer should capture an image.
        detector_model_g_inferences_queue (Optional[Queue]): Queue for model G inferences.
        detector_model_m_inferences_queue (Optional[Queue]): Queue for model M inferences.
        detector_model_r_inferences_queue (Optional[Queue]): Queue for model R inferences.
    """
    print(
        "Initializing Pilot in multiprocessing mode. Process ID: ",
        os.getpid()
    )

    # Initialize the Pilot
    pilot = Pilot(
        debug=debug,
        challenge=challenge,
        start_event=start_event,
        parking_event=parking_event,
        stop_event=stop_event,
        completed_event=completed_event,
        rplidar_update_measures_event=rplidar_update_measures_event,
        rplidar_measures_queue=rplidar_measures_queue,
        serial_sender_messages_queue=serial_sender_messages_queue,
        writer_messages_queue=writer_messages_queue,
        bno08x_yaw_deg=bno08x_yaw_deg,
        bno08x_turns=bno08x_turns,
        movement=movement,
        photographer_capture_image_event=photographer_capture_image_event,
        detector_model_g_inferences_queue=detector_model_g_inferences_queue,
        detector_model_m_inferences_queue=detector_model_m_inferences_queue,
        detector_model_r_inferences_queue=detector_model_r_inferences_queue
    )

    # Run the Pilot
    pilot.run()