Saltar a contenido

Hailo Module

Hailo

Bases: HailoABC, LoggerConsumerProtocol

Class to handle Hailo inferences.

Source code in devices\raspberry_pi_5\src\hailo\__init__.py
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
class Hailo(HailoABC, LoggerConsumerProtocol):
    """
    Class to handle Hailo inferences.
    """

    # Logger configuration
    LOGGER_TAG = "Hailo"

    # Image allowed extensions
    IMAGE_ALLOWED_EXTENSIONS: tuple = ('.jpg', '.png', '.bmp', '.jpeg')

    # Currently models file paths
    NO_PARKING_MODELS_NAME = [MODEL_G, MODEL_R]
    PARKING_MODELS_NAME = [MODEL_M]

    # Batch size
    BATCH_SIZE = 1

    # Wait timeout
    WAIT_TIMEOUT = 0.1

    # Job timeout
    JOB_TIMEOUT = 5000

    # Wait timeout for the start event
    START_WAIT_TIMEOUT = 0.1

    def __init__(
        self,
        debug: bool,
        model_name: str,
        hef_file_path: str | os.PathLike[str],
        labels_path: str | os.PathLike[str],
        class_colors: tuple[tuple[int, int, int]],
        processed_images_queue: Queue,
        inferences_queue: Queue,
        start_event: EventCls,
        stop_event: EventCls,
        writer_messages_queue: Queue,
        multi_threading: bool = True,
        multiprocessing: bool = False,
        batch_size: int = BATCH_SIZE,
        input_type: Optional[str] = None,
        output_type: Optional[Dict[str, str]] = None
    ) -> None:
        """
        Initialize the Hailo handler class.

        Args:
            debug (bool): Flag to indicate if the Hailo handler is in debug mode.
            model_name (str): Name of the YOLO model.
            hef_file_path (str | os.PathLike[str]): Path to the HEF file.
            labels_path (str | os.PathLike[str]): Path to the labels file.
            class_colors (tuple[tuple[int, int, int]]): Tuple mapping class IDs to RGB colors.
            processed_images_queue (Queue): Queue to hold input images for processing.
            inferences_queue (Queue): Queue to hold the inferences from the Hailo handlers.
            start_event (EventCls): Event to signal when the Hailo handler should start.
            stop_event (EventCls): Event to signal when the Hailo handler should stop.
            writer_messages_queue (Queue): Queue to hold log messages.
            multi_threading (bool): Whether to enable multi-threading.
            multiprocessing (bool): Whether to enable multiprocessing.
            batch_size (int): Batch size for inference.
            input_type (Optional[str]): Format type of the input stream.
            output_type (Optional[Dict[str, str]]): Format type of the output stream.
        """
        # Initialize the debug flag
        self.__debug = debug

        # Initialize the queues and events
        self.__processed_images_queue = processed_images_queue
        self.__inferences_queue = inferences_queue
        self.__start_event = start_event
        self.__started_event = Event()
        self.__deleted_event = Event()
        self.__stop_event = stop_event

        # Initialize the logger
        self.__logger_tag = f"{self.LOGGER_TAG}_{model_name}"
        self.__logger = Logger(
            writer_messages_queue,
            tag=self.__logger_tag,
            debug=self.__debug
            )

        # Initialize the reentrant lock
        self.__rlock = RLock()

        # Check the type of model name
        is_instance(model_name, str)
        self.__model_name = model_name

        # Check the HEF file path
        is_instance(hef_file_path, str)
        Files.ensure_directory_exists(hef_file_path)
        self.__hef_file_path = hef_file_path

        # Check the labels path
        is_instance(labels_path, str)
        Files.ensure_directory_exists(labels_path)
        self.__labels = Files.get_labels_from_txt(labels_path)

        # Check the type of class colors
        is_instance(class_colors, Dict)
        self.__class_colors = class_colors

        # Check the type of batch size
        is_instance(batch_size, int)
        self.__batch_size = batch_size

        # Initialize the multiprocessing and multi-threading flags
        is_instance(multi_threading, bool)
        self.__multi_threading = multi_threading
        is_instance(multiprocessing, bool)
        self.__multiprocessing = multiprocessing

        # Initialize the input type
        is_instance(input_type, (str, NoneType,))
        self.__input_type = input_type

        # Initialize the output type
        self.__output_type: Optional[Dict[str, str]] = output_type

        # Initialize the target, HEF, infer model and job
        self.__target = None
        self.__hef = None
        self.__infer_model = None
        self.__job = None

    @final
    @property
    def logger(self) -> Logger:
        return self.__logger

    @final
    def _set_input_type(self, input_type: Optional[str] = None) -> None:
        self.__infer_model.input().set_format_type(
            getattr(FormatType, input_type)
        )

    @final
    def _set_output_type(
        self, output_type_dict: Optional[
            Dict[str, str]] = None
    ) -> None:
        for output_name, output_type in output_type_dict.items():
            self.__infer_model.output(output_name).set_format_type(
                getattr(FormatType, output_type)
            )

    @final
    def _get_output_type_str(self, output_info) -> str | None:
        if not self.__output_type:
            return str(output_info.format.type).split(".")[1].lower()
        else:
            self.__output_type[output_info.name].lower()

    @final
    def get_input_shape(self) -> tuple[int, ...]:
        # Assumes one input
        return self.__hef.get_input_vstream_infos()[0].shape

    @final
    def _create_bindings(self, configured_infer_model) -> object:
        if not self.__output_type:
            output_buffers = {
                output_info.name: np.empty(
                    self.__infer_model.output(output_info.name).shape,
                    dtype=(getattr(np, self._get_output_type_str(output_info)))
                )
                for output_info in self.__hef.get_output_vstream_infos()
            }
        else:
            output_buffers = {
                name: np.empty(
                    self.__infer_model.output(name).shape,
                    dtype=(getattr(np, self.__output_type[name].lower()))
                )
                for name in self.__output_type
            }
        return configured_infer_model.create_bindings(
            output_buffers=output_buffers
        )

    @final
    def _callback(
        self, completion_info, bindings, preprocessed_image: np.ndarray
    ) -> None:
        if completion_info.exception:
            self.__logger.warning(
                f'Inference error: {completion_info.exception}'
                )
            return

        # If the model has a single output, return the output buffer.
        if len(bindings._output_names) == 1:
            result = bindings.output().get_buffer()

        # Else, return a dictionary of output buffers, where the keys are the output names.
        else:
            result = {
                name: np.expand_dims(
                    bindings.output(name).get_buffer(), axis=0
                )
                for name in bindings._output_names
            }
        self.__inferences_queue.put(ImageBoundingBoxes.from_hailo(result))

    @final
    def _start(self):
        with self.__rlock:
            # Check if the stop event is set
            if self.__stop_event.is_set():
                raise RuntimeError(
                    f"Stop event is set. Hailo handler for model '{self.__model_name}' will not run."
                )

            # Check if the Hailo handler for the given model name is already running
            if self.__started_event.is_set():
                raise RuntimeError(
                    f"Hailo handler for model '{self.__model_name}' is already running. Cannot start again."
                )

            # Set the started event to signal that the Hailo handler has started
            self.__started_event.set()

        # Log
        self.__logger.info(f"Initialized.")

    @final
    def _stop(self) -> None:
        # Check if there are any remaining jobs
        self.__job.wait(self.JOB_TIMEOUT) if self.__job else None

        with self.__rlock:
            # Clear the started event
            self.__started_event.clear()

            # Clear the deleted event
            self.__deleted_event.clear()

            # Clear the job
            self.__job = None

        # Log
        self.__logger.info("Stopped.")

    @final
    def _infer_latest_preprocessed_image(self, configured_infer_model) -> None:
        try:
            # Get a preprocessed image from the input queue
            preprocessed_image = self.__processed_images_queue.get(
                timeout=self.WAIT_TIMEOUT
            )

        except Empty:
            return None

        # Check the type of preprocessed_image
        is_instance(preprocessed_image, np.ndarray)

        # Create the bindings for the input and output buffers
        bindings = self._create_bindings(configured_infer_model)
        bindings.input().set_buffer(np.array(preprocessed_image))

        configured_infer_model.wait_for_async_ready(
            timeout_ms=self.JOB_TIMEOUT
        )
        self.__job = configured_infer_model.run_async(
            bindings, partial(
                self._callback,
                preprocessed_image=preprocessed_image,
                bindings=bindings
            )
        )

    @final
    @ignore_sigint
    @log_on_error()
    def run(self) -> None:
        # Start the hailo handler
        self._start()

        # Wait for the start event
        self.__logger.info("Waiting for the start event...")
        while not self.__stop_event.is_set() and not self.__deleted_event.is_set():
            if self.__start_event.wait(self.START_WAIT_TIMEOUT):
                break
        if self.__stop_event.is_set() or self.__deleted_event.is_set():
            # Stop the Hailo handler if the stop or deleted event is set
            self._stop()
            return
        self.__logger.info("Started.")

        try:
            # Create the VDevice parameters
            params = VDevice.create_params()

            # Set the scheduling algorithm to round-robin to activate the scheduler
            params.scheduling_algorithm = HailoSchedulingAlgorithm.ROUND_ROBIN

            # Set the group ID to SHARED
            if self.__multi_threading or self.__multiprocessing:
                params.group_id = "SHARED"

            # Enable multi-processing service
            if self.__multiprocessing:
                params.multi_process_service = True

            # Set the VDevice parameters
            self.__target = VDevice(params)

            # Set the HEF model
            self.__hef = HEF(self.__hef_file_path)
            self.__infer_model = self.__target.create_infer_model(
                self.__hef_file_path
            )
            self.__infer_model.set_batch_size(self.__batch_size)

            # Set the input and output types
            self._set_input_type(
                self.__input_type
                ) if self.__input_type else None
            self._set_output_type(
                self.__output_type
            ) if self.__output_type else None

            with self.__infer_model.configure() as configured_infer_model:
                while not self.__stop_event.is_set() and not self.__deleted_event.is_set():
                    self._infer_latest_preprocessed_image(
                        configured_infer_model
                        )

            # Stop the Hailo handler
            self._stop()

        except Exception as e:
            # Stop the Hailo handler in case of an exception
            self._stop()
            raise e

    def __del__(self):
        """
        Destructor to clean up resources when the Hailo handler is no longer needed.
        """
        self.__deleted_event.set()

        # Log
        self.__logger.info(
            "Instance is being deleted. Resources will be cleaned up."
        )

__del__()

Destructor to clean up resources when the Hailo handler is no longer needed.

Source code in devices\raspberry_pi_5\src\hailo\__init__.py
363
364
365
366
367
368
369
370
371
372
def __del__(self):
    """
    Destructor to clean up resources when the Hailo handler is no longer needed.
    """
    self.__deleted_event.set()

    # Log
    self.__logger.info(
        "Instance is being deleted. Resources will be cleaned up."
    )

__init__(debug, model_name, hef_file_path, labels_path, class_colors, processed_images_queue, inferences_queue, start_event, stop_event, writer_messages_queue, multi_threading=True, multiprocessing=False, batch_size=BATCH_SIZE, input_type=None, output_type=None)

Initialize the Hailo handler class.

Parameters:

Name Type Description Default
debug bool

Flag to indicate if the Hailo handler is in debug mode.

required
model_name str

Name of the YOLO model.

required
hef_file_path str | PathLike[str]

Path to the HEF file.

required
labels_path str | PathLike[str]

Path to the labels file.

required
class_colors tuple[tuple[int, int, int]]

Tuple mapping class IDs to RGB colors.

required
processed_images_queue Queue

Queue to hold input images for processing.

required
inferences_queue Queue

Queue to hold the inferences from the Hailo handlers.

required
start_event Event

Event to signal when the Hailo handler should start.

required
stop_event Event

Event to signal when the Hailo handler should stop.

required
writer_messages_queue Queue

Queue to hold log messages.

required
multi_threading bool

Whether to enable multi-threading.

True
multiprocessing bool

Whether to enable multiprocessing.

False
batch_size int

Batch size for inference.

BATCH_SIZE
input_type Optional[str]

Format type of the input stream.

None
output_type Optional[Dict[str, str]]

Format type of the output stream.

None
Source code in devices\raspberry_pi_5\src\hailo\__init__.py
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
def __init__(
    self,
    debug: bool,
    model_name: str,
    hef_file_path: str | os.PathLike[str],
    labels_path: str | os.PathLike[str],
    class_colors: tuple[tuple[int, int, int]],
    processed_images_queue: Queue,
    inferences_queue: Queue,
    start_event: EventCls,
    stop_event: EventCls,
    writer_messages_queue: Queue,
    multi_threading: bool = True,
    multiprocessing: bool = False,
    batch_size: int = BATCH_SIZE,
    input_type: Optional[str] = None,
    output_type: Optional[Dict[str, str]] = None
) -> None:
    """
    Initialize the Hailo handler class.

    Args:
        debug (bool): Flag to indicate if the Hailo handler is in debug mode.
        model_name (str): Name of the YOLO model.
        hef_file_path (str | os.PathLike[str]): Path to the HEF file.
        labels_path (str | os.PathLike[str]): Path to the labels file.
        class_colors (tuple[tuple[int, int, int]]): Tuple mapping class IDs to RGB colors.
        processed_images_queue (Queue): Queue to hold input images for processing.
        inferences_queue (Queue): Queue to hold the inferences from the Hailo handlers.
        start_event (EventCls): Event to signal when the Hailo handler should start.
        stop_event (EventCls): Event to signal when the Hailo handler should stop.
        writer_messages_queue (Queue): Queue to hold log messages.
        multi_threading (bool): Whether to enable multi-threading.
        multiprocessing (bool): Whether to enable multiprocessing.
        batch_size (int): Batch size for inference.
        input_type (Optional[str]): Format type of the input stream.
        output_type (Optional[Dict[str, str]]): Format type of the output stream.
    """
    # Initialize the debug flag
    self.__debug = debug

    # Initialize the queues and events
    self.__processed_images_queue = processed_images_queue
    self.__inferences_queue = inferences_queue
    self.__start_event = start_event
    self.__started_event = Event()
    self.__deleted_event = Event()
    self.__stop_event = stop_event

    # Initialize the logger
    self.__logger_tag = f"{self.LOGGER_TAG}_{model_name}"
    self.__logger = Logger(
        writer_messages_queue,
        tag=self.__logger_tag,
        debug=self.__debug
        )

    # Initialize the reentrant lock
    self.__rlock = RLock()

    # Check the type of model name
    is_instance(model_name, str)
    self.__model_name = model_name

    # Check the HEF file path
    is_instance(hef_file_path, str)
    Files.ensure_directory_exists(hef_file_path)
    self.__hef_file_path = hef_file_path

    # Check the labels path
    is_instance(labels_path, str)
    Files.ensure_directory_exists(labels_path)
    self.__labels = Files.get_labels_from_txt(labels_path)

    # Check the type of class colors
    is_instance(class_colors, Dict)
    self.__class_colors = class_colors

    # Check the type of batch size
    is_instance(batch_size, int)
    self.__batch_size = batch_size

    # Initialize the multiprocessing and multi-threading flags
    is_instance(multi_threading, bool)
    self.__multi_threading = multi_threading
    is_instance(multiprocessing, bool)
    self.__multiprocessing = multiprocessing

    # Initialize the input type
    is_instance(input_type, (str, NoneType,))
    self.__input_type = input_type

    # Initialize the output type
    self.__output_type: Optional[Dict[str, str]] = output_type

    # Initialize the target, HEF, infer model and job
    self.__target = None
    self.__hef = None
    self.__infer_model = None
    self.__job = None

abstracts

HailoABC

Bases: ABC

Abstract base class for Hailo handlers.

Source code in devices\raspberry_pi_5\src\hailo\abstracts.py
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
class HailoABC(ABC):
    """
    Abstract base class for Hailo handlers.
    """

    @staticmethod
    def preprocess(
        image: Image,
        width: int = WIDTH,
        height: int = HEIGHT
    ) -> np.ndarray:
        """
        Resize image with unchanged aspect ratio using padding.

        Args:
            image (Image): Input image.
            width (int): Model input width.
            height (int): Model input height.
        Returns:
            np.ndarray: Preprocessed and padded image.
        """
        # Convert image to numpy array
        image = np.array(image)

        # Resize image with unchanged aspect ratio using padding
        img_height, img_width, _ = image.shape[:3]
        scale = min(width / img_width, height / img_height)
        new_img_width, new_img_height = int(img_width * scale), int(
            img_height * scale
        )
        image = cv2.resize(
            image, (new_img_width, new_img_height),
            interpolation=cv2.INTER_CUBIC
        )

        # Calculate padding and create padded image
        padded_image = np.full(
            (height, width, 3), PADDING_COLOR,
            dtype=np.uint8
        )
        x_offset = (height - new_img_width) // 2
        y_offset = (height - new_img_height) // 2
        padded_image[y_offset:y_offset + new_img_height,
        x_offset:x_offset + new_img_width] = image
        return padded_image

    @abstractmethod
    def logger(self) -> Logger:
        """
        Get the logger instance for the Hailo.

        Returns:
            Logger: The logger instance.
        """
        pass

    @abstractmethod
    def _set_input_type(self, input_type: Optional[str] = None) -> None:
        """
        Set the input type for the HEF model. If the model has multiple inputs,
        it will set the same type of all of them.

        Args:
            input_type (Optional[str]): Format type of the input stream.
        """
        pass

    @abstractmethod
    def _set_output_type(
        self,
        output_type_dict: Optional[Dict[str, str]] = None
    ) -> None:
        """
        Set the output type for the HEF model. If the model has multiple outputs,
        it will set the same type for all of them.

        Args:
            output_type_dict (Optional[Dict[str, str]]): Format type of the output stream.
        """
        pass

    @abstractmethod
    def _get_output_type_str(self, output_info) -> str | None:
        """
        Get the output type string for the HEF model.

        Args:
            output_info: Information about the output stream.
        """
        pass

    @abstractmethod
    def get_input_shape(self) -> tuple[int, ...]:
        """
        Get the shape of the model's input layer.

        Returns:
            tuple[int, ...]: Shape of the model's input layer.
        """
        pass

    @abstractmethod
    def _create_bindings(self, configured_infer_model) -> object:
        """
        Create bindings for input and output buffers.

        Args:
            configured_infer_model: The configured inference model.

        Returns:
            object: Bindings object with input and output buffers.
        """
        pass

    @abstractmethod
    def _callback(
        self,
        completion_info,
        bindings,
        preprocessed_image: np.ndarray
    ) -> None:
        """
        Callback function for handling inference results.

        Args:
            completion_info: Information about the completion of the inference task.
            bindings: Binding objects containing input and output buffers.
            preprocessed_image (np.ndarray): Preprocessed image used for inference.
        """
        pass

    @abstractmethod
    def _start(self) -> None:
        """
        Start the Hailo handler.

        Raises:
            RuntimeError: If the Hailo handler cannot be started.
        """
        pass

    @abstractmethod
    def _stop(self) -> None:
        """
        Stop the Hailo handler.
        """
        pass

    @abstractmethod
    def _infer_latest_preprocessed_image(self, configured_infer_model) -> None:
        """
        Run inference on the latest preprocessed image.

        Args:
            configured_infer_model: The configured inference model to run.
        """
        pass

    @abstractmethod
    def run(self) -> None:
        """
        Run the inference loop.

        This method continuously retrieves batches of images from the input queue,
        preprocesses them, and runs inference using the configured infer model.
        """
        pass

get_input_shape() abstractmethod

Get the shape of the model's input layer.

Returns:

Type Description
tuple[int, ...]

tuple[int, ...]: Shape of the model's input layer.

Source code in devices\raspberry_pi_5\src\hailo\abstracts.py
104
105
106
107
108
109
110
111
112
@abstractmethod
def get_input_shape(self) -> tuple[int, ...]:
    """
    Get the shape of the model's input layer.

    Returns:
        tuple[int, ...]: Shape of the model's input layer.
    """
    pass

logger() abstractmethod

Get the logger instance for the Hailo.

Returns:

Name Type Description
Logger Logger

The logger instance.

Source code in devices\raspberry_pi_5\src\hailo\abstracts.py
59
60
61
62
63
64
65
66
67
@abstractmethod
def logger(self) -> Logger:
    """
    Get the logger instance for the Hailo.

    Returns:
        Logger: The logger instance.
    """
    pass

preprocess(image, width=WIDTH, height=HEIGHT) staticmethod

Resize image with unchanged aspect ratio using padding.

Parameters:

Name Type Description Default
image Image

Input image.

required
width int

Model input width.

WIDTH
height int

Model input height.

HEIGHT

Returns:
np.ndarray: Preprocessed and padded image.

Source code in devices\raspberry_pi_5\src\hailo\abstracts.py
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
@staticmethod
def preprocess(
    image: Image,
    width: int = WIDTH,
    height: int = HEIGHT
) -> np.ndarray:
    """
    Resize image with unchanged aspect ratio using padding.

    Args:
        image (Image): Input image.
        width (int): Model input width.
        height (int): Model input height.
    Returns:
        np.ndarray: Preprocessed and padded image.
    """
    # Convert image to numpy array
    image = np.array(image)

    # Resize image with unchanged aspect ratio using padding
    img_height, img_width, _ = image.shape[:3]
    scale = min(width / img_width, height / img_height)
    new_img_width, new_img_height = int(img_width * scale), int(
        img_height * scale
    )
    image = cv2.resize(
        image, (new_img_width, new_img_height),
        interpolation=cv2.INTER_CUBIC
    )

    # Calculate padding and create padded image
    padded_image = np.full(
        (height, width, 3), PADDING_COLOR,
        dtype=np.uint8
    )
    x_offset = (height - new_img_width) // 2
    y_offset = (height - new_img_height) // 2
    padded_image[y_offset:y_offset + new_img_height,
    x_offset:x_offset + new_img_width] = image
    return padded_image

run() abstractmethod

Run the inference loop.

This method continuously retrieves batches of images from the input queue,
preprocesses them, and runs inference using the configured infer model.

Source code in devices\raspberry_pi_5\src\hailo\abstracts.py
171
172
173
174
175
176
177
178
179
@abstractmethod
def run(self) -> None:
    """
    Run the inference loop.

    This method continuously retrieves batches of images from the input queue,
    preprocesses them, and runs inference using the configured infer model.
    """
    pass

ObjectDetectorABC

Bases: ABC

Abstract class to handle object detection using Hailo handlers.

Source code in devices\raspberry_pi_5\src\hailo\abstracts.py
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
class ObjectDetectorABC(ABC):
    """
    Abstract class to handle object detection using Hailo handlers.
    """

    @abstractmethod
    def logger(self) -> Logger:
        """
        Get the logger instance for the ObjectDetector.

        Returns:
            Logger: The logger instance.
        """
        pass

    @abstractmethod
    def _start(self) -> None:
        """
        Start the object detection process.

        Raises:
            RuntimeError: If the object detection process cannot be started.
        """
        pass

    @abstractmethod
    def _stop(self) -> None:
        """
        Stop the object detection process.
        """
        pass

    @abstractmethod
    def run(self) -> None:
        """
        The main loop to run the object detection using Hailo handlers.
        """
        pass

logger() abstractmethod

Get the logger instance for the ObjectDetector.

Returns:

Name Type Description
Logger Logger

The logger instance.

Source code in devices\raspberry_pi_5\src\hailo\abstracts.py
187
188
189
190
191
192
193
194
195
@abstractmethod
def logger(self) -> Logger:
    """
    Get the logger instance for the ObjectDetector.

    Returns:
        Logger: The logger instance.
    """
    pass

run() abstractmethod

The main loop to run the object detection using Hailo handlers.

Source code in devices\raspberry_pi_5\src\hailo\abstracts.py
214
215
216
217
218
219
@abstractmethod
def run(self) -> None:
    """
    The main loop to run the object detection using Hailo handlers.
    """
    pass

multiprocessing

object_detector_target(debug, yolo_version, model_g_inferences_queue, model_m_inferences_queue, model_r_inferences_queue, start_event, parking_event, stop_event, photographer_images_queue, writer_messages_queue)

Target function for a multiprocessing process that handles the
ObjectDetector.

Parameters:

Name Type Description Default
debug bool

Flag to indicate if the object detector is in debug mode.

required
yolo_version str

The version of YOLO to use for object detection.

required
model_g_inferences_queue Queue

Queue to hold inferences for model G.

required
model_m_inferences_queue Queue

Queue to hold inferences for model M.

required
model_r_inferences_queue Queue

Queue to hold inferences for model R.

required
start_event Event

Event to signal when the object detector should start.

required
parking_event Event

Event to signal the parking state of the robot.

required
stop_event Event

Event to signal when the object detector should stop.

required
photographer_images_queue Queue

Queue to hold input images for processing.

required
writer_messages_queue Queue

Queue to hold log messages.

required
Source code in devices\raspberry_pi_5\src\hailo\multiprocessing.py
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
def object_detector_target(
    debug: bool,
    yolo_version: str,
    model_g_inferences_queue: Queue,
    model_m_inferences_queue: Queue,
    model_r_inferences_queue: Queue,
    start_event: EventCls,
    parking_event: EventCls,
    stop_event: EventCls,
    photographer_images_queue: Queue,
    writer_messages_queue: Queue,
):
    """
    Target function for a multiprocessing process that handles the
    ObjectDetector.

    Args:
        debug (bool): Flag to indicate if the object detector is in debug mode.
        yolo_version (str): The version of YOLO to use for object detection.
        model_g_inferences_queue (Queue): Queue to hold inferences for model G.
        model_m_inferences_queue (Queue): Queue to hold inferences for model M.
        model_r_inferences_queue (Queue): Queue to hold inferences for model R.
        start_event (EventCls): Event to signal when the object detector should start.
        parking_event (EventCls): Event to signal the parking state of the robot.
        stop_event (EventCls): Event to signal when the object detector should stop.
        photographer_images_queue (Queue): Queue to hold input images for processing.
        writer_messages_queue (Queue): Queue to hold log messages.
    """
    print(
        "Initializing ObjectDetector in multiprocessing mode. Process ID: ",
        os.getpid()
    )

    # Initialize the object detector
    object_detector = ObjectDetector(
        debug=debug,
        yolo_version=yolo_version,
        model_g_inferences_queue=model_g_inferences_queue,
        model_m_inferences_queue=model_m_inferences_queue,
        model_r_inferences_queue=model_r_inferences_queue,
        start_event=start_event,
        parking_event=parking_event,
        stop_event=stop_event,
        photographer_images_queue=photographer_images_queue,
        writer_messages_queue=writer_messages_queue
    )

    # Run the object detector
    object_detector.run()

object_detector

ObjectDetector

Bases: ObjectDetectorABC, LoggerConsumerProtocol

Class to handle object detection using Hailo handlers.

Source code in devices\raspberry_pi_5\src\hailo\object_detector.py
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
class ObjectDetector(ObjectDetectorABC, LoggerConsumerProtocol):
    """
    Class to handle object detection using Hailo handlers.
    """

    # Logger configuration
    LOGGER_TAG = 'ObjectDetector'

    # Wait timeout
    WAIT_TIMEOUT = 0.1

    # Wait timeout for the start event
    START_WAIT_TIMEOUT = 0.1

    def __init__(
        self,
        debug: bool,
        yolo_version: str,
        model_g_inferences_queue: Queue,
        model_m_inferences_queue: Queue,
        model_r_inferences_queue: Queue,
        start_event: EventCls,
        parking_event: EventCls,
        stop_event: EventCls,
        photographer_images_queue: Queue,
        writer_messages_queue: Queue,
    ) -> None:
        """
        Initialize the ObjectDetector class.

        Args:
            debug (bool): Flag to indicate if the object detector is in debug mode.
            yolo_version (str): The version of YOLO to use for object detection.
            model_g_inferences_queue (Queue): Queue to hold inferences for model G.
            model_m_inferences_queue (Queue): Queue to hold inferences for model M.
            model_r_inferences_queue (Queue): Queue to hold inferences for model R.
            start_event (EventCls): Event to signal when the object detector should start.
            parking_event (EventCls): Event to signal the parking state of the robot.
            stop_event (EventCls): Event to signal when the object detector should stop.
            photographer_images_queue (Queue): Queue to hold input images for processing.
            writer_messages_queue (Queue): Queue to hold log messages.
        """
        # Initialize the debug flag
        self.__debug = debug

        # Initialize the queues and events
        self.__photographer_images_queue = photographer_images_queue
        self.__started_event = Event
        self.__start_event = start_event
        self.__parking_event = parking_event
        self.__deleted_event = Event()
        self.__stop_event = stop_event
        self.__processed_images_queues = {}
        self.__stop_events = {}
        self.__inferences_queues = {
            MODEL_G: model_g_inferences_queue,
            MODEL_M: model_m_inferences_queue,
            MODEL_R: model_r_inferences_queue,
        }
        for model_name in MODELS_NAME:
            self.__processed_images_queues[model_name] = Queue()
            self.__stop_events[model_name] = Event()

        # Initialize the logger
        self.__logger = Logger(
            writer_messages_queue,
            tag=self.LOGGER_TAG,
            debug=self.__debug
            )

        # Initialize the reentrant lock
        self.__rlock = RLock()

        # Initialize the thread
        self.__thread = None

        # Create the Hailo handlers
        self.__hailo_handlers = {}
        self.__hailo_handler_threads: Dict[str, Thread | None] = {}
        for model_name in MODELS_NAME:
            # Get the HEF file paths
            hef_file_path = Files.get_model_hailo_suite_compiled_hef_file_path(
                model_name, yolo_version
            )

            # Get the labels file paths
            labels_file_path = Files.get_hailo_labels_file_path(model_name)

            # Get the model class colors
            model_class_colors = OpenCV.get_model_classes_color_palette(
                model_name
            )

            # Create the Hailo handler
            hailo_handler = Hailo(
                debug=debug,
                model_name=model_name,
                hef_file_path=hef_file_path,
                labels_path=labels_file_path,
                class_colors=model_class_colors,
                processed_images_queue=self.__processed_images_queues[
                    model_name],
                inferences_queue=self.__inferences_queues[model_name],
                start_event=self.__start_event,
                stop_event=self.__stop_events[model_name],
                writer_messages_queue=writer_messages_queue
            )
            self.__hailo_handlers[model_name] = hailo_handler

            # Initialize the thread
            self.__hailo_handler_threads[model_name] = None

    @final
    @property
    def logger(self) -> Logger:
        return self.__logger

    @final
    def _start(self) -> None:
        with (self.__rlock):
            # Check if the stop event is set
            if self.__stop_event.is_set():
                raise RuntimeError(
                    "Stop event is set. ObjectDetector will not run."
                )

            # Check if the object detector is already running
            if self.__started_event.is_set():
                raise RuntimeError(
                    "ObjectDetector is already running. Cannot start again."
                )

            # Set the started event
            self.__started_event.set()

        # Log
        self.__logger.info("Initialized.")

    @final
    def _stop(self) -> None:
        # Stop the Hailo handler threads
        for model_name in MODELS_NAME:
            # Set the stop event for the model handler
            self.__stop_events[model_name].set()

            # Wait for the Hailo handler thread to finish
            if self.__hailo_handler_threads[model_name] is not None:
                self.__logger.info(
                    f"Stopping Hailo handler for {model_name} model..."
                )
                self.__hailo_handler_threads[model_name].join()
                self.__hailo_handler_threads[model_name] = None

        with self.__rlock:
            # Clear the started event
            self.__started_event.clear()

            # Set the deleted event to signal that the object detector is being deleted
            self.__deleted_event.set()

        # Log
        self.__logger.info("Stopped.")

    @final
    @ignore_sigint
    @log_on_error()
    def run(self) -> None:
        # Start the object detector
        self._start()

        # Wait for the start event to be set
        self.__logger.info("Waiting for the start event...")
        while not self.__stop_event.is_set() and not self.__deleted_event.is_set():
            if self.__start_event.wait(self.START_WAIT_TIMEOUT):
                break
        if self.__stop_event.is_set() or self.__deleted_event.is_set():
            # Stop the object detector if the stop or deleted event is set
            self._stop()
            return
        self.__logger.info("Started.")

        try:
            for model_name in MODELS_NAME:
                # Initialize the Hailo handler thread
                hailo_handler = self.__hailo_handlers[model_name]
                hailo_handler_thread = Thread(target=hailo_handler.run())

                # Start only the G and R model handlers
                hailo_handler_thread.start() if model_name in [MODEL_G,
                                                               MODEL_R] else None

                # Store the thread in the dictionary
                self.__hailo_handler_threads[model_name] = hailo_handler_thread

            # Process images for G and R models
            self.__logger.info("Starting Hailo handlers for G and R models...")
            while (not self.__stop_event.is_set() and not
            self.__deleted_event.is_set() and not self.__parking_event.is_set()):
                try:
                    # Get the image from the photographer images queue
                    image = self.__photographer_images_queue.get(
                        timeout=self.WAIT_TIMEOUT
                    )

                except Empty:
                    continue

                # Put the model G and R images in the Hailo handler processed images queues
                for model_name in [MODEL_G, MODEL_R]:
                    self.__processed_images_queues[model_name].put(image)

            # Stop the Hailo handlers for G and R models
            for model_name in [MODEL_G, MODEL_R]:
                # Set the stop event for the model handler
                self.__stop_events[model_name].set()

                # Wait for the Hailo handler thread to finish
                self.__logger.info(
                    f"Stopping Hailo handler for {model_name} model..."
                )
                self.__hailo_handler_threads[model_name].join()
                self.__hailo_handler_threads[model_name] = None

            # Start the Hailo handler thread for model M
            self.__hailo_handler_threads[MODEL_M].start()

            # Process images for model M
            self.__logger.info("Starting Hailo handler for M model...")
            while not self.__stop_event.is_set() and not self.__deleted_event.is_set():
                try:
                    # Get the image from the photographer images queue
                    image = self.__photographer_images_queue.get(
                        timeout=self.WAIT_TIMEOUT
                    )

                except Empty:
                    continue

                # Put the model M image in the Hailo handler processed images queue
                self.__processed_images_queues[MODEL_M].put(image)

            # Stop the object detector
            self._stop()

        except Exception as e:
            # Stop the object detector in case of an exception
            self._stop()
            raise e

    def __del__(self):
        """
        Destructor to clean up resources when the ObjectDetector is no longer needed.
        """
        self.__deleted_event.set()

        # Log
        self.__logger.info(
            "Instance is being deleted. Resources will be cleaned up."
        )

__del__()

Destructor to clean up resources when the ObjectDetector is no longer needed.

Source code in devices\raspberry_pi_5\src\hailo\object_detector.py
267
268
269
270
271
272
273
274
275
276
def __del__(self):
    """
    Destructor to clean up resources when the ObjectDetector is no longer needed.
    """
    self.__deleted_event.set()

    # Log
    self.__logger.info(
        "Instance is being deleted. Resources will be cleaned up."
    )

__init__(debug, yolo_version, model_g_inferences_queue, model_m_inferences_queue, model_r_inferences_queue, start_event, parking_event, stop_event, photographer_images_queue, writer_messages_queue)

Initialize the ObjectDetector class.

Parameters:

Name Type Description Default
debug bool

Flag to indicate if the object detector is in debug mode.

required
yolo_version str

The version of YOLO to use for object detection.

required
model_g_inferences_queue Queue

Queue to hold inferences for model G.

required
model_m_inferences_queue Queue

Queue to hold inferences for model M.

required
model_r_inferences_queue Queue

Queue to hold inferences for model R.

required
start_event Event

Event to signal when the object detector should start.

required
parking_event Event

Event to signal the parking state of the robot.

required
stop_event Event

Event to signal when the object detector should stop.

required
photographer_images_queue Queue

Queue to hold input images for processing.

required
writer_messages_queue Queue

Queue to hold log messages.

required
Source code in devices\raspberry_pi_5\src\hailo\object_detector.py
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
def __init__(
    self,
    debug: bool,
    yolo_version: str,
    model_g_inferences_queue: Queue,
    model_m_inferences_queue: Queue,
    model_r_inferences_queue: Queue,
    start_event: EventCls,
    parking_event: EventCls,
    stop_event: EventCls,
    photographer_images_queue: Queue,
    writer_messages_queue: Queue,
) -> None:
    """
    Initialize the ObjectDetector class.

    Args:
        debug (bool): Flag to indicate if the object detector is in debug mode.
        yolo_version (str): The version of YOLO to use for object detection.
        model_g_inferences_queue (Queue): Queue to hold inferences for model G.
        model_m_inferences_queue (Queue): Queue to hold inferences for model M.
        model_r_inferences_queue (Queue): Queue to hold inferences for model R.
        start_event (EventCls): Event to signal when the object detector should start.
        parking_event (EventCls): Event to signal the parking state of the robot.
        stop_event (EventCls): Event to signal when the object detector should stop.
        photographer_images_queue (Queue): Queue to hold input images for processing.
        writer_messages_queue (Queue): Queue to hold log messages.
    """
    # Initialize the debug flag
    self.__debug = debug

    # Initialize the queues and events
    self.__photographer_images_queue = photographer_images_queue
    self.__started_event = Event
    self.__start_event = start_event
    self.__parking_event = parking_event
    self.__deleted_event = Event()
    self.__stop_event = stop_event
    self.__processed_images_queues = {}
    self.__stop_events = {}
    self.__inferences_queues = {
        MODEL_G: model_g_inferences_queue,
        MODEL_M: model_m_inferences_queue,
        MODEL_R: model_r_inferences_queue,
    }
    for model_name in MODELS_NAME:
        self.__processed_images_queues[model_name] = Queue()
        self.__stop_events[model_name] = Event()

    # Initialize the logger
    self.__logger = Logger(
        writer_messages_queue,
        tag=self.LOGGER_TAG,
        debug=self.__debug
        )

    # Initialize the reentrant lock
    self.__rlock = RLock()

    # Initialize the thread
    self.__thread = None

    # Create the Hailo handlers
    self.__hailo_handlers = {}
    self.__hailo_handler_threads: Dict[str, Thread | None] = {}
    for model_name in MODELS_NAME:
        # Get the HEF file paths
        hef_file_path = Files.get_model_hailo_suite_compiled_hef_file_path(
            model_name, yolo_version
        )

        # Get the labels file paths
        labels_file_path = Files.get_hailo_labels_file_path(model_name)

        # Get the model class colors
        model_class_colors = OpenCV.get_model_classes_color_palette(
            model_name
        )

        # Create the Hailo handler
        hailo_handler = Hailo(
            debug=debug,
            model_name=model_name,
            hef_file_path=hef_file_path,
            labels_path=labels_file_path,
            class_colors=model_class_colors,
            processed_images_queue=self.__processed_images_queues[
                model_name],
            inferences_queue=self.__inferences_queues[model_name],
            start_event=self.__start_event,
            stop_event=self.__stop_events[model_name],
            writer_messages_queue=writer_messages_queue
        )
        self.__hailo_handlers[model_name] = hailo_handler

        # Initialize the thread
        self.__hailo_handler_threads[model_name] = None