Saltar a contenido

Code Python File

main() async

Main function to initialize the robot and start the main algorithm.

Source code in devices\raspberry_pi_pico_2w\circuit_python\src\code.py
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
async def main():
    """
    Main function to initialize the robot and start the main algorithm.
    """
    global bno08x, servo, motor, serial_communication, switch

    # Loop to handle exceptions and retry initialization
    while True:
        try:
            # Create tasks for initialization
            bno08x_calibrate = create_task(bno08x.calibrate())
            motor_stop = create_task(motor.stop())
            servo_center = create_task(servo.center())

            # Wait for all initialization tasks to complete
            await gather(bno08x_calibrate, motor_stop, servo_center)

            # Wait for the switch to be pressed
            await switch.wait()

            # Set the exit condition to False
            to_exit = False

            # Get start time to compare with the timeout
            start_time = monotonic()

            while not to_exit:
                # Create the update quaternion and receive serial messages tasks
                update_quaternion_task = create_task(
                    bno08x.update_quaternion()
                )
                receive_serial_task = create_task(
                    serial_communication.receive_messages()
                )

                # Wait for the tasks to complete
                results = await gather(
                    update_quaternion_task,
                    receive_serial_task
                    )
                msgs: list[IncomingMessage] = results[1]
                if len(msgs) == 0:
                    # If no messages were received, check if the timeout has been reached
                    if monotonic() - start_time > RECEIVING_MESSAGE_TIMEOUT:
                        raise TimeoutError(
                            "No messages received within the timeout period."
                        )
                else:
                    # Reset the start time if messages are received
                    start_time = monotonic()

                # Process the received messages
                motor_speed = None
                servo_angle = None
                motor_speed_task = None
                servo_angle_task = None
                for msg in msgs:
                    if msg == SerialCommunication.HEARTBEAT_MESSAGE:
                        continue

                    if msg == SerialCommunication.STOP_MESSAGE:
                        # Set the exit condition to True
                        to_exit = True

                        # Stop the motor and center the servo
                        motor_speed_task = create_task(motor.stop())
                        servo_angle_task = create_task(servo.center())

                        # Send a confirmation message to the serial communication
                        serial_communication.send_confirmation_message()
                        break

                    elif msg.category == IncomingCategory.MOTOR_SPEED:
                        # Set the motor speed
                        motor_speed = float(msg.content)

                    elif msg.category == IncomingCategory.SERVO_ANGLE:
                        # Set the servo angle
                        servo_angle = int(msg.content)

                    else:
                        raise SerialCommunicationError(
                            f"Unknown message: {msg.format_to_send_with_error_message()}"
                            )

                # Add the set motor speed task and set servo angle task if the exit flag is not set
                if not to_exit:
                    if motor_speed is not None:
                        motor_speed_task = create_task(motor.set_speed(motor_speed))

                    if servo_angle is not None:
                        servo_angle_task = create_task(servo.set_angle(servo_angle))

                # Wait for the motor speed and servo angle tasks to complete
                if motor_speed_task is not None:
                    await motor_speed_task
                if servo_angle_task is not None:
                    await servo_angle_task

            # Stop the serial communication when exiting the loop
            await serial_communication.stop()

        except Exception as e:
            # Get the traceback as string
            buf = StringIO()
            print_exception(e, e, e.__traceback__, file=buf)
            serial_communication.send_buffer_message(OutgoingCategory.ERROR, buf)

            # Set the speed to 0 and center the servo in case of an exception
            await motor.stop()
            await servo.center()