Saltar a contenido

Lib Folder

bno08x

BNO08XError

Bases: Exception

Custom exception class for BNO08X errors.

Source code in devices\raspberry_pi_pico_2w\circuit_python\src\lib\bno08x.py
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
class BNO08XError(Exception):
    """
    Custom exception class for BNO08X errors.
    """

    def __init__(self, message):
        """
        Initializes the BNO08XError with a custom message.
        """
        super().__init__(message)
        self.message = message

    def __str__(self):
        """
        Returns a string representation of the BNO08XError.
        """
        return f"BNO08X Error: {self.message}"

__init__(message)

Initializes the BNO08XError with a custom message.

Source code in devices\raspberry_pi_pico_2w\circuit_python\src\lib\bno08x.py
16
17
18
19
20
21
def __init__(self, message):
    """
    Initializes the BNO08XError with a custom message.
    """
    super().__init__(message)
    self.message = message

__str__()

Returns a string representation of the BNO08XError.

Source code in devices\raspberry_pi_pico_2w\circuit_python\src\lib\bno08x.py
23
24
25
26
27
def __str__(self):
    """
    Returns a string representation of the BNO08XError.
    """
    return f"BNO08X Error: {self.message}"

BNO08XHandler

A class to handle BNO08X sensor operations.

Source code in devices\raspberry_pi_pico_2w\circuit_python\src\lib\bno08x.py
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
class BNO08XHandler:
    """
    A class to handle BNO08X sensor operations.
    """
    # Default configuration
    I2C_ADDRESS = 0x4b
    INITIAL_SAMPLES = 10  # Number of samples to gather for initial calibration

    # Delay between readings in seconds
    DELAY = 0.05

    def __init__(
        self,
        i2c: I2C,
        address: int = I2C_ADDRESS,
        serial_communication: SerialCommunication = None,
        is_upside_down: bool = False,
    ):
        """
        Initializes the BNO08X handler with the specified I2C bus and address.

        Args:
            i2c (I2C): The I2C bus to use for communication with the BNO08X sensor.
            address (int): The I2C address of the BNO08X sensor.
            serial_communication (SerialCommunication | None): Optional serial communication handler.
            is_upside_down (bool): If True, the sensor is mounted upside down.
        """
        # Initialize the I2C bus and BNO08X sensor
        self.__bno = BNO08X_I2C(i2c, address=address)
        self.__bno.enable_feature(BNO_REPORT_GYROSCOPE)
        self.__bno.enable_feature(BNO_REPORT_ROTATION_VECTOR)

        # Check the type of serial communication
        self.__serial_communication = serial_communication

        # Check if the sensor is mounted upside down
        self.YAW_FACTOR = -1.0 if is_upside_down else 1.0

        # Set accumulated values to zero
        self.__accumulated_yaw_deg = 0.0
        self.__last_yaw_deg = 0.0
        self.__accumulated_90_deg_turns = 0
        self.__last_segment_count = 0

        # Initialize gyroscope values
        self.__gyro_x_deg, self.__gyro_y_deg, self.__gyro_z_deg = 0.0, 0.0, 0.0

        # Initialize quaternion values
        self.__initial_roll_deg, self.__initial_pitch_deg, self.__initial_yaw_deg = 0.0, 0.0, 0.0
        self.__roll_deg, self.__pitch_deg, self.__yaw_deg = 0.0, 0.0, 0.0

        # Set has been calibrated flag
        self.__calibrated = False

    async def calibrate(self):
        """
        Calibrates the BNO08X sensor by taking initial readings to set the initial orientation.

        Raises:
            BNO08XError: If the sensor is already calibrated.
        """
        # Check if already calibrated
        # if self.__calibrated:
        #     raise BNO08XError("BNO08X sensor is already calibrated.")

        # Set the calibrated flag to True
        self.__calibrated = True

        # Gathering multiple samples to fix errors
        for _ in range(self.INITIAL_SAMPLES):
            await self.__read_quaternion()
            await sleep(self.DELAY)

        # Saving the orientation, this makes the turns variables much smoother to handle
        self.__initial_roll_deg, self.__initial_pitch_deg, self.__initial_yaw_deg = BNO08XHandler.quaternion_to_euler_degrees(
            *self.__quaternion
        )

    @staticmethod
    def quaternion_to_euler_degrees(x: float, y: float, z: float, w: float):
        """
        This function receives the 4 components of the quaternion and calculates the orientation
        """
        # Roll
        sinr_cosp = 2 * (w * x + y * z)
        cosr_cosp = 1 - 2 * (x * x + y * y)
        roll_rad = atan2(sinr_cosp, cosr_cosp)

        # Pitch
        sinp = 2 * (w * y - z * x)

        # Clamp the value to avoid domain errors for asin (should be between -1 and 1)
        if sinp > 1:
            pitch_rad = pi / 2
        elif sinp < -1:
            pitch_rad = -pi / 2
        else:
            pitch_rad = asin(sinp)

        # Yaw
        siny_cosp = 2 * (w * z + x * y)
        cosy_cosp = 1 - 2 * (y * y + z * z)
        yaw_rad = atan2(siny_cosp, cosy_cosp)

        return degrees(roll_rad), degrees(pitch_rad), degrees(yaw_rad)

    @staticmethod
    def gyro_to_degrees(x_rad: float, y_rad: float, z_rad: float):
        """
        Converts gyroscope readings from radians to degrees.
        """
        return (
            degrees(x_rad),
            degrees(y_rad),
            degrees(z_rad)
        )

    @property
    def quaternion(self):
        """
        Returns the current quaternion data from the BNO08X sensor.
        """
        if not hasattr(self, '__quaternion'):
            self.__read_quaternion()
        return self.__quaternion

    @property
    def initial_pitch(self):
        """
        Returns the initial pitch value in degrees.
        """
        return self.__initial_pitch_deg

    @property
    def initial_roll(self):
        """
        Returns the initial roll value in degrees.
        """
        return self.__initial_roll_deg

    @property
    def initial_yaw(self):
        """
        Returns the initial yaw value in degrees.
        """
        return self.__initial_yaw_deg

    @property
    def accumulated_yaw_deg(self):
        """
        Returns the accumulated yaw degrees value since the last reset.
        """
        return self.__accumulated_yaw_deg

    @property
    def accumulated_90_deg_turns(self):
        """
        Returns the accumulated 90-degree turns since the last reset.
        """
        return self.__accumulated_90_deg_turns

    @property
    def roll(self):
        """
        Returns the current roll value in degrees from the BNO08X sensor.
        """
        if not hasattr(self, '__roll_deg'):
            self.update_quaternion()
        return self.__roll_deg

    @property
    def pitch(self):
        """
        Returns the current pitch value in degrees from the BNO08X sensor.
        """
        if not hasattr(self, '__pitch_deg'):
            self.update_quaternion()
        return self.__pitch_deg

    @property
    def yaw(self):
        """
        Returns the current yaw value in degrees from the BNO08X sensor.
        """
        if not hasattr(self, '__yaw_deg'):
            self.update_quaternion()
        return self.__yaw_deg

    @property
    def gyro(self):
        """
        Returns the current gyroscope data in radians from the BNO08X sensor.
        """
        if not hasattr(self, 'gyro'):
            self.__read_gyro()
        return self.__gyro

    @property
    def gyro_x(self):
        """
        Returns the current gyroscope X value in degrees from the BNO08X sensor.
        """
        if not hasattr(self, '__gyro_x_deg'):
            self.update_gyro()
        return self.__gyro_x_deg

    @property
    def gyro_y(self):
        """
        Returns the current gyroscope Y value in degrees from the BNO08X sensor.
        """
        if not hasattr(self, '__gyro_y_deg'):
            self.update_gyro()
        return self.__gyro_y_deg

    @property
    def gyro_z(self):
        """
        Returns the current gyroscope Z value in degrees from the BNO08X sensor.
        """
        if not hasattr(self, '__gyro_z_deg'):
            self.update_gyro()
        return self.__gyro_z_deg

    async def __read_gyro(self):
        """
        Reads the gyroscope data.
        """
        self.__gyro = self.__bno.gyro

        # Adding a delay to ensure the sensor has time to update
        await sleep(self.DELAY)

    async def update_gyro(self):
        """
        Reads the gyroscope data from the BNO08X sensor and updates the gyroscope values in degrees.
        """
        # Updating the gyroscope data
        await self.__read_gyro()

        # Get the current gyroscope values in degrees
        self.__gyro_x_deg, self.__gyro_y_deg, self.__gyro_z_deg = BNO08XHandler.gyro_to_degrees(
            *self.__gyro
        )

    async def __read_quaternion(self):
        """
        Reads the quaternion data from the BNO08X sensor.
        """
        self.__quaternion = self.__bno.quaternion

        # Adding a delay to ensure the sensor has time to update
        await sleep(self.DELAY)

    async def update_quaternion(self):
        """
        Reads the quaternion data from the BNO08X sensor and updates the roll, pitch, and yaw values.
        """
        # Updating the quaternion data
        await self.__read_quaternion()

        # Get the current roll, pitch, and yaw in degrees
        self.__roll_deg, self.__pitch_deg, self.__yaw_deg = BNO08XHandler.quaternion_to_euler_degrees(
            *self.__quaternion
        )

        # If serial communication is enabled, send the yaw degrees message
        if self.__serial_communication:
            self.__serial_communication.send_bno08x_yaw_deg_message(
                self.__yaw_deg
                )

        # Compute relative yaw degrees
        relative_yaw_deg = self.__yaw_deg - self.__initial_yaw_deg
        if relative_yaw_deg > 180:
            relative_yaw_deg -= 360
        elif relative_yaw_deg < -180:
            relative_yaw_deg += 360

        # Calculate the change in yaw degrees since the last update
        delta_raw_yaw_deg = relative_yaw_deg - self.__last_yaw_deg
        if delta_raw_yaw_deg > 180:
            delta_raw_yaw_deg -= 360
        elif delta_raw_yaw_deg < -180:
            delta_raw_yaw_deg += 360

        # Update accumulated yaw and segment count
        self.__accumulated_yaw_deg += delta_raw_yaw_deg
        current_segment_count = int(self.__accumulated_yaw_deg / 90)
        if current_segment_count != self.__last_segment_count:
            self.__accumulated_90_deg_turns += current_segment_count - self.__last_segment_count
            self.__last_segment_count = current_segment_count

            # If serial communication is enabled, send the turn message
            if self.__serial_communication:
                self.__serial_communication.send_bno08x_turns_message(
                    abs(self.__accumulated_90_deg_turns)
                )

        self.__last_yaw_deg = relative_yaw_deg

    @property
    def turns(self):
        """
        Returns the number of 90-degree turns made since the last reset.
        """
        return self.__accumulated_90_deg_turns

accumulated_90_deg_turns property

Returns the accumulated 90-degree turns since the last reset.

accumulated_yaw_deg property

Returns the accumulated yaw degrees value since the last reset.

gyro property

Returns the current gyroscope data in radians from the BNO08X sensor.

gyro_x property

Returns the current gyroscope X value in degrees from the BNO08X sensor.

gyro_y property

Returns the current gyroscope Y value in degrees from the BNO08X sensor.

gyro_z property

Returns the current gyroscope Z value in degrees from the BNO08X sensor.

initial_pitch property

Returns the initial pitch value in degrees.

initial_roll property

Returns the initial roll value in degrees.

initial_yaw property

Returns the initial yaw value in degrees.

pitch property

Returns the current pitch value in degrees from the BNO08X sensor.

quaternion property

Returns the current quaternion data from the BNO08X sensor.

roll property

Returns the current roll value in degrees from the BNO08X sensor.

turns property

Returns the number of 90-degree turns made since the last reset.

yaw property

Returns the current yaw value in degrees from the BNO08X sensor.

__init__(i2c, address=I2C_ADDRESS, serial_communication=None, is_upside_down=False)

Initializes the BNO08X handler with the specified I2C bus and address.

Parameters:

Name Type Description Default
i2c I2C

The I2C bus to use for communication with the BNO08X sensor.

required
address int

The I2C address of the BNO08X sensor.

I2C_ADDRESS
serial_communication SerialCommunication | None

Optional serial communication handler.

None
is_upside_down bool

If True, the sensor is mounted upside down.

False
Source code in devices\raspberry_pi_pico_2w\circuit_python\src\lib\bno08x.py
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
def __init__(
    self,
    i2c: I2C,
    address: int = I2C_ADDRESS,
    serial_communication: SerialCommunication = None,
    is_upside_down: bool = False,
):
    """
    Initializes the BNO08X handler with the specified I2C bus and address.

    Args:
        i2c (I2C): The I2C bus to use for communication with the BNO08X sensor.
        address (int): The I2C address of the BNO08X sensor.
        serial_communication (SerialCommunication | None): Optional serial communication handler.
        is_upside_down (bool): If True, the sensor is mounted upside down.
    """
    # Initialize the I2C bus and BNO08X sensor
    self.__bno = BNO08X_I2C(i2c, address=address)
    self.__bno.enable_feature(BNO_REPORT_GYROSCOPE)
    self.__bno.enable_feature(BNO_REPORT_ROTATION_VECTOR)

    # Check the type of serial communication
    self.__serial_communication = serial_communication

    # Check if the sensor is mounted upside down
    self.YAW_FACTOR = -1.0 if is_upside_down else 1.0

    # Set accumulated values to zero
    self.__accumulated_yaw_deg = 0.0
    self.__last_yaw_deg = 0.0
    self.__accumulated_90_deg_turns = 0
    self.__last_segment_count = 0

    # Initialize gyroscope values
    self.__gyro_x_deg, self.__gyro_y_deg, self.__gyro_z_deg = 0.0, 0.0, 0.0

    # Initialize quaternion values
    self.__initial_roll_deg, self.__initial_pitch_deg, self.__initial_yaw_deg = 0.0, 0.0, 0.0
    self.__roll_deg, self.__pitch_deg, self.__yaw_deg = 0.0, 0.0, 0.0

    # Set has been calibrated flag
    self.__calibrated = False

__read_gyro() async

Reads the gyroscope data.

Source code in devices\raspberry_pi_pico_2w\circuit_python\src\lib\bno08x.py
254
255
256
257
258
259
260
261
async def __read_gyro(self):
    """
    Reads the gyroscope data.
    """
    self.__gyro = self.__bno.gyro

    # Adding a delay to ensure the sensor has time to update
    await sleep(self.DELAY)

__read_quaternion() async

Reads the quaternion data from the BNO08X sensor.

Source code in devices\raspberry_pi_pico_2w\circuit_python\src\lib\bno08x.py
275
276
277
278
279
280
281
282
async def __read_quaternion(self):
    """
    Reads the quaternion data from the BNO08X sensor.
    """
    self.__quaternion = self.__bno.quaternion

    # Adding a delay to ensure the sensor has time to update
    await sleep(self.DELAY)

calibrate() async

Calibrates the BNO08X sensor by taking initial readings to set the initial orientation.

Raises:

Type Description
BNO08XError

If the sensor is already calibrated.

Source code in devices\raspberry_pi_pico_2w\circuit_python\src\lib\bno08x.py
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
async def calibrate(self):
    """
    Calibrates the BNO08X sensor by taking initial readings to set the initial orientation.

    Raises:
        BNO08XError: If the sensor is already calibrated.
    """
    # Check if already calibrated
    # if self.__calibrated:
    #     raise BNO08XError("BNO08X sensor is already calibrated.")

    # Set the calibrated flag to True
    self.__calibrated = True

    # Gathering multiple samples to fix errors
    for _ in range(self.INITIAL_SAMPLES):
        await self.__read_quaternion()
        await sleep(self.DELAY)

    # Saving the orientation, this makes the turns variables much smoother to handle
    self.__initial_roll_deg, self.__initial_pitch_deg, self.__initial_yaw_deg = BNO08XHandler.quaternion_to_euler_degrees(
        *self.__quaternion
    )

gyro_to_degrees(x_rad, y_rad, z_rad) staticmethod

Converts gyroscope readings from radians to degrees.

Source code in devices\raspberry_pi_pico_2w\circuit_python\src\lib\bno08x.py
136
137
138
139
140
141
142
143
144
145
@staticmethod
def gyro_to_degrees(x_rad: float, y_rad: float, z_rad: float):
    """
    Converts gyroscope readings from radians to degrees.
    """
    return (
        degrees(x_rad),
        degrees(y_rad),
        degrees(z_rad)
    )

quaternion_to_euler_degrees(x, y, z, w) staticmethod

This function receives the 4 components of the quaternion and calculates the orientation

Source code in devices\raspberry_pi_pico_2w\circuit_python\src\lib\bno08x.py
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
@staticmethod
def quaternion_to_euler_degrees(x: float, y: float, z: float, w: float):
    """
    This function receives the 4 components of the quaternion and calculates the orientation
    """
    # Roll
    sinr_cosp = 2 * (w * x + y * z)
    cosr_cosp = 1 - 2 * (x * x + y * y)
    roll_rad = atan2(sinr_cosp, cosr_cosp)

    # Pitch
    sinp = 2 * (w * y - z * x)

    # Clamp the value to avoid domain errors for asin (should be between -1 and 1)
    if sinp > 1:
        pitch_rad = pi / 2
    elif sinp < -1:
        pitch_rad = -pi / 2
    else:
        pitch_rad = asin(sinp)

    # Yaw
    siny_cosp = 2 * (w * z + x * y)
    cosy_cosp = 1 - 2 * (y * y + z * z)
    yaw_rad = atan2(siny_cosp, cosy_cosp)

    return degrees(roll_rad), degrees(pitch_rad), degrees(yaw_rad)

update_gyro() async

Reads the gyroscope data from the BNO08X sensor and updates the gyroscope values in degrees.

Source code in devices\raspberry_pi_pico_2w\circuit_python\src\lib\bno08x.py
263
264
265
266
267
268
269
270
271
272
273
async def update_gyro(self):
    """
    Reads the gyroscope data from the BNO08X sensor and updates the gyroscope values in degrees.
    """
    # Updating the gyroscope data
    await self.__read_gyro()

    # Get the current gyroscope values in degrees
    self.__gyro_x_deg, self.__gyro_y_deg, self.__gyro_z_deg = BNO08XHandler.gyro_to_degrees(
        *self.__gyro
    )

update_quaternion() async

Reads the quaternion data from the BNO08X sensor and updates the roll, pitch, and yaw values.

Source code in devices\raspberry_pi_pico_2w\circuit_python\src\lib\bno08x.py
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
async def update_quaternion(self):
    """
    Reads the quaternion data from the BNO08X sensor and updates the roll, pitch, and yaw values.
    """
    # Updating the quaternion data
    await self.__read_quaternion()

    # Get the current roll, pitch, and yaw in degrees
    self.__roll_deg, self.__pitch_deg, self.__yaw_deg = BNO08XHandler.quaternion_to_euler_degrees(
        *self.__quaternion
    )

    # If serial communication is enabled, send the yaw degrees message
    if self.__serial_communication:
        self.__serial_communication.send_bno08x_yaw_deg_message(
            self.__yaw_deg
            )

    # Compute relative yaw degrees
    relative_yaw_deg = self.__yaw_deg - self.__initial_yaw_deg
    if relative_yaw_deg > 180:
        relative_yaw_deg -= 360
    elif relative_yaw_deg < -180:
        relative_yaw_deg += 360

    # Calculate the change in yaw degrees since the last update
    delta_raw_yaw_deg = relative_yaw_deg - self.__last_yaw_deg
    if delta_raw_yaw_deg > 180:
        delta_raw_yaw_deg -= 360
    elif delta_raw_yaw_deg < -180:
        delta_raw_yaw_deg += 360

    # Update accumulated yaw and segment count
    self.__accumulated_yaw_deg += delta_raw_yaw_deg
    current_segment_count = int(self.__accumulated_yaw_deg / 90)
    if current_segment_count != self.__last_segment_count:
        self.__accumulated_90_deg_turns += current_segment_count - self.__last_segment_count
        self.__last_segment_count = current_segment_count

        # If serial communication is enabled, send the turn message
        if self.__serial_communication:
            self.__serial_communication.send_bno08x_turns_message(
                abs(self.__accumulated_90_deg_turns)
            )

    self.__last_yaw_deg = relative_yaw_deg

enums

Challenge

Class to represent the enum challenge messages sent and received from the Raspberry Pi Pico.

Source code in devices\raspberry_pi_pico_2w\circuit_python\src\lib\enums.py
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
class Challenge:
    """
    Class to represent the enum challenge messages sent and received from the Raspberry Pi Pico.
    """

    WITH_OBSTACLES = "with_obstacles"
    WITHOUT_OBSTACLES = "without_obstacles"

    @classmethod
    def from_string(cls, challenge_str: str) -> str:
        """
        Convert a string to a Challenge enum value.

        Args:
            challenge_str (str): The string representation of the challenge.
        Returns:
            str: The corresponding Challenge enum value.
        Raises:
            ValueError: If the challenge string does not match any known challenge.
        """
        challenge_name = challenge_str.lower()
        for challenge in [cls.WITH_OBSTACLES, cls.WITHOUT_OBSTACLES]:
            if challenge_name == challenge:
                return challenge

        raise ValueError(f"Invalid challenge: {challenge_str}")

from_string(challenge_str) classmethod

Convert a string to a Challenge enum value.

Parameters:

Name Type Description Default
challenge_str str

The string representation of the challenge.

required

Returns:
str: The corresponding Challenge enum value.
Raises:
ValueError: If the challenge string does not match any known challenge.

Source code in devices\raspberry_pi_pico_2w\circuit_python\src\lib\enums.py
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
@classmethod
def from_string(cls, challenge_str: str) -> str:
    """
    Convert a string to a Challenge enum value.

    Args:
        challenge_str (str): The string representation of the challenge.
    Returns:
        str: The corresponding Challenge enum value.
    Raises:
        ValueError: If the challenge string does not match any known challenge.
    """
    challenge_name = challenge_str.lower()
    for challenge in [cls.WITH_OBSTACLES, cls.WITHOUT_OBSTACLES]:
        if challenge_name == challenge:
            return challenge

    raise ValueError(f"Invalid challenge: {challenge_str}")

IncomingCategory

Class to represent the enum categories of incoming messages from the Raspberry Pi 5.

Source code in devices\raspberry_pi_pico_2w\circuit_python\src\lib\enums.py
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
class IncomingCategory:
    """
    Class to represent the enum categories of incoming messages from the Raspberry Pi 5.
    """
    STATUS = "status"
    MOTOR_SPEED = "motor_speed"
    SERVO_ANGLE = "servo_angle"

    @classmethod
    def from_string(cls, category_str: str) -> str:
        """
        Convert a string to a IncomingCategory enum value.

        Args:
            category_str (str): The string representation of the category.
        Returns:
            str: The corresponding IncomingCategory enum value.
        Raises:
            ValueError: If the category string does not match any known category.
        """
        category_name = category_str.lower()
        for category in [cls.STATUS, cls.MOTOR_SPEED, cls.SERVO_ANGLE]:
            if category_name == category:
                return category

        raise ValueError(f"Invalid incoming category: {category_str}")

from_string(category_str) classmethod

Convert a string to a IncomingCategory enum value.

Parameters:

Name Type Description Default
category_str str

The string representation of the category.

required

Returns:
str: The corresponding IncomingCategory enum value.
Raises:
ValueError: If the category string does not match any known category.

Source code in devices\raspberry_pi_pico_2w\circuit_python\src\lib\enums.py
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
@classmethod
def from_string(cls, category_str: str) -> str:
    """
    Convert a string to a IncomingCategory enum value.

    Args:
        category_str (str): The string representation of the category.
    Returns:
        str: The corresponding IncomingCategory enum value.
    Raises:
        ValueError: If the category string does not match any known category.
    """
    category_name = category_str.lower()
    for category in [cls.STATUS, cls.MOTOR_SPEED, cls.SERVO_ANGLE]:
        if category_name == category:
            return category

    raise ValueError(f"Invalid incoming category: {category_str}")

OutgoingCategory

Class to represent the enum categories of outgoing messages to the Raspberry Pi 5.

Source code in devices\raspberry_pi_pico_2w\circuit_python\src\lib\enums.py
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
class OutgoingCategory:
    """
    Class to represent the enum categories of outgoing messages to the Raspberry Pi 5.
    """
    CHALLENGE = "challenge"
    STATUS = "status"
    BNO08X_YAW_DEG = "bno08x_yaw_deg"
    BNO08X_TURNS = "bno08x_turns"
    ERROR = "error"
    DEBUG = "debug"

    @classmethod
    def from_string(cls, category_str: str) -> str:
        """
        Convert a string to a OutgoingCategory enum value.

        Args:
            category_str (str): The string representation of the category.
        Returns:
            str: The corresponding OutgoingCategory enum value.
        Raises:
            ValueError: If the category string does not match any known category.
        """
        category_name = category_str.lower()
        for category in [cls.CHALLENGE, cls.STATUS, cls.BNO08X_YAW_DEG,
                         cls.BNO08X_TURNS, cls.ERROR, cls.DEBUG]:
            if category_name == category:
                return category

        raise ValueError(f"Invalid outgoing category: {category_str}")

from_string(category_str) classmethod

Convert a string to a OutgoingCategory enum value.

Parameters:

Name Type Description Default
category_str str

The string representation of the category.

required

Returns:
str: The corresponding OutgoingCategory enum value.
Raises:
ValueError: If the category string does not match any known category.

Source code in devices\raspberry_pi_pico_2w\circuit_python\src\lib\enums.py
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
@classmethod
def from_string(cls, category_str: str) -> str:
    """
    Convert a string to a OutgoingCategory enum value.

    Args:
        category_str (str): The string representation of the category.
    Returns:
        str: The corresponding OutgoingCategory enum value.
    Raises:
        ValueError: If the category string does not match any known category.
    """
    category_name = category_str.lower()
    for category in [cls.CHALLENGE, cls.STATUS, cls.BNO08X_YAW_DEG,
                     cls.BNO08X_TURNS, cls.ERROR, cls.DEBUG]:
        if category_name == category:
            return category

    raise ValueError(f"Invalid outgoing category: {category_str}")

Status

Class to represent the enum status messages sent and received to the Raspberry Pi 5.

Source code in devices\raspberry_pi_pico_2w\circuit_python\src\lib\enums.py
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
class Status:
    """
    Class to represent the enum status messages sent and received to the Raspberry Pi 5.
    """
    START = "start"
    STOP = "stop"
    OK = "ok"
    HEARTBEAT = "heartbeat"

    @classmethod
    def from_string(cls, status_str: str) -> str:
        """
        Convert a string to a Status enum value.

        Args:
            status_str (str): The string representation of the status.
        Returns:
            str: The corresponding Status enum value.
        """
        status_name = status_str.lower()
        for status in [cls.START, cls.STOP, cls.OK, cls.HEARTBEAT]:
            if status_name == status:
                return status

        raise ValueError(f"Invalid status: {status_str}")

from_string(status_str) classmethod

Convert a string to a Status enum value.

Parameters:

Name Type Description Default
status_str str

The string representation of the status.

required

Returns:
str: The corresponding Status enum value.

Source code in devices\raspberry_pi_pico_2w\circuit_python\src\lib\enums.py
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
@classmethod
def from_string(cls, status_str: str) -> str:
    """
    Convert a string to a Status enum value.

    Args:
        status_str (str): The string representation of the status.
    Returns:
        str: The corresponding Status enum value.
    """
    status_name = status_str.lower()
    for status in [cls.START, cls.STOP, cls.OK, cls.HEARTBEAT]:
        if status_name == status:
            return status

    raise ValueError(f"Invalid status: {status_str}")

env

Env

Environment variables manager class.

This class provides methods to access environment variables related to debug mode and challenge type.

Source code in devices\raspberry_pi_pico_2w\circuit_python\src\lib\env.py
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
class Env:
    """
    Environment variables manager class.

    This class provides methods to access environment variables related to debug mode and challenge type.
    """

    @staticmethod
    def _check_boolean(value: str) -> bool:
        """
        Check if the given string is a valid boolean representation.

        Args:
            value (str): The string to check.

        Returns:
            bool: True if the string represents a boolean value, otherwise False.
        """
        return value in ("true", "false")

    @staticmethod
    def _check_challenge(value: str) -> bool:
        """
        Check if the given string is a valid challenge type.

        Args:
            value (str): The string to check.

        Returns:
            bool: True if the string represents a valid challenge type, otherwise False.
        """
        return value in (Challenge.WITHOUT_OBSTACLES, Challenge.WITH_OBSTACLES)

    @staticmethod
    def get_movement_mode() -> bool:
        """
        Get the movement mode from the environment variable.

        Returns:
            bool: True if movement mode is enabled, otherwise False.
        """
        value = getenv("MOVEMENT", "false").lower()

        # Check if the value is a valid boolean representation
        if not Env._check_boolean(value):
            raise ValueError(
                f"Invalid value for MOVEMENT: {value}. Expected 'true' or 'false'."
            )

        return value == "true"

    @staticmethod
    def get_debug_mode() -> bool:
        """
        Get the debug mode from the environment variable.

        Returns:
            bool: True if debug mode is enabled, otherwise False.
        """
        value = getenv("DEBUG", "false").lower()

        # Check if the value is a valid boolean representation
        if not Env._check_boolean(value):
            raise ValueError(
                f"Invalid value for DEBUG: {value}. Expected 'true' or 'false'."
            )

        return value == "true"

    @staticmethod
    def get_challenge() -> str:
        """
        Get the challenge type from the environment variable.

        Returns:
            str: The challenge type.
        """
        value = getenv("CHALLENGE", Challenge.WITHOUT_OBSTACLES).lower()

        # Check if the value is a valid challenge type
        if not Env._check_challenge(value):
            raise ValueError(
                f"Invalid value for CHALLENGE: {value}. Expected 'without_obstacles' or 'with_obstacles'."
            )

        return value

get_challenge() staticmethod

Get the challenge type from the environment variable.

Returns:

Name Type Description
str str

The challenge type.

Source code in devices\raspberry_pi_pico_2w\circuit_python\src\lib\env.py
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
@staticmethod
def get_challenge() -> str:
    """
    Get the challenge type from the environment variable.

    Returns:
        str: The challenge type.
    """
    value = getenv("CHALLENGE", Challenge.WITHOUT_OBSTACLES).lower()

    # Check if the value is a valid challenge type
    if not Env._check_challenge(value):
        raise ValueError(
            f"Invalid value for CHALLENGE: {value}. Expected 'without_obstacles' or 'with_obstacles'."
        )

    return value

get_debug_mode() staticmethod

Get the debug mode from the environment variable.

Returns:

Name Type Description
bool bool

True if debug mode is enabled, otherwise False.

Source code in devices\raspberry_pi_pico_2w\circuit_python\src\lib\env.py
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
@staticmethod
def get_debug_mode() -> bool:
    """
    Get the debug mode from the environment variable.

    Returns:
        bool: True if debug mode is enabled, otherwise False.
    """
    value = getenv("DEBUG", "false").lower()

    # Check if the value is a valid boolean representation
    if not Env._check_boolean(value):
        raise ValueError(
            f"Invalid value for DEBUG: {value}. Expected 'true' or 'false'."
        )

    return value == "true"

get_movement_mode() staticmethod

Get the movement mode from the environment variable.

Returns:

Name Type Description
bool bool

True if movement mode is enabled, otherwise False.

Source code in devices\raspberry_pi_pico_2w\circuit_python\src\lib\env.py
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
@staticmethod
def get_movement_mode() -> bool:
    """
    Get the movement mode from the environment variable.

    Returns:
        bool: True if movement mode is enabled, otherwise False.
    """
    value = getenv("MOVEMENT", "false").lower()

    # Check if the value is a valid boolean representation
    if not Env._check_boolean(value):
        raise ValueError(
            f"Invalid value for MOVEMENT: {value}. Expected 'true' or 'false'."
        )

    return value == "true"

esc_motor

ESCMotorError

Bases: Exception

Custom exception class for ESC motor errors.

Source code in devices\raspberry_pi_pico_2w\circuit_python\src\lib\esc_motor.py
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
class ESCMotorError(Exception):
    """
    Custom exception class for ESC motor errors.
    """

    def __init__(self, message):
        """
        Initializes the ESCMotorError with a custom message.
        """
        super().__init__(message)
        self.message = message

    def __str__(self):
        """
        Returns a string representation of the ESCMotorError.
        """
        return f"ESC Motor Error: {self.message}"

__init__(message)

Initializes the ESCMotorError with a custom message.

Source code in devices\raspberry_pi_pico_2w\circuit_python\src\lib\esc_motor.py
16
17
18
19
20
21
def __init__(self, message):
    """
    Initializes the ESCMotorError with a custom message.
    """
    super().__init__(message)
    self.message = message

__str__()

Returns a string representation of the ESCMotorError.

Source code in devices\raspberry_pi_pico_2w\circuit_python\src\lib\esc_motor.py
23
24
25
26
27
def __str__(self):
    """
    Returns a string representation of the ESCMotorError.
    """
    return f"ESC Motor Error: {self.message}"

ESCMotorHandler

A class to handle ESC (Electronic Speed Controller) motor operations.

Source code in devices\raspberry_pi_pico_2w\circuit_python\src\lib\esc_motor.py
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
class ESCMotorHandler:
    """
    A class to handle ESC (Electronic Speed Controller) motor operations.
    """
    # Default configuration
    PWM_FREQUENCY = 50
    MIN_PULSE = 1000
    MAX_PULSE = 2000

    # Speed factor to normalize that if positive the motor move forward, and if negative it moves backward
    SPEED_FACTOR = -1.0

    # Speed range
    SPEED_RANGE = (-1.0, 1.0)

    # Common speed values
    SPEED_FAST = 1.0
    SPEED_NORMAL = 0.5
    SPEED_SLOW = 0.25

    # Delay for motor operations
    DELAY = 0.2

    def __init__(
        self, motor_pin: int, frequency: int = PWM_FREQUENCY,
        min_pulse: int = MIN_PULSE, max_pulse: int = MAX_PULSE,
        movement: bool = True, debug: bool = False,
        serial_communication: SerialCommunication = None
    ):
        """
        Initializes the ESC motor handler with the specified parameters.

        Args:
            motor_pin (int): The GPIO pin connected to the ESC motor.
            frequency (int): The PWM frequency for the ESC motor.
            min_pulse (int): Minimum pulse width for the ESC motor.
            max_pulse (int): Maximum pulse width for the ESC motor.
            movement (bool): If True, the motor will be controlled for movement; if False, it will not.
            debug (bool): If True, debug messages will be sent.
            serial_communication (SerialCommunication): An instance of SerialCommunication for sending debug messages.
        """
        # Setup PWM output for the ESC motor
        self.__esc_pwm = PWMOut(motor_pin, duty_cycle=0, frequency=frequency)
        self.__esc_motor = ContinuousServo(
            self.__esc_pwm,
            min_pulse=min_pulse,
            max_pulse=max_pulse
        )

        # Set the movement flag and debug mode
        self.__movement = movement
        self.__debug = debug

        # Set the serial communication instance
        self.__serial_communication = serial_communication

        # Initialize the speed to 0
        self.__speed = None
        self.__esc_motor.throttle = 0
        time.sleep(self.DELAY)

    @classmethod
    def _check_speed_half_range(cls, speed: float):
        """
        Check the speed value to ensure it is within the valid half range for ESC motors.

        Args:
            speed (float): The speed value to check.

        Raises:
            ESCMotorError: If the speed is not within the valid half range.
        """
        if not (0 < speed <= cls.SPEED_RANGE[1]):
            raise ESCMotorError(
                f"Speed must be between 0 and {cls.SPEED_RANGE[1]}"
            )

    @classmethod
    def _check_speed_full_range(cls, speed: float):
        """
        Check the speed value to ensure it is within the valid full range for ESC motors.

        Args:
            speed (float): The speed value to check.

        Raises:
            ESCMotorError: If the speed is not within the valid full range.
        """
        if not (cls.SPEED_RANGE[0] <= speed <= cls.SPEED_RANGE[1]):
            raise ESCMotorError(
                f"Speed must be between {cls.SPEED_RANGE[0]} and {cls.SPEED_RANGE[1]}"
            )

    @property
    def speed(self) -> float:
        """
        Returns the current speed of the ESC motor.

        Returns:
            float: Current speed value between -1.0 (full reverse) and 1.0 (full forward).
        """
        return self.__speed

    async def set_speed(self, speed: float):
        """
        Sets the speed of the ESC motor.

        Args:
            speed (float): Speed value between -1.0 (full reverse) and 1.0 (full forward).
        """
        # Check if the speed is the same as the current speed
        if self.__speed == speed * self.SPEED_FACTOR:
            return

        # Check if the speed is within the full range
        self._check_speed_full_range(speed)
        self.__speed = speed * self.SPEED_FACTOR
        if self.__movement:
            self.__esc_motor.throttle = self.__speed

        # Send the received message
        if self.__debug and self.__serial_communication:
            self.__serial_communication.send_message(
                OutgoingMessage(
                    OutgoingCategory.DEBUG,
                    f"{IncomingCategory.MOTOR_SPEED}={self.__speed}"
                )
            )

        # Add a delay to allow the motor to respond
        await sleep(self.DELAY)

    async def stop(self):
        """
        Sets the speed of the ESC motor to 0.
        """
        await self.set_speed(0.0)

    async def forward(self, speed: float):
        """
        Sets the speed of the ESC motor forward.

        Args:
            speed (float): Speed value between 0 and 1.0 (full forward).
        """
        self._check_speed_half_range(speed)
        await self.set_speed(speed)

    async def backward(self, speed: float):
        """
        Sets the speed of the ESC motor backward.

        Args:
            speed (float): Speed value between 0 and 1.0 (full backward).
        """
        self._check_speed_half_range(speed)
        await self.set_speed(-speed)

speed property

Returns the current speed of the ESC motor.

Returns:

Name Type Description
float float

Current speed value between -1.0 (full reverse) and 1.0 (full forward).

__init__(motor_pin, frequency=PWM_FREQUENCY, min_pulse=MIN_PULSE, max_pulse=MAX_PULSE, movement=True, debug=False, serial_communication=None)

Initializes the ESC motor handler with the specified parameters.

Parameters:

Name Type Description Default
motor_pin int

The GPIO pin connected to the ESC motor.

required
frequency int

The PWM frequency for the ESC motor.

PWM_FREQUENCY
min_pulse int

Minimum pulse width for the ESC motor.

MIN_PULSE
max_pulse int

Maximum pulse width for the ESC motor.

MAX_PULSE
movement bool

If True, the motor will be controlled for movement; if False, it will not.

True
debug bool

If True, debug messages will be sent.

False
serial_communication SerialCommunication

An instance of SerialCommunication for sending debug messages.

None
Source code in devices\raspberry_pi_pico_2w\circuit_python\src\lib\esc_motor.py
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
def __init__(
    self, motor_pin: int, frequency: int = PWM_FREQUENCY,
    min_pulse: int = MIN_PULSE, max_pulse: int = MAX_PULSE,
    movement: bool = True, debug: bool = False,
    serial_communication: SerialCommunication = None
):
    """
    Initializes the ESC motor handler with the specified parameters.

    Args:
        motor_pin (int): The GPIO pin connected to the ESC motor.
        frequency (int): The PWM frequency for the ESC motor.
        min_pulse (int): Minimum pulse width for the ESC motor.
        max_pulse (int): Maximum pulse width for the ESC motor.
        movement (bool): If True, the motor will be controlled for movement; if False, it will not.
        debug (bool): If True, debug messages will be sent.
        serial_communication (SerialCommunication): An instance of SerialCommunication for sending debug messages.
    """
    # Setup PWM output for the ESC motor
    self.__esc_pwm = PWMOut(motor_pin, duty_cycle=0, frequency=frequency)
    self.__esc_motor = ContinuousServo(
        self.__esc_pwm,
        min_pulse=min_pulse,
        max_pulse=max_pulse
    )

    # Set the movement flag and debug mode
    self.__movement = movement
    self.__debug = debug

    # Set the serial communication instance
    self.__serial_communication = serial_communication

    # Initialize the speed to 0
    self.__speed = None
    self.__esc_motor.throttle = 0
    time.sleep(self.DELAY)

backward(speed) async

Sets the speed of the ESC motor backward.

Parameters:

Name Type Description Default
speed float

Speed value between 0 and 1.0 (full backward).

required
Source code in devices\raspberry_pi_pico_2w\circuit_python\src\lib\esc_motor.py
178
179
180
181
182
183
184
185
186
async def backward(self, speed: float):
    """
    Sets the speed of the ESC motor backward.

    Args:
        speed (float): Speed value between 0 and 1.0 (full backward).
    """
    self._check_speed_half_range(speed)
    await self.set_speed(-speed)

forward(speed) async

Sets the speed of the ESC motor forward.

Parameters:

Name Type Description Default
speed float

Speed value between 0 and 1.0 (full forward).

required
Source code in devices\raspberry_pi_pico_2w\circuit_python\src\lib\esc_motor.py
168
169
170
171
172
173
174
175
176
async def forward(self, speed: float):
    """
    Sets the speed of the ESC motor forward.

    Args:
        speed (float): Speed value between 0 and 1.0 (full forward).
    """
    self._check_speed_half_range(speed)
    await self.set_speed(speed)

set_speed(speed) async

Sets the speed of the ESC motor.

Parameters:

Name Type Description Default
speed float

Speed value between -1.0 (full reverse) and 1.0 (full forward).

required
Source code in devices\raspberry_pi_pico_2w\circuit_python\src\lib\esc_motor.py
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
async def set_speed(self, speed: float):
    """
    Sets the speed of the ESC motor.

    Args:
        speed (float): Speed value between -1.0 (full reverse) and 1.0 (full forward).
    """
    # Check if the speed is the same as the current speed
    if self.__speed == speed * self.SPEED_FACTOR:
        return

    # Check if the speed is within the full range
    self._check_speed_full_range(speed)
    self.__speed = speed * self.SPEED_FACTOR
    if self.__movement:
        self.__esc_motor.throttle = self.__speed

    # Send the received message
    if self.__debug and self.__serial_communication:
        self.__serial_communication.send_message(
            OutgoingMessage(
                OutgoingCategory.DEBUG,
                f"{IncomingCategory.MOTOR_SPEED}={self.__speed}"
            )
        )

    # Add a delay to allow the motor to respond
    await sleep(self.DELAY)

stop() async

Sets the speed of the ESC motor to 0.

Source code in devices\raspberry_pi_pico_2w\circuit_python\src\lib\esc_motor.py
162
163
164
165
166
async def stop(self):
    """
    Sets the speed of the ESC motor to 0.
    """
    await self.set_speed(0.0)

led

LEDHandler

A class to manage the onboard LED of the Raspberry Pi Pico 2W.

Source code in devices\raspberry_pi_pico_2w\circuit_python\src\lib\led.py
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
class LEDHandler:
    """
    A class to manage the onboard LED of the Raspberry Pi Pico 2W.
    """

    def __init__(self, led_pin: int):
        """
        Initializes the LED on the specified GPIO pin.

        Args:
            led_pin (int): The GPIO number where the LED is connected. Default is 25.
        """
        self.__led = DigitalInOut(led_pin)
        self.__led.direction = Direction.OUTPUT

    def on(self):
        """Turns the LED on."""
        self.__led.value = True

    def is_on(self) -> bool:
        """
        Checks if the LED is currently on.

        Returns:
            bool: True if the LED is on, False otherwise.
        """
        return self.__led.value

    def off(self):
        """Turns the LED off."""
        self.__led.value = False

    def is_off(self) -> bool:
        """
        Checks if the LED is currently off.

        Returns:
            bool: True if the LED is off, False otherwise.
        """
        return not self.__led.value

    def toggle(self):
        """Toggles the LED state."""
        self.__led.value = not self.__led.value

    async def blink(self, times: int = 1, delay: float = 0.5):
        """
        Blinks the LED a specified number of times with a delay.

        Args:
            times (int): The number of times to blink the LED. Default is 1.
            delay (float): The delay in seconds between on and off states. Default is 0.5 seconds.
        """
        for _ in range(times):
            self.on()
            await sleep(delay)
            self.off()
            await sleep(delay)

__init__(led_pin)

Initializes the LED on the specified GPIO pin.

Parameters:

Name Type Description Default
led_pin int

The GPIO number where the LED is connected. Default is 25.

required
Source code in devices\raspberry_pi_pico_2w\circuit_python\src\lib\led.py
11
12
13
14
15
16
17
18
19
def __init__(self, led_pin: int):
    """
    Initializes the LED on the specified GPIO pin.

    Args:
        led_pin (int): The GPIO number where the LED is connected. Default is 25.
    """
    self.__led = DigitalInOut(led_pin)
    self.__led.direction = Direction.OUTPUT

Blinks the LED a specified number of times with a delay.

Parameters:

Name Type Description Default
times int

The number of times to blink the LED. Default is 1.

1
delay float

The delay in seconds between on and off states. Default is 0.5 seconds.

0.5
Source code in devices\raspberry_pi_pico_2w\circuit_python\src\lib\led.py
51
52
53
54
55
56
57
58
59
60
61
62
63
async def blink(self, times: int = 1, delay: float = 0.5):
    """
    Blinks the LED a specified number of times with a delay.

    Args:
        times (int): The number of times to blink the LED. Default is 1.
        delay (float): The delay in seconds between on and off states. Default is 0.5 seconds.
    """
    for _ in range(times):
        self.on()
        await sleep(delay)
        self.off()
        await sleep(delay)

is_off()

Checks if the LED is currently off.

Returns:

Name Type Description
bool bool

True if the LED is off, False otherwise.

Source code in devices\raspberry_pi_pico_2w\circuit_python\src\lib\led.py
38
39
40
41
42
43
44
45
def is_off(self) -> bool:
    """
    Checks if the LED is currently off.

    Returns:
        bool: True if the LED is off, False otherwise.
    """
    return not self.__led.value

is_on()

Checks if the LED is currently on.

Returns:

Name Type Description
bool bool

True if the LED is on, False otherwise.

Source code in devices\raspberry_pi_pico_2w\circuit_python\src\lib\led.py
25
26
27
28
29
30
31
32
def is_on(self) -> bool:
    """
    Checks if the LED is currently on.

    Returns:
        bool: True if the LED is on, False otherwise.
    """
    return self.__led.value

off()

Turns the LED off.

Source code in devices\raspberry_pi_pico_2w\circuit_python\src\lib\led.py
34
35
36
def off(self):
    """Turns the LED off."""
    self.__led.value = False

on()

Turns the LED on.

Source code in devices\raspberry_pi_pico_2w\circuit_python\src\lib\led.py
21
22
23
def on(self):
    """Turns the LED on."""
    self.__led.value = True

toggle()

Toggles the LED state.

Source code in devices\raspberry_pi_pico_2w\circuit_python\src\lib\led.py
47
48
49
def toggle(self):
    """Toggles the LED state."""
    self.__led.value = not self.__led.value

message

IncomingMessage

Class to handle the messages received from the Raspberry Pi 5.

Source code in devices\raspberry_pi_pico_2w\circuit_python\src\lib\message.py
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
class IncomingMessage:
    """
    Class to handle the messages received from the Raspberry Pi 5.
    """

    def __init__(self, category: str, content: str):
        """
        Initialize the message class.

        Args:
            category (str): The category of the message.
            content (str): The content of the message.
        """
        self.category = category
        self.content = content

    def __str__(self) -> str:
        """
        String representation of the message.
        """
        return f"{self.__category}{HEADER_SEPARATOR_CHAR}{self.__content}{END_CHAR}"

    def __eq__(self, other: 'IncomingMessage') -> bool:
        """
        Check if two messages are equal.

        Args:
            other (IncomingMessage): The other message to compare with.

        Returns:
            bool: True if the messages are equal, False otherwise.
        """
        return (self.category == other.category and
                self.content == other.content)

    @staticmethod
    def from_string(msg_str: str) -> 'IncomingMessage':
        """
        Create a IncomingMessage object from a string.

        Args:
            msg_str (str): The string representation of the message.

        Returns:
            IncomingMessage: The IncomingMessage object created from the string.
        """
        # Remove the end character if present
        if msg_str.endswith(END_CHAR):
            msg_str = msg_str[:-1]

        # Split the string into category and content
        parts = msg_str.strip().split(HEADER_SEPARATOR_CHAR, 1)
        if len(parts) != 2:
            raise ValueError("Invalid incoming message format")

        # Convert the category string to a Category enum value
        category = IncomingCategory.from_string(parts[0])

        # Create and return the IncomingMessage object
        return IncomingMessage(category, parts[1])

    @property
    def category(self) -> str:
        """
        Property to get the message category.

        Returns:
            str: The category of the message.
        """
        return self.__category

    @category.setter
    def category(self, category: str):
        """
        Setter for the message category.

        Args:
            category (str): The category of the message.
        """
        self.__category = category

    @property
    def content(self) -> str:
        """
        Get the message content.

        Returns:
            str: The content of the message.
        """
        return self.__content

    @content.setter
    def content(self, content: str):
        """
        Setter for the message content.

        Args:
            content (str): The content of the message.
        """
        self.__content = content

    def format_to_send_with_error_message(self) -> str:
        """
        Format the message to send with an error message.

        Returns:
            str: The formatted message string.
        """
        return f"{self.__category}{HEADER_SEPARATOR_CHAR}{self.__content}"

category property writable

Property to get the message category.

Returns:

Name Type Description
str str

The category of the message.

content property writable

Get the message content.

Returns:

Name Type Description
str str

The content of the message.

__eq__(other)

Check if two messages are equal.

Parameters:

Name Type Description Default
other IncomingMessage

The other message to compare with.

required

Returns:

Name Type Description
bool bool

True if the messages are equal, False otherwise.

Source code in devices\raspberry_pi_pico_2w\circuit_python\src\lib\message.py
32
33
34
35
36
37
38
39
40
41
42
43
def __eq__(self, other: 'IncomingMessage') -> bool:
    """
    Check if two messages are equal.

    Args:
        other (IncomingMessage): The other message to compare with.

    Returns:
        bool: True if the messages are equal, False otherwise.
    """
    return (self.category == other.category and
            self.content == other.content)

__init__(category, content)

Initialize the message class.

Parameters:

Name Type Description Default
category str

The category of the message.

required
content str

The content of the message.

required
Source code in devices\raspberry_pi_pico_2w\circuit_python\src\lib\message.py
15
16
17
18
19
20
21
22
23
24
def __init__(self, category: str, content: str):
    """
    Initialize the message class.

    Args:
        category (str): The category of the message.
        content (str): The content of the message.
    """
    self.category = category
    self.content = content

__str__()

String representation of the message.

Source code in devices\raspberry_pi_pico_2w\circuit_python\src\lib\message.py
26
27
28
29
30
def __str__(self) -> str:
    """
    String representation of the message.
    """
    return f"{self.__category}{HEADER_SEPARATOR_CHAR}{self.__content}{END_CHAR}"

format_to_send_with_error_message()

Format the message to send with an error message.

Returns:

Name Type Description
str str

The formatted message string.

Source code in devices\raspberry_pi_pico_2w\circuit_python\src\lib\message.py
111
112
113
114
115
116
117
118
def format_to_send_with_error_message(self) -> str:
    """
    Format the message to send with an error message.

    Returns:
        str: The formatted message string.
    """
    return f"{self.__category}{HEADER_SEPARATOR_CHAR}{self.__content}"

from_string(msg_str) staticmethod

Create a IncomingMessage object from a string.

Parameters:

Name Type Description Default
msg_str str

The string representation of the message.

required

Returns:

Name Type Description
IncomingMessage IncomingMessage

The IncomingMessage object created from the string.

Source code in devices\raspberry_pi_pico_2w\circuit_python\src\lib\message.py
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
@staticmethod
def from_string(msg_str: str) -> 'IncomingMessage':
    """
    Create a IncomingMessage object from a string.

    Args:
        msg_str (str): The string representation of the message.

    Returns:
        IncomingMessage: The IncomingMessage object created from the string.
    """
    # Remove the end character if present
    if msg_str.endswith(END_CHAR):
        msg_str = msg_str[:-1]

    # Split the string into category and content
    parts = msg_str.strip().split(HEADER_SEPARATOR_CHAR, 1)
    if len(parts) != 2:
        raise ValueError("Invalid incoming message format")

    # Convert the category string to a Category enum value
    category = IncomingCategory.from_string(parts[0])

    # Create and return the IncomingMessage object
    return IncomingMessage(category, parts[1])

OutgoingMessage

Class to handle the messages sent to the Raspberry Pi 5.

Source code in devices\raspberry_pi_pico_2w\circuit_python\src\lib\message.py
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
class OutgoingMessage:
    """
    Class to handle the messages sent to the Raspberry Pi 5.
    """

    def __init__(self, category: str, content: str):
        """
        Initialize the message class.

        Args:
            category (str): The category of the message.
            content (str): The content of the message.
        """
        self.category = category
        self.content = content

    def __str__(self) -> str:
        """
        String representation of the message.
        """
        return f"{self.__category}{HEADER_SEPARATOR_CHAR}{self.__content}{END_CHAR}"

    def __eq__(self, other: 'OutgoingMessage') -> bool:
        """
        Check if two messages are equal.

        Args:
            other (OutgoingMessage): The other message to compare with.

        Returns:
            bool: True if the messages are equal, False otherwise.
        """
        return (self.category == other.category and
                self.content == other.content)

    @staticmethod
    def from_string(msg_str: str) -> 'OutgoingMessage':
        """
        Create a OutgoingMessage object from a string.

        Args:
            msg_str (str): The string representation of the message.

        Returns:
            OutgoingMessage: The OutgoingMessage object created from the string.
        """
        # Remove the end character if present
        if msg_str.endswith(END_CHAR):
            msg_str = msg_str[:-1]

        # Split the string into category and content
        parts = msg_str.strip().split(HEADER_SEPARATOR_CHAR, 1)
        if len(parts) != 2:
            raise ValueError("Invalid outgoing message format")

        # Convert the category string to a Category enum value
        category = OutgoingCategory.from_string(parts[0])

        # Create and return the OutgoingMessage object
        return OutgoingMessage(category, parts[1])

    @property
    def category(self) -> str:
        """
        Property to get the message category.

        Returns:
            str: The category of the message.
        """
        return self.__category

    @category.setter
    def category(self, category: str):
        """
        Setter for the message category.

        Args:
            category (str): The category of the message.
        """
        self.__category = category

    @property
    def content(self) -> str:
        """
        Get the message content.

        Returns:
            str: The content of the message.
        """
        return self.__content

    @content.setter
    def content(self, content: str):
        """
        Setter for the message content.

        Args:
            content (str): The content of the message.
        """
        self.__content = content

    def format_to_send_with_error_message(self) -> str:
        """
        Format the message to send with an error message.

        Returns:
            str: The formatted message string.
        """
        return f"{self.__category}{HEADER_SEPARATOR_CHAR}{self.__content}"

category property writable

Property to get the message category.

Returns:

Name Type Description
str str

The category of the message.

content property writable

Get the message content.

Returns:

Name Type Description
str str

The content of the message.

__eq__(other)

Check if two messages are equal.

Parameters:

Name Type Description Default
other OutgoingMessage

The other message to compare with.

required

Returns:

Name Type Description
bool bool

True if the messages are equal, False otherwise.

Source code in devices\raspberry_pi_pico_2w\circuit_python\src\lib\message.py
143
144
145
146
147
148
149
150
151
152
153
154
def __eq__(self, other: 'OutgoingMessage') -> bool:
    """
    Check if two messages are equal.

    Args:
        other (OutgoingMessage): The other message to compare with.

    Returns:
        bool: True if the messages are equal, False otherwise.
    """
    return (self.category == other.category and
            self.content == other.content)

__init__(category, content)

Initialize the message class.

Parameters:

Name Type Description Default
category str

The category of the message.

required
content str

The content of the message.

required
Source code in devices\raspberry_pi_pico_2w\circuit_python\src\lib\message.py
126
127
128
129
130
131
132
133
134
135
def __init__(self, category: str, content: str):
    """
    Initialize the message class.

    Args:
        category (str): The category of the message.
        content (str): The content of the message.
    """
    self.category = category
    self.content = content

__str__()

String representation of the message.

Source code in devices\raspberry_pi_pico_2w\circuit_python\src\lib\message.py
137
138
139
140
141
def __str__(self) -> str:
    """
    String representation of the message.
    """
    return f"{self.__category}{HEADER_SEPARATOR_CHAR}{self.__content}{END_CHAR}"

format_to_send_with_error_message()

Format the message to send with an error message.

Returns:

Name Type Description
str str

The formatted message string.

Source code in devices\raspberry_pi_pico_2w\circuit_python\src\lib\message.py
222
223
224
225
226
227
228
229
def format_to_send_with_error_message(self) -> str:
    """
    Format the message to send with an error message.

    Returns:
        str: The formatted message string.
    """
    return f"{self.__category}{HEADER_SEPARATOR_CHAR}{self.__content}"

from_string(msg_str) staticmethod

Create a OutgoingMessage object from a string.

Parameters:

Name Type Description Default
msg_str str

The string representation of the message.

required

Returns:

Name Type Description
OutgoingMessage OutgoingMessage

The OutgoingMessage object created from the string.

Source code in devices\raspberry_pi_pico_2w\circuit_python\src\lib\message.py
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
@staticmethod
def from_string(msg_str: str) -> 'OutgoingMessage':
    """
    Create a OutgoingMessage object from a string.

    Args:
        msg_str (str): The string representation of the message.

    Returns:
        OutgoingMessage: The OutgoingMessage object created from the string.
    """
    # Remove the end character if present
    if msg_str.endswith(END_CHAR):
        msg_str = msg_str[:-1]

    # Split the string into category and content
    parts = msg_str.strip().split(HEADER_SEPARATOR_CHAR, 1)
    if len(parts) != 2:
        raise ValueError("Invalid outgoing message format")

    # Convert the category string to a Category enum value
    category = OutgoingCategory.from_string(parts[0])

    # Create and return the OutgoingMessage object
    return OutgoingMessage(category, parts[1])

serial_communication

SerialCommunication

A class to handle serial communication over USB CDC in CircuitPython.

Source code in devices\raspberry_pi_pico_2w\circuit_python\src\lib\serial_communication.py
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
class SerialCommunication:
    """
    A class to handle serial communication over USB CDC in CircuitPython.
    """
    # Default configuration
    TOGGLE_LED_ON_RECEIVE = False
    DATA_PORT_ENABLED = True
    CONSOLE_PORT_ENABLED = True
    CHUNK_SIZE = 64

    # Status outgoing messages
    START_MESSAGE = OutgoingMessage(
        OutgoingCategory.STATUS,
        Status.START
    )
    OUTGOING_OK_MESSAGE = OutgoingMessage(
        OutgoingCategory.STATUS,
        Status.OK
    )

    # Status incoming messages
    STOP_MESSAGE = IncomingMessage(
        IncomingCategory.STATUS,
        Status.STOP
    )
    INCOMING_OK_MESSAGE = IncomingMessage(
        IncomingCategory.STATUS,
        Status.OK
    )
    HEARTBEAT_MESSAGE = IncomingMessage(
        OutgoingCategory.STATUS,
        Status.HEARTBEAT
    )

    # Challenge messages
    CHALLENGE_WITH_OBSTACLES = OutgoingMessage(
        OutgoingCategory.CHALLENGE,
        Challenge.WITH_OBSTACLES
    )
    CHALLENGE_WITHOUT_OBSTACLES = OutgoingMessage(
        OutgoingCategory.CHALLENGE,
        Challenge.WITHOUT_OBSTACLES
    )

    # Confirmation timeout
    CONFIRMATION_TIMEOUT = 5.0

    def __init__(
        self,
        challenge: str,
        console_port_enabled: bool = CONSOLE_PORT_ENABLED,
        data_port_enabled: bool = DATA_PORT_ENABLED,
        led: LEDHandler = None
    ):
        """
        Initialize the SerialCommunication instance.

        Args:
            challenge (str): The challenge type for the robot.
            console_port_enabled (bool): Whether to enable the console port for sending messages.
            data_port_enabled (bool): Whether to enable the data port for receiving messages.
            led (LEDHandler | None): Optional LED handler for toggling the LED on message receive.
        """
        self.__console_port = console if console_port_enabled else None
        self.__data_port = data if data_port_enabled else None
        self.__challenge = challenge
        self.__led = led

    async def receive_messages(self) -> list[IncomingMessage]:
        """
        Receive messages from the USB CDC data stream.

        Returns:
            list[IncomingMessage]: A list of received messages or None if no messages are waiting.
        Raises:
            SerialCommunicationError: If the data port is not enabled or if there is an error in reading messages.
        """
        if not self.__data_port:
            raise SerialCommunicationError("Data port is not enabled.")

        if self.__data_port.in_waiting == 0:
            # Turn off the LED if no data is waiting
            if self.__led and self.__led.is_on():
                self.__led.off()
            return []

        # Turn on the LED to indicate a message has been received
        if self.__led and self.__led.is_off():
            self.__led.on()

        msgs = []
        buffer = b""
        while self.__data_port.in_waiting > 0:
            byte = self.__data_port.read(1)
            if not byte:
                continue
            if byte != END_CHAR.encode("utf-8"):
                buffer += byte
                continue

            try:
                msg_str = buffer.decode("utf-8").strip()
                msg = IncomingMessage.from_string(msg_str)
                msgs.append(msg)

            except Exception as e:
                raise SerialCommunicationError(
                    f"Invalid message format or undecodable bytes: {buffer} ({e})"
                ) from e
            buffer = b""

        return msgs

    def send_message(self, msg: OutgoingMessage):
        """
        Send a message to the USB CDC console stream.

        Args:
            msg (OutgoingMessage): The message to send.
        Raises:
            SerialCommunicationError: If the console port is not enabled or if there is an error in sending the message.
        """
        if not self.__console_port:
            raise SerialCommunicationError("Console port is not enabled.")

        try:
            self.__console_port.write(str(msg).encode("utf-8"))

            # Flush the console port to ensure the message is sent
            self.__console_port.flush()

        except Exception as e:
            raise SerialCommunicationError(f"Error sending message: {e}")

    def send_buffer_message(self, category: str, msg: StringIO):
        """
        Send a message in chunks to the USB CDC console stream.

        Args:
            category (str): The category of the message.
            msg (StringIO): The message to send as a StringIO object.
        Raises:
            SerialCommunicationError: If the console port is not enabled or if there is an error in sending the message.
        """
        if not self.__console_port:
            raise SerialCommunicationError("Console port is not enabled.")

        try:
            # Send the category header
            self.__console_port.write(f"{category}{HEADER_SEPARATOR_CHAR}".encode("utf-8"))

            # Send error message to the serial communication
            msg_str = msg.getvalue()
            for i in range(0, len(msg_str), self.CHUNK_SIZE):
                chunk = msg_str[i:i + self.CHUNK_SIZE] if i + self.CHUNK_SIZE < len(msg_str) else msg_str[i:]
                self.__console_port.write(chunk.encode("utf-8"))

            # Send the end character
            self.__console_port.write(END_CHAR.encode("utf-8"))

            # Flush the console port to ensure the message is sent
            self.__console_port.flush()

        except Exception as e:
            raise SerialCommunicationError(f"Error sending message: {e}")

    def send_confirmation_message(self):
        """
        Sends a confirmation message through the data port.
        """
        self.send_message(self.OUTGOING_OK_MESSAGE)

    async def wait_for_confirmation_message(
        self,
        msg_to_confirm: OutgoingMessage,
        timeout: float = CONFIRMATION_TIMEOUT
    ) -> None:
        """
        Wait for a confirmation message from the console port.

        Args:
            timeout (float): The maximum time to wait for a confirmation message.
            msg_to_confirm (OutgoingMessage): The message to confirm.
        Raises:
            SerialCommunicationError: If the confirmation message is not received within the timeout period.
        """
        start_time = monotonic()
        while monotonic() - start_time < timeout:
            msgs = await self.receive_messages()
            for msg in msgs:
                if msg == self.INCOMING_OK_MESSAGE:
                    return

        raise SerialCommunicationError(
            f"Confirmation message '{msg_to_confirm.format_to_send_with_error_message()}' not received within {timeout} seconds."
        )

    def send_initialization_message(self):
        """
        Send an END_CHAR message to the console port to indicate initialization.
        """
        if not self.__console_port:
            raise SerialCommunicationError("Console port is not enabled.")

        try:
            self.__console_port.write(END_CHAR.encode("utf-8"))

        except Exception as e:
            raise SerialCommunicationError(
                f"Error sending initialization message: {e}"
                )

    async def send_challenge_message(self):
        """
        Send a challenge message to the console port.

        Raises:
            SerialCommunicationError: If the console port is not enabled or if confirmation is not received.
        """
        # Send the challenge message based on the challenge type
        challenge_message = self.CHALLENGE_WITH_OBSTACLES if self.__challenge == Challenge.WITH_OBSTACLES else self.CHALLENGE_WITHOUT_OBSTACLES
        self.send_message(challenge_message)

        # Wait for confirmation of the challenge message
        await self.wait_for_confirmation_message(challenge_message)

    def send_bno08x_yaw_deg_message(self, yaw_deg: float):
        """
        Send a BNO08x yaw degrees message to the console port.

        Args:
            yaw_deg (float): The yaw value to send.
        """
        bno08x_message = OutgoingMessage(
            OutgoingCategory.BNO08X_YAW_DEG,
            str(yaw_deg)
            )
        self.send_message(bno08x_message)

    def send_bno08x_turns_message(self, turns: int):
        """
        Send a BNO08x turns message to the console port.

        Args:
            turns (int): The number of turns to send.
        """
        bno08x_message = OutgoingMessage(
            OutgoingCategory.BNO08X_TURNS,
            str(turns)
        )
        self.send_message(bno08x_message)

    def send_error_message(self, error: Exception):
        """
        Send an error message to the console port.

        Args:
            error (Exception): The error to send.
        """
        error_message = OutgoingMessage(OutgoingCategory.ERROR, str(error))
        self.send_message(error_message)

    async def start(self):
        """
        Send the start message to the console port and wait for confirmation.

        Raises:
            SerialCommunicationError: If the console port is not enabled or if confirmation is not received.
        """
        try:
            # Send the start message
            self.send_message(self.START_MESSAGE)

            # Wait for confirmation of the start message
            await self.wait_for_confirmation_message(self.START_MESSAGE)

        except Exception as e:
            raise e

    async def stop(self):
        """
        Close the serial communication.
        """
        if self.__console_port:
            self.__console_port.deinit()

        if self.__data_port:
            self.__data_port.deinit()

        # Turn off the LED if it exists
        if self.__led:
            self.__led.off()

__init__(challenge, console_port_enabled=CONSOLE_PORT_ENABLED, data_port_enabled=DATA_PORT_ENABLED, led=None)

Initialize the SerialCommunication instance.

Parameters:

Name Type Description Default
challenge str

The challenge type for the robot.

required
console_port_enabled bool

Whether to enable the console port for sending messages.

CONSOLE_PORT_ENABLED
data_port_enabled bool

Whether to enable the data port for receiving messages.

DATA_PORT_ENABLED
led LEDHandler | None

Optional LED handler for toggling the LED on message receive.

None
Source code in devices\raspberry_pi_pico_2w\circuit_python\src\lib\serial_communication.py
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
def __init__(
    self,
    challenge: str,
    console_port_enabled: bool = CONSOLE_PORT_ENABLED,
    data_port_enabled: bool = DATA_PORT_ENABLED,
    led: LEDHandler = None
):
    """
    Initialize the SerialCommunication instance.

    Args:
        challenge (str): The challenge type for the robot.
        console_port_enabled (bool): Whether to enable the console port for sending messages.
        data_port_enabled (bool): Whether to enable the data port for receiving messages.
        led (LEDHandler | None): Optional LED handler for toggling the LED on message receive.
    """
    self.__console_port = console if console_port_enabled else None
    self.__data_port = data if data_port_enabled else None
    self.__challenge = challenge
    self.__led = led

receive_messages() async

Receive messages from the USB CDC data stream.

Returns:

Type Description
list[IncomingMessage]

list[IncomingMessage]: A list of received messages or None if no messages are waiting.

Raises:
SerialCommunicationError: If the data port is not enabled or if there is an error in reading messages.

Source code in devices\raspberry_pi_pico_2w\circuit_python\src\lib\serial_communication.py
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
async def receive_messages(self) -> list[IncomingMessage]:
    """
    Receive messages from the USB CDC data stream.

    Returns:
        list[IncomingMessage]: A list of received messages or None if no messages are waiting.
    Raises:
        SerialCommunicationError: If the data port is not enabled or if there is an error in reading messages.
    """
    if not self.__data_port:
        raise SerialCommunicationError("Data port is not enabled.")

    if self.__data_port.in_waiting == 0:
        # Turn off the LED if no data is waiting
        if self.__led and self.__led.is_on():
            self.__led.off()
        return []

    # Turn on the LED to indicate a message has been received
    if self.__led and self.__led.is_off():
        self.__led.on()

    msgs = []
    buffer = b""
    while self.__data_port.in_waiting > 0:
        byte = self.__data_port.read(1)
        if not byte:
            continue
        if byte != END_CHAR.encode("utf-8"):
            buffer += byte
            continue

        try:
            msg_str = buffer.decode("utf-8").strip()
            msg = IncomingMessage.from_string(msg_str)
            msgs.append(msg)

        except Exception as e:
            raise SerialCommunicationError(
                f"Invalid message format or undecodable bytes: {buffer} ({e})"
            ) from e
        buffer = b""

    return msgs

send_bno08x_turns_message(turns)

Send a BNO08x turns message to the console port.

Parameters:

Name Type Description Default
turns int

The number of turns to send.

required
Source code in devices\raspberry_pi_pico_2w\circuit_python\src\lib\serial_communication.py
270
271
272
273
274
275
276
277
278
279
280
281
def send_bno08x_turns_message(self, turns: int):
    """
    Send a BNO08x turns message to the console port.

    Args:
        turns (int): The number of turns to send.
    """
    bno08x_message = OutgoingMessage(
        OutgoingCategory.BNO08X_TURNS,
        str(turns)
    )
    self.send_message(bno08x_message)

send_bno08x_yaw_deg_message(yaw_deg)

Send a BNO08x yaw degrees message to the console port.

Parameters:

Name Type Description Default
yaw_deg float

The yaw value to send.

required
Source code in devices\raspberry_pi_pico_2w\circuit_python\src\lib\serial_communication.py
257
258
259
260
261
262
263
264
265
266
267
268
def send_bno08x_yaw_deg_message(self, yaw_deg: float):
    """
    Send a BNO08x yaw degrees message to the console port.

    Args:
        yaw_deg (float): The yaw value to send.
    """
    bno08x_message = OutgoingMessage(
        OutgoingCategory.BNO08X_YAW_DEG,
        str(yaw_deg)
        )
    self.send_message(bno08x_message)

send_buffer_message(category, msg)

Send a message in chunks to the USB CDC console stream.

Parameters:

Name Type Description Default
category str

The category of the message.

required
msg StringIO

The message to send as a StringIO object.

required

Raises:
SerialCommunicationError: If the console port is not enabled or if there is an error in sending the message.

Source code in devices\raspberry_pi_pico_2w\circuit_python\src\lib\serial_communication.py
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
def send_buffer_message(self, category: str, msg: StringIO):
    """
    Send a message in chunks to the USB CDC console stream.

    Args:
        category (str): The category of the message.
        msg (StringIO): The message to send as a StringIO object.
    Raises:
        SerialCommunicationError: If the console port is not enabled or if there is an error in sending the message.
    """
    if not self.__console_port:
        raise SerialCommunicationError("Console port is not enabled.")

    try:
        # Send the category header
        self.__console_port.write(f"{category}{HEADER_SEPARATOR_CHAR}".encode("utf-8"))

        # Send error message to the serial communication
        msg_str = msg.getvalue()
        for i in range(0, len(msg_str), self.CHUNK_SIZE):
            chunk = msg_str[i:i + self.CHUNK_SIZE] if i + self.CHUNK_SIZE < len(msg_str) else msg_str[i:]
            self.__console_port.write(chunk.encode("utf-8"))

        # Send the end character
        self.__console_port.write(END_CHAR.encode("utf-8"))

        # Flush the console port to ensure the message is sent
        self.__console_port.flush()

    except Exception as e:
        raise SerialCommunicationError(f"Error sending message: {e}")

send_challenge_message() async

Send a challenge message to the console port.

Raises:

Type Description
SerialCommunicationError

If the console port is not enabled or if confirmation is not received.

Source code in devices\raspberry_pi_pico_2w\circuit_python\src\lib\serial_communication.py
243
244
245
246
247
248
249
250
251
252
253
254
255
async def send_challenge_message(self):
    """
    Send a challenge message to the console port.

    Raises:
        SerialCommunicationError: If the console port is not enabled or if confirmation is not received.
    """
    # Send the challenge message based on the challenge type
    challenge_message = self.CHALLENGE_WITH_OBSTACLES if self.__challenge == Challenge.WITH_OBSTACLES else self.CHALLENGE_WITHOUT_OBSTACLES
    self.send_message(challenge_message)

    # Wait for confirmation of the challenge message
    await self.wait_for_confirmation_message(challenge_message)

send_confirmation_message()

Sends a confirmation message through the data port.

Source code in devices\raspberry_pi_pico_2w\circuit_python\src\lib\serial_communication.py
197
198
199
200
201
def send_confirmation_message(self):
    """
    Sends a confirmation message through the data port.
    """
    self.send_message(self.OUTGOING_OK_MESSAGE)

send_error_message(error)

Send an error message to the console port.

Parameters:

Name Type Description Default
error Exception

The error to send.

required
Source code in devices\raspberry_pi_pico_2w\circuit_python\src\lib\serial_communication.py
283
284
285
286
287
288
289
290
291
def send_error_message(self, error: Exception):
    """
    Send an error message to the console port.

    Args:
        error (Exception): The error to send.
    """
    error_message = OutgoingMessage(OutgoingCategory.ERROR, str(error))
    self.send_message(error_message)

send_initialization_message()

Send an END_CHAR message to the console port to indicate initialization.

Source code in devices\raspberry_pi_pico_2w\circuit_python\src\lib\serial_communication.py
228
229
230
231
232
233
234
235
236
237
238
239
240
241
def send_initialization_message(self):
    """
    Send an END_CHAR message to the console port to indicate initialization.
    """
    if not self.__console_port:
        raise SerialCommunicationError("Console port is not enabled.")

    try:
        self.__console_port.write(END_CHAR.encode("utf-8"))

    except Exception as e:
        raise SerialCommunicationError(
            f"Error sending initialization message: {e}"
            )

send_message(msg)

Send a message to the USB CDC console stream.

Parameters:

Name Type Description Default
msg OutgoingMessage

The message to send.

required

Raises:
SerialCommunicationError: If the console port is not enabled or if there is an error in sending the message.

Source code in devices\raspberry_pi_pico_2w\circuit_python\src\lib\serial_communication.py
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
def send_message(self, msg: OutgoingMessage):
    """
    Send a message to the USB CDC console stream.

    Args:
        msg (OutgoingMessage): The message to send.
    Raises:
        SerialCommunicationError: If the console port is not enabled or if there is an error in sending the message.
    """
    if not self.__console_port:
        raise SerialCommunicationError("Console port is not enabled.")

    try:
        self.__console_port.write(str(msg).encode("utf-8"))

        # Flush the console port to ensure the message is sent
        self.__console_port.flush()

    except Exception as e:
        raise SerialCommunicationError(f"Error sending message: {e}")

start() async

Send the start message to the console port and wait for confirmation.

Raises:

Type Description
SerialCommunicationError

If the console port is not enabled or if confirmation is not received.

Source code in devices\raspberry_pi_pico_2w\circuit_python\src\lib\serial_communication.py
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
async def start(self):
    """
    Send the start message to the console port and wait for confirmation.

    Raises:
        SerialCommunicationError: If the console port is not enabled or if confirmation is not received.
    """
    try:
        # Send the start message
        self.send_message(self.START_MESSAGE)

        # Wait for confirmation of the start message
        await self.wait_for_confirmation_message(self.START_MESSAGE)

    except Exception as e:
        raise e

stop() async

Close the serial communication.

Source code in devices\raspberry_pi_pico_2w\circuit_python\src\lib\serial_communication.py
310
311
312
313
314
315
316
317
318
319
320
321
322
async def stop(self):
    """
    Close the serial communication.
    """
    if self.__console_port:
        self.__console_port.deinit()

    if self.__data_port:
        self.__data_port.deinit()

    # Turn off the LED if it exists
    if self.__led:
        self.__led.off()

wait_for_confirmation_message(msg_to_confirm, timeout=CONFIRMATION_TIMEOUT) async

Wait for a confirmation message from the console port.

Parameters:

Name Type Description Default
timeout float

The maximum time to wait for a confirmation message.

CONFIRMATION_TIMEOUT
msg_to_confirm OutgoingMessage

The message to confirm.

required

Raises:
SerialCommunicationError: If the confirmation message is not received within the timeout period.

Source code in devices\raspberry_pi_pico_2w\circuit_python\src\lib\serial_communication.py
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
async def wait_for_confirmation_message(
    self,
    msg_to_confirm: OutgoingMessage,
    timeout: float = CONFIRMATION_TIMEOUT
) -> None:
    """
    Wait for a confirmation message from the console port.

    Args:
        timeout (float): The maximum time to wait for a confirmation message.
        msg_to_confirm (OutgoingMessage): The message to confirm.
    Raises:
        SerialCommunicationError: If the confirmation message is not received within the timeout period.
    """
    start_time = monotonic()
    while monotonic() - start_time < timeout:
        msgs = await self.receive_messages()
        for msg in msgs:
            if msg == self.INCOMING_OK_MESSAGE:
                return

    raise SerialCommunicationError(
        f"Confirmation message '{msg_to_confirm.format_to_send_with_error_message()}' not received within {timeout} seconds."
    )

SerialCommunicationError

Bases: Exception

Custom exception class for serial communication errors.

Source code in devices\raspberry_pi_pico_2w\circuit_python\src\lib\serial_communication.py
18
19
20
21
22
23
24
25
26
27
28
class SerialCommunicationError(Exception):
    """
    Custom exception class for serial communication errors.
    """

    def __init__(self, msg: str):
        super().__init__(msg)
        self.msg = msg

    def __str__(self):
        return f"SerialCommunicationError: {self.msg}"

servo

ServoError

Bases: Exception

Custom exception class for servo motor errors.

Source code in devices\raspberry_pi_pico_2w\circuit_python\src\lib\servo.py
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
class ServoError(Exception):
    """
    Custom exception class for servo motor errors.
    """

    def __init__(self, message):
        """
        Initializes the ServoError with a custom message.
        """
        super().__init__(message)
        self.message = message

    def __str__(self):
        """
        Returns a string representation of the ServoError.
        """
        return f"Servo Error: {self.message}"

__init__(message)

Initializes the ServoError with a custom message.

Source code in devices\raspberry_pi_pico_2w\circuit_python\src\lib\servo.py
17
18
19
20
21
22
def __init__(self, message):
    """
    Initializes the ServoError with a custom message.
    """
    super().__init__(message)
    self.message = message

__str__()

Returns a string representation of the ServoError.

Source code in devices\raspberry_pi_pico_2w\circuit_python\src\lib\servo.py
24
25
26
27
28
def __str__(self):
    """
    Returns a string representation of the ServoError.
    """
    return f"Servo Error: {self.message}"

ServoHandler

A class to handle servo motor operations.

Source code in devices\raspberry_pi_pico_2w\circuit_python\src\lib\servo.py
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
class ServoHandler:
    """
    A class to handle servo motor operations.
    """
    # Default configuration
    PWM_FREQUENCY = 50
    MIN_PULSE = 500
    MAX_PULSE = 2500
    ACTUATION_RANGE = 180
    CENTER_ANGLE = 75
    LEFT_LIMIT = -(ACTUATION_RANGE - CENTER_ANGLE)
    RIGHT_LIMIT = (ACTUATION_RANGE - (ACTUATION_RANGE - CENTER_ANGLE))

    # Angle factor to normalize that when subtracting the servo moves to the left, and adding moves to the right
    ANGLE_FACTOR = -1

    # Common angle values
    BIG_TURN_ANGLE = 35
    MEDIUM_TURN_ANGLE = 25
    SMALL_TURN_ANGLE = 15

    # Delay
    DELAY = 0.05

    def __init__(
        self, servo_pin: int, frequency: int = PWM_FREQUENCY,
        min_pulse: int = MIN_PULSE, max_pulse: int = MAX_PULSE,
        actuation_range: int = ACTUATION_RANGE, movement: bool = True,
        debug: bool = False, serial_communication: SerialCommunication = None
    ):
        """
        Initializes the servo handler with the specified parameters.

        Args:
            servo_pin (int): The GPIO pin for the servo motor.
            frequency (int): The PWM frequency for the servo motor.
            min_pulse (int): The minimum pulse width in microseconds.
            max_pulse (int): The maximum pulse width in microseconds.
            actuation_range (int): The range of motion of the servo in degrees.
            movement (bool): If True, the servo will be controlled for movement; if False, it will not.
            debug (bool): If True, debug messages will be sent.
            serial_communication (SerialCommunication): An instance of SerialCommunication for sending debug messages.
        """
        # Setup PWM output for the servo motor
        self.__servo_pwm = PWMOut(servo_pin, duty_cycle=0, frequency=frequency)
        self.__servo_motor = Servo(
            self.__servo_pwm, actuation_range=actuation_range,
            min_pulse=min_pulse, max_pulse=max_pulse
        )

        # Set the movement flag and debug mode
        self.__movement = movement
        self.__debug = debug

        # Set the serial communication instance
        self.__serial_communication = serial_communication

        # Set the servo to center position
        self.__angle = None
        self.__servo_motor.angle = self.CENTER_ANGLE
        time.sleep(self.DELAY)

    @staticmethod
    def _check_angle(angle: int):
        """
        Checks if the angle is within the valid range.

        Args:
            angle (int): Angle value to check.

        Raises:
            ServoError: If the angle is not within the valid range.
        """
        if not 0 <= angle <= ServoHandler.ACTUATION_RANGE:
            raise ServoError(
                f"Angle must be between 0 and {ServoHandler.ACTUATION_RANGE} degrees"
            )

    @property
    def angle(self) -> int:
        """
        Returns the current angle of the servo motor.

        Returns:
            int: The current angle of the servo motor.
        """
        return self.__angle

    async def set_angle(self, angle: int):
        """
        Sets the angle of the servo motor.

        Args:
            angle (int): Angle value between 0 and the actuation range.
        """
        # Check if the angle is the same as the current angle
        if angle == self.__angle:
            return

        # Check if the angle is within the valid range
        self._check_angle(angle)
        self.__angle = angle
        if self.__movement:
            self.__servo_motor.angle = self.__angle
        if self.__debug and self.__serial_communication:
            self.__serial_communication.send_message(
                OutgoingMessage(
                    OutgoingCategory.DEBUG,
                    f"{IncomingCategory.SERVO_ANGLE}={self.__angle}"
                )
            )

        # Add a small delay to allow the servo to move
        await sleep(self.DELAY)

    async def set_angle_relative_to_center(self, relative_angle: int):
        """
        Sets the angle of the servo motor relative to the center position.

        Args:
            relative_angle (int): Relative angle value between -90 and 90 degrees.

        Raises:
            ServoError: If the relative angle is not within the left and right limits.
        """
        if not self.LEFT_LIMIT <= relative_angle * self.ANGLE_FACTOR <= self.RIGHT_LIMIT:
            raise ServoError(
                f"Relative angle must be between {self.LEFT_LIMIT} and {self.RIGHT_LIMIT} degrees"
            )

        await self.set_angle(
            self.CENTER_ANGLE + relative_angle * self.ANGLE_FACTOR
        )

    async def center(self):
        """
        Centers the servo motor to the middle position.
        """
        await self.set_angle(self.CENTER_ANGLE)

    async def right(self, angle):
        """
        Sets the servo motor to the right by a specified angle.

        Args:
            angle (int): Angle value to move the servo to the right, must be between 0 and right limit.

        Raises:
            ServoError: If the angle is not within the right limit.
        """
        if not 0 < angle <= self.RIGHT_LIMIT:
            raise ServoError(
                f"Angle must be between 0 and {self.RIGHT_LIMIT} degrees for right movement"
            )

        await self.set_angle(self.CENTER_ANGLE + angle * self.ANGLE_FACTOR)

    async def left(self, angle):
        """
        Sets the servo motor to the left by a specified angle.

        Args:
            angle (int): Angle value to move the servo to the left, must be between 0 and left limit.

        Raises:
            ServoError: If the angle is not within the left limit.
        """
        if not 0 < angle <= abs(self.LEFT_LIMIT):
            raise ServoError(
                f"Angle must be between 0 and {abs(self.LEFT_LIMIT)} degrees for left movement"
            )

        await self.set_angle(self.CENTER_ANGLE - angle * self.ANGLE_FACTOR)

    def is_turning(self):
        """
        Checks if the servo motor is currently turning.

        Returns:
            bool: True if the servo is not centered, False otherwise.
        """
        return self.__angle != self.CENTER_ANGLE

angle property

Returns the current angle of the servo motor.

Returns:

Name Type Description
int int

The current angle of the servo motor.

__init__(servo_pin, frequency=PWM_FREQUENCY, min_pulse=MIN_PULSE, max_pulse=MAX_PULSE, actuation_range=ACTUATION_RANGE, movement=True, debug=False, serial_communication=None)

Initializes the servo handler with the specified parameters.

Parameters:

Name Type Description Default
servo_pin int

The GPIO pin for the servo motor.

required
frequency int

The PWM frequency for the servo motor.

PWM_FREQUENCY
min_pulse int

The minimum pulse width in microseconds.

MIN_PULSE
max_pulse int

The maximum pulse width in microseconds.

MAX_PULSE
actuation_range int

The range of motion of the servo in degrees.

ACTUATION_RANGE
movement bool

If True, the servo will be controlled for movement; if False, it will not.

True
debug bool

If True, debug messages will be sent.

False
serial_communication SerialCommunication

An instance of SerialCommunication for sending debug messages.

None
Source code in devices\raspberry_pi_pico_2w\circuit_python\src\lib\servo.py
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
def __init__(
    self, servo_pin: int, frequency: int = PWM_FREQUENCY,
    min_pulse: int = MIN_PULSE, max_pulse: int = MAX_PULSE,
    actuation_range: int = ACTUATION_RANGE, movement: bool = True,
    debug: bool = False, serial_communication: SerialCommunication = None
):
    """
    Initializes the servo handler with the specified parameters.

    Args:
        servo_pin (int): The GPIO pin for the servo motor.
        frequency (int): The PWM frequency for the servo motor.
        min_pulse (int): The minimum pulse width in microseconds.
        max_pulse (int): The maximum pulse width in microseconds.
        actuation_range (int): The range of motion of the servo in degrees.
        movement (bool): If True, the servo will be controlled for movement; if False, it will not.
        debug (bool): If True, debug messages will be sent.
        serial_communication (SerialCommunication): An instance of SerialCommunication for sending debug messages.
    """
    # Setup PWM output for the servo motor
    self.__servo_pwm = PWMOut(servo_pin, duty_cycle=0, frequency=frequency)
    self.__servo_motor = Servo(
        self.__servo_pwm, actuation_range=actuation_range,
        min_pulse=min_pulse, max_pulse=max_pulse
    )

    # Set the movement flag and debug mode
    self.__movement = movement
    self.__debug = debug

    # Set the serial communication instance
    self.__serial_communication = serial_communication

    # Set the servo to center position
    self.__angle = None
    self.__servo_motor.angle = self.CENTER_ANGLE
    time.sleep(self.DELAY)

center() async

Centers the servo motor to the middle position.

Source code in devices\raspberry_pi_pico_2w\circuit_python\src\lib\servo.py
165
166
167
168
169
async def center(self):
    """
    Centers the servo motor to the middle position.
    """
    await self.set_angle(self.CENTER_ANGLE)

is_turning()

Checks if the servo motor is currently turning.

Returns:

Name Type Description
bool

True if the servo is not centered, False otherwise.

Source code in devices\raspberry_pi_pico_2w\circuit_python\src\lib\servo.py
205
206
207
208
209
210
211
212
def is_turning(self):
    """
    Checks if the servo motor is currently turning.

    Returns:
        bool: True if the servo is not centered, False otherwise.
    """
    return self.__angle != self.CENTER_ANGLE

left(angle) async

Sets the servo motor to the left by a specified angle.

Parameters:

Name Type Description Default
angle int

Angle value to move the servo to the left, must be between 0 and left limit.

required

Raises:

Type Description
ServoError

If the angle is not within the left limit.

Source code in devices\raspberry_pi_pico_2w\circuit_python\src\lib\servo.py
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
async def left(self, angle):
    """
    Sets the servo motor to the left by a specified angle.

    Args:
        angle (int): Angle value to move the servo to the left, must be between 0 and left limit.

    Raises:
        ServoError: If the angle is not within the left limit.
    """
    if not 0 < angle <= abs(self.LEFT_LIMIT):
        raise ServoError(
            f"Angle must be between 0 and {abs(self.LEFT_LIMIT)} degrees for left movement"
        )

    await self.set_angle(self.CENTER_ANGLE - angle * self.ANGLE_FACTOR)

right(angle) async

Sets the servo motor to the right by a specified angle.

Parameters:

Name Type Description Default
angle int

Angle value to move the servo to the right, must be between 0 and right limit.

required

Raises:

Type Description
ServoError

If the angle is not within the right limit.

Source code in devices\raspberry_pi_pico_2w\circuit_python\src\lib\servo.py
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
async def right(self, angle):
    """
    Sets the servo motor to the right by a specified angle.

    Args:
        angle (int): Angle value to move the servo to the right, must be between 0 and right limit.

    Raises:
        ServoError: If the angle is not within the right limit.
    """
    if not 0 < angle <= self.RIGHT_LIMIT:
        raise ServoError(
            f"Angle must be between 0 and {self.RIGHT_LIMIT} degrees for right movement"
        )

    await self.set_angle(self.CENTER_ANGLE + angle * self.ANGLE_FACTOR)

set_angle(angle) async

Sets the angle of the servo motor.

Parameters:

Name Type Description Default
angle int

Angle value between 0 and the actuation range.

required
Source code in devices\raspberry_pi_pico_2w\circuit_python\src\lib\servo.py
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
async def set_angle(self, angle: int):
    """
    Sets the angle of the servo motor.

    Args:
        angle (int): Angle value between 0 and the actuation range.
    """
    # Check if the angle is the same as the current angle
    if angle == self.__angle:
        return

    # Check if the angle is within the valid range
    self._check_angle(angle)
    self.__angle = angle
    if self.__movement:
        self.__servo_motor.angle = self.__angle
    if self.__debug and self.__serial_communication:
        self.__serial_communication.send_message(
            OutgoingMessage(
                OutgoingCategory.DEBUG,
                f"{IncomingCategory.SERVO_ANGLE}={self.__angle}"
            )
        )

    # Add a small delay to allow the servo to move
    await sleep(self.DELAY)

set_angle_relative_to_center(relative_angle) async

Sets the angle of the servo motor relative to the center position.

Parameters:

Name Type Description Default
relative_angle int

Relative angle value between -90 and 90 degrees.

required

Raises:

Type Description
ServoError

If the relative angle is not within the left and right limits.

Source code in devices\raspberry_pi_pico_2w\circuit_python\src\lib\servo.py
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
async def set_angle_relative_to_center(self, relative_angle: int):
    """
    Sets the angle of the servo motor relative to the center position.

    Args:
        relative_angle (int): Relative angle value between -90 and 90 degrees.

    Raises:
        ServoError: If the relative angle is not within the left and right limits.
    """
    if not self.LEFT_LIMIT <= relative_angle * self.ANGLE_FACTOR <= self.RIGHT_LIMIT:
        raise ServoError(
            f"Relative angle must be between {self.LEFT_LIMIT} and {self.RIGHT_LIMIT} degrees"
        )

    await self.set_angle(
        self.CENTER_ANGLE + relative_angle * self.ANGLE_FACTOR
    )

switch

SwitchHandler

A class to handle a switch connected to a Raspberry Pi Pico.

Source code in devices\raspberry_pi_pico_2w\circuit_python\src\lib\switch.py
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
class SwitchHandler:
    """
    A class to handle a switch connected to a Raspberry Pi Pico.
    """
    # Default configuration
    DELAY = 0.01

    def __init__(
        self,
        serial_communication: SerialCommunication,
        switch_pin: int,
        led: LEDHandler = None
    ):
        """
        Initializes the switch handler with the specified pin.

        Args:
            serial_communication (SerialCommunication): Serial communication handler.
            switch_pin (int): The GPIO number where the switch is connected.
            led (LEDHandler | None): Optional LED handler for visual feedback when the switch is pressed.
        """
        # Set up the switch pin as input with pull-up resistor
        self.__switch = DigitalInOut(switch_pin)
        self.__switch.direction = Direction.INPUT
        self.__switch.pull = Pull.UP

        # If serial communication is provided, set it
        self.__serial_communication = serial_communication

        # If LED handler is provided, set it
        self.__led = led

    async def wait(self):
        """
        Waits for the switch to be pressed.

        This method blocks until the switch is pressed (i.e., the pin reads LOW).
        """
        # Check if the switch is already pressed
        if not self.__switch.value:
            while not self.__switch.value:
                await sleep(self.DELAY)

        while self.__switch.value:
            await sleep(self.DELAY)

        # Send initialization message
        self.__serial_communication.send_initialization_message()

        # Send challenge message
        await self.__serial_communication.send_challenge_message()

        # Create the tasks to signal the start of the robot's operation
        start_tasks = [create_task(self.__serial_communication.start())]

        # Blink the LED if provided
        if self.__led:
            start_tasks.append(create_task(self.__led.blink()))

        # Wait for all start tasks to complete
        await gather(*start_tasks)

__init__(serial_communication, switch_pin, led=None)

Initializes the switch handler with the specified pin.

Parameters:

Name Type Description Default
serial_communication SerialCommunication

Serial communication handler.

required
switch_pin int

The GPIO number where the switch is connected.

required
led LEDHandler | None

Optional LED handler for visual feedback when the switch is pressed.

None
Source code in devices\raspberry_pi_pico_2w\circuit_python\src\lib\switch.py
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
def __init__(
    self,
    serial_communication: SerialCommunication,
    switch_pin: int,
    led: LEDHandler = None
):
    """
    Initializes the switch handler with the specified pin.

    Args:
        serial_communication (SerialCommunication): Serial communication handler.
        switch_pin (int): The GPIO number where the switch is connected.
        led (LEDHandler | None): Optional LED handler for visual feedback when the switch is pressed.
    """
    # Set up the switch pin as input with pull-up resistor
    self.__switch = DigitalInOut(switch_pin)
    self.__switch.direction = Direction.INPUT
    self.__switch.pull = Pull.UP

    # If serial communication is provided, set it
    self.__serial_communication = serial_communication

    # If LED handler is provided, set it
    self.__led = led

wait() async

Waits for the switch to be pressed.

This method blocks until the switch is pressed (i.e., the pin reads LOW).

Source code in devices\raspberry_pi_pico_2w\circuit_python\src\lib\switch.py
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
async def wait(self):
    """
    Waits for the switch to be pressed.

    This method blocks until the switch is pressed (i.e., the pin reads LOW).
    """
    # Check if the switch is already pressed
    if not self.__switch.value:
        while not self.__switch.value:
            await sleep(self.DELAY)

    while self.__switch.value:
        await sleep(self.DELAY)

    # Send initialization message
    self.__serial_communication.send_initialization_message()

    # Send challenge message
    await self.__serial_communication.send_challenge_message()

    # Create the tasks to signal the start of the robot's operation
    start_tasks = [create_task(self.__serial_communication.start())]

    # Blink the LED if provided
    if self.__led:
        start_tasks.append(create_task(self.__led.blink()))

    # Wait for all start tasks to complete
    await gather(*start_tasks)

utils

is_instance(obj, class_or_tuple)

Check if the object is an instance of the specified class or tuple of classes.

Parameters:

Name Type Description Default
obj object

The object to check.

required
class_or_tuple Union[type, tuple[Any, ...]]

The class or tuple of classes to check against.

required

Raises:

Type Description
TypeError

If the object is not an instance of the specified class or tuple of classes.

Source code in devices\raspberry_pi_pico_2w\circuit_python\src\lib\utils.py
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
def is_instance(
    obj: object,
    class_or_tuple: Union[type, tuple[Any, ...]]
) -> None:
    """
    Check if the object is an instance of the specified class or tuple of classes.

    Args:
        obj (object): The object to check.
        class_or_tuple (Union[type, tuple[Any, ...]]): The class or tuple of classes to check against.

    Raises:
        TypeError: If the object is not an instance of the specified class or tuple of classes.
    """
    if not isinstance(obj, class_or_tuple):
        raise TypeError(
            f"Expected type {class_or_tuple}, got {type(obj)} for object {obj}"
        )

vl53l0x

VL53L0XError

Bases: Exception

Custom exception class for VL53L0X errors.

Source code in devices\raspberry_pi_pico_2w\circuit_python\src\lib\vl53l0x.py
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
class VL53L0XError(Exception):
    """
    Custom exception class for VL53L0X errors.
    """

    def __init__(self, message):
        """
        Initializes the VL53L0XError with a custom message.
        """
        super().__init__(message)
        self.message = message

    def __str__(self):
        """
        Returns a string representation of the VL53L0XError.
        """
        return f"VL53L0X Error: {self.message}"

__init__(message)

Initializes the VL53L0XError with a custom message.

Source code in devices\raspberry_pi_pico_2w\circuit_python\src\lib\vl53l0x.py
13
14
15
16
17
18
def __init__(self, message):
    """
    Initializes the VL53L0XError with a custom message.
    """
    super().__init__(message)
    self.message = message

__str__()

Returns a string representation of the VL53L0XError.

Source code in devices\raspberry_pi_pico_2w\circuit_python\src\lib\vl53l0x.py
20
21
22
23
24
def __str__(self):
    """
    Returns a string representation of the VL53L0XError.
    """
    return f"VL53L0X Error: {self.message}"

VL53L0XHandler

This class handles the initialization and reading of multiple VL53L0X ToF sensors.

Source code in devices\raspberry_pi_pico_2w\circuit_python\src\lib\vl53l0x.py
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
class VL53L0XHandler:
    """
    This class handles the initialization and reading of multiple VL53L0X ToF sensors.
    """
    # Default configuration
    SHORT_SETUP_DELAY = 0.05  # Short delay for sensor power-up
    SETUP_DELAY = 0.1  # Delay to ensure all sensors are off before starting
    MEASUREMENT_LIMIT_MM = 3000  # Maximum distance for ToF measurement in mm
    MEASUREMENT_TIMING_BUDGET = 100000  # Timing budget for ToF measurement in microseconds
    SENSOR_DELAY = 0.05
    START_NEW_I2C_ADDRESS = 0x30

    def __init__(
        self,
        i2c: I2C,
        xshut_pins: tuple[int, ...]
    ):
        """
        Initializes the VL53L0XHandler with default settings.

        Args:
            i2c (I2C): The I2C bus instance to communicate with the sensors.
            xshut_pins (tuple): A tuple of GPIO pins used to control the XSHUT lines of the sensors.

        Raises:
            VL53L0XError: If there is an error initializing the sensors.
        """
        # Initialize XSHUT list, sensors and measures list
        self.__xshut = []
        self.__sensors = []
        self.__sensors_measures = [None for _ in range(len(xshut_pins))]

        # Fill the XSHUT list with DigitalInOut objects for each pin
        for i, pin in enumerate(xshut_pins):
            xshut = DigitalInOut(pin)
            xshut.direction = Direction.OUTPUT
            xshut.value = False
            self.__xshut.append(xshut)
            sleep(self.SETUP_DELAY)

            try:
                # Initialize the sensor, should be at 0x29
                sensor = VL53L0X(i2c)

                # Change the I2C address of the current sensor to a new unique address
                sensor.set_address(self.START_NEW_I2C_ADDRESS + i)
                self.__sensors.append(sensor)

                # Set the ToF measurement timing budget
                sensor.measurement_timing_budget = self.MEASUREMENT_TIMING_BUDGET

                # Small synchronous pause before activating next sensor
                sleep(self.SHORT_SETUP_DELAY)

            except ValueError as e:
                raise VL53L0XError(
                    f"Failed to initialize sensor on pin {pin}: {e}"
                )

    async def multiple_tof_sensors_reading(self):
        """
        Asynchronously reads the distance from multiple VL53L0X ToF sensors.

        Raises:
            VL53L0XError: If there is an error reading from any sensor.
        """
        for i, sensor in enumerate(self.__sensors):
            try:
                # Read the distance from the sensor
                distance_mm = sensor.range
                if distance_mm is None or distance_mm < 0:
                    pass
                if distance_mm >= self.MEASUREMENT_LIMIT_MM:
                    distance_cm = float('inf')
                else:
                    distance_cm = distance_mm // 10  # Convert to centimeters

                # Store the distance in the measures list
                self.__sensors_measures[i] = distance_cm

                sleep(self.SENSOR_DELAY)

            except Exception as e:
                raise VL53L0XError(
                    f"Error reading sensor on pin {self.__xshut[i].pin}: {e}"
                )

__init__(i2c, xshut_pins)

Initializes the VL53L0XHandler with default settings.

Parameters:

Name Type Description Default
i2c I2C

The I2C bus instance to communicate with the sensors.

required
xshut_pins tuple

A tuple of GPIO pins used to control the XSHUT lines of the sensors.

required

Raises:

Type Description
VL53L0XError

If there is an error initializing the sensors.

Source code in devices\raspberry_pi_pico_2w\circuit_python\src\lib\vl53l0x.py
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
def __init__(
    self,
    i2c: I2C,
    xshut_pins: tuple[int, ...]
):
    """
    Initializes the VL53L0XHandler with default settings.

    Args:
        i2c (I2C): The I2C bus instance to communicate with the sensors.
        xshut_pins (tuple): A tuple of GPIO pins used to control the XSHUT lines of the sensors.

    Raises:
        VL53L0XError: If there is an error initializing the sensors.
    """
    # Initialize XSHUT list, sensors and measures list
    self.__xshut = []
    self.__sensors = []
    self.__sensors_measures = [None for _ in range(len(xshut_pins))]

    # Fill the XSHUT list with DigitalInOut objects for each pin
    for i, pin in enumerate(xshut_pins):
        xshut = DigitalInOut(pin)
        xshut.direction = Direction.OUTPUT
        xshut.value = False
        self.__xshut.append(xshut)
        sleep(self.SETUP_DELAY)

        try:
            # Initialize the sensor, should be at 0x29
            sensor = VL53L0X(i2c)

            # Change the I2C address of the current sensor to a new unique address
            sensor.set_address(self.START_NEW_I2C_ADDRESS + i)
            self.__sensors.append(sensor)

            # Set the ToF measurement timing budget
            sensor.measurement_timing_budget = self.MEASUREMENT_TIMING_BUDGET

            # Small synchronous pause before activating next sensor
            sleep(self.SHORT_SETUP_DELAY)

        except ValueError as e:
            raise VL53L0XError(
                f"Failed to initialize sensor on pin {pin}: {e}"
            )

multiple_tof_sensors_reading() async

Asynchronously reads the distance from multiple VL53L0X ToF sensors.

Raises:

Type Description
VL53L0XError

If there is an error reading from any sensor.

Source code in devices\raspberry_pi_pico_2w\circuit_python\src\lib\vl53l0x.py
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
async def multiple_tof_sensors_reading(self):
    """
    Asynchronously reads the distance from multiple VL53L0X ToF sensors.

    Raises:
        VL53L0XError: If there is an error reading from any sensor.
    """
    for i, sensor in enumerate(self.__sensors):
        try:
            # Read the distance from the sensor
            distance_mm = sensor.range
            if distance_mm is None or distance_mm < 0:
                pass
            if distance_mm >= self.MEASUREMENT_LIMIT_MM:
                distance_cm = float('inf')
            else:
                distance_cm = distance_mm // 10  # Convert to centimeters

            # Store the distance in the measures list
            self.__sensors_measures[i] = distance_cm

            sleep(self.SENSOR_DELAY)

        except Exception as e:
            raise VL53L0XError(
                f"Error reading sensor on pin {self.__xshut[i].pin}: {e}"
            )

wifi

WifiError

Bases: Exception

Custom exception for Wi-Fi errors.

Source code in devices\raspberry_pi_pico_2w\circuit_python\src\lib\wifi.py
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
class WifiError(Exception):
    """
    Custom exception for Wi-Fi errors.
    """

    def __init__(self, message):
        """
        Initializes the WifiError with a custom message.
        """
        super().__init__(message)
        self.message = message

    def __str__(self):
        """
        Returns a string representation of the WifiError.
        """
        return f"Wi-Fi Error: {self.message}"

__init__(message)

Initializes the WifiError with a custom message.

Source code in devices\raspberry_pi_pico_2w\circuit_python\src\lib\wifi.py
15
16
17
18
19
20
def __init__(self, message):
    """
    Initializes the WifiError with a custom message.
    """
    super().__init__(message)
    self.message = message

__str__()

Returns a string representation of the WifiError.

Source code in devices\raspberry_pi_pico_2w\circuit_python\src\lib\wifi.py
22
23
24
25
26
def __str__(self):
    """
    Returns a string representation of the WifiError.
    """
    return f"Wi-Fi Error: {self.message}"

WifiHandler

A class to handle Wi-Fi connection and UDP socket operations.

Source code in devices\raspberry_pi_pico_2w\circuit_python\src\lib\wifi.py
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
class WifiHandler:
    """
    A class to handle Wi-Fi connection and UDP socket operations.
    """
    # Default configuration
    ATTEMPTS = 5
    ATTEMPT_DELAY = 5
    SSID = getenv("WIFI_SSID", "default_ssid")
    PASSWORD = getenv("WIFI_PASSWORD", "default_password")
    TARGET_IP = getenv("SOCKET_SERVER_IP", "default_target_ip")
    TARGET_PORT = int(getenv("SOCKET_SERVER_PORT", 12345))

    def __init__(self, ssid: str = SSID, password: str = PASSWORD):
        """
        Initializes the WifiHandler with default values.
        """
        # Set the Wi-Fi credentials
        self.__ssid = ssid
        self.__password = password

        # Initialize attributes
        self.__pool = None
        self.__udp_socket = None
        self.__ipv4_address = None
        self.__ipv4_gateway = None
        self.__ipv4_dns = None

    async def connect(self, attempts: int = ATTEMPTS):
        """
        Connect to WI-FI using credentials from environment variables.
        """
        # Initialize Wi-Fi status
        counter = 0
        while not self.__ipv4_address and counter < attempts:
            try:
                radio.connect(self.__ssid, self.__password)
                self.__ipv4_address = radio.ipv4_address
                self.__ipv4_gateway = radio.ipv4_gateway
                self.__ipv4_dns = radio.ipv4_dns

            except Exception:
                pass

            sleep(self.ATTEMPT_DELAY)
            counter += 1

        if not self.__ipv4_address:
            raise WifiError(
                "Failed to connect to Wi-Fi after multiple attempts."
            )

    def create_socket(self):
        """
        Create a socket pool for creating sockets.
        """
        if not self.__ipv4_address:
            raise WifiError(
                "Wi-Fi is not connected. Cannot create socket pool."
            )

        try:
            self.__pool = SocketPool(radio)

        except Exception as e:
            raise WifiError(f"Error creating socket pool: {e}")

    def create_udp_socket(self):
        """
        Create UDP socket.
        """
        if not self.__pool:
            raise WifiError(
                "Socket pool is not available. Cannot create UDP socket."
            )

        try:
            self.__udp_socket = self.__pool.socket(
                self.__pool.AF_INET,
                self.__pool.SOCK_DGRAM
            )
            self.__udp_socket.setblocking(False)

        except Exception as e:
            raise WifiError(f"Error creating UDP socket: {e}")

    def close_udp_socket(self):
        """
        Close UDP socket.
        """
        if not self.__udp_socket:
            raise WifiError(
                "UDP socket is not available. Cannot close UDP socket."
            )

        try:
            self.__udp_socket.close()

        except Exception as e:
            print(f"Error closing UDP socket: {e}")

    async def send_udp_message(
        self,
        message: str,
        target_ip: str = TARGET_IP,
        target_port: int = TARGET_PORT
    ):
        """
        Send message over UDP

        Args:
            message (str): The message to send.
            target_ip (str): The target IP address.
            target_port (int): The target port number.
        """
        if not self.__udp_socket:
            raise WifiError("UDP socket is not created. Cannot send message.")

        # Validate target ip and port
        if not target_ip or not target_port:
            raise WifiError(
                "Target host or port is not set. Cannot send message."
            )
        ip_address(target_ip)

        try:
            self.__udp_socket.sendto(
                message.encode('utf-8'),
                (target_ip, target_port)
            )

        except OSError as e:
            print(f"Error sending message: {e}")

        except Exception as e:
            print(f"Unexpected error: {e}")

    def __del__(self):
        """
        Destructor to close the WI-FI connection and clean up resources.
        """
        if self.__udp_socket:
            self.close_udp_socket()

        if self.__pool:
            del self.__pool

        if self.__ipv4_address:
            radio.disconnect()

__del__()

Destructor to close the WI-FI connection and clean up resources.

Source code in devices\raspberry_pi_pico_2w\circuit_python\src\lib\wifi.py
165
166
167
168
169
170
171
172
173
174
175
176
def __del__(self):
    """
    Destructor to close the WI-FI connection and clean up resources.
    """
    if self.__udp_socket:
        self.close_udp_socket()

    if self.__pool:
        del self.__pool

    if self.__ipv4_address:
        radio.disconnect()

__init__(ssid=SSID, password=PASSWORD)

Initializes the WifiHandler with default values.

Source code in devices\raspberry_pi_pico_2w\circuit_python\src\lib\wifi.py
41
42
43
44
45
46
47
48
49
50
51
52
53
54
def __init__(self, ssid: str = SSID, password: str = PASSWORD):
    """
    Initializes the WifiHandler with default values.
    """
    # Set the Wi-Fi credentials
    self.__ssid = ssid
    self.__password = password

    # Initialize attributes
    self.__pool = None
    self.__udp_socket = None
    self.__ipv4_address = None
    self.__ipv4_gateway = None
    self.__ipv4_dns = None

close_udp_socket()

Close UDP socket.

Source code in devices\raspberry_pi_pico_2w\circuit_python\src\lib\wifi.py
114
115
116
117
118
119
120
121
122
123
124
125
126
127
def close_udp_socket(self):
    """
    Close UDP socket.
    """
    if not self.__udp_socket:
        raise WifiError(
            "UDP socket is not available. Cannot close UDP socket."
        )

    try:
        self.__udp_socket.close()

    except Exception as e:
        print(f"Error closing UDP socket: {e}")

connect(attempts=ATTEMPTS) async

Connect to WI-FI using credentials from environment variables.

Source code in devices\raspberry_pi_pico_2w\circuit_python\src\lib\wifi.py
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
async def connect(self, attempts: int = ATTEMPTS):
    """
    Connect to WI-FI using credentials from environment variables.
    """
    # Initialize Wi-Fi status
    counter = 0
    while not self.__ipv4_address and counter < attempts:
        try:
            radio.connect(self.__ssid, self.__password)
            self.__ipv4_address = radio.ipv4_address
            self.__ipv4_gateway = radio.ipv4_gateway
            self.__ipv4_dns = radio.ipv4_dns

        except Exception:
            pass

        sleep(self.ATTEMPT_DELAY)
        counter += 1

    if not self.__ipv4_address:
        raise WifiError(
            "Failed to connect to Wi-Fi after multiple attempts."
        )

create_socket()

Create a socket pool for creating sockets.

Source code in devices\raspberry_pi_pico_2w\circuit_python\src\lib\wifi.py
80
81
82
83
84
85
86
87
88
89
90
91
92
93
def create_socket(self):
    """
    Create a socket pool for creating sockets.
    """
    if not self.__ipv4_address:
        raise WifiError(
            "Wi-Fi is not connected. Cannot create socket pool."
        )

    try:
        self.__pool = SocketPool(radio)

    except Exception as e:
        raise WifiError(f"Error creating socket pool: {e}")

create_udp_socket()

Create UDP socket.

Source code in devices\raspberry_pi_pico_2w\circuit_python\src\lib\wifi.py
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
def create_udp_socket(self):
    """
    Create UDP socket.
    """
    if not self.__pool:
        raise WifiError(
            "Socket pool is not available. Cannot create UDP socket."
        )

    try:
        self.__udp_socket = self.__pool.socket(
            self.__pool.AF_INET,
            self.__pool.SOCK_DGRAM
        )
        self.__udp_socket.setblocking(False)

    except Exception as e:
        raise WifiError(f"Error creating UDP socket: {e}")

send_udp_message(message, target_ip=TARGET_IP, target_port=TARGET_PORT) async

Send message over UDP

Parameters:

Name Type Description Default
message str

The message to send.

required
target_ip str

The target IP address.

TARGET_IP
target_port int

The target port number.

TARGET_PORT
Source code in devices\raspberry_pi_pico_2w\circuit_python\src\lib\wifi.py
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
async def send_udp_message(
    self,
    message: str,
    target_ip: str = TARGET_IP,
    target_port: int = TARGET_PORT
):
    """
    Send message over UDP

    Args:
        message (str): The message to send.
        target_ip (str): The target IP address.
        target_port (int): The target port number.
    """
    if not self.__udp_socket:
        raise WifiError("UDP socket is not created. Cannot send message.")

    # Validate target ip and port
    if not target_ip or not target_port:
        raise WifiError(
            "Target host or port is not set. Cannot send message."
        )
    ip_address(target_ip)

    try:
        self.__udp_socket.sendto(
            message.encode('utf-8'),
            (target_ip, target_port)
        )

    except OSError as e:
        print(f"Error sending message: {e}")

    except Exception as e:
        print(f"Unexpected error: {e}")