Saltar a contenido

Camera Module

Camera

Bases: CameraABC, LoggerConsumerProtocol

Class implementation that wraps the functionality required for the Raspberry Pi Camera.

Source code in devices\raspberry_pi_5\src\camera\__init__.py
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
class Camera(CameraABC, LoggerConsumerProtocol):
    """
    Class implementation that wraps the functionality required for the Raspberry Pi Camera.
    """

    # Logger configuration
    LOGGER_TAG = "Camera"

    def __init__(
        self,
        writer_messages_queue: Queue,
        width: int = WIDTH,
        height: int = HEIGHT,
        rotation: int = 0,
        video_config: Optional[Dict] = None
    ):
        """
        Initialize the camera with the specified width, height, and video configuration.

        Args:
            writer_messages_queue (Queue): Queue to hold log messages.
            width (int): Width of the camera image.
            height (int): Height of the camera image.
            video_config(Optional[Dict]): Configuration for video recording.
            rotation (int): Rotation angle for the camera.
        """
        # Initialize the logger
        self.__logger = Logger(writer_messages_queue, self.LOGGER_TAG)

        # Initialize the reentrant lock
        self.__rlock = RLock()

        # Configure the camera and video settings
        self.__picam2 = Picamera2()
        self.__picam2.set_controls(
            {"AwbMode": "auto"}
        )  # Set Auto White Balance (AWB)
        self.__config = self.__picam2.create_still_configuration(
            main={"size": (width, height)}
        )
        self.__picam2.configure(self.__config)

        # Configure rotation if specified
        if rotation:
            self.__picam2.set_controls({"Rotation": rotation})

        # Set the video configuration if provided and the started preview flag
        self.__video_config = video_config
        self.__started_preview = False

    @final
    def logger(self) -> Logger:
        return self.__logger

    @final
    def _start_preview(self) -> None:
        with self.__rlock:
            # Check if the preview is already started
            if self.__started_preview:
                return

            self.__picam2.start_preview()
            self.__started_preview = True

        # Log
        self.__logger.info("Camera preview started.")

    @final
    def _stop_preview(self) -> None:
        with self.__rlock:
            # Check if the preview is running
            if not self.__started_preview:
                return

            self.__picam2.stop_preview()
            self.__started_preview = False

        # Log
        self.__logger.info("Camera preview stopped.")

    @final
    def record_video(
        self,
        width: int = WIDTH,
        height: int = HEIGHT,
        duration: int = 10,
        file_path: str = 'video.h264',
        encoder=H264Encoder()
    ) -> None:
        with self.__rlock:
            # Stop the camera preview if it is running
            self._stop_preview()

            # Configure the camera for video recording
            if not self.__video_config:
                self.__video_config = self.__picam2.create_video_configuration(
                    main={"size": (width, height)},
                    display="preview"
                )
            self.__picam2.configure(self.__video_config)

            # Get the  output
            output = FileOutput(file_path)

            # Start the recording
            self.__picam2.start_recording(encoder, output)

            # Sleep for the duration of the recording
            sleep(duration)

            # Stop the recording
            self.__picam2.stop_recording()

        # Log
        self.__logger.info(
            f"Video of {duration} seconds recording saved to {file_path}."
        )

    @final
    def capture_image(
        self,
        adjust_duration: float = ADJUST_DURATION
    ) -> Image:
        with self.__rlock:
            # Start the camera preview
            self._start_preview()

            # Allow time for the camera to adjust
            sleep(adjust_duration)

            # Capture the image
            image = self.__picam2.capture()

            # Stop the camera preview if required
            self._stop_preview()

        # Log
        self.__logger.info("Captured image.")

        return image

    @final
    def capture_image_stream(
        self,
        image_format: str = IMAGE_FORMAT,
        adjust_duration: float = ADJUST_DURATION
    ) -> io.BytesIO:
        with self.__rlock:
            # Start the camera preview
            self._start_preview()

            # Allow time for the camera to adjust
            sleep(adjust_duration)

            # Capture the image stream
            image_stream = io.BytesIO()
            self.__picam2.capture(image_stream, format=image_format)

            # Stop the camera preview
            self._stop_preview()

        # Log
        self.__logger.info("Captured image stream.")

        return image_stream

    def __del__(self):
        """
        Destructor to clean up resources when the camera is no longer needed.
        """
        # Stop the camera preview
        self._stop_preview()

        # Stop the camera
        self.__picam2.close()

        # Log
        self.__logger.info(
            "Camera instance is being deleted. Resources will be cleaned up."
        )

__del__()

Destructor to clean up resources when the camera is no longer needed.

Source code in devices\raspberry_pi_5\src\camera\__init__.py
184
185
186
187
188
189
190
191
192
193
194
195
196
197
def __del__(self):
    """
    Destructor to clean up resources when the camera is no longer needed.
    """
    # Stop the camera preview
    self._stop_preview()

    # Stop the camera
    self.__picam2.close()

    # Log
    self.__logger.info(
        "Camera instance is being deleted. Resources will be cleaned up."
    )

__init__(writer_messages_queue, width=WIDTH, height=HEIGHT, rotation=0, video_config=None)

Initialize the camera with the specified width, height, and video configuration.

Parameters:

Name Type Description Default
writer_messages_queue Queue

Queue to hold log messages.

required
width int

Width of the camera image.

WIDTH
height int

Height of the camera image.

HEIGHT
video_config(Optional[Dict])

Configuration for video recording.

required
rotation int

Rotation angle for the camera.

0
Source code in devices\raspberry_pi_5\src\camera\__init__.py
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
def __init__(
    self,
    writer_messages_queue: Queue,
    width: int = WIDTH,
    height: int = HEIGHT,
    rotation: int = 0,
    video_config: Optional[Dict] = None
):
    """
    Initialize the camera with the specified width, height, and video configuration.

    Args:
        writer_messages_queue (Queue): Queue to hold log messages.
        width (int): Width of the camera image.
        height (int): Height of the camera image.
        video_config(Optional[Dict]): Configuration for video recording.
        rotation (int): Rotation angle for the camera.
    """
    # Initialize the logger
    self.__logger = Logger(writer_messages_queue, self.LOGGER_TAG)

    # Initialize the reentrant lock
    self.__rlock = RLock()

    # Configure the camera and video settings
    self.__picam2 = Picamera2()
    self.__picam2.set_controls(
        {"AwbMode": "auto"}
    )  # Set Auto White Balance (AWB)
    self.__config = self.__picam2.create_still_configuration(
        main={"size": (width, height)}
    )
    self.__picam2.configure(self.__config)

    # Configure rotation if specified
    if rotation:
        self.__picam2.set_controls({"Rotation": rotation})

    # Set the video configuration if provided and the started preview flag
    self.__video_config = video_config
    self.__started_preview = False

abstracts

CameraABC

Bases: ABC

Abstract class that wraps the functionality required for the Raspberry Pi Camera.

Source code in devices\raspberry_pi_5\src\camera\abstracts.py
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
class CameraABC(ABC):
    """
    Abstract class that wraps the functionality required for the Raspberry Pi Camera.
    """

    @staticmethod
    def convert_image_stream_to_pil(image_stream: io.BytesIO) -> Image:
        """
        Convert a byte stream to a PIL image.

        Args:
            image_stream (io.BytesIO): Byte stream containing the image data.
        Returns:
            Image: Converted PIL image.
        """
        # Convert the image stream to a PIL image
        image_stream.seek(0)
        return Image.open(image_stream)

    @property
    @abstractmethod
    def logger(self) -> Logger:
        """
        Get the logger instance for the Camera.

        Returns:
            Logger: The logger instance.
        """
        pass

    @abstractmethod
    def _start_preview(self) -> None:
        """
        Start the camera preview.
        """
        pass

    @abstractmethod
    def _stop_preview(self) -> None:
        """
        Stop the camera preview.
        """
        pass

    @abstractmethod
    def capture_image_stream(
        self,
        image_format: str = IMAGE_FORMAT,
        adjust_duration: float = ADJUST_DURATION
    ) -> io.BytesIO:
        """
        Capture an image and return a byte stream.

        Args:
            image_format (str): Format of the image to be captured.
            adjust_duration (float): Duration to allow the camera to adjust before capturing the image.
        Returns:
            io.BytesIO: Captured image as a byte stream.
        """
        pass

    @abstractmethod
    def record_video(
        self, width: int, height: int, duration: int,
        file_path: str, encoder
    ) -> None:
        """
        Record a video with the camera.

        Args:
            width (int): Width of the video.
            height (int): Height of the video.
            duration (int): Duration of the video in seconds.
            file_path (str): Path to save the recorded video file.
            encoder: Encoder to use for video recording.
        """
        pass

    @staticmethod
    def correct_color(
        image: Image,
        factor: float = 1.1
    ) -> Image:
        """
        Apply color correction to the image.

        Args:
            image (Image): The original image.
            factor (float): Factor by which to enhance the color balance.
        Returns:
            Image: Color-corrected image.
        """
        return ImageEnhance.Color(image).enhance(factor)

logger abstractmethod property

Get the logger instance for the Camera.

Returns:

Name Type Description
Logger Logger

The logger instance.

capture_image_stream(image_format=IMAGE_FORMAT, adjust_duration=ADJUST_DURATION) abstractmethod

Capture an image and return a byte stream.

Parameters:

Name Type Description Default
image_format str

Format of the image to be captured.

IMAGE_FORMAT
adjust_duration float

Duration to allow the camera to adjust before capturing the image.

ADJUST_DURATION

Returns:
io.BytesIO: Captured image as a byte stream.

Source code in devices\raspberry_pi_5\src\camera\abstracts.py
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
@abstractmethod
def capture_image_stream(
    self,
    image_format: str = IMAGE_FORMAT,
    adjust_duration: float = ADJUST_DURATION
) -> io.BytesIO:
    """
    Capture an image and return a byte stream.

    Args:
        image_format (str): Format of the image to be captured.
        adjust_duration (float): Duration to allow the camera to adjust before capturing the image.
    Returns:
        io.BytesIO: Captured image as a byte stream.
    """
    pass

convert_image_stream_to_pil(image_stream) staticmethod

Convert a byte stream to a PIL image.

Parameters:

Name Type Description Default
image_stream BytesIO

Byte stream containing the image data.

required

Returns:
Image: Converted PIL image.

Source code in devices\raspberry_pi_5\src\camera\abstracts.py
17
18
19
20
21
22
23
24
25
26
27
28
29
@staticmethod
def convert_image_stream_to_pil(image_stream: io.BytesIO) -> Image:
    """
    Convert a byte stream to a PIL image.

    Args:
        image_stream (io.BytesIO): Byte stream containing the image data.
    Returns:
        Image: Converted PIL image.
    """
    # Convert the image stream to a PIL image
    image_stream.seek(0)
    return Image.open(image_stream)

correct_color(image, factor=1.1) staticmethod

Apply color correction to the image.

Parameters:

Name Type Description Default
image Image

The original image.

required
factor float

Factor by which to enhance the color balance.

1.1

Returns:
Image: Color-corrected image.

Source code in devices\raspberry_pi_5\src\camera\abstracts.py
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
@staticmethod
def correct_color(
    image: Image,
    factor: float = 1.1
) -> Image:
    """
    Apply color correction to the image.

    Args:
        image (Image): The original image.
        factor (float): Factor by which to enhance the color balance.
    Returns:
        Image: Color-corrected image.
    """
    return ImageEnhance.Color(image).enhance(factor)

record_video(width, height, duration, file_path, encoder) abstractmethod

Record a video with the camera.

Parameters:

Name Type Description Default
width int

Width of the video.

required
height int

Height of the video.

required
duration int

Duration of the video in seconds.

required
file_path str

Path to save the recorded video file.

required
encoder

Encoder to use for video recording.

required
Source code in devices\raspberry_pi_5\src\camera\abstracts.py
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
@abstractmethod
def record_video(
    self, width: int, height: int, duration: int,
    file_path: str, encoder
) -> None:
    """
    Record a video with the camera.

    Args:
        width (int): Width of the video.
        height (int): Height of the video.
        duration (int): Duration of the video in seconds.
        file_path (str): Path to save the recorded video file.
        encoder: Encoder to use for video recording.
    """
    pass

PhotographerABC

Bases: ABC

Abstract class to handle image processing for the camera.

Source code in devices\raspberry_pi_5\src\camera\abstracts.py
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
class PhotographerABC(ABC):
    """
    Abstract class to handle image processing for the camera.
    """

    @abstractmethod
    def logger(self) -> Logger:
        """
        Get the logger instance for the Photographer.

        Returns:
            Logger: The logger instance.
        """
        pass

    @abstractmethod
    def _start(self) -> None:
        """
        Start the photographer.

        Raises:
            RuntimeError: If the photographer fails to start.
        """
        pass

    @abstractmethod
    def _stop(self) -> None:
        """
        Stop the photographer.
        """
        pass

    @abstractmethod
    def _capture_image(self) -> None:
        """
        Capture an image from the camera and sends it to the corresponding queue.
        """
        pass

    @abstractmethod
    def run(self):
        """
        Loop to capture images and put them in the input photographer.
        """
        pass

logger() abstractmethod

Get the logger instance for the Photographer.

Returns:

Name Type Description
Logger Logger

The logger instance.

Source code in devices\raspberry_pi_5\src\camera\abstracts.py
112
113
114
115
116
117
118
119
120
@abstractmethod
def logger(self) -> Logger:
    """
    Get the logger instance for the Photographer.

    Returns:
        Logger: The logger instance.
    """
    pass

run() abstractmethod

Loop to capture images and put them in the input photographer.

Source code in devices\raspberry_pi_5\src\camera\abstracts.py
146
147
148
149
150
151
@abstractmethod
def run(self):
    """
    Loop to capture images and put them in the input photographer.
    """
    pass

multiprocessing

photographer_target(debug, images_queue, capture_image_event, start_event, stop_event, writer_messages_queue, preprocess_fn, server_messages_queue=None)

Target function for a multiprocessing process that handles photography tasks.

Parameters:

Name Type Description Default
debug bool

Flag to indicate if the photographer is in debug mode.

required
images_queue Queue

Queue to hold input images for processing.

required
capture_image_event Event

Event to signal when an image should be captured.

required
start_event Event

Event to signal when the photographer should start.

required
stop_event Event

Event to signal when the logger should stop.

required
writer_messages_queue Queue

Queue to hold log messages.

required
preprocess_fn Callable[[Image], ndarray]

Callable[[Image], np.ndarray]: Function to preprocess images before inference.

required
server_messages_queue Optional[Queue]

Queue to broadcast messages through the websockets server, if any.

None
Source code in devices\raspberry_pi_5\src\camera\multiprocessing.py
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
def photographer_target(
    debug: bool,
    images_queue: Queue,
    capture_image_event: EventCls,
    start_event: EventCls,
    stop_event: EventCls,
    writer_messages_queue: Queue,
    preprocess_fn: Callable[[Image], np.ndarray],
    server_messages_queue: Optional[Queue] = None
):
    """
    Target function for a multiprocessing process that handles photography tasks.

    Args:
        debug (bool): Flag to indicate if the photographer is in debug mode.
        images_queue (Queue): Queue to hold input images for processing.
        capture_image_event (EventCls): Event to signal when an image should be captured.
        start_event (EventCls): Event to signal when the photographer should start.
        stop_event (EventCls): Event to signal when the logger should stop.
        writer_messages_queue (Queue): Queue to hold log messages.
        preprocess_fn: Callable[[Image], np.ndarray]: Function to preprocess images before inference.
        server_messages_queue (Optional[Queue]): Queue to broadcast messages through the websockets server, if any.
    """
    print(
        "Initializing Photographer in multiprocessing mode. Process ID: ",
        os.getpid()
    )

    # Initialize the camera
    camera = Camera(writer_messages_queue=writer_messages_queue)

    # Initialize the photographer
    photographer = Photographer(
        debug=debug,
        camera=camera,
        images_queue=images_queue,
        capture_image_event=capture_image_event,
        start_event=start_event,
        stop_event=stop_event,
        writer_messages_queue=writer_messages_queue,
        preprocess_fn=preprocess_fn,
        server_messages_queue=server_messages_queue
    )

    # Run the photographer
    photographer.run()

photographer

Photographer

Bases: PhotographerABC, LoggerConsumerProtocol

Class to handle image processing for the camera.

Source code in devices\raspberry_pi_5\src\camera\photographer.py
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
class Photographer(PhotographerABC, LoggerConsumerProtocol):
    """
    Class to handle image processing for the camera.
    """

    # Logger configuration
    LOGGER_TAG = "Photographer"

    # Wait timeout
    WAIT_TIMEOUT = 0.1

    # Wait timeout for the start event
    START_WAIT_TIMEOUT = 0.1

    def __init__(
        self,
        debug: bool,
        camera: CameraABC,
        images_queue: Queue,
        capture_image_event: EventCls,
        start_event: EventCls,
        stop_event: EventCls,
        writer_messages_queue: Queue,
        preprocess_fn: Callable[[Image], np.ndarray],
        server_messages_queue: Optional[Queue] = None
    ):
        """
        Initialize the Photographer class.

        Args:
            debug (bool): Flag to indicate if the photographer is in debug mode.
            camera (CameraABC): Camera instance for capturing images.
            images_queue (Queue): Queue to hold input images for processing.
            capture_image_event (EventCls): Event to signal when an image should be captured.
            start_event (EventCls): Event to signal when the photographer should start.
            stop_event (EventCls): Event to signal when the photographer should stop processing images.
            writer_messages_queue (Queue): Queue to hold log messages.
            preprocess_fn: Callable[[Image], np.ndarray]: Function to preprocess images before inference.
            server_messages_queue (Optional[Queue]): Queue to broadcast messages through the websockets server, if any.
        """
        # Initialize the debug flag
        self.__debug = debug

        # Initialize the queues and events
        self.__images_queue = images_queue
        self.__capture_image_event = capture_image_event
        self.__start_event = start_event
        self.__started_event = Event()
        self.__deleted_event = Event()
        self.__stop_event = stop_event

        # Check the type of camera
        is_instance(camera, CameraABC)
        self.__camera: CameraABC = camera

        # Initialize the reentrant lock
        self.__rlock = RLock()

        # Initialize the logger
        self.__logger = Logger(
            writer_messages_queue,
            tag=self.LOGGER_TAG,
            debug=self.__debug
            )

        # Check the type of preprocess function
        is_instance(preprocess_fn, Callable)
        self.__preprocess_fn = preprocess_fn

        # Initialize the dispatcher for broadcasting messages
        self.__dispatcher = Dispatcher(
            server_messages_queue,
            writer_messages_queue
        ) if server_messages_queue else None

        # Initialize the image counter
        self.__imager_counter = 0

    @final
    @property
    def logger(self) -> Logger:
        return self.__logger

    @final
    def _start(self) -> None:
        with self.__rlock:
            # Check if the stop event is set
            if self.__stop_event.is_set():
                raise RuntimeError(
                    "Stop event is set. Photographer will not run."
                )

            # Check if the photographer is already running
            if self.__started_event.is_set():
                raise RuntimeError(
                    "Photographer is already running. Cannot start again."
                )

            # Set the started event
            self.__started_event.set()

        # Log
        self.__logger.info("Initialized.")

    @final
    def _stop(self) -> None:
        with self.__rlock:
            # Clear the started event
            self.__started_event.clear()

            # Clear the deleted event
            self.__deleted_event.clear()

            # Clear the capture image event
            self.__capture_image_event.clear()

            # Reset the image counter
            self.__imager_counter = 0

        # Log
        self.__logger.info("Stopped.")

    @final
    def _capture_image(self) -> None:
        # Wait for the capture image event
        capture_image = self.__capture_image_event.wait(
            timeout=self.WAIT_TIMEOUT
        )
        if not capture_image:
            return None

        # Capture image stream from camera
        image_stream = self.__camera.capture_image_stream()

        # Convert the image stream to a PIL Image
        image = self.__camera.convert_image_stream_to_pil(image_stream)

        # Preprocess the image
        preprocessed_image = self.__preprocess_fn(image)

        # Put image in input image processing queue
        self.__images_queue.put(preprocessed_image)

        # Increment the image counter
        self.__imager_counter += 1

        # Log
        self.__logger.debug(
            f"Image {self.__imager_counter} added to images queue."
            )

        # Clear the capture image event
        self.__capture_image_event.clear()

        # If the dispatcher is available, broadcast the original image
        self.__dispatcher.broadcast_original_image(
            image
        ) if self.__dispatcher else None

    @final
    @ignore_sigint
    @log_on_error()
    def run(self):
        # Start the photographer
        self._start()

        # Wait for the start event
        self.__logger.info("Waiting for the start event...")
        while not self.__stop_event.is_set() and not self.__deleted_event.is_set():
            if self.__start_event.wait(timeout=self.START_WAIT_TIMEOUT):
                break
        if self.__stop_event.is_set() or self.__deleted_event.is_set():
            # Stop the photographer if the stop or deleted event is set
            self._stop()
            return
        self.__logger.info("Started.")

        try:
            # Capture images until the photographer is stopped
            while not self.__stop_event.is_set() and not self.__deleted_event.is_set():
                self._capture_image()

            # Stop the photographer
            self._stop()

        except Exception as e:
            # Stop the photographer in case of an exception
            self._stop()
            raise e

    def __del__(self):
        """
        Destructor to clean up resources when the photographer is no longer needed.
        """
        self.__deleted_event.set()

        # Log
        self.__logger.info(
            "Instance is being deleted. Resources will be cleaned up."
        )

__del__()

Destructor to clean up resources when the photographer is no longer needed.

Source code in devices\raspberry_pi_5\src\camera\photographer.py
207
208
209
210
211
212
213
214
215
216
def __del__(self):
    """
    Destructor to clean up resources when the photographer is no longer needed.
    """
    self.__deleted_event.set()

    # Log
    self.__logger.info(
        "Instance is being deleted. Resources will be cleaned up."
    )

__init__(debug, camera, images_queue, capture_image_event, start_event, stop_event, writer_messages_queue, preprocess_fn, server_messages_queue=None)

Initialize the Photographer class.

Parameters:

Name Type Description Default
debug bool

Flag to indicate if the photographer is in debug mode.

required
camera CameraABC

Camera instance for capturing images.

required
images_queue Queue

Queue to hold input images for processing.

required
capture_image_event Event

Event to signal when an image should be captured.

required
start_event Event

Event to signal when the photographer should start.

required
stop_event Event

Event to signal when the photographer should stop processing images.

required
writer_messages_queue Queue

Queue to hold log messages.

required
preprocess_fn Callable[[Image], ndarray]

Callable[[Image], np.ndarray]: Function to preprocess images before inference.

required
server_messages_queue Optional[Queue]

Queue to broadcast messages through the websockets server, if any.

None
Source code in devices\raspberry_pi_5\src\camera\photographer.py
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
def __init__(
    self,
    debug: bool,
    camera: CameraABC,
    images_queue: Queue,
    capture_image_event: EventCls,
    start_event: EventCls,
    stop_event: EventCls,
    writer_messages_queue: Queue,
    preprocess_fn: Callable[[Image], np.ndarray],
    server_messages_queue: Optional[Queue] = None
):
    """
    Initialize the Photographer class.

    Args:
        debug (bool): Flag to indicate if the photographer is in debug mode.
        camera (CameraABC): Camera instance for capturing images.
        images_queue (Queue): Queue to hold input images for processing.
        capture_image_event (EventCls): Event to signal when an image should be captured.
        start_event (EventCls): Event to signal when the photographer should start.
        stop_event (EventCls): Event to signal when the photographer should stop processing images.
        writer_messages_queue (Queue): Queue to hold log messages.
        preprocess_fn: Callable[[Image], np.ndarray]: Function to preprocess images before inference.
        server_messages_queue (Optional[Queue]): Queue to broadcast messages through the websockets server, if any.
    """
    # Initialize the debug flag
    self.__debug = debug

    # Initialize the queues and events
    self.__images_queue = images_queue
    self.__capture_image_event = capture_image_event
    self.__start_event = start_event
    self.__started_event = Event()
    self.__deleted_event = Event()
    self.__stop_event = stop_event

    # Check the type of camera
    is_instance(camera, CameraABC)
    self.__camera: CameraABC = camera

    # Initialize the reentrant lock
    self.__rlock = RLock()

    # Initialize the logger
    self.__logger = Logger(
        writer_messages_queue,
        tag=self.LOGGER_TAG,
        debug=self.__debug
        )

    # Check the type of preprocess function
    is_instance(preprocess_fn, Callable)
    self.__preprocess_fn = preprocess_fn

    # Initialize the dispatcher for broadcasting messages
    self.__dispatcher = Dispatcher(
        server_messages_queue,
        writer_messages_queue
    ) if server_messages_queue else None

    # Initialize the image counter
    self.__imager_counter = 0

server

StreamingServer

Bases: BaseHTTPRequestHandler

Streaming server for live video feed.

Source code in devices\raspberry_pi_5\src\camera\server\__init__.py
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
class StreamingServer(BaseHTTPRequestHandler):
    """
    Streaming server for live video feed.
    """

    @staticmethod
    def generate_frames():
        """
        Generate frames from the camera using libcamera-vid.
        """
        # Execute the libcamera-vid command to capture video
        command = f'libcamera-vid -n -t 0 --width {WIDTH} --height {HEIGHT} --framerate {FPS} --codec {CODEC} -o -'
        process = subprocess.Popen(
            shlex.split(command), stdout=subprocess.PIPE,
            stderr=subprocess.PIPE,
            bufsize=-1
        )

        try:
            while True:
                # Read the JPEG header (2 bytes)
                header = process.stdout.read(2)
                if not header:
                    print("No more data from libcamera-vid (header)")
                    break
                if header != b'\xff\xd8':
                    print(f"Incorrect JPEG SOI marker: {header}")
                    continue

                # Read the rest of the JPEG frame
                frame_data = header
                while True:
                    byte = process.stdout.read(1)
                    if not byte:
                        print("No more data from libcamera-vid (frame)")
                        break

                    frame_data += byte
                    if frame_data.endswith(b'\xff\xd9'):
                        print(
                            f"Found complete JPEG frame, size: {len(frame_data)}"
                        )
                        yield (frame_data)
                        break
        finally:
            process.terminate()
            stderr_output = process.stderr.read().decode()
            if stderr_output:
                print(f"libcamera-vid stderr: {stderr_output}")

    def do_GET(self):
        """
        Handle GET requests for the streaming server.
        """
        if self.path == '/':
            self.send_response(200)
            self.send_header('Content-type', 'text/html')
            self.end_headers()
            self.wfile.write(
                b'<html><head></head><body><h1>Live Stream</h1><img src="stream.mjpg" width="640" height="480" /></body></html>'
            )
        elif self.path == '/stream.mjpg':
            self.send_response(200)
            self.send_header(
                'Content-type',
                'multipart/x-mixed-replace; boundary=frame'
            )
            self.end_headers()
            try:
                # Start generating frames
                for frame in self.generate_frames():
                    # Write the frame to the response
                    self.wfile.write(
                        b'--frame\r\n'
                        b'Content-Type: image/jpeg\r\n\r\n' + frame + b'\r\n'
                    )
            except Exception as e:
                print(f"Error streaming: {e}")
        else:
            self.send_error(404)
            self.end_headers()

do_GET()

Handle GET requests for the streaming server.

Source code in devices\raspberry_pi_5\src\camera\server\__init__.py
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
def do_GET(self):
    """
    Handle GET requests for the streaming server.
    """
    if self.path == '/':
        self.send_response(200)
        self.send_header('Content-type', 'text/html')
        self.end_headers()
        self.wfile.write(
            b'<html><head></head><body><h1>Live Stream</h1><img src="stream.mjpg" width="640" height="480" /></body></html>'
        )
    elif self.path == '/stream.mjpg':
        self.send_response(200)
        self.send_header(
            'Content-type',
            'multipart/x-mixed-replace; boundary=frame'
        )
        self.end_headers()
        try:
            # Start generating frames
            for frame in self.generate_frames():
                # Write the frame to the response
                self.wfile.write(
                    b'--frame\r\n'
                    b'Content-Type: image/jpeg\r\n\r\n' + frame + b'\r\n'
                )
        except Exception as e:
            print(f"Error streaming: {e}")
    else:
        self.send_error(404)
        self.end_headers()

generate_frames() staticmethod

Generate frames from the camera using libcamera-vid.

Source code in devices\raspberry_pi_5\src\camera\server\__init__.py
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
@staticmethod
def generate_frames():
    """
    Generate frames from the camera using libcamera-vid.
    """
    # Execute the libcamera-vid command to capture video
    command = f'libcamera-vid -n -t 0 --width {WIDTH} --height {HEIGHT} --framerate {FPS} --codec {CODEC} -o -'
    process = subprocess.Popen(
        shlex.split(command), stdout=subprocess.PIPE,
        stderr=subprocess.PIPE,
        bufsize=-1
    )

    try:
        while True:
            # Read the JPEG header (2 bytes)
            header = process.stdout.read(2)
            if not header:
                print("No more data from libcamera-vid (header)")
                break
            if header != b'\xff\xd8':
                print(f"Incorrect JPEG SOI marker: {header}")
                continue

            # Read the rest of the JPEG frame
            frame_data = header
            while True:
                byte = process.stdout.read(1)
                if not byte:
                    print("No more data from libcamera-vid (frame)")
                    break

                frame_data += byte
                if frame_data.endswith(b'\xff\xd9'):
                    print(
                        f"Found complete JPEG frame, size: {len(frame_data)}"
                    )
                    yield (frame_data)
                    break
    finally:
        process.terminate()
        stderr_output = process.stderr.read().decode()
        if stderr_output:
            print(f"libcamera-vid stderr: {stderr_output}")