14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285 | class Spawner:
"""
Class that spawns processes for the challenge.
"""
# Wait timeout for the start event
START_WAIT_TIMEOUT = 0.1
# Wait timeout for the stop event
STOP_WAIT_TIMEOUT = 0.1
def __init__(
self,
debug: bool,
yolo_version: str,
movement: bool = True,
rplidar_is_upside_down: bool = False,
rplidar_angle_rotation: Optional[float] = None,
):
"""
Initialize the Spawner class.
Args:
debug (bool): Flag to indicate if the spawner is in debug mode.
yolo_version (str): The version of YOLO to use for object detection.
movement (bool): Flag to indicate if the pilot should handle movement.
rplidar_is_upside_down (bool): Flag to indicate if the RPLidar is upside down.
rplidar_angle_rotation (Optional[float]): Angle rotation for the RPLidar, if any.
"""
# Initialize the flags
self.__debug = debug
self.__yolo_version = yolo_version
self.__movement = movement
# Initialize the RPLidar parameters
self.__rplidar_is_upside_down = rplidar_is_upside_down
self.__rplidar_angle_rotation = rplidar_angle_rotation
# Initialize the reentrant lock
self.__rlock = RLock()
# Initialize the queues, values and events
self.__start_event = Event()
self.__started_event = Event()
self.__parking_event = Event()
self.__stop_event = Event()
self.__completed_event = Event()
self.__writer_messages_queue = Queue()
self.__writer_stop_event = Event()
self.__serial_sender_messages_queue = Queue()
self.__bno08x_yaw_deg = Value('d', 0.0)
self.__bno08x_turns = Value('i', 0)
self.__challenge = Value('c', Challenge.NONE.as_char)
self.__rplidar_update_measures_event = Event()
self.__rplidar_measures_queue = Queue()
self.__photographer_capture_image_event = None
self.__photographer_images_queue = None
self.__photographer_preprocess_fn = OpenCV.preprocess_pil_image
self.__object_detector_model_g_inferences_queue = None
self.__object_detector_model_m_inferences_queue = None
self.__object_detector_model_r_inferences_queue = None
# Initialize the processes
self.__writer_process = None
self.__serial_communication_process = None
self.__rplidar_process = None
self.__photographer_process = None
self.__object_detector_process = None
self.__pilot_process = None
def _start(self) -> None:
"""
Start the spawner to initialize and manage the challenge processes.
Raises:
RuntimeError: If the spawner is already running or if the stop event is set.
"""
with self.__rlock:
# Check if the stop event is set
if self.__stop_event.is_set():
print("Stop event is set. RPLidar will not run.")
return
# Check if the RPLidar is already running
if self.__started_event.is_set():
print("Spawner is already running. Cannot start again.")
return
# Set the started event to signal that the Spawner has started
self.__started_event.set()
# Log
print("Spawner initialized.")
def _stop(self) -> None:
"""
Stop the spawner and clean up resources.
"""
print("Stopping Spawner...")
with self.__rlock:
# Wait for all processes to finish
self.__serial_communication_process.join() if self.__serial_communication_process else None
self.__rplidar_process.join() if self.__rplidar_process else None
self.__photographer_process.join() if self.__photographer_process else None
self.__object_detector_process.join() if self.__object_detector_process else None
self.__pilot_process.join() if self.__pilot_process else None
# Set the writer stop event to stop the writer process
self.__writer_stop_event.set()
self.__writer_process.join() if self.__writer_process else None
# Clear the events
self.__start_event.clear()
self.__started_event.clear()
self.__parking_event.clear()
self.__stop_event.clear()
self.__writer_stop_event.clear()
# Log
print("Spawner stopped.")
def run(self) -> None:
"""
Run the spawner to initialize and manage the challenge processes.
"""
# Loop to ensure the spawner runs until stopped
while not self.__completed_event.is_set():
try:
# Start the spawner
self._start()
# Start the writer process
self.__writer_process = Process(
target=writer_target,
args=(self.__debug,
self.__writer_messages_queue,
self.__writer_stop_event)
)
self.__writer_process.start()
# Start the serial communication process
self.__serial_communication_process = Process(
target=serial_communication_target,
args=(self.__debug,
self.__challenge,
self.__start_event,
self.__stop_event,
self.__bno08x_yaw_deg,
self.__bno08x_turns,
self.__serial_sender_messages_queue,
self.__writer_messages_queue)
)
self.__serial_communication_process.start()
# Initialize the RPLidar process
self.__rplidar_process = Process(
target=rplidar_target,
args=(self.__debug,
self.__rplidar_update_measures_event,
self.__rplidar_measures_queue,
self.__start_event,
self.__stop_event,
self.__writer_messages_queue),
kwargs={
'is_upside_down': self.__rplidar_is_upside_down,
'angle_rotation': self.__rplidar_angle_rotation
}
)
self.__rplidar_process.start()
# Wait for the start event to be set
while not self.__stop_event.is_set():
if self.__start_event.wait(timeout=self.START_WAIT_TIMEOUT):
break
if self.__stop_event.is_set():
self._stop()
continue
print("Spawner started.")
# Check if the challenge requires the photographer and the object detector
with self.__challenge.get_lock():
if self.__challenge.value == Challenge.WITH_OBSTACLES.as_char:
# Initialize the required queues and events for the photographer and object detector
self.__photographer_capture_image_event = Event()
self.__photographer_images_queue = Queue()
self.__object_detector_model_g_inferences_queue = Queue()
self.__object_detector_model_m_inferences_queue = Queue()
self.__object_detector_model_r_inferences_queue = Queue()
# Initialize the photographer process
self.__photographer_process = Process(
target=photographer_target,
args=(self.__debug,
self.__photographer_images_queue,
self.__photographer_capture_image_event,
self.__start_event,
self.__stop_event,
self.__writer_messages_queue,
self.__photographer_preprocess_fn,)
)
self.__photographer_process.start()
# Initialize the object detector process
self.__object_detector_process = Process(
target=object_detector_target,
args=(self.__debug,
self.__yolo_version,
self.__object_detector_model_g_inferences_queue,
self.__object_detector_model_m_inferences_queue,
self.__object_detector_model_r_inferences_queue,
self.__start_event,
self.__parking_event,
self.__stop_event,
self.__photographer_images_queue,
self.__writer_messages_queue)
)
self.__object_detector_process.start()
# Initialize the pilot process
self.__pilot_process = Process(
target=pilot_target,
args=(self.__debug,
self.__challenge,
self.__start_event,
self.__parking_event,
self.__stop_event,
self.__completed_event,
self.__rplidar_update_measures_event,
self.__rplidar_measures_queue,
self.__serial_sender_messages_queue,
self.__writer_messages_queue,
self.__bno08x_yaw_deg,
self.__bno08x_turns,
self.__movement,
self.__photographer_capture_image_event,
self.__object_detector_model_g_inferences_queue,
self.__object_detector_model_m_inferences_queue,
self.__object_detector_model_r_inferences_queue)
)
self.__pilot_process.start()
# Wait for the stop event to be set
while not self.__stop_event.wait(timeout=self.STOP_WAIT_TIMEOUT):
continue
# Stop the spawner
self._stop()
except KeyboardInterrupt:
print("Spawner interrupted by user (Ctrl+C). Shutting down...")
# Set the stop event
self.__stop_event.set()
# Stop the spawner
self._stop()
break
except Exception as e:
print(f"An error occurred in the Spawner: {e}")
self._stop()
def __del__(self):
"""
Destructor to clean up resources when the Spawner is no longer needed.
"""
self.__stop_event.set()
print(
"Spawner instance is being deleted. Resources will be cleaned up."
)
|