Saltar a contenido

Spawner Module

Spawner

Class that spawns processes for the challenge.

Source code in devices\raspberry_pi_5\src\spawner\__init__.py
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
class Spawner:
    """
    Class that spawns processes for the challenge.
    """

    # Wait timeout for the start event
    START_WAIT_TIMEOUT = 0.1

    # Wait timeout for the stop event
    STOP_WAIT_TIMEOUT = 0.1

    def __init__(
        self,
        debug: bool,
        yolo_version: str,
        movement: bool = True,
        rplidar_is_upside_down: bool = False,
        rplidar_angle_rotation: Optional[float] = None,
    ):
        """
        Initialize the Spawner class.

        Args:
            debug (bool): Flag to indicate if the spawner is in debug mode.
            yolo_version (str): The version of YOLO to use for object detection.
            movement (bool): Flag to indicate if the pilot should handle movement.
            rplidar_is_upside_down (bool): Flag to indicate if the RPLidar is upside down.
            rplidar_angle_rotation (Optional[float]): Angle rotation for the RPLidar, if any.
        """
        # Initialize the flags
        self.__debug = debug
        self.__yolo_version = yolo_version
        self.__movement = movement

        # Initialize the RPLidar parameters
        self.__rplidar_is_upside_down = rplidar_is_upside_down
        self.__rplidar_angle_rotation = rplidar_angle_rotation

        # Initialize the reentrant lock
        self.__rlock = RLock()

        # Initialize the queues, values and events
        self.__start_event = Event()
        self.__started_event = Event()
        self.__parking_event = Event()
        self.__stop_event = Event()
        self.__completed_event = Event()
        self.__writer_messages_queue = Queue()
        self.__writer_stop_event = Event()
        self.__serial_sender_messages_queue = Queue()
        self.__bno08x_yaw_deg = Value('d', 0.0)
        self.__bno08x_turns = Value('i', 0)
        self.__challenge = Value('c', Challenge.NONE.as_char)
        self.__rplidar_update_measures_event = Event()
        self.__rplidar_measures_queue = Queue()
        self.__photographer_capture_image_event = None
        self.__photographer_images_queue = None
        self.__photographer_preprocess_fn = OpenCV.preprocess_pil_image
        self.__object_detector_model_g_inferences_queue = None
        self.__object_detector_model_m_inferences_queue = None
        self.__object_detector_model_r_inferences_queue = None

        # Initialize the processes
        self.__writer_process = None
        self.__serial_communication_process = None
        self.__rplidar_process = None
        self.__photographer_process = None
        self.__object_detector_process = None
        self.__pilot_process = None

    def _start(self) -> None:
        """
        Start the spawner to initialize and manage the challenge processes.

        Raises:
            RuntimeError: If the spawner is already running or if the stop event is set.
        """
        with self.__rlock:
            # Check if the stop event is set
            if self.__stop_event.is_set():
                print("Stop event is set. RPLidar will not run.")
                return

            # Check if the RPLidar is already running
            if self.__started_event.is_set():
                print("Spawner is already running. Cannot start again.")
                return

            # Set the started event to signal that the Spawner has started
            self.__started_event.set()

        # Log
        print("Spawner initialized.")

    def _stop(self) -> None:
        """
        Stop the spawner and clean up resources.
        """
        print("Stopping Spawner...")

        with self.__rlock:
            # Wait for all processes to finish
            self.__serial_communication_process.join() if self.__serial_communication_process else None
            self.__rplidar_process.join() if self.__rplidar_process else None
            self.__photographer_process.join() if self.__photographer_process else None
            self.__object_detector_process.join() if self.__object_detector_process else None
            self.__pilot_process.join() if self.__pilot_process else None

            # Set the writer stop event to stop the writer process
            self.__writer_stop_event.set()
            self.__writer_process.join() if self.__writer_process else None

            # Clear the events
            self.__start_event.clear()
            self.__started_event.clear()
            self.__parking_event.clear()
            self.__stop_event.clear()
            self.__writer_stop_event.clear()

        # Log
        print("Spawner stopped.")

    def run(self) -> None:
        """
        Run the spawner to initialize and manage the challenge processes.
        """
        # Loop to ensure the spawner runs until stopped
        while not self.__completed_event.is_set():
            try:
                # Start the spawner
                self._start()

                # Start the writer process
                self.__writer_process = Process(
                    target=writer_target,
                    args=(self.__debug,
                        self.__writer_messages_queue,
                        self.__writer_stop_event)
                )
                self.__writer_process.start()

                # Start the serial communication process
                self.__serial_communication_process = Process(
                    target=serial_communication_target,
                    args=(self.__debug,
                        self.__challenge,
                        self.__start_event,
                        self.__stop_event,
                        self.__bno08x_yaw_deg,
                        self.__bno08x_turns,
                        self.__serial_sender_messages_queue,
                        self.__writer_messages_queue)
                )
                self.__serial_communication_process.start()

                # Initialize the RPLidar process
                self.__rplidar_process = Process(
                    target=rplidar_target,
                    args=(self.__debug,
                        self.__rplidar_update_measures_event,
                        self.__rplidar_measures_queue,
                        self.__start_event,
                        self.__stop_event,
                        self.__writer_messages_queue),
                    kwargs={
                        'is_upside_down': self.__rplidar_is_upside_down,
                        'angle_rotation': self.__rplidar_angle_rotation
                    }
                )
                self.__rplidar_process.start()

                # Wait for the start event to be set
                while not self.__stop_event.is_set():
                    if self.__start_event.wait(timeout=self.START_WAIT_TIMEOUT):
                        break
                if self.__stop_event.is_set():
                    self._stop()
                    continue
                print("Spawner started.")

                # Check if the challenge requires the photographer and the object detector
                with self.__challenge.get_lock():
                    if self.__challenge.value == Challenge.WITH_OBSTACLES.as_char:
                        # Initialize the required queues and events for the photographer and object detector
                        self.__photographer_capture_image_event = Event()
                        self.__photographer_images_queue = Queue()
                        self.__object_detector_model_g_inferences_queue = Queue()
                        self.__object_detector_model_m_inferences_queue = Queue()
                        self.__object_detector_model_r_inferences_queue = Queue()

                        # Initialize the photographer process
                        self.__photographer_process = Process(
                            target=photographer_target,
                            args=(self.__debug,
                                self.__photographer_images_queue,
                                self.__photographer_capture_image_event,
                                self.__start_event,
                                self.__stop_event,
                                self.__writer_messages_queue,
                                self.__photographer_preprocess_fn,)
                        )
                        self.__photographer_process.start()

                        # Initialize the object detector process
                        self.__object_detector_process = Process(
                            target=object_detector_target,
                            args=(self.__debug,
                                self.__yolo_version,
                                self.__object_detector_model_g_inferences_queue,
                                self.__object_detector_model_m_inferences_queue,
                                self.__object_detector_model_r_inferences_queue,
                                self.__start_event,
                                self.__parking_event,
                                self.__stop_event,
                                self.__photographer_images_queue,
                                self.__writer_messages_queue)
                        )
                        self.__object_detector_process.start()

                # Initialize the pilot process
                self.__pilot_process = Process(
                    target=pilot_target,
                    args=(self.__debug,
                        self.__challenge,
                        self.__start_event,
                        self.__parking_event,
                        self.__stop_event,
                        self.__completed_event,
                        self.__rplidar_update_measures_event,
                        self.__rplidar_measures_queue,
                        self.__serial_sender_messages_queue,
                        self.__writer_messages_queue,
                        self.__bno08x_yaw_deg,
                        self.__bno08x_turns,
                        self.__movement,
                        self.__photographer_capture_image_event,
                        self.__object_detector_model_g_inferences_queue,
                        self.__object_detector_model_m_inferences_queue,
                        self.__object_detector_model_r_inferences_queue)
                )
                self.__pilot_process.start()

                # Wait for the stop event to be set
                while not self.__stop_event.wait(timeout=self.STOP_WAIT_TIMEOUT):
                    continue

                # Stop the spawner
                self._stop()

            except KeyboardInterrupt:
                print("Spawner interrupted by user (Ctrl+C). Shutting down...")

                # Set the stop event
                self.__stop_event.set()

                # Stop the spawner
                self._stop()
                break

            except Exception as e:
                print(f"An error occurred in the Spawner: {e}")
                self._stop()

    def __del__(self):
        """
        Destructor to clean up resources when the Spawner is no longer needed.
        """
        self.__stop_event.set()

        print(
            "Spawner instance is being deleted. Resources will be cleaned up."
        )

__del__()

Destructor to clean up resources when the Spawner is no longer needed.

Source code in devices\raspberry_pi_5\src\spawner\__init__.py
277
278
279
280
281
282
283
284
285
def __del__(self):
    """
    Destructor to clean up resources when the Spawner is no longer needed.
    """
    self.__stop_event.set()

    print(
        "Spawner instance is being deleted. Resources will be cleaned up."
    )

__init__(debug, yolo_version, movement=True, rplidar_is_upside_down=False, rplidar_angle_rotation=None)

Initialize the Spawner class.

Parameters:

Name Type Description Default
debug bool

Flag to indicate if the spawner is in debug mode.

required
yolo_version str

The version of YOLO to use for object detection.

required
movement bool

Flag to indicate if the pilot should handle movement.

True
rplidar_is_upside_down bool

Flag to indicate if the RPLidar is upside down.

False
rplidar_angle_rotation Optional[float]

Angle rotation for the RPLidar, if any.

None
Source code in devices\raspberry_pi_5\src\spawner\__init__.py
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
def __init__(
    self,
    debug: bool,
    yolo_version: str,
    movement: bool = True,
    rplidar_is_upside_down: bool = False,
    rplidar_angle_rotation: Optional[float] = None,
):
    """
    Initialize the Spawner class.

    Args:
        debug (bool): Flag to indicate if the spawner is in debug mode.
        yolo_version (str): The version of YOLO to use for object detection.
        movement (bool): Flag to indicate if the pilot should handle movement.
        rplidar_is_upside_down (bool): Flag to indicate if the RPLidar is upside down.
        rplidar_angle_rotation (Optional[float]): Angle rotation for the RPLidar, if any.
    """
    # Initialize the flags
    self.__debug = debug
    self.__yolo_version = yolo_version
    self.__movement = movement

    # Initialize the RPLidar parameters
    self.__rplidar_is_upside_down = rplidar_is_upside_down
    self.__rplidar_angle_rotation = rplidar_angle_rotation

    # Initialize the reentrant lock
    self.__rlock = RLock()

    # Initialize the queues, values and events
    self.__start_event = Event()
    self.__started_event = Event()
    self.__parking_event = Event()
    self.__stop_event = Event()
    self.__completed_event = Event()
    self.__writer_messages_queue = Queue()
    self.__writer_stop_event = Event()
    self.__serial_sender_messages_queue = Queue()
    self.__bno08x_yaw_deg = Value('d', 0.0)
    self.__bno08x_turns = Value('i', 0)
    self.__challenge = Value('c', Challenge.NONE.as_char)
    self.__rplidar_update_measures_event = Event()
    self.__rplidar_measures_queue = Queue()
    self.__photographer_capture_image_event = None
    self.__photographer_images_queue = None
    self.__photographer_preprocess_fn = OpenCV.preprocess_pil_image
    self.__object_detector_model_g_inferences_queue = None
    self.__object_detector_model_m_inferences_queue = None
    self.__object_detector_model_r_inferences_queue = None

    # Initialize the processes
    self.__writer_process = None
    self.__serial_communication_process = None
    self.__rplidar_process = None
    self.__photographer_process = None
    self.__object_detector_process = None
    self.__pilot_process = None

run()

Run the spawner to initialize and manage the challenge processes.

Source code in devices\raspberry_pi_5\src\spawner\__init__.py
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
def run(self) -> None:
    """
    Run the spawner to initialize and manage the challenge processes.
    """
    # Loop to ensure the spawner runs until stopped
    while not self.__completed_event.is_set():
        try:
            # Start the spawner
            self._start()

            # Start the writer process
            self.__writer_process = Process(
                target=writer_target,
                args=(self.__debug,
                    self.__writer_messages_queue,
                    self.__writer_stop_event)
            )
            self.__writer_process.start()

            # Start the serial communication process
            self.__serial_communication_process = Process(
                target=serial_communication_target,
                args=(self.__debug,
                    self.__challenge,
                    self.__start_event,
                    self.__stop_event,
                    self.__bno08x_yaw_deg,
                    self.__bno08x_turns,
                    self.__serial_sender_messages_queue,
                    self.__writer_messages_queue)
            )
            self.__serial_communication_process.start()

            # Initialize the RPLidar process
            self.__rplidar_process = Process(
                target=rplidar_target,
                args=(self.__debug,
                    self.__rplidar_update_measures_event,
                    self.__rplidar_measures_queue,
                    self.__start_event,
                    self.__stop_event,
                    self.__writer_messages_queue),
                kwargs={
                    'is_upside_down': self.__rplidar_is_upside_down,
                    'angle_rotation': self.__rplidar_angle_rotation
                }
            )
            self.__rplidar_process.start()

            # Wait for the start event to be set
            while not self.__stop_event.is_set():
                if self.__start_event.wait(timeout=self.START_WAIT_TIMEOUT):
                    break
            if self.__stop_event.is_set():
                self._stop()
                continue
            print("Spawner started.")

            # Check if the challenge requires the photographer and the object detector
            with self.__challenge.get_lock():
                if self.__challenge.value == Challenge.WITH_OBSTACLES.as_char:
                    # Initialize the required queues and events for the photographer and object detector
                    self.__photographer_capture_image_event = Event()
                    self.__photographer_images_queue = Queue()
                    self.__object_detector_model_g_inferences_queue = Queue()
                    self.__object_detector_model_m_inferences_queue = Queue()
                    self.__object_detector_model_r_inferences_queue = Queue()

                    # Initialize the photographer process
                    self.__photographer_process = Process(
                        target=photographer_target,
                        args=(self.__debug,
                            self.__photographer_images_queue,
                            self.__photographer_capture_image_event,
                            self.__start_event,
                            self.__stop_event,
                            self.__writer_messages_queue,
                            self.__photographer_preprocess_fn,)
                    )
                    self.__photographer_process.start()

                    # Initialize the object detector process
                    self.__object_detector_process = Process(
                        target=object_detector_target,
                        args=(self.__debug,
                            self.__yolo_version,
                            self.__object_detector_model_g_inferences_queue,
                            self.__object_detector_model_m_inferences_queue,
                            self.__object_detector_model_r_inferences_queue,
                            self.__start_event,
                            self.__parking_event,
                            self.__stop_event,
                            self.__photographer_images_queue,
                            self.__writer_messages_queue)
                    )
                    self.__object_detector_process.start()

            # Initialize the pilot process
            self.__pilot_process = Process(
                target=pilot_target,
                args=(self.__debug,
                    self.__challenge,
                    self.__start_event,
                    self.__parking_event,
                    self.__stop_event,
                    self.__completed_event,
                    self.__rplidar_update_measures_event,
                    self.__rplidar_measures_queue,
                    self.__serial_sender_messages_queue,
                    self.__writer_messages_queue,
                    self.__bno08x_yaw_deg,
                    self.__bno08x_turns,
                    self.__movement,
                    self.__photographer_capture_image_event,
                    self.__object_detector_model_g_inferences_queue,
                    self.__object_detector_model_m_inferences_queue,
                    self.__object_detector_model_r_inferences_queue)
            )
            self.__pilot_process.start()

            # Wait for the stop event to be set
            while not self.__stop_event.wait(timeout=self.STOP_WAIT_TIMEOUT):
                continue

            # Stop the spawner
            self._stop()

        except KeyboardInterrupt:
            print("Spawner interrupted by user (Ctrl+C). Shutting down...")

            # Set the stop event
            self.__stop_event.set()

            # Stop the spawner
            self._stop()
            break

        except Exception as e:
            print(f"An error occurred in the Spawner: {e}")
            self._stop()