Saltar a contenido

Server Module

WebSocketServer

Bases: WebSocketServerABC, LoggerConsumerProtocol

Class for a WebSocket server that handles real-time tracking updates.
It allows clients to connect and receive messages about tracking events.

This is only used on practices, not in the competition, to test new features and models in real-time.

Source code in devices\raspberry_pi_5\src\server\__init__.py
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
class WebSocketServer(WebSocketServerABC, LoggerConsumerProtocol):
    """
    Class for a WebSocket server that handles real-time tracking updates.
    It allows clients to connect and receive messages about tracking events.

    This is only used on practices, not in the competition, to test new features and models in real-time.
    """

    # Logger configuration
    LOGGER_TAG = "WebSocketServer"

    # Wait timeout
    WAIT_TIMEOUT = 0.1

    def __init__(
        self,
        debug: bool,
        messages_queue: Queue,
        parking_event: EventCls,
        stop_event: EventCls,
        writer_messages_queue: Queue,
        host: str = HOST,
        port: int = PORT
    ):
        """
        Initializes the WebSocket server with the specified host and port.

        Args:
            debug (bool): Flag to indicate if the WebSocket server is in debug mode.
            messages_queue (Queue): Queue to broadcast messages through the websockets server.
            parking_event (EventCls): Event to signal the parking state of the robot.
            stop_event (EventCls): Event to signal when the websockets server should stop.
            writer_messages_queue (Queue): Queue to hold log messages.
            host (str): The host address for the WebSocket server.
            port (int): The port number for the WebSocket server.
        """
        # Initialize the debug flag
        self.__debug = debug

        # Initialize the messages queue and events
        self.__messages_queue = messages_queue
        self.__started_event = Event()
        self.__parking_event = parking_event
        self.__deleted_event = Event()
        self.__stop_event = stop_event

        # Initialize the logger
        self.__logger = Logger(
            writer_messages_queue,
            tag=self.LOGGER_TAG,
            debug=self.__debug
            )

        # Check the type of host
        is_instance(host, str)
        self.__host = host

        # Check the type of port
        is_instance(port, int)
        self.__port = port

        # Initialize the reentrant lock
        self.__rlock = RLock()

        # Initialize the connected clients set
        self.__connected_clients = set()

        # Initialize the broadcast thread
        self.__broadcast_thread = None

    @final
    @property
    def logger(self) -> Logger:
        return self.__logger

    @final
    async def _reactive_handler(self, connection) -> None:
        # Add the client to the set of connected clients
        self.__connected_clients.add(connection)
        self.__logger.debug(f"Client connected: {connection.remote_address}")

        # Send a welcome message immediately upon connection
        await self._send_message(
            connection, Message(
                Tag.CONNECTION_STATUS,
                "Connected to WebsocketServer"
            )
        )

        try:
            while not self.__stop_event.is_set():
                msg = await connection.recv()

                # Log
                self.__logger.debug(f"Received message: {msg}")

                # Check if the message is a stop event
                if msg == Tag.STOP_EVENT:
                    self.__logger.debug(
                        "Stop event received. Stopping the server..."
                    )
                    self.__stop_event.set()

                # Check if the message is a parking event
                elif msg == Tag.PARKING_EVENT:
                    if self.__parking_event.is_set():
                        self.__logger.debug(
                            "Parking event received. Resuming processing..."
                        )
                        self.__parking_event.clear()
                    else:
                        self.__logger.debug(
                            "Parking event received. Pausing processing..."
                        )
                        self.__parking_event.set()

                else:
                    # Unknown message type
                    self.__logger.warning(f"Unknown message type: {msg}")

                    await self._send_message(
                        connection,
                        Message(
                            Tag.UNKNOWN_TAG,
                            "Unknown message type received."
                        )
                    )
                    continue

                # Broadcast the received message to all connected clients
                await self._broadcast_message(msg)

        except exceptions.ConnectionClosedOK:
            self.__logger.error(
                f"Client {connection.remote_address} disconnected gracefully."
            )

        except exceptions.ConnectionClosedError as e:
            self.__logger.error(
                f"Client {connection.remote_address} disconnected with error: {e}"
            )

        except Exception as e:
            self.__logger.error(
                f"An unexpected error occurred with {connection.remote_address}: {e}"
            )

        finally:
            # Remove the client from the set of connected clients
            self.__connected_clients.discard(connection)
            self.__logger.debug(
                f"Client {connection.remote_address} disconnected."
            )

    @final
    async def _send_message(self, connection, msg: Message):
        try:
            # Check the type of connection
            is_instance(msg, Message)

            # Check if the connection is still open
            if connection.open:
                # Send the message to the client
                await connection.send(str(msg))

        except Exception as e:
            self.__logger.error(
                f"Error sending message to {connection.remote_address}: {e}"
            )

    @final
    async def _broadcast_message(self, msg: Message):
        if not self.__connected_clients:
            return

        try:
            # Check the type of msg
            is_instance(msg, Message)

            # Broadcast the message to all connected clients
            await asyncio.gather(
                *(client.send(str(msg)) for client in self.__connected_clients),
                return_exceptions=True
            )

        except Exception as e:
            self.__logger.error(
                f"Unexpected error while broadcasting message: {e}"
            )

    @final
    async def _broadcast_last_message(self) -> None:
        try:
            # Process any remaining messages in the queue
            msg = self.__messages_queue.get(timeout=self.WAIT_TIMEOUT)

            # Broadcast the last message to all connected clients
            await self._broadcast_message(msg)

        except Empty:
            # If the queue is empty, do nothing
            return None

    @final
    async def _broadcast_handler(self):
        while not self.__stop_event.is_set() and not self.__deleted_event.is_set():
            # Broadcast the last message if available
            await self._broadcast_last_message()

        # Check if there are any remaining messages in the queue
        while not self.__messages_queue.empty():
            # Broadcast the last message if available
            await self._broadcast_last_message()

    @final
    def _start(self) -> None:
        with self.__rlock:
            # Check if the stop event is set
            if self.__stop_event.is_set():
                raise RuntimeError(
                    "Stop event is set. WebSocket server will not run."
                    )

            # Check if the websocket server is already running
            if self.__started_event.is_set():
                raise RuntimeError(
                    "WebSocket server is already running. Cannot start again."
                    )

            # Set the opened event to signal that the websocket server is ready
            self.__started_event.set()

        # Log
        self.__logger.info("Initialized.")

    @final
    def _stop(self) -> None:
        with self.__rlock:
            # Clear the started event
            self.__started_event.clear()

            # Clear the deleted event
            self.__deleted_event.clear()

        # Log
        self.__logger.info("Stopped.")

    @final
    @ignore_sigint
    @log_on_error()
    async def run(self):
        with self.__rlock:
            # Check if the stop event is set
            if self.__stop_event.is_set():
                self.__logger.warning(
                    "Stop event is set. WebSocket server will not run."
                )
                return

            # Check if the websocket server is already running
            if self.__started_event.is_set():
                self.__logger.warning(
                    "WebSocket server is already running. Cannot start again."
                )
                return

            # Set the opened event to signal that the websocket server is ready
            self.__started_event.set()

        # Get the local IP address
        local_ip = get_local_ip()

        # Create a thread to handle broadcasting messages
        self.__broadcast_thread = Thread(
            target=self._broadcast_handler
        )
        self.__broadcast_thread.start()

        # Start the WebSocket server
        self.__logger.debug("WebSocket server is starting...")

        async def monitor_events():
            try:
                await asyncio.wait(
                    [asyncio.to_thread(self.__stop_event.wait),
                     asyncio.to_thread(self.__deleted_event.wait)],
                    return_when=asyncio.FIRST_COMPLETED
                )

            except asyncio.CancelledError:
                pass

        monitor_task = asyncio.create_task(monitor_events())

        try:
            async with serve(self._reactive_handler, self.__host, self.__port):
                self.__logger.info(
                    f"WebSocket server started successfully on ws://{local_ip}:{self.__port}"
                )
                await monitor_task
        finally:
            monitor_task.cancel()

        # Wait for the broadcast thread to finish
        self.__broadcast_thread.join()
        self.__broadcast_thread = None

        # Clear the opened event
        with self.__rlock:
            self.__started_event.clear()

        # Log the stopping of the server
        self.__logger.info("WebSocket server stopped.")

    def __del__(self):
        """
        Destructor to clean up resources when the websockets server is no longer needed.
        """
        self.__deleted_event.set()

        # Log
        self.__logger.debug(
            "Instance is being deleted. Resources will be cleaned up."
        )

__del__()

Destructor to clean up resources when the websockets server is no longer needed.

Source code in devices\raspberry_pi_5\src\server\__init__.py
336
337
338
339
340
341
342
343
344
345
def __del__(self):
    """
    Destructor to clean up resources when the websockets server is no longer needed.
    """
    self.__deleted_event.set()

    # Log
    self.__logger.debug(
        "Instance is being deleted. Resources will be cleaned up."
    )

__init__(debug, messages_queue, parking_event, stop_event, writer_messages_queue, host=HOST, port=PORT)

Initializes the WebSocket server with the specified host and port.

Parameters:

Name Type Description Default
debug bool

Flag to indicate if the WebSocket server is in debug mode.

required
messages_queue Queue

Queue to broadcast messages through the websockets server.

required
parking_event Event

Event to signal the parking state of the robot.

required
stop_event Event

Event to signal when the websockets server should stop.

required
writer_messages_queue Queue

Queue to hold log messages.

required
host str

The host address for the WebSocket server.

HOST
port int

The port number for the WebSocket server.

PORT
Source code in devices\raspberry_pi_5\src\server\__init__.py
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
def __init__(
    self,
    debug: bool,
    messages_queue: Queue,
    parking_event: EventCls,
    stop_event: EventCls,
    writer_messages_queue: Queue,
    host: str = HOST,
    port: int = PORT
):
    """
    Initializes the WebSocket server with the specified host and port.

    Args:
        debug (bool): Flag to indicate if the WebSocket server is in debug mode.
        messages_queue (Queue): Queue to broadcast messages through the websockets server.
        parking_event (EventCls): Event to signal the parking state of the robot.
        stop_event (EventCls): Event to signal when the websockets server should stop.
        writer_messages_queue (Queue): Queue to hold log messages.
        host (str): The host address for the WebSocket server.
        port (int): The port number for the WebSocket server.
    """
    # Initialize the debug flag
    self.__debug = debug

    # Initialize the messages queue and events
    self.__messages_queue = messages_queue
    self.__started_event = Event()
    self.__parking_event = parking_event
    self.__deleted_event = Event()
    self.__stop_event = stop_event

    # Initialize the logger
    self.__logger = Logger(
        writer_messages_queue,
        tag=self.LOGGER_TAG,
        debug=self.__debug
        )

    # Check the type of host
    is_instance(host, str)
    self.__host = host

    # Check the type of port
    is_instance(port, int)
    self.__port = port

    # Initialize the reentrant lock
    self.__rlock = RLock()

    # Initialize the connected clients set
    self.__connected_clients = set()

    # Initialize the broadcast thread
    self.__broadcast_thread = None

abstracts

DispatcherABC

Abstract class for a dispatcher that handles broadcasting messages
and images

Source code in devices\raspberry_pi_5\src\server\abstracts.py
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
class DispatcherABC:
    """
    Abstract class for a dispatcher that handles broadcasting messages
    and images
    """

    @abstractmethod
    def _broadcast_message(self, msg: Message):
        """
        Add a message to the messages queue to be sent to all connected clients.

        Args:
            msg (Message): The message to broadcast.
        """
        pass

    @abstractmethod
    def _broadcast_image_with_tag(self, tag: Tag, img: Image):
        """
        Adds an image with a specific tag to the messages queue to be sent to all connected clients.

        Args:
            tag (Tag): The tag associated with the image.
            img (Image): The image to broadcast.
        """
        pass

    @abstractmethod
    def broadcast_original_image(self, img: Image):
        """
        Adds the original image to the messages queue to be sent to all connected clients.

        Args:
            img (Image): The original image to broadcast.
        """
        pass

    @abstractmethod
    def broadcast_model_image(self, img: Image, model_name: str):
        """
        Adds a model-processed image to the messages queue to be sent to all connected clients.

        Args:
            img (Image): The image to broadcast.
            model_name (str): The name of the model that processed the image.
        """
        pass

    @abstractmethod
    def broadcast_serial_incoming_message(self, msg: str):
        """
        Adds a serial incoming message to the messages queue to be sent to all connected clients.

        Args:
            msg (str): The serial incoming message to broadcast.
        """
        pass

    @abstractmethod
    def broadcast_serial_outgoing_message(self, msg: str):
        """
        Adds a serial outgoing message to the messages queue to be sent to all connected clients.

        Args:
            msg (str): The serial outgoing message to broadcast.
        """
        pass

    @abstractmethod
    def broadcast_rplidar_measure(self, measure: Measure):
        """
        Adds a RPLIDAR measure to the messages queue to be sent to all connected clients.

        Args:
            measure (Measure): The RPLIDAR measure to broadcast.
        """
        pass

broadcast_model_image(img, model_name) abstractmethod

Adds a model-processed image to the messages queue to be sent to all connected clients.

Parameters:

Name Type Description Default
img Image

The image to broadcast.

required
model_name str

The name of the model that processed the image.

required
Source code in devices\raspberry_pi_5\src\server\abstracts.py
134
135
136
137
138
139
140
141
142
143
@abstractmethod
def broadcast_model_image(self, img: Image, model_name: str):
    """
    Adds a model-processed image to the messages queue to be sent to all connected clients.

    Args:
        img (Image): The image to broadcast.
        model_name (str): The name of the model that processed the image.
    """
    pass

broadcast_original_image(img) abstractmethod

Adds the original image to the messages queue to be sent to all connected clients.

Parameters:

Name Type Description Default
img Image

The original image to broadcast.

required
Source code in devices\raspberry_pi_5\src\server\abstracts.py
124
125
126
127
128
129
130
131
132
@abstractmethod
def broadcast_original_image(self, img: Image):
    """
    Adds the original image to the messages queue to be sent to all connected clients.

    Args:
        img (Image): The original image to broadcast.
    """
    pass

broadcast_rplidar_measure(measure) abstractmethod

Adds a RPLIDAR measure to the messages queue to be sent to all connected clients.

Parameters:

Name Type Description Default
measure Measure

The RPLIDAR measure to broadcast.

required
Source code in devices\raspberry_pi_5\src\server\abstracts.py
165
166
167
168
169
170
171
172
173
@abstractmethod
def broadcast_rplidar_measure(self, measure: Measure):
    """
    Adds a RPLIDAR measure to the messages queue to be sent to all connected clients.

    Args:
        measure (Measure): The RPLIDAR measure to broadcast.
    """
    pass

broadcast_serial_incoming_message(msg) abstractmethod

Adds a serial incoming message to the messages queue to be sent to all connected clients.

Parameters:

Name Type Description Default
msg str

The serial incoming message to broadcast.

required
Source code in devices\raspberry_pi_5\src\server\abstracts.py
145
146
147
148
149
150
151
152
153
@abstractmethod
def broadcast_serial_incoming_message(self, msg: str):
    """
    Adds a serial incoming message to the messages queue to be sent to all connected clients.

    Args:
        msg (str): The serial incoming message to broadcast.
    """
    pass

broadcast_serial_outgoing_message(msg) abstractmethod

Adds a serial outgoing message to the messages queue to be sent to all connected clients.

Parameters:

Name Type Description Default
msg str

The serial outgoing message to broadcast.

required
Source code in devices\raspberry_pi_5\src\server\abstracts.py
155
156
157
158
159
160
161
162
163
@abstractmethod
def broadcast_serial_outgoing_message(self, msg: str):
    """
    Adds a serial outgoing message to the messages queue to be sent to all connected clients.

    Args:
        msg (str): The serial outgoing message to broadcast.
    """
    pass

WebSocketServerABC

Bases: ABC

Abstract class for a WebSocket server that handles real-time tracking updates.

Source code in devices\raspberry_pi_5\src\server\abstracts.py
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
class WebSocketServerABC(ABC):
    """
    Abstract class for a WebSocket server that handles real-time tracking updates.
    """

    @abstractmethod
    def logger(self) -> Logger:
        """
        Get the logger instance for the WebSocketServer.

        Returns:
            Logger: The logger instance.
        """
        pass

    @abstractmethod
    async def _reactive_handler(self, connection) -> None:
        """
        Handles WebSocket connections and messages.

        Args:
            connection: The WebSocket connection object.
        """
        pass

    @abstractmethod
    async def _send_message(self, connection, msg: Message):
        """
        Sends a message to a specific WebSocket connection.

        Args:
            connection: The WebSocket connection to send the message to.
            msg (Message): The message to send.
        """
        pass

    @abstractmethod
    async def _broadcast_message(self, msg: Message):
        """
        Broadcasts a message to all connected WebSocket clients.

        Args:
            msg (Message): The message to broadcast.
        """
        pass

    @abstractmethod
    async def _broadcast_last_message(self) -> None:
        """
        Processes the last message in the queue and broadcasts it to all connected clients.
        """
        pass

    @abstractmethod
    async def _broadcast_handler(self):
        """
        Continuously checks the messages queue and broadcasts the last message
        to all connected clients until the stop event is set.
        """
        pass

    @abstractmethod
    def _start(self) -> None:
        """
        Starts the WebSocket server and initializes necessary components.

        Raises:
            RuntimeError: If the WebSocket server fails to start.
        """
        pass

    @abstractmethod
    def _stop(self) -> None:
        """
        Stops the WebSocket server and cleans up resources.
        """
        pass

    @abstractmethod
    async def run(self):
        """
        Starts the WebSocket server and listens for incoming connections and messages.
        """
        pass

logger() abstractmethod

Get the logger instance for the WebSocketServer.

Returns:

Name Type Description
Logger Logger

The logger instance.

Source code in devices\raspberry_pi_5\src\server\abstracts.py
16
17
18
19
20
21
22
23
24
@abstractmethod
def logger(self) -> Logger:
    """
    Get the logger instance for the WebSocketServer.

    Returns:
        Logger: The logger instance.
    """
    pass

run() abstractmethod async

Starts the WebSocket server and listens for incoming connections and messages.

Source code in devices\raspberry_pi_5\src\server\abstracts.py
89
90
91
92
93
94
@abstractmethod
async def run(self):
    """
    Starts the WebSocket server and listens for incoming connections and messages.
    """
    pass

client

ws_client() async

WebSocket client that connects to the server and sends user input.
This client will send the user's name and age to the server
and listen for incoming messages indefinitely.

Source code in devices\raspberry_pi_5\src\server\client.py
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
async def ws_client():
    """
    WebSocket client that connects to the server and sends user input.
    This client will send the user's name and age to the server
    and listen for incoming messages indefinitely.
    """
    # Connect to the server
    url = f'ws://{HOST}:{PORT}'
    print(f"Connecting to WebSocket server at {url}...")
    async with connect(url) as ws:
        # Stay alive forever, listen to incoming msgs
        while True:
            msg = await ws.recv()
            print(msg)

dispatcher

Dispatcher

Bases: DispatcherABC

Class for a dispatcher that handles broadcasting messages and images

Source code in devices\raspberry_pi_5\src\server\dispatcher.py
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
class Dispatcher(DispatcherABC):
    """
    Class for a dispatcher that handles broadcasting messages and images
    """

    # Logger configuration
    LOGGER_TAG = "Dispatcher"

    def __init__(
        self,
        server_messages_queue: Queue,
        writer_messages_queue: Queue
    ):
        """
        Initializes the Dispatcher class.

        Args:
            server_messages_queue (Queue): Queue to broadcast messages through the websockets server.
            writer_messages_queue (Queue): Queue to hold log messages.
        """
        # Initialize the server messages queue
        self.__server_messages_queue = server_messages_queue

        # Initialize the logger
        self.__logger = Logger(
            writer_messages_queue, self.LOGGER_TAG,
            unique_tag=True
        )

    @final
    def _broadcast_message(self, msg: Message):
        # Check the type of message
        is_instance(msg, Message)

        # Put the message in the server messages queue
        self.__server_messages_queue.put(msg)

    @final
    def _broadcast_image_with_tag(self, tag: Tag, img: Image):
        try:
            # Open the image and convert it to a binary stream
            img_stream = io.BytesIO()
            img.save(img_stream, format=IMAGE_FORMAT)
            img_stream.seek(0)
            binary_data = img_stream.read()

            # Send the tagged binary data to the clients
            self._broadcast_message(Message(tag, str(binary_data)))

        except Exception as e:
            self.__logger.error(
                f"Error sending image: {e}"
            ) if self.__logger else None

    @final
    def broadcast_original_image(self, img: Image):
        """
        Broadcasts the original image to all connected clients.
        """
        self._broadcast_image_with_tag(Tag.IMAGE_ORIGINAL, img)

    def _broadcast_model_g_image(self, img: Image):
        """
        Broadcasts the image processed by model G to all connected clients.
        """
        self._broadcast_image_with_tag(Tag.IMAGE_MODEL_G, img)

    def _broadcast_model_m_image(self, img: Image):
        """
        Broadcasts the image processed by model M to all connected clients.
        """
        self._broadcast_image_with_tag(Tag.IMAGE_MODEL_M, img)

    def _broadcast_model_r_image(self, img: Image):
        """
        Broadcasts the image processed by model R to all connected clients.
        """
        self._broadcast_image_with_tag(Tag.IMAGE_MODEL_R, img)

    @final
    def broadcast_model_image(self, img: Image, model_name: str):
        # Check the type of model_name
        is_instance(model_name, str)

        # Check the type of image
        is_instance(img, Image)

        if model_name == MODEL_G:
            self._broadcast_model_g_image(img)

        elif model_name == MODEL_M:
            self._broadcast_model_m_image(img)

        elif model_name == MODEL_R:
            self._broadcast_model_r_image(img)

        else:
            raise ValueError(f"Unknown model name: {model_name}")

    @final
    def broadcast_serial_incoming_message(self, msg: str):
        # Check the type of msg
        is_instance(msg, str)

        # Send a tagged message
        self._broadcast_message(Message(Tag.SERIAL_INCOMING_MESSAGE, msg))

    @final
    def broadcast_serial_outgoing_message(self, msg: str):
        # Check the type of msg
        is_instance(msg, str)

        # Send a tagged message
        self._broadcast_message(Message(Tag.SERIAL_OUTGOING_MESSAGE, msg))

    @final
    def broadcast_rplidar_measure(self, measure: Measure):
        # Check the type of measure
        is_instance(measure, Measure)

        # Send a tagged message
        self._broadcast_message(Message(Tag.RPLIDAR_MEASURES, str(measure)))

__init__(server_messages_queue, writer_messages_queue)

Initializes the Dispatcher class.

Parameters:

Name Type Description Default
server_messages_queue Queue

Queue to broadcast messages through the websockets server.

required
writer_messages_queue Queue

Queue to hold log messages.

required
Source code in devices\raspberry_pi_5\src\server\dispatcher.py
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
def __init__(
    self,
    server_messages_queue: Queue,
    writer_messages_queue: Queue
):
    """
    Initializes the Dispatcher class.

    Args:
        server_messages_queue (Queue): Queue to broadcast messages through the websockets server.
        writer_messages_queue (Queue): Queue to hold log messages.
    """
    # Initialize the server messages queue
    self.__server_messages_queue = server_messages_queue

    # Initialize the logger
    self.__logger = Logger(
        writer_messages_queue, self.LOGGER_TAG,
        unique_tag=True
    )

broadcast_original_image(img)

Broadcasts the original image to all connected clients.

Source code in devices\raspberry_pi_5\src\server\dispatcher.py
70
71
72
73
74
75
@final
def broadcast_original_image(self, img: Image):
    """
    Broadcasts the original image to all connected clients.
    """
    self._broadcast_image_with_tag(Tag.IMAGE_ORIGINAL, img)

enums

Tag

Bases: Enum

Enum to represent the tags of messages sent and received from the server.
This is used for categorizing or filtering messages based on their type.

Source code in devices\raspberry_pi_5\src\server\enums.py
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
@unique
class Tag(Enum):
    """
    Enum to represent the tags of messages sent and received from the server.
    This is used for categorizing or filtering messages based on their type.
    """
    CONNECTION_STATUS = 1
    SERIAL_INCOMING_MESSAGE = 2
    SERIAL_OUTGOING_MESSAGE = 3
    ORIGINAL_IMAGE = 4
    MODEL_G_IMAGE = 5
    MODEL_M_IMAGE = 6
    MODEL_R_IMAGE = 7
    RPLIDAR_MEASURE = 8
    STOP_EVENT = 9
    PARKING_EVENT = 10

    @staticmethod
    def from_string(tag_str: str) -> "Tag":
        """
        Convert a string to a Tag enum value.

        Args:
            tag_str (str): The string representation of the tag.

        Returns:
            Tag: The corresponding Tag enum value.
        """
        return map_string_to_enum(tag_str.upper(), Tag)

from_string(tag_str) staticmethod

Convert a string to a Tag enum value.

Parameters:

Name Type Description Default
tag_str str

The string representation of the tag.

required

Returns:

Name Type Description
Tag Tag

The corresponding Tag enum value.

Source code in devices\raspberry_pi_5\src\server\enums.py
23
24
25
26
27
28
29
30
31
32
33
34
@staticmethod
def from_string(tag_str: str) -> "Tag":
    """
    Convert a string to a Tag enum value.

    Args:
        tag_str (str): The string representation of the tag.

    Returns:
        Tag: The corresponding Tag enum value.
    """
    return map_string_to_enum(tag_str.upper(), Tag)

message

Message

A class to represent a server message.
This class is used to encapsulate the message data that will be sent
over the WebSocket connection.

Source code in devices\raspberry_pi_5\src\server\message.py
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
class Message:
    """
    A class to represent a server message.
    This class is used to encapsulate the message data that will be sent
    over the WebSocket connection.
    """

    # Tag separator used to separate the tag from the content in the message string
    TAG_SEPARATOR = ":"

    def __init__(self, tag: Tag, content: str):
        """
        Initialize the Message instance.

        Args:
            tag (Tag): A tag associated with the message, used for categorization or filtering.
            content (str): The content of the message.
        """
        self.content = content
        self.tag = tag

    @classmethod
    def from_string(cls, msg_str: str) -> "Message":
        """
        Create a Message instance from a string representation.

        Args:
            msg_str (str): The string representation of the message.

        Returns:
            Message: A new Message instance created from the string.
        """
        # Split the string into category and content
        parts = msg_str.strip().split(cls.TAG_SEPARATOR, 1)
        if len(parts) != 2:
            raise ValueError("Invalid message format")

        # Convert the tag string to a Tag enum value
        tag = Tag.from_string(parts[0])

        # Create and return the Message object
        return Message(tag, parts[1])

    def __str__(self):
        """
        Return a string representation of the Message instance.
        This is useful for logging or debugging purposes.
        """
        return f"{self.__tag}{self.TAG_SEPARATOR}{self.__content}"

    @property
    def content(self) -> str:
        """
        Get the content of the message.

        Returns:
            str: The content of the message.
        """
        return self.__content

    @content.setter
    def content(self, value: str):
        """
        Set the content of the message.

        Args:
            value (str): The new content for the message.
        """
        is_instance(value, str)
        self.__content = value

    @property
    def tag(self) -> Tag:
        """
        Get the tag of the message.

        Returns:
            Tag: The tag of the message.
        """
        return self.__tag

    @tag.setter
    def tag(self, value: Tag):
        """
        Set the tag of the message.

        Args:
            value (Tag): The new tag for the message.
        """
        is_instance(value, Tag)
        self.__tag = value

content property writable

Get the content of the message.

Returns:

Name Type Description
str str

The content of the message.

tag property writable

Get the tag of the message.

Returns:

Name Type Description
Tag Tag

The tag of the message.

__init__(tag, content)

Initialize the Message instance.

Parameters:

Name Type Description Default
tag Tag

A tag associated with the message, used for categorization or filtering.

required
content str

The content of the message.

required
Source code in devices\raspberry_pi_5\src\server\message.py
15
16
17
18
19
20
21
22
23
24
def __init__(self, tag: Tag, content: str):
    """
    Initialize the Message instance.

    Args:
        tag (Tag): A tag associated with the message, used for categorization or filtering.
        content (str): The content of the message.
    """
    self.content = content
    self.tag = tag

__str__()

Return a string representation of the Message instance.
This is useful for logging or debugging purposes.

Source code in devices\raspberry_pi_5\src\server\message.py
48
49
50
51
52
53
def __str__(self):
    """
    Return a string representation of the Message instance.
    This is useful for logging or debugging purposes.
    """
    return f"{self.__tag}{self.TAG_SEPARATOR}{self.__content}"

from_string(msg_str) classmethod

Create a Message instance from a string representation.

Parameters:

Name Type Description Default
msg_str str

The string representation of the message.

required

Returns:

Name Type Description
Message Message

A new Message instance created from the string.

Source code in devices\raspberry_pi_5\src\server\message.py
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
@classmethod
def from_string(cls, msg_str: str) -> "Message":
    """
    Create a Message instance from a string representation.

    Args:
        msg_str (str): The string representation of the message.

    Returns:
        Message: A new Message instance created from the string.
    """
    # Split the string into category and content
    parts = msg_str.strip().split(cls.TAG_SEPARATOR, 1)
    if len(parts) != 2:
        raise ValueError("Invalid message format")

    # Convert the tag string to a Tag enum value
    tag = Tag.from_string(parts[0])

    # Create and return the Message object
    return Message(tag, parts[1])

multiprocessing

websocket_server_target(debug, messages_queue, parking_event, stop_event, writer_messages_queue, host=HOST, port=PORT)

Target function for a multiprocessing process that handles the WebSocket server.

Parameters:

Name Type Description Default
debug bool

Flag to indicate if the WebSocket server is in debug mode.

required
messages_queue Queue

Queue to broadcast messages through the websockets server.

required
parking_event Event

Event to signal the parking state of the server.

required
stop_event Event

Event to signal when the websockets server should stop.

required
writer_messages_queue Queue

Queue to hold log messages.

required
host str

The host address for the WebSocket server.

HOST
port int

The port number for the WebSocket server.

PORT
Source code in devices\raspberry_pi_5\src\server\multiprocessing.py
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
def websocket_server_target(
    debug: bool,
    messages_queue: Queue,
    parking_event: EventCls,
    stop_event: EventCls,
    writer_messages_queue: Queue,
    host: str = HOST,
    port: int = PORT
) -> None:
    """
    Target function for a multiprocessing process that handles the WebSocket server.

    Args:
        debug (bool): Flag to indicate if the WebSocket server is in debug mode.
        messages_queue (Queue): Queue to broadcast messages through the websockets server.
        parking_event (EventCls): Event to signal the parking state of the server.
        stop_event (EventCls): Event to signal when the websockets server should stop.
        writer_messages_queue (Queue): Queue to hold log messages.
        host (str): The host address for the WebSocket server.
        port (int): The port number for the WebSocket server.
    """
    print(
        "Initializing WebSocketServer in multiprocessing mode. Process ID: ",
        os.getpid()
    )

    # Initialize the websocket server
    server = WebSocketServer(
        debug=debug,
        messages_queue=messages_queue,
        parking_event=parking_event,
        stop_event=stop_event,
        writer_messages_queue=writer_messages_queue,
        host=host,
        port=port
    )

    # Run the websocket server
    server.run()