Saltar a contenido

Serial Communication Module

SerialCommunication

Bases: SerialCommunicationABC, LoggerConsumerProtocol

Class to handle the serial communication through USB.

Source code in devices\raspberry_pi_5\src\serial_communication\__init__.py
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
class SerialCommunication(SerialCommunicationABC, LoggerConsumerProtocol):
    """
    Class to handle the serial communication through USB.
    """

    # Logger configuration
    LOGGER_TAG = "SerialCommunication"

    def __init__(
        self,
        debug: bool,
        challenge: ValueCls,
        start_event: EventCls,
        stop_event: EventCls,
        bno08x_yaw_deg: ValueCls,
        bno08x_turns: ValueCls,
        sender_messages_queue: Queue,
        writer_messages_queue: Queue,
        server_messages_queue: Optional[Queue] = None,
        console_ports: Optional[List[str]] = RASPBERRY_PI_PICO_CONSOLE_PORTS,
        data_ports: Optional[List[str]] = RASPBERRY_PI_PICO_DATA_PORTS,
        baudrate: Optional[int] = RASPBERRY_PI_PICO_BAUDRATE
    ):
        """
        Initialize the serial communication class.

        Args:
            debug (bool): Flag to indicate if the receiver is in debug mode.
            challenge (ValueCls): Shared value to hold the current challenge.
            start_event (EventCls): Event to signal when the serial communication has started.
            stop_event (EventCls): Event to signal when the serial communication should stop sending and receiving messages.
            bno08x_yaw_deg (ValueCls): Shared value for the BNO08X yaw angle in degrees.
            bno08x_turns (ValueCls): Shared value for the BNO08X turns.
            sender_messages_queue (Queue): Queue to hold outgoing messages of the serial port.
            writer_messages_queue (Queue): Queue to hold log messages.
            server_messages_queue (Optional[Queue]): Queue to broadcast the messages through the websockets server.
            console_ports (Optional[List[str]]): List of serial ports used for receiving data from Pico.
            data_ports (Optional[List[str]]): List of serial ports used for sending data to Pico.
            baudrate (Optional[int]): Baud rate for the serial communication.
        """
        # Initialize the events
        self.__stop_sent_event = Event()
        self.__stop_confirmation_event = Event()
        self.__stop_event = stop_event

        # Initialize the serial communication receiver
        self.__serial_receiver = Receiver(
            debug=debug,
            challenge=challenge,
            start_event=start_event,
            stop_sent_event=self.__stop_sent_event,
            stop_confirmation_event=self.__stop_confirmation_event,
            stop_event=stop_event,
            bno08x_yaw_deg=bno08x_yaw_deg,
            bno08x_turns=bno08x_turns,
            sender_messages_queue=sender_messages_queue,
            writer_messages_queue=writer_messages_queue,
            server_messages_queue=server_messages_queue,
            console_ports=console_ports,
            baudrate=baudrate
        )

        # Initialize the serial communication sender
        self.__serial_sender = Sender(
            debug=debug,
            start_event=start_event,
            stop_sent_event=self.__stop_sent_event,
            stop_confirmation_event=self.__stop_confirmation_event,
            stop_event=stop_event,
            messages_queue=sender_messages_queue,
            writer_messages_queue=writer_messages_queue,
            server_messages_queue=server_messages_queue,
            data_ports=data_ports,
            baudrate=baudrate
        )

        # Initialize the logger
        self.__logger = Logger(writer_messages_queue, self.LOGGER_TAG)

        # Initialize the threads
        self.__receiving_thread = None
        self.__sending_thread = None

    @final
    @property
    def logger(self) -> Logger:
        return self.__logger

    @final
    @ignore_sigint
    def run(self) -> None:
        try:
            # Create the receiving thread
            self.__logger.info(
                "Starting the serial communication receiver thread."
            )
            self.__receiving_thread = Thread(
                target=self.__serial_receiver.run
            )
            self.__receiving_thread.start()

            # Create the sending thread
            self.__logger.info(
                "Starting the serial communication sender thread."
            )
            self.__sending_thread = Thread(target=self.__serial_sender.run)
            self.__sending_thread.start()

            # Wait for the receiving thread to finish
            self.__receiving_thread.join()
            self.__receiving_thread = None

            # Wait for the sending thread to finish
            self.__sending_thread.join()
            self.__sending_thread = None

        except Exception as e:
            # Set the stop event in case of an exception
            self.__stop_event.set()
            raise e

    def __del__(self):
        """
        Destructor to clean up resources when the SerialCommunication instance is deleted.
        """
        self.__stop_event.set()

        # Log
        self.__logger.info(
            "Instance is being deleted. Resources will be cleaned up."
        )

__del__()

Destructor to clean up resources when the SerialCommunication instance is deleted.

Source code in devices\raspberry_pi_5\src\serial_communication\__init__.py
141
142
143
144
145
146
147
148
149
150
def __del__(self):
    """
    Destructor to clean up resources when the SerialCommunication instance is deleted.
    """
    self.__stop_event.set()

    # Log
    self.__logger.info(
        "Instance is being deleted. Resources will be cleaned up."
    )

__init__(debug, challenge, start_event, stop_event, bno08x_yaw_deg, bno08x_turns, sender_messages_queue, writer_messages_queue, server_messages_queue=None, console_ports=RASPBERRY_PI_PICO_CONSOLE_PORTS, data_ports=RASPBERRY_PI_PICO_DATA_PORTS, baudrate=RASPBERRY_PI_PICO_BAUDRATE)

Initialize the serial communication class.

Parameters:

Name Type Description Default
debug bool

Flag to indicate if the receiver is in debug mode.

required
challenge Value

Shared value to hold the current challenge.

required
start_event Event

Event to signal when the serial communication has started.

required
stop_event Event

Event to signal when the serial communication should stop sending and receiving messages.

required
bno08x_yaw_deg Value

Shared value for the BNO08X yaw angle in degrees.

required
bno08x_turns Value

Shared value for the BNO08X turns.

required
sender_messages_queue Queue

Queue to hold outgoing messages of the serial port.

required
writer_messages_queue Queue

Queue to hold log messages.

required
server_messages_queue Optional[Queue]

Queue to broadcast the messages through the websockets server.

None
console_ports Optional[List[str]]

List of serial ports used for receiving data from Pico.

RASPBERRY_PI_PICO_CONSOLE_PORTS
data_ports Optional[List[str]]

List of serial ports used for sending data to Pico.

RASPBERRY_PI_PICO_DATA_PORTS
baudrate Optional[int]

Baud rate for the serial communication.

RASPBERRY_PI_PICO_BAUDRATE
Source code in devices\raspberry_pi_5\src\serial_communication\__init__.py
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
def __init__(
    self,
    debug: bool,
    challenge: ValueCls,
    start_event: EventCls,
    stop_event: EventCls,
    bno08x_yaw_deg: ValueCls,
    bno08x_turns: ValueCls,
    sender_messages_queue: Queue,
    writer_messages_queue: Queue,
    server_messages_queue: Optional[Queue] = None,
    console_ports: Optional[List[str]] = RASPBERRY_PI_PICO_CONSOLE_PORTS,
    data_ports: Optional[List[str]] = RASPBERRY_PI_PICO_DATA_PORTS,
    baudrate: Optional[int] = RASPBERRY_PI_PICO_BAUDRATE
):
    """
    Initialize the serial communication class.

    Args:
        debug (bool): Flag to indicate if the receiver is in debug mode.
        challenge (ValueCls): Shared value to hold the current challenge.
        start_event (EventCls): Event to signal when the serial communication has started.
        stop_event (EventCls): Event to signal when the serial communication should stop sending and receiving messages.
        bno08x_yaw_deg (ValueCls): Shared value for the BNO08X yaw angle in degrees.
        bno08x_turns (ValueCls): Shared value for the BNO08X turns.
        sender_messages_queue (Queue): Queue to hold outgoing messages of the serial port.
        writer_messages_queue (Queue): Queue to hold log messages.
        server_messages_queue (Optional[Queue]): Queue to broadcast the messages through the websockets server.
        console_ports (Optional[List[str]]): List of serial ports used for receiving data from Pico.
        data_ports (Optional[List[str]]): List of serial ports used for sending data to Pico.
        baudrate (Optional[int]): Baud rate for the serial communication.
    """
    # Initialize the events
    self.__stop_sent_event = Event()
    self.__stop_confirmation_event = Event()
    self.__stop_event = stop_event

    # Initialize the serial communication receiver
    self.__serial_receiver = Receiver(
        debug=debug,
        challenge=challenge,
        start_event=start_event,
        stop_sent_event=self.__stop_sent_event,
        stop_confirmation_event=self.__stop_confirmation_event,
        stop_event=stop_event,
        bno08x_yaw_deg=bno08x_yaw_deg,
        bno08x_turns=bno08x_turns,
        sender_messages_queue=sender_messages_queue,
        writer_messages_queue=writer_messages_queue,
        server_messages_queue=server_messages_queue,
        console_ports=console_ports,
        baudrate=baudrate
    )

    # Initialize the serial communication sender
    self.__serial_sender = Sender(
        debug=debug,
        start_event=start_event,
        stop_sent_event=self.__stop_sent_event,
        stop_confirmation_event=self.__stop_confirmation_event,
        stop_event=stop_event,
        messages_queue=sender_messages_queue,
        writer_messages_queue=writer_messages_queue,
        server_messages_queue=server_messages_queue,
        data_ports=data_ports,
        baudrate=baudrate
    )

    # Initialize the logger
    self.__logger = Logger(writer_messages_queue, self.LOGGER_TAG)

    # Initialize the threads
    self.__receiving_thread = None
    self.__sending_thread = None

abstracts

DispatcherABC

Abstract class for a dispatcher that handles incoming and outgoing messages.

Source code in devices\raspberry_pi_5\src\serial_communication\abstracts.py
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
class DispatcherABC:
    """
    Abstract class for a dispatcher that handles incoming and outgoing messages.
    """

    @abstractmethod
    def _send_message(self, msg: OutgoingMessage) -> None:
        """
        Put a message in the outgoing messages queue.

        Args:
            msg (OutgoingMessage): The message to put in the queue.
        """
        pass

    @abstractmethod
    def send_motor_speed_message(self, speed: float) -> None:
        """
        Send the motor speed to the serial port.

        Args:
            speed (float): The speed of the motor.
        """
        pass

    @abstractmethod
    def send_servo_angle_message(self, angle: float) -> None:
        """
        Send the servo angle to the serial port.

        Args:
            angle (float): The angle of the servo.
        """
        pass

    @abstractmethod
    def send_confirmation_message(self) -> None:
        """
        Send a confirmation message to the serial port.
        """
        pass

    @abstractmethod
    def send_stop_message(self) -> None:
        """
        Send a stop message to the serial port.
        """
        pass

send_confirmation_message() abstractmethod

Send a confirmation message to the serial port.

Source code in devices\raspberry_pi_5\src\serial_communication\abstracts.py
182
183
184
185
186
187
@abstractmethod
def send_confirmation_message(self) -> None:
    """
    Send a confirmation message to the serial port.
    """
    pass

send_motor_speed_message(speed) abstractmethod

Send the motor speed to the serial port.

Parameters:

Name Type Description Default
speed float

The speed of the motor.

required
Source code in devices\raspberry_pi_5\src\serial_communication\abstracts.py
162
163
164
165
166
167
168
169
170
@abstractmethod
def send_motor_speed_message(self, speed: float) -> None:
    """
    Send the motor speed to the serial port.

    Args:
        speed (float): The speed of the motor.
    """
    pass

send_servo_angle_message(angle) abstractmethod

Send the servo angle to the serial port.

Parameters:

Name Type Description Default
angle float

The angle of the servo.

required
Source code in devices\raspberry_pi_5\src\serial_communication\abstracts.py
172
173
174
175
176
177
178
179
180
@abstractmethod
def send_servo_angle_message(self, angle: float) -> None:
    """
    Send the servo angle to the serial port.

    Args:
        angle (float): The angle of the servo.
    """
    pass

send_stop_message() abstractmethod

Send a stop message to the serial port.

Source code in devices\raspberry_pi_5\src\serial_communication\abstracts.py
189
190
191
192
193
194
@abstractmethod
def send_stop_message(self) -> None:
    """
    Send a stop message to the serial port.
    """
    pass

ReceiverABC

Bases: ABC

Receiver abstract class to handle serial communication with the Raspberry Pi Pico.

Source code in devices\raspberry_pi_5\src\serial_communication\abstracts.py
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
class ReceiverABC(ABC):
    """
    Receiver abstract class to handle serial communication with the Raspberry Pi Pico.
    """

    @abstractmethod
    def logger(self) -> Logger:
        """
        Get the logger instance for the Receiver.

        Returns:
            Logger: The logger instance.
        """
        pass

    @abstractmethod
    def _open_port(self, port: str) -> None:
        """
        Open the console port for communication.

        Args:
            port (str): The console port to open.
        Raises:
            RuntimeError: If the console port cannot be opened.
        """
        pass

    @abstractmethod
    def _start(self) -> None:
        """
        Start the serial communication.

        Raises:
            RuntimeError: If the serial port cannot be opened.
        """
        pass

    @abstractmethod
    def _stop(self) -> None:
        """
        Stop the serial communication.
        """
        pass

    @abstractmethod
    def _receive_latest_message(self) -> IncomingMessage | None:
        """
        Receive the latest message from the serial port.

        Returns:
            IncomingMessage | None: The latest incoming message or None if no message is available.
        """
        pass

    @abstractmethod
    def run(self) -> None:
        """
        Handler to receive messages from the serial port.

        Raises:
            RuntimeError: If an error message is received or if the confirmation message is not received within a timeout.
        """
        pass

logger() abstractmethod

Get the logger instance for the Receiver.

Returns:

Name Type Description
Logger Logger

The logger instance.

Source code in devices\raspberry_pi_5\src\serial_communication\abstracts.py
35
36
37
38
39
40
41
42
43
@abstractmethod
def logger(self) -> Logger:
    """
    Get the logger instance for the Receiver.

    Returns:
        Logger: The logger instance.
    """
    pass

run() abstractmethod

Handler to receive messages from the serial port.

Raises:

Type Description
RuntimeError

If an error message is received or if the confirmation message is not received within a timeout.

Source code in devices\raspberry_pi_5\src\serial_communication\abstracts.py
84
85
86
87
88
89
90
91
92
@abstractmethod
def run(self) -> None:
    """
    Handler to receive messages from the serial port.

    Raises:
        RuntimeError: If an error message is received or if the confirmation message is not received within a timeout.
    """
    pass

SenderABC

Bases: ABC

Sender abstract class to handle sending messages through serial communication.

Source code in devices\raspberry_pi_5\src\serial_communication\abstracts.py
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
class SenderABC(ABC):
    """
    Sender abstract class to handle sending messages through serial communication.
    """

    @abstractmethod
    def logger(self) -> Logger:
        """
        Get the logger instance for the Sender.

        Returns:
            Logger: The logger instance.
        """
        pass

    @abstractmethod
    def _open_port(self, port: str) -> None:
        """
        Open the data port for serial communication.

        Args:
            port (str): The data port to open.
        Raises:
            RuntimeError: If the data port cannot be opened.
        """
        pass

    @abstractmethod
    def _send_latest_message(self) -> None:
        """
        Sends the latest message from the outgoing messages queue.
        """
        pass

    @abstractmethod
    def _send_confirmation_message(self) -> None:
        """
        Send a confirmation message to the serial port.
        """
        pass

    @abstractmethod
    def run(self) -> None:
        """
        Handler to send messages to the serial port.

        Raises:
            RuntimeError: If an error occurs while sending a message or if the confirmation message is not received within a timeout.
        """
        pass

logger() abstractmethod

Get the logger instance for the Sender.

Returns:

Name Type Description
Logger Logger

The logger instance.

Source code in devices\raspberry_pi_5\src\serial_communication\abstracts.py
100
101
102
103
104
105
106
107
108
@abstractmethod
def logger(self) -> Logger:
    """
    Get the logger instance for the Sender.

    Returns:
        Logger: The logger instance.
    """
    pass

run() abstractmethod

Handler to send messages to the serial port.

Raises:

Type Description
RuntimeError

If an error occurs while sending a message or if the confirmation message is not received within a timeout.

Source code in devices\raspberry_pi_5\src\serial_communication\abstracts.py
136
137
138
139
140
141
142
143
144
@abstractmethod
def run(self) -> None:
    """
    Handler to send messages to the serial port.

    Raises:
        RuntimeError: If an error occurs while sending a message or if the confirmation message is not received within a timeout.
    """
    pass

SerialCommunicationABC

Bases: ABC

Abstract class to handle the serial communication through USB.

Source code in devices\raspberry_pi_5\src\serial_communication\abstracts.py
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
class SerialCommunicationABC(ABC):
    """
    Abstract class to handle the serial communication through USB.
    """

    @abstractmethod
    def logger(self) -> Logger:
        """
        Get the logger instance for the SerialCommunication.

        Returns:
            Logger: The logger instance.
        """
        pass

    @abstractmethod
    def run(self) -> None:
        """
        Run the serial communication by creating threads for receiving and sending messages.
        """
        pass

logger() abstractmethod

Get the logger instance for the SerialCommunication.

Returns:

Name Type Description
Logger Logger

The logger instance.

Source code in devices\raspberry_pi_5\src\serial_communication\abstracts.py
12
13
14
15
16
17
18
19
20
@abstractmethod
def logger(self) -> Logger:
    """
    Get the logger instance for the SerialCommunication.

    Returns:
        Logger: The logger instance.
    """
    pass

run() abstractmethod

Run the serial communication by creating threads for receiving and sending messages.

Source code in devices\raspberry_pi_5\src\serial_communication\abstracts.py
22
23
24
25
26
27
@abstractmethod
def run(self) -> None:
    """
    Run the serial communication by creating threads for receiving and sending messages.
    """
    pass

dispatcher

Dispatcher

Bases: DispatcherABC

Class for a dispatcher that handles incoming and outgoing messages.

Source code in devices\raspberry_pi_5\src\serial_communication\dispatcher.py
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
class Dispatcher(DispatcherABC):
    """
    Class for a dispatcher that handles incoming and outgoing messages.
    """

    def __init__(
        self,
        serial_messages_queue: Queue,
    ) -> None:
        """
        Initializes the Dispatcher class.

        Args:
            serial_messages_queue (Queue): Queue to hold outgoing messages to the serial port.
        """
        # Initialize the queue
        self.__serial_messages_queue = serial_messages_queue

    @final
    def _send_message(self, msg: OutgoingMessage) -> None:
        # Check the type of message
        is_instance(msg, OutgoingMessage)

        # Put the message in the queue
        self.__serial_messages_queue.put(msg)

    @final
    def send_motor_speed_message(self, speed: float) -> None:
        # Create the message
        msg = OutgoingMessage(OutgoingCategory.MOTOR_SPEED, str(speed))

        # Send the message
        self._send_message(msg)

    @final
    def send_servo_angle_message(self, angle: float) -> None:
        # Create the message
        msg = OutgoingMessage(OutgoingCategory.SERVO_ANGLE, str(angle))

        # Send the message
        self._send_message(msg)

    @final
    def send_confirmation_message(self) -> None:
        self._send_message(OUTGOING_OK_MESSAGE)

    @final
    def send_stop_message(self) -> None:
        self._send_message(STOP_MESSAGE)

__init__(serial_messages_queue)

Initializes the Dispatcher class.

Parameters:

Name Type Description Default
serial_messages_queue Queue

Queue to hold outgoing messages to the serial port.

required
Source code in devices\raspberry_pi_5\src\serial_communication\dispatcher.py
16
17
18
19
20
21
22
23
24
25
26
27
def __init__(
    self,
    serial_messages_queue: Queue,
) -> None:
    """
    Initializes the Dispatcher class.

    Args:
        serial_messages_queue (Queue): Queue to hold outgoing messages to the serial port.
    """
    # Initialize the queue
    self.__serial_messages_queue = serial_messages_queue

enums

IncomingCategory

Bases: Enum

Enum to represent the categories of incoming messages from the Raspberry Pi Pico 2W.

Source code in devices\raspberry_pi_5\src\serial_communication\enums.py
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
@unique
class IncomingCategory(Enum):
    """
    Enum to represent the categories of incoming messages from the Raspberry Pi Pico 2W.
    """
    CHALLENGE = 1
    STATUS = 2
    BNO08X_YAW_DEG = 3
    BNO08X_TURNS = 4
    ERROR = 5
    DEBUG = 6

    @property
    def parsed_name(self) -> str:
        """
        Get the category name in lowercase.

        Returns:
            str: The category name in lowercase.
        """
        return self.name.lower()

    @classmethod
    def from_string(cls, category_str: str) -> 'IncomingCategory':
        """
        Convert a string to an IncomingCategory enum value.

        Args:
            category_str (str): The string representation of the incoming category.
        Returns:
            IncomingCategory: The corresponding IncomingCategory enum value.
        """
        return map_string_to_enum(category_str.upper(), cls)

parsed_name property

Get the category name in lowercase.

Returns:

Name Type Description
str str

The category name in lowercase.

from_string(category_str) classmethod

Convert a string to an IncomingCategory enum value.

Parameters:

Name Type Description Default
category_str str

The string representation of the incoming category.

required

Returns:
IncomingCategory: The corresponding IncomingCategory enum value.

Source code in devices\raspberry_pi_5\src\serial_communication\enums.py
28
29
30
31
32
33
34
35
36
37
38
@classmethod
def from_string(cls, category_str: str) -> 'IncomingCategory':
    """
    Convert a string to an IncomingCategory enum value.

    Args:
        category_str (str): The string representation of the incoming category.
    Returns:
        IncomingCategory: The corresponding IncomingCategory enum value.
    """
    return map_string_to_enum(category_str.upper(), cls)

OutgoingCategory

Bases: Enum

Enum to represent the categories of outgoing messages sent to the Raspberry Pi Pico 2W.

Source code in devices\raspberry_pi_5\src\serial_communication\enums.py
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
@unique
class OutgoingCategory(Enum):
    """
    Enum to represent the categories of outgoing messages sent to the Raspberry Pi Pico 2W.
    """
    STATUS = 1
    SERVO_ANGLE = 2
    MOTOR_SPEED = 3

    @property
    def parsed_name(self) -> str:
        """
        Get the category name in lowercase.

        Returns:
            str: The category name in lowercase.
        """
        return self.name.lower()

    @classmethod
    def from_string(cls, category_str: str) -> 'OutgoingCategory':
        """
        Convert a string to an OutgoingCategory enum value.

        Args:
            category_str (str): The string representation of the outgoing category.
        Returns:
            OutgoingCategory: The corresponding OutgoingCategory enum value.
        """
        return map_string_to_enum(category_str.upper(), cls)

parsed_name property

Get the category name in lowercase.

Returns:

Name Type Description
str str

The category name in lowercase.

from_string(category_str) classmethod

Convert a string to an OutgoingCategory enum value.

Parameters:

Name Type Description Default
category_str str

The string representation of the outgoing category.

required

Returns:
OutgoingCategory: The corresponding OutgoingCategory enum value.

Source code in devices\raspberry_pi_5\src\serial_communication\enums.py
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
@classmethod
def from_string(cls, category_str: str) -> 'OutgoingCategory':
    """
    Convert a string to an OutgoingCategory enum value.

    Args:
        category_str (str): The string representation of the outgoing category.
    Returns:
        OutgoingCategory: The corresponding OutgoingCategory enum value.
    """
    return map_string_to_enum(category_str.upper(), cls)

Status

Bases: Enum

Enum to represent the status messages sent and received from the Raspberry Pi Pico 2W.

Source code in devices\raspberry_pi_5\src\serial_communication\enums.py
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
@unique
class Status(Enum):
    """
    Enum to represent the status messages sent and received from the Raspberry Pi Pico 2W.
    """
    START = 1
    STOP = 2
    OK = 3
    HEARTBEAT = 4

    @property
    def parsed_name(self) -> str:
        """
        Get the status name in lowercase.

        Returns:
            str: The status name in lowercase.
        """
        return self.name.lower()

    @classmethod
    def from_string(cls, status_str: str) -> 'Status':
        """
        Convert a string to a Status enum value.

        Args:
            status_str (str): The string representation of the status.
        Returns:
            Status: The corresponding Status enum value.
        """
        return map_string_to_enum(status_str.upper(), cls)

parsed_name property

Get the status name in lowercase.

Returns:

Name Type Description
str str

The status name in lowercase.

from_string(status_str) classmethod

Convert a string to a Status enum value.

Parameters:

Name Type Description Default
status_str str

The string representation of the status.

required

Returns:
Status: The corresponding Status enum value.

Source code in devices\raspberry_pi_5\src\serial_communication\enums.py
61
62
63
64
65
66
67
68
69
70
71
@classmethod
def from_string(cls, status_str: str) -> 'Status':
    """
    Convert a string to a Status enum value.

    Args:
        status_str (str): The string representation of the status.
    Returns:
        Status: The corresponding Status enum value.
    """
    return map_string_to_enum(status_str.upper(), cls)

message

IncomingMessage

Class to handle the messages received from the Raspberry Pi Pico 2W.

Source code in devices\raspberry_pi_5\src\serial_communication\message.py
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
class IncomingMessage:
    """
    Class to handle the messages received from the Raspberry Pi Pico 2W.
    """

    def __init__(self, category: IncomingCategory, content: str):
        """
        Initialize the incoming message class.

        Args:
            category (IncomingCategory): The category of the message.
            content (str): The content of the message.
        """
        self.category = category
        self.content = content

    def __str__(self) -> str:
        """
        String representation of the message.
        """
        return f"{self.__category.parsed_name}{HEADER_SEPARATOR_CHAR}{self.__content}{END_CHAR}"

    def __eq__(self, other: 'IncomingMessage') -> bool:
        """
        Check equality of two IncomingMessage objects.

        Args:
            other (IncomingMessage): The other IncomingMessage object to compare with.
        Returns:
            bool: True if both messages have the same category and content, False otherwise.
        """
        return self.category == other.category and self.content == other.content

    @staticmethod
    def from_string(msg_str: str) -> 'IncomingMessage':
        """
        Create a Message object from a string.

        Args:
            msg_str (str): The string representation of the message.
        Returns:
            Message: The Message object created from the string.
        Raises:
            ValueError: If the string does not match the expected format.
        """
        # Remove the end character if present
        if msg_str.endswith(END_CHAR):
            msg_str = msg_str[:-1]

        # Split the string into category and content
        parts = msg_str.strip().split(HEADER_SEPARATOR_CHAR, 1)
        if len(parts) != 2:
            raise ValueError("Invalid incoming message format")

        # Convert the category string to a Category enum value
        try:
            category = IncomingCategory.from_string(parts[0])

        except ValueError:
            raise ValueError(
                f"Invalid category in incoming message: {parts[0]}"
                )

        # Create and return the Message object
        return IncomingMessage(category, parts[1])

    @property
    def category(self) -> IncomingCategory:
        """
        Property to get the message category.

        Returns:
            IncomingCategory: The category of the message.
        """
        return self.__category

    @category.setter
    def category(self, category: IncomingCategory):
        """
        Setter for the message category.

        Args:
            category (IncomingCategory): The category of the message.
        """
        # Check the type of message
        is_instance(category, IncomingCategory)
        self.__category = category

    @property
    def content(self) -> str:
        """
        Get the message content.

        Returns:
            str: The content of the message.
        """
        return self.__content

    @content.setter
    def content(self, content: str):
        """
        Setter for the message content.

        Args:
            content (str): The content of the message.
        """
        # Check the type of content
        is_instance(content, str)
        self.__content = content

    def is_start(self) -> bool:
        """
        Check if the message is a start message.

        Returns:
            bool: True if the message is a start message, False otherwise.
        """
        return self.category == IncomingCategory.STATUS and self.content == Status.START.parsed_name

    def is_challenge(self) -> bool:
        """
        Check if the message is a challenge message.

        Returns:
            bool: True if the message is a challenge message, False otherwise.
        """
        return self.category == IncomingCategory.CHALLENGE

    def is_error(self) -> bool:
        """
        Check if the message is an error message.

        Returns:
            bool: True if the message is an error message, False otherwise.
        """
        return self.category == IncomingCategory.ERROR

    def is_confirmation(self) -> bool:
        """
        Check if the message is a confirmation message.

        Returns:
            bool: True if the message is a confirmation message, False otherwise.
        """
        return self.category == IncomingCategory.STATUS and self.content == Status.OK.parsed_name

    def is_bno08x_yaw_deg(self) -> bool:
        """
        Check if the message is a BNO08X yaw degrees message.

        Returns:
            bool: True if the message is a BNO08X yaw degrees message, False otherwise.
        """
        return self.category == IncomingCategory.BNO08X_YAW_DEG

    def is_bno08x_turns(self) -> bool:
        """
        Check if the message is a BNO08X turns message.

        Returns:
            bool: True if the message is a BNO08X turns message, False otherwise.
        """
        return self.category == IncomingCategory.BNO08X_TURNS

    def is_debug(self) -> bool:
        """
        Check if the message is a debug message.

        Returns:
            bool: True if the message is a debug message, False otherwise.
        """
        return self.category == IncomingCategory.DEBUG

category property writable

Property to get the message category.

Returns:

Name Type Description
IncomingCategory IncomingCategory

The category of the message.

content property writable

Get the message content.

Returns:

Name Type Description
str str

The content of the message.

__eq__(other)

Check equality of two IncomingMessage objects.

Parameters:

Name Type Description Default
other IncomingMessage

The other IncomingMessage object to compare with.

required

Returns:
bool: True if both messages have the same category and content, False otherwise.

Source code in devices\raspberry_pi_5\src\serial_communication\message.py
28
29
30
31
32
33
34
35
36
37
def __eq__(self, other: 'IncomingMessage') -> bool:
    """
    Check equality of two IncomingMessage objects.

    Args:
        other (IncomingMessage): The other IncomingMessage object to compare with.
    Returns:
        bool: True if both messages have the same category and content, False otherwise.
    """
    return self.category == other.category and self.content == other.content

__init__(category, content)

Initialize the incoming message class.

Parameters:

Name Type Description Default
category IncomingCategory

The category of the message.

required
content str

The content of the message.

required
Source code in devices\raspberry_pi_5\src\serial_communication\message.py
11
12
13
14
15
16
17
18
19
20
def __init__(self, category: IncomingCategory, content: str):
    """
    Initialize the incoming message class.

    Args:
        category (IncomingCategory): The category of the message.
        content (str): The content of the message.
    """
    self.category = category
    self.content = content

__str__()

String representation of the message.

Source code in devices\raspberry_pi_5\src\serial_communication\message.py
22
23
24
25
26
def __str__(self) -> str:
    """
    String representation of the message.
    """
    return f"{self.__category.parsed_name}{HEADER_SEPARATOR_CHAR}{self.__content}{END_CHAR}"

from_string(msg_str) staticmethod

Create a Message object from a string.

Parameters:

Name Type Description Default
msg_str str

The string representation of the message.

required

Returns:
Message: The Message object created from the string.
Raises:
ValueError: If the string does not match the expected format.

Source code in devices\raspberry_pi_5\src\serial_communication\message.py
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
@staticmethod
def from_string(msg_str: str) -> 'IncomingMessage':
    """
    Create a Message object from a string.

    Args:
        msg_str (str): The string representation of the message.
    Returns:
        Message: The Message object created from the string.
    Raises:
        ValueError: If the string does not match the expected format.
    """
    # Remove the end character if present
    if msg_str.endswith(END_CHAR):
        msg_str = msg_str[:-1]

    # Split the string into category and content
    parts = msg_str.strip().split(HEADER_SEPARATOR_CHAR, 1)
    if len(parts) != 2:
        raise ValueError("Invalid incoming message format")

    # Convert the category string to a Category enum value
    try:
        category = IncomingCategory.from_string(parts[0])

    except ValueError:
        raise ValueError(
            f"Invalid category in incoming message: {parts[0]}"
            )

    # Create and return the Message object
    return IncomingMessage(category, parts[1])

is_bno08x_turns()

Check if the message is a BNO08X turns message.

Returns:

Name Type Description
bool bool

True if the message is a BNO08X turns message, False otherwise.

Source code in devices\raspberry_pi_5\src\serial_communication\message.py
161
162
163
164
165
166
167
168
def is_bno08x_turns(self) -> bool:
    """
    Check if the message is a BNO08X turns message.

    Returns:
        bool: True if the message is a BNO08X turns message, False otherwise.
    """
    return self.category == IncomingCategory.BNO08X_TURNS

is_bno08x_yaw_deg()

Check if the message is a BNO08X yaw degrees message.

Returns:

Name Type Description
bool bool

True if the message is a BNO08X yaw degrees message, False otherwise.

Source code in devices\raspberry_pi_5\src\serial_communication\message.py
152
153
154
155
156
157
158
159
def is_bno08x_yaw_deg(self) -> bool:
    """
    Check if the message is a BNO08X yaw degrees message.

    Returns:
        bool: True if the message is a BNO08X yaw degrees message, False otherwise.
    """
    return self.category == IncomingCategory.BNO08X_YAW_DEG

is_challenge()

Check if the message is a challenge message.

Returns:

Name Type Description
bool bool

True if the message is a challenge message, False otherwise.

Source code in devices\raspberry_pi_5\src\serial_communication\message.py
125
126
127
128
129
130
131
132
def is_challenge(self) -> bool:
    """
    Check if the message is a challenge message.

    Returns:
        bool: True if the message is a challenge message, False otherwise.
    """
    return self.category == IncomingCategory.CHALLENGE

is_confirmation()

Check if the message is a confirmation message.

Returns:

Name Type Description
bool bool

True if the message is a confirmation message, False otherwise.

Source code in devices\raspberry_pi_5\src\serial_communication\message.py
143
144
145
146
147
148
149
150
def is_confirmation(self) -> bool:
    """
    Check if the message is a confirmation message.

    Returns:
        bool: True if the message is a confirmation message, False otherwise.
    """
    return self.category == IncomingCategory.STATUS and self.content == Status.OK.parsed_name

is_debug()

Check if the message is a debug message.

Returns:

Name Type Description
bool bool

True if the message is a debug message, False otherwise.

Source code in devices\raspberry_pi_5\src\serial_communication\message.py
170
171
172
173
174
175
176
177
def is_debug(self) -> bool:
    """
    Check if the message is a debug message.

    Returns:
        bool: True if the message is a debug message, False otherwise.
    """
    return self.category == IncomingCategory.DEBUG

is_error()

Check if the message is an error message.

Returns:

Name Type Description
bool bool

True if the message is an error message, False otherwise.

Source code in devices\raspberry_pi_5\src\serial_communication\message.py
134
135
136
137
138
139
140
141
def is_error(self) -> bool:
    """
    Check if the message is an error message.

    Returns:
        bool: True if the message is an error message, False otherwise.
    """
    return self.category == IncomingCategory.ERROR

is_start()

Check if the message is a start message.

Returns:

Name Type Description
bool bool

True if the message is a start message, False otherwise.

Source code in devices\raspberry_pi_5\src\serial_communication\message.py
116
117
118
119
120
121
122
123
def is_start(self) -> bool:
    """
    Check if the message is a start message.

    Returns:
        bool: True if the message is a start message, False otherwise.
    """
    return self.category == IncomingCategory.STATUS and self.content == Status.START.parsed_name

OutgoingMessage

Class to handle the messages sent to the Raspberry Pi Pico 2W.

Source code in devices\raspberry_pi_5\src\serial_communication\message.py
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
class OutgoingMessage:
    """
    Class to handle the messages sent to the Raspberry Pi Pico 2W.
    """

    def __init__(self, category: OutgoingCategory, content: str):
        """
        Initialize the outgoing message class.

        Args:
            category (OutgoingCategory): The category of the message.
            content (str): The content of the message.
        """
        self.category = category
        self.content = content

    def __str__(self) -> str:
        """
        String representation of the message.
        """
        return f"{self.__category.parsed_name}{HEADER_SEPARATOR_CHAR}{self.__content}{END_CHAR}"

    def __eq__(self, other: 'OutgoingMessage') -> bool:
        """
        Check equality of two OutgoingMessage objects.

        Args:
            other (OutgoingMessage): The other OutgoingMessage object to compare with.
        Returns:
            bool: True if both messages have the same category and content, False otherwise.
        """
        return self.category == other.category and self.content == other.content

    @staticmethod
    def from_string(msg_str: str) -> 'OutgoingMessage':
        """
        Create a Message object from a string.

        Args:
            msg_str (str): The string representation of the message.
        Returns:
            Message: The Message object created from the string.
        """
        # Remove the end character if present
        if msg_str.endswith(END_CHAR):
            msg_str = msg_str[:-1]

        # Split the string into category and content
        parts = msg_str.strip().split(HEADER_SEPARATOR_CHAR, 1)
        if len(parts) != 2:
            raise ValueError("Invalid outgoing message format")

        # Convert the category string to a Category enum value
        category = OutgoingCategory.from_string(parts[0])

        # Create and return the Message object
        return OutgoingMessage(category, parts[1])

    @property
    def category(self) -> OutgoingCategory:
        """
        Property to get the message category.

        Returns:
            OutgoingCategory: The category of the message.
        """
        return self.__category

    @category.setter
    def category(self, category: OutgoingCategory):
        """
        Setter for the message category.

        Args:
            category (OutgoingCategory): The category of the message.
        """
        # Check the type of message
        is_instance(category, OutgoingCategory)
        self.__category = category

    @property
    def content(self) -> str:
        """
        Get the message content.

        Returns:
            str: The content of the message.
        """
        return self.__content

    @content.setter
    def content(self, content: str):
        """
        Setter for the message content.

        Args:
            content (str): The content of the message.
        """
        # Check the type of content
        is_instance(content, str)
        self.__content = content

category property writable

Property to get the message category.

Returns:

Name Type Description
OutgoingCategory OutgoingCategory

The category of the message.

content property writable

Get the message content.

Returns:

Name Type Description
str str

The content of the message.

__eq__(other)

Check equality of two OutgoingMessage objects.

Parameters:

Name Type Description Default
other OutgoingMessage

The other OutgoingMessage object to compare with.

required

Returns:
bool: True if both messages have the same category and content, False otherwise.

Source code in devices\raspberry_pi_5\src\serial_communication\message.py
202
203
204
205
206
207
208
209
210
211
def __eq__(self, other: 'OutgoingMessage') -> bool:
    """
    Check equality of two OutgoingMessage objects.

    Args:
        other (OutgoingMessage): The other OutgoingMessage object to compare with.
    Returns:
        bool: True if both messages have the same category and content, False otherwise.
    """
    return self.category == other.category and self.content == other.content

__init__(category, content)

Initialize the outgoing message class.

Parameters:

Name Type Description Default
category OutgoingCategory

The category of the message.

required
content str

The content of the message.

required
Source code in devices\raspberry_pi_5\src\serial_communication\message.py
185
186
187
188
189
190
191
192
193
194
def __init__(self, category: OutgoingCategory, content: str):
    """
    Initialize the outgoing message class.

    Args:
        category (OutgoingCategory): The category of the message.
        content (str): The content of the message.
    """
    self.category = category
    self.content = content

__str__()

String representation of the message.

Source code in devices\raspberry_pi_5\src\serial_communication\message.py
196
197
198
199
200
def __str__(self) -> str:
    """
    String representation of the message.
    """
    return f"{self.__category.parsed_name}{HEADER_SEPARATOR_CHAR}{self.__content}{END_CHAR}"

from_string(msg_str) staticmethod

Create a Message object from a string.

Parameters:

Name Type Description Default
msg_str str

The string representation of the message.

required

Returns:
Message: The Message object created from the string.

Source code in devices\raspberry_pi_5\src\serial_communication\message.py
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
@staticmethod
def from_string(msg_str: str) -> 'OutgoingMessage':
    """
    Create a Message object from a string.

    Args:
        msg_str (str): The string representation of the message.
    Returns:
        Message: The Message object created from the string.
    """
    # Remove the end character if present
    if msg_str.endswith(END_CHAR):
        msg_str = msg_str[:-1]

    # Split the string into category and content
    parts = msg_str.strip().split(HEADER_SEPARATOR_CHAR, 1)
    if len(parts) != 2:
        raise ValueError("Invalid outgoing message format")

    # Convert the category string to a Category enum value
    category = OutgoingCategory.from_string(parts[0])

    # Create and return the Message object
    return OutgoingMessage(category, parts[1])

multiprocessing

serial_communication_target(debug, challenge, start_event, stop_event, bno08x_yaw_deg, bno08x_turns, sender_messages_queue, writer_messages_queue, server_messages_queue=None, console_ports=RASPBERRY_PI_PICO_CONSOLE_PORTS, data_ports=RASPBERRY_PI_PICO_DATA_PORTS, baudrate=RASPBERRY_PI_PICO_BAUDRATE)

Target function for a multiprocessing process that handles the serial
communication.

Parameters:

Name Type Description Default
debug bool

Flag to indicate if the receiver is in debug mode.

required
challenge Value

Shared value to hold the current challenge.

required
start_event Event

Event to signal when the serial communication has started.

required
stop_event Event

Event to signal when the serial communication should stop sending and receiving messages.

required
bno08x_yaw_deg Value

Shared value for the BNO08X yaw angle in degrees.

required
bno08x_turns Value

Shared value for the BNO08X turns.

required
sender_messages_queue Queue

Queue to hold outgoing messages of the serial port.

required
writer_messages_queue Queue

Queue to hold log messages.

required
server_messages_queue Optional[Queue]

Queue to broadcast the messages through the websockets server.

None
console_ports Optional[list[str]]

List of serial ports used for receiving data from Pico.

RASPBERRY_PI_PICO_CONSOLE_PORTS
data_ports Optional[list[str]]

List of serial ports used for sending data to Pico.

RASPBERRY_PI_PICO_DATA_PORTS
baudrate Optional[int]

Baud rate for the serial communication.

RASPBERRY_PI_PICO_BAUDRATE
Source code in devices\raspberry_pi_5\src\serial_communication\multiprocessing.py
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
def serial_communication_target(
    debug: bool,
    challenge: ValueCls,
    start_event: EventCls,
    stop_event: EventCls,
    bno08x_yaw_deg: ValueCls,
    bno08x_turns: ValueCls,
    sender_messages_queue: Queue,
    writer_messages_queue: Queue,
    server_messages_queue: Optional[Queue] = None,
    console_ports: Optional[list[str]] = RASPBERRY_PI_PICO_CONSOLE_PORTS,
    data_ports: Optional[list[str]] = RASPBERRY_PI_PICO_DATA_PORTS,
    baudrate: Optional[int] = RASPBERRY_PI_PICO_BAUDRATE
) -> None:
    """
    Target function for a multiprocessing process that handles the serial
    communication.

    Args:
        debug (bool): Flag to indicate if the receiver is in debug mode.
        challenge (ValueCls): Shared value to hold the current challenge.
        start_event (EventCls): Event to signal when the serial communication has started.
        stop_event (EventCls): Event to signal when the serial communication should stop sending and receiving messages.
        bno08x_yaw_deg (ValueCls): Shared value for the BNO08X yaw angle in degrees.
        bno08x_turns (ValueCls): Shared value for the BNO08X turns.
        sender_messages_queue (Queue): Queue to hold outgoing messages of the serial port.
        writer_messages_queue (Queue): Queue to hold log messages.
        server_messages_queue (Optional[Queue]): Queue to broadcast the messages through the websockets server.
        console_ports (Optional[list[str]]): List of serial ports used for receiving data from Pico.
        data_ports (Optional[list[str]]): List of serial ports used for sending data to Pico.
        baudrate (Optional[int]): Baud rate for the serial communication.
    """
    print(
        "Initializing SerialCommunication in multiprocessing mode. Process ID: ",
        os.getpid()
    )

    # Initialize the serial communication
    serial_communication = SerialCommunication(
        debug=debug,
        challenge=challenge,
        start_event=start_event,
        stop_event=stop_event,
        sender_messages_queue=sender_messages_queue,
        writer_messages_queue=writer_messages_queue,
        bno08x_yaw_deg=bno08x_yaw_deg,
        bno08x_turns=bno08x_turns,
        server_messages_queue=server_messages_queue,
        console_ports=console_ports,
        data_ports=data_ports,
        baudrate=baudrate
    )

    # Run the serial communication
    serial_communication.run()

receiver

Receiver

Bases: ReceiverABC, LoggerConsumerProtocol

Receiver class to handle serial communication with the Raspberry Pi Pico.

Source code in devices\raspberry_pi_5\src\serial_communication\receiver.py
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
class Receiver(ReceiverABC, LoggerConsumerProtocol):
    """
    Receiver class to handle serial communication with the Raspberry Pi Pico.
    """

    # Logger configuration
    LOGGER_TAG = "SerialCommunicationReceiver"

    # Incoming delay
    INCOMING_DELAY = 0.01

    # Confirmation timeout
    CONFIRMATION_TIMEOUT = 5.0
    CONFIRMATION_ATTEMPTS = CONFIRMATION_TIMEOUT / INCOMING_DELAY

    # Read timeout
    READ_TIMEOUT = 0.5

    def __init__(
        self,
        debug: bool,
        challenge: ValueCls,
        start_event: EventCls,
        stop_sent_event: EventCls,
        stop_confirmation_event: EventCls,
        stop_event: EventCls,
        bno08x_yaw_deg: ValueCls,
        bno08x_turns: ValueCls,
        sender_messages_queue: Queue,
        writer_messages_queue: Queue,
        server_messages_queue: Optional[Queue] = None,
        console_ports: Optional[List[str]] = RASPBERRY_PI_PICO_CONSOLE_PORTS,
        baudrate: Optional[int] = RASPBERRY_PI_PICO_BAUDRATE
    ):
        """
        Initialize the Receiver class.

        Args:
            debug (bool): Flag to indicate if the receiver is in debug mode.
            challenge (ValueCls): Shared value to hold the current challenge.
            start_event (EventCls): Event to signal when the serial communication has started.
            stop_sent_event (EventCls): Event to signal when the stop message has been sent.
            stop_confirmation_event (EventCls): Event to signal when stop messages has been confirmed.
            stop_event (EventCls): Event to signal when the serial communication should stop sending and receiving messages.
            bno08x_yaw_deg (ValueCls): Shared value for the BNO08X yaw angle in degrees.
            bno08x_turns (ValueCls): Shared value for the BNO08X turns.
            sender_messages_queue (Queue): Queue to hold outgoing messages of the serial port.
            writer_messages_queue (Queue): Queue to hold log messages.
            server_messages_queue (Optional[Queue]): Queue to broadcast the messages through the websockets server.
            console_ports (Optional[List[str]]): List of serial ports used for receiving data from Pico.
            baudrate (Optional[int]): Baud rate for the serial communication.
        """
        # Initialize the debug flag
        self.__debug = debug

        # Initialize the queues, values and events
        self.__challenge = challenge
        self.__bno08x_yaw_deg = bno08x_yaw_deg
        self.__bno08x_turns = bno08x_turns
        self.__start_event = start_event
        self.__started_event = Event()
        self.__stop_sent_event = stop_sent_event
        self.__stop_confirmation_event = stop_confirmation_event
        self.__stop_waiting_confirmation_event = Event()
        self.__stop_event = stop_event
        self.__deleted_event = Event()

        # Initialize the logger
        self.__logger = Logger(
            writer_messages_queue,
            tag=self.LOGGER_TAG,
            debug=self.__debug
        )

        # Initialize the serial dispatcher
        self.__serial_dispatcher = SerialDispatcher(sender_messages_queue)

        # Initialize the server dispatcher
        self.__server_dispatcher = ServerDispatcher(
            server_messages_queue,
            writer_messages_queue
        ) if server_messages_queue else None

        # Initialize the reentrant lock
        self.__rlock = RLock()

        # Check the type of console ports
        is_instance(console_ports, List)
        self.__console_ports = console_ports

        # Check the type of baudrate
        is_instance(baudrate, int)
        self.__baudrate = baudrate

        # Initialize the console serial port
        self.__console_port = None
        self.__console_serial = None

    @final
    @property
    def logger(self) -> Logger:
        return self.__logger

    @final
    def _open_port(self, port: str) -> None:
        try:
            # Create a new Serial instance for the console port
            self.__console_serial = Serial(port, self.__baudrate, timeout=self.READ_TIMEOUT)
            self.__console_port = port
            self.__console_serial.flush()

        except Exception as e:
            raise RuntimeError(f"Error opening console port {port}: {e}")

    @final
    def _start(self) -> None:
        with self.__rlock:
            # Check if the stop event is set
            if self.__stop_event.is_set():
                raise RuntimeError(
                    "Stop event is set. Serial communication receiver will not run."
                )

            # Check if the serial communication receiver is already running
            if self.__started_event.is_set():
                raise RuntimeError(
                    "Serial communication receiver is already running. Cannot start again."
                )

            # Set the started event
            self.__started_event.set()

        # Open the console port
        for i in range(CONNECTION_ATTEMPTS):
            for port in self.__console_ports:
                try:
                    self._open_port(port)

                    # Log
                    self.__logger.info(
                        f"Console port opened on {self.__console_port} after {i + 1} {'attempts' if i != 0 else 'attempt'}."
                    )
                    return

                except Exception:
                    pass

            sleep(ATTEMPTS_DELAY)

        raise RuntimeError(
            f"Failed to open console port after {CONNECTION_ATTEMPTS} attempts."
        )


    @final
    def _stop(self) -> None:
        with self.__rlock:
            try:
                # Check if the start event is set
                if self.__started_event.is_set():
                    # Clear the started event
                    self.__started_event.clear()

                    # Wait for the stop sent event to be set
                    if not self.__stop_sent_event.wait(timeout=STOP_TIMEOUT):
                        self.__logger.warning(
                            "Stop sent event not set within timeout. "
                        )
                    else:
                        # Log the stop sent
                        self.__logger.info(
                            "Stop sent event set."
                        )

                    # Clear the stop sent event
                    self.__stop_sent_event.clear()

                    # Set the stop waiting confirmation event
                    self.__stop_waiting_confirmation_event.set()

                    # Wait for the stop confirmation event to be set
                    self._wait_confirmation_message(STOP_MESSAGE, attempts=STOP_TIMEOUT/self.INCOMING_DELAY)

                    # Set the stop confirmation event
                    self.__stop_confirmation_event.set()

                    # Clear the stop waiting confirmation event
                    self.__stop_waiting_confirmation_event.clear()

            except Exception as e:
                # Log the error
                self.__logger.error(
                    f"Error while stopping the serial communication receiver: {e}"
                )

            # Set the stop event
            self.__stop_event.set()

            # Clear the deleted event
            self.__deleted_event.clear()

            # Close the console serial port
            if self.__console_serial and self.__console_serial.is_open:
                self.__logger.info(
                    f"Closing console serial port: {self.__console_port}"
                )
                self.__console_serial.close()
                self.__console_serial = None

        # Log
        self.__logger.info("Stopped.")

    @final
    def _receive_latest_message(self) -> (IncomingMessage | None):
        if self.__console_serial.in_waiting == 0:
            sleep(self.INCOMING_DELAY)
            return None

        # Parse the message from the serial port
        buffer = ""
        while (not self.__stop_event.is_set() and not self.__deleted_event.is_set()) or self.__stop_waiting_confirmation_event.is_set():
            data = self.__console_serial.read(1).decode(
                ENCODE,
                errors="ignore"
            )
            if not data:
                continue
            if data == END_CHAR:
                break
            buffer += data

        # Check if the stop event is set or the deleted event is set
        if (self.__stop_event.is_set() or self.__deleted_event.is_set()) and not self.__stop_waiting_confirmation_event.is_set():
            return None

        # If the buffer is empty, return None
        if not buffer:
            return None

        # Strip the buffer to remove any leading or trailing whitespace and convert it to a string
        msg_str = buffer.strip()

        # Log
        # self.__logger.debug(f"Received message: '{msg_str}'")

        # Get the message from the string
        msg = IncomingMessage.from_string(msg_str)

        # Check if it's a debug message
        if msg.is_debug():
            # Log the debug message
            self.__logger.debug(
                f"Received debug message: {msg.content}"
            )
            return None

        # If the server is set, send the message to the server
        self.__server_dispatcher.broadcast_serial_incoming_message(
            msg_str
        ) if self.__server_dispatcher else None

        return msg

    def _wait_confirmation_message(
        self,
        msg_to_confirm: OutgoingMessage,
        attempts: int = CONFIRMATION_ATTEMPTS
    ) -> None:
        """
        Wait for a confirmation message from the serial port.

        Args:
            msg_to_confirm (OutgoingMessage): The message to confirm.
            attempts (int): The number of attempts to wait for the confirmation message.
        Raises:
            RuntimeError: If the confirmation message is not received within the timeout.
        """
        # Log
        self.__logger.debug(
            f"Waiting confirmation message for: {msg_to_confirm}"
        )

        # Wait for the confirmation message
        i = 0
        while i < attempts:
            msg = self._receive_latest_message()
            if msg is None:
                i += 1
                continue

            if msg.is_error():
                raise RuntimeError(
                    f"Received error message: {msg.content}"
                )
            elif msg.is_confirmation():
                # Log the confirmation message
                self.__logger.debug(
                    f"Received confirmation message: {msg.content}"
                )
                return

            else:
                # Log the received message
                self.__logger.debug(
                    f"Received message while waiting for confirmation: {msg}"
                )

        raise RuntimeError(
            f"Confirmation message for {msg_to_confirm} not received within timeout."
        )

    @final
    @log_on_error()
    def run(self) -> None:
        try:
            # Start the serial communication receiver
            self._start()

            # Wait for the first END_CHAR message to be received to ensure the serial port is ready
            self.__logger.info(
                f"Waiting for initial {repr(END_CHAR)} message to confirm serial communication is ready..."
            )
            while not self.__stop_event.is_set() and not self.__deleted_event.is_set():
                if self.__console_serial.in_waiting == 0:
                    sleep(self.INCOMING_DELAY)
                    continue

                # Read a single character from the console
                char = self.__console_serial.read(1).decode(
                    ENCODE,
                    errors="ignore"
                    )
                if not char:
                    continue
                if char == END_CHAR:
                    break
            if self.__stop_event.is_set() or self.__deleted_event.is_set():
                # Stop the serial communication receiver
                self._stop()
                return

            # Log
            self.__logger.info(
                f"Received initial {repr(END_CHAR)} message. Serial communication is ready."
            )

            # Wait for the start message
            self.__logger.info(
                "Waiting for start event..."
            )
            while not self.__stop_event.is_set() and not self.__deleted_event.is_set():
                try:
                    msg = self._receive_latest_message()
                    # If no message is received, continue to wait
                    if msg is None:
                        continue

                except ValueError as e:
                    # May receive some garbage data, so we catch the exception
                    raise RuntimeError(
                        f"Received invalid message, may be garbage data: '{e}'"
                    )

                if msg.is_error():
                    raise RuntimeError(
                        f"Received error message: '{msg.content}'"
                    )

                elif msg.is_challenge():
                    # Log
                    self.__logger.info("Received challenge message.")

                    # Send a confirmation message
                    self.__serial_dispatcher.send_confirmation_message()

                    # Set the challenge as an environment variable
                    with self.__challenge.get_lock():
                        self.__challenge.value = Challenge.from_string(
                            msg.content
                            ).as_char

                    # Continue to wait for the start event
                    continue

                elif msg.is_start():
                    # Log
                    self.__logger.info("Received start event.")

                    # Check if the challenge is set
                    if self.__challenge.value == Challenge.NONE.as_char:
                        raise RuntimeError(
                            "Challenge not set. Stopping communication."
                        )

                    # Send a confirmation message
                    self.__serial_dispatcher.send_confirmation_message()

                    # Set the start event
                    self.__start_event.set()
                    break

                else:
                    # Log the received message
                    self.__logger.debug(
                        f"Received message while waiting for start event: {msg}"
                    )

            while not self.__stop_event.is_set() and not self.__deleted_event.is_set():
                try:
                    msg = self._receive_latest_message()
                    if msg is None:
                        continue

                except ValueError as e:
                    # May signal a bad code on the Pico or garbage data
                    self.__logger.warning(
                        f"Received invalid message error, skipping: '{e}'"
                    )
                    continue

                if msg.is_error():
                    raise RuntimeError(
                        f"Received error message: '{msg.content}'"
                    )

                elif msg.is_bno08x_yaw_deg():
                    # Log
                    self.__logger.debug(
                        f"Received BNO08X yaw degrees message: {msg.content}"
                    )

                    # Update the BNO08X horizontal axis angle
                    with self.__bno08x_yaw_deg.get_lock():
                        self.__bno08x_yaw_deg.value = float(msg.content)

                elif msg.is_bno08x_turns():
                    # Log
                    self.__logger.debug(
                        f"Received BNO08X turns message: {msg.content}"
                    )

                    # Update the BNO08X turns
                    with self.__bno08x_turns.get_lock():
                        self.__bno08x_turns.value = int(msg.content)

                else:
                    # Log the received message
                    self.__logger.debug(
                        f"Received message: {msg}"
                    )

            # Stop
            self._stop()

        except Exception as e:
            # Stop the serial communication receiver in case of an exception
            self._stop()
            raise e

    def __del__(self) -> None:
        """
        Destructor to clean up resources when the receiver is no longer needed.
        """
        # Set the deleted event
        self.__deleted_event.set()

        # Log the deletion
        self.__logger.info(
            "Instance will be deleted. Resources will be cleaned up."
            )

__del__()

Destructor to clean up resources when the receiver is no longer needed.

Source code in devices\raspberry_pi_5\src\serial_communication\receiver.py
489
490
491
492
493
494
495
496
497
498
499
def __del__(self) -> None:
    """
    Destructor to clean up resources when the receiver is no longer needed.
    """
    # Set the deleted event
    self.__deleted_event.set()

    # Log the deletion
    self.__logger.info(
        "Instance will be deleted. Resources will be cleaned up."
        )

__init__(debug, challenge, start_event, stop_sent_event, stop_confirmation_event, stop_event, bno08x_yaw_deg, bno08x_turns, sender_messages_queue, writer_messages_queue, server_messages_queue=None, console_ports=RASPBERRY_PI_PICO_CONSOLE_PORTS, baudrate=RASPBERRY_PI_PICO_BAUDRATE)

Initialize the Receiver class.

Parameters:

Name Type Description Default
debug bool

Flag to indicate if the receiver is in debug mode.

required
challenge Value

Shared value to hold the current challenge.

required
start_event Event

Event to signal when the serial communication has started.

required
stop_sent_event Event

Event to signal when the stop message has been sent.

required
stop_confirmation_event Event

Event to signal when stop messages has been confirmed.

required
stop_event Event

Event to signal when the serial communication should stop sending and receiving messages.

required
bno08x_yaw_deg Value

Shared value for the BNO08X yaw angle in degrees.

required
bno08x_turns Value

Shared value for the BNO08X turns.

required
sender_messages_queue Queue

Queue to hold outgoing messages of the serial port.

required
writer_messages_queue Queue

Queue to hold log messages.

required
server_messages_queue Optional[Queue]

Queue to broadcast the messages through the websockets server.

None
console_ports Optional[List[str]]

List of serial ports used for receiving data from Pico.

RASPBERRY_PI_PICO_CONSOLE_PORTS
baudrate Optional[int]

Baud rate for the serial communication.

RASPBERRY_PI_PICO_BAUDRATE
Source code in devices\raspberry_pi_5\src\serial_communication\receiver.py
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
def __init__(
    self,
    debug: bool,
    challenge: ValueCls,
    start_event: EventCls,
    stop_sent_event: EventCls,
    stop_confirmation_event: EventCls,
    stop_event: EventCls,
    bno08x_yaw_deg: ValueCls,
    bno08x_turns: ValueCls,
    sender_messages_queue: Queue,
    writer_messages_queue: Queue,
    server_messages_queue: Optional[Queue] = None,
    console_ports: Optional[List[str]] = RASPBERRY_PI_PICO_CONSOLE_PORTS,
    baudrate: Optional[int] = RASPBERRY_PI_PICO_BAUDRATE
):
    """
    Initialize the Receiver class.

    Args:
        debug (bool): Flag to indicate if the receiver is in debug mode.
        challenge (ValueCls): Shared value to hold the current challenge.
        start_event (EventCls): Event to signal when the serial communication has started.
        stop_sent_event (EventCls): Event to signal when the stop message has been sent.
        stop_confirmation_event (EventCls): Event to signal when stop messages has been confirmed.
        stop_event (EventCls): Event to signal when the serial communication should stop sending and receiving messages.
        bno08x_yaw_deg (ValueCls): Shared value for the BNO08X yaw angle in degrees.
        bno08x_turns (ValueCls): Shared value for the BNO08X turns.
        sender_messages_queue (Queue): Queue to hold outgoing messages of the serial port.
        writer_messages_queue (Queue): Queue to hold log messages.
        server_messages_queue (Optional[Queue]): Queue to broadcast the messages through the websockets server.
        console_ports (Optional[List[str]]): List of serial ports used for receiving data from Pico.
        baudrate (Optional[int]): Baud rate for the serial communication.
    """
    # Initialize the debug flag
    self.__debug = debug

    # Initialize the queues, values and events
    self.__challenge = challenge
    self.__bno08x_yaw_deg = bno08x_yaw_deg
    self.__bno08x_turns = bno08x_turns
    self.__start_event = start_event
    self.__started_event = Event()
    self.__stop_sent_event = stop_sent_event
    self.__stop_confirmation_event = stop_confirmation_event
    self.__stop_waiting_confirmation_event = Event()
    self.__stop_event = stop_event
    self.__deleted_event = Event()

    # Initialize the logger
    self.__logger = Logger(
        writer_messages_queue,
        tag=self.LOGGER_TAG,
        debug=self.__debug
    )

    # Initialize the serial dispatcher
    self.__serial_dispatcher = SerialDispatcher(sender_messages_queue)

    # Initialize the server dispatcher
    self.__server_dispatcher = ServerDispatcher(
        server_messages_queue,
        writer_messages_queue
    ) if server_messages_queue else None

    # Initialize the reentrant lock
    self.__rlock = RLock()

    # Check the type of console ports
    is_instance(console_ports, List)
    self.__console_ports = console_ports

    # Check the type of baudrate
    is_instance(baudrate, int)
    self.__baudrate = baudrate

    # Initialize the console serial port
    self.__console_port = None
    self.__console_serial = None

sender

Sender

Bases: SenderABC, LoggerConsumerProtocol

Sender class to handle serial communication with the Raspberry Pi Pico.

Source code in devices\raspberry_pi_5\src\serial_communication\sender.py
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
class Sender(SenderABC, LoggerConsumerProtocol):
    """
    Sender class to handle serial communication with the Raspberry Pi Pico.
    """

    # Logger configuration
    LOGGER_TAG = "SerialCommunicationSender"

    # Outgoing wait timeout
    OUTGOING_WAIT_TIMEOUT = 0.1

    # Write timeout
    WRITE_TIMEOUT = 0.5

    def __init__(
        self,
        debug: bool,
        start_event: EventCls,
        stop_sent_event: EventCls,
        stop_confirmation_event: EventCls,
        stop_event: EventCls,
        messages_queue: Queue,
        writer_messages_queue: Queue,
        server_messages_queue: Optional[Queue] = None,
        data_ports: Optional[list[str]] = RASPBERRY_PI_PICO_DATA_PORTS,
        baudrate: Optional[int] = RASPBERRY_PI_PICO_BAUDRATE
    ):
        """
        Initialize the Receiver class.

        Args:
            debug (bool): Flag to indicate if the receiver is in debug mode.
            start_event (EventCls): Event to signal when the serial communication has started.
            stop_sent_event (EventCls): Event to signal when the stop message has been sent.
            stop_confirmation_event (EventCls): Event to signal when stop messages has been confirmed.
            stop_event (EventCls): Event to signal when the serial communication should stop sending and receiving messages.
            messages_queue (Queue): Queue to hold outgoing messages of the serial port.
            writer_messages_queue (Queue): Queue to hold log messages.
            server_messages_queue (Optional[Queue]): Queue to broadcast the messages through the websockets server.
            data_ports (Optional[list[str]]): List of serial ports used for sending data to Pico.
            baudrate (Optional[int]): Baud rate for the serial communication.
        """
        # Initialize the debug flag
        self.__debug = debug

        # Initialize the queues and events
        self.__start_event = start_event
        self.__started_event = Event()
        self.__stop_event = stop_event
        self.__stop_sent_event = stop_sent_event
        self.__stop_confirmation_event = stop_confirmation_event
        self.__deleted_event = Event()
        self.__messages_queue = messages_queue

        # Initialize the logger
        self.__logger = Logger(
            writer_messages_queue,
            tag=self.LOGGER_TAG,
            debug=self.__debug
        )

        # Initialize the server dispatcher
        self.__server_dispatcher = ServerDispatcher(
            server_messages_queue,
            writer_messages_queue
        ) if server_messages_queue else None

        # Initialize the reentrant lock
        self.__rlock = RLock()

        # Check the type of data ports
        is_instance(data_ports, list)
        self.__data_ports = data_ports

        # Check the type of baudrate
        is_instance(baudrate, int)
        self.__baudrate = baudrate

        # Initialize the data serial port
        self.__data_port = None
        self.__data_serial = None

    @final
    @property
    def logger(self) -> Logger:
        return self.__logger

    @final
    def _open_port(self, port: str) -> None:
        try:
            # Create a new Serial instance for the data port
            self.__data_serial = Serial(port, self.__baudrate, write_timeout=self.WRITE_TIMEOUT)
            self.__data_port = port
            self.__data_serial.flush()

        except Exception as e:
            raise RuntimeError(f"Error opening data {port}: {e}")

    @final
    def _start(self) -> None:
        with self.__rlock:
            # Check if the stop event is set
            if self.__stop_event.is_set():
                raise RuntimeError(
                    "Stop event is set. Serial communication sender will not run."
                )

            # Check if the serial communication sender is already running
            if self.__started_event.is_set():
                raise RuntimeError(
                    "Serial communication sender is already running. Cannot start again."
                )

            # Set the started event
            self.__started_event.set()

        # Open the data port
        for i in range(CONNECTION_ATTEMPTS):
            for port in self.__data_ports:
                try:
                    self._open_port(port)

                    # Log
                    self.__logger.info(
                        f"Data port opened on {self.__data_port} after {i + 1} {'attempts' if i != 0 else 'attempt'}."
                    )
                    return

                except Exception:
                    pass

            sleep(ATTEMPTS_DELAY)

        raise RuntimeError(
            f"Failed to open data port after {CONNECTION_ATTEMPTS} attempts."
        )


    @final
    def _stop(self) -> None:
        with self.__rlock:
            try:
                # Check if the start event is set
                if self.__started_event.is_set():
                    # Clear the started event
                    self.__started_event.clear()

                    # Send the message to the serial port
                    self._send_message(STOP_MESSAGE)

                    # Set the stop sent event
                    self.__stop_sent_event.set()

                    # Wait for the confirmation message
                    if not self.__stop_confirmation_event.wait(timeout=STOP_TIMEOUT):
                        self.__logger.warning(
                            "Stop confirmation event not set within the timeout."
                        )
                    else:
                        self.logger.info(
                            "Stop confirmation event set."
                        )

                    # Clear the stop confirmation event
                    self.__stop_confirmation_event.clear()

            except Exception as e:
                # Log the error
                self.__logger.error(
                    f"Error while stopping the serial communication sender: {e}"
                )

            # Set the stop event
            self.__stop_event.set()

            # Clear the deleted event
            self.__deleted_event.clear()

            # Close the data serial port
            if self.__data_serial and self.__data_serial.is_open:
                self.__logger.info(
                    f"Closing data serial port: {self.__data_port}"
                )
                self.__data_serial.close()
                self.__data_serial = None

        # Log
        self.__logger.info("Stopped.")

    def _send_message(
        self,
        msg: OutgoingMessage
    ) -> None:
        # Log
        msg_str = str(msg)
        self.__logger.debug(
            f"Sending message: {msg_str}"
        )

        # Send the message to the serial port
        self.__data_serial.write(msg_str.encode(ENCODE))

        # Flush the serial port to ensure the message is sent
        self.__data_serial.flush()

    @final
    def _send_confirmation_message(self) -> None:
        self._send_message(OUTGOING_OK_MESSAGE)

    @final
    def _send_latest_message(self) -> OutgoingMessage | None:
        try:
            # Get the message from the queue
            msg = self.__messages_queue.get(
                timeout=self.OUTGOING_WAIT_TIMEOUT
            )

        except Empty:
            # Sent heartbeat message if the queue is empty
            if self.__start_event.is_set():
                self._send_message(HEARTBEAT_MESSAGE)
            return None

        # Send the message to the serial port
        self._send_message(msg)

        # If the server is set, send the message to the server
        self.__server_dispatcher.broadcast_serial_outgoing_message(
            str(msg).split(END_CHAR, 1)[0]
        ) if self.__server_dispatcher else None

    @final
    @log_on_error()
    def run(self) -> None:
        try:
            # Start the serial communication sender
            self._start()

            while not self.__stop_event.is_set() and not self.__deleted_event.is_set():
                self._send_latest_message()

            # Stop
            self._stop()

        except Exception as e:
            # Stop the serial communication in case of an exception
            self._stop()
            raise e

    def __del__(self) -> None:
        """
        Destructor to clean up resources when the sender is no longer needed.
        """
        # Set the deleted event
        self.__deleted_event.set()

        # Log the deletion
        self.__logger.info(
            "Instance will be deleted. Resources will be cleaned up."
            )

__del__()

Destructor to clean up resources when the sender is no longer needed.

Source code in devices\raspberry_pi_5\src\serial_communication\sender.py
280
281
282
283
284
285
286
287
288
289
290
def __del__(self) -> None:
    """
    Destructor to clean up resources when the sender is no longer needed.
    """
    # Set the deleted event
    self.__deleted_event.set()

    # Log the deletion
    self.__logger.info(
        "Instance will be deleted. Resources will be cleaned up."
        )

__init__(debug, start_event, stop_sent_event, stop_confirmation_event, stop_event, messages_queue, writer_messages_queue, server_messages_queue=None, data_ports=RASPBERRY_PI_PICO_DATA_PORTS, baudrate=RASPBERRY_PI_PICO_BAUDRATE)

Initialize the Receiver class.

Parameters:

Name Type Description Default
debug bool

Flag to indicate if the receiver is in debug mode.

required
start_event Event

Event to signal when the serial communication has started.

required
stop_sent_event Event

Event to signal when the stop message has been sent.

required
stop_confirmation_event Event

Event to signal when stop messages has been confirmed.

required
stop_event Event

Event to signal when the serial communication should stop sending and receiving messages.

required
messages_queue Queue

Queue to hold outgoing messages of the serial port.

required
writer_messages_queue Queue

Queue to hold log messages.

required
server_messages_queue Optional[Queue]

Queue to broadcast the messages through the websockets server.

None
data_ports Optional[list[str]]

List of serial ports used for sending data to Pico.

RASPBERRY_PI_PICO_DATA_PORTS
baudrate Optional[int]

Baud rate for the serial communication.

RASPBERRY_PI_PICO_BAUDRATE
Source code in devices\raspberry_pi_5\src\serial_communication\sender.py
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
def __init__(
    self,
    debug: bool,
    start_event: EventCls,
    stop_sent_event: EventCls,
    stop_confirmation_event: EventCls,
    stop_event: EventCls,
    messages_queue: Queue,
    writer_messages_queue: Queue,
    server_messages_queue: Optional[Queue] = None,
    data_ports: Optional[list[str]] = RASPBERRY_PI_PICO_DATA_PORTS,
    baudrate: Optional[int] = RASPBERRY_PI_PICO_BAUDRATE
):
    """
    Initialize the Receiver class.

    Args:
        debug (bool): Flag to indicate if the receiver is in debug mode.
        start_event (EventCls): Event to signal when the serial communication has started.
        stop_sent_event (EventCls): Event to signal when the stop message has been sent.
        stop_confirmation_event (EventCls): Event to signal when stop messages has been confirmed.
        stop_event (EventCls): Event to signal when the serial communication should stop sending and receiving messages.
        messages_queue (Queue): Queue to hold outgoing messages of the serial port.
        writer_messages_queue (Queue): Queue to hold log messages.
        server_messages_queue (Optional[Queue]): Queue to broadcast the messages through the websockets server.
        data_ports (Optional[list[str]]): List of serial ports used for sending data to Pico.
        baudrate (Optional[int]): Baud rate for the serial communication.
    """
    # Initialize the debug flag
    self.__debug = debug

    # Initialize the queues and events
    self.__start_event = start_event
    self.__started_event = Event()
    self.__stop_event = stop_event
    self.__stop_sent_event = stop_sent_event
    self.__stop_confirmation_event = stop_confirmation_event
    self.__deleted_event = Event()
    self.__messages_queue = messages_queue

    # Initialize the logger
    self.__logger = Logger(
        writer_messages_queue,
        tag=self.LOGGER_TAG,
        debug=self.__debug
    )

    # Initialize the server dispatcher
    self.__server_dispatcher = ServerDispatcher(
        server_messages_queue,
        writer_messages_queue
    ) if server_messages_queue else None

    # Initialize the reentrant lock
    self.__rlock = RLock()

    # Check the type of data ports
    is_instance(data_ports, list)
    self.__data_ports = data_ports

    # Check the type of baudrate
    is_instance(baudrate, int)
    self.__baudrate = baudrate

    # Initialize the data serial port
    self.__data_port = None
    self.__data_serial = None