Main function to initialize the robot and start the main algorithm.
Source code in devices\raspberry_pi_pico_2w\circuit_python\src\code.py
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175 | async def main():
"""
Main function to initialize the robot and start the main algorithm.
"""
global bno08x, servo, motor, serial_communication, switch
# Loop to handle exceptions and retry initialization
while True:
try:
# Create tasks for initialization
bno08x_calibrate = create_task(bno08x.calibrate())
motor_stop = create_task(motor.stop())
servo_center = create_task(servo.center())
# Wait for all initialization tasks to complete
await gather(bno08x_calibrate, motor_stop, servo_center)
# Wait for the switch to be pressed
await switch.wait()
# Set the exit condition to False
to_exit = False
# Get start time to compare with the timeout
start_time = monotonic()
while not to_exit:
# Create the update quaternion and receive serial messages tasks
update_quaternion_task = create_task(
bno08x.update_quaternion()
)
receive_serial_task = create_task(
serial_communication.receive_messages()
)
# Wait for the tasks to complete
results = await gather(
update_quaternion_task,
receive_serial_task
)
msgs: list[IncomingMessage] = results[1]
if len(msgs) == 0:
# If no messages were received, check if the timeout has been reached
if monotonic() - start_time > RECEIVING_MESSAGE_TIMEOUT:
raise TimeoutError(
"No messages received within the timeout period."
)
else:
# Reset the start time if messages are received
start_time = monotonic()
# Process the received messages
motor_speed = None
servo_angle = None
motor_speed_task = None
servo_angle_task = None
for msg in msgs:
if msg == SerialCommunication.HEARTBEAT_MESSAGE:
continue
if msg == SerialCommunication.STOP_MESSAGE:
# Set the exit condition to True
to_exit = True
# Stop the motor and center the servo
motor_speed_task = create_task(motor.stop())
servo_angle_task = create_task(servo.center())
# Send a confirmation message to the serial communication
serial_communication.send_confirmation_message()
break
elif msg.category == IncomingCategory.MOTOR_SPEED:
# Set the motor speed
motor_speed = float(msg.content)
elif msg.category == IncomingCategory.SERVO_ANGLE:
# Set the servo angle
servo_angle = int(msg.content)
else:
raise SerialCommunicationError(
f"Unknown message: {msg.format_to_send_with_error_message()}"
)
# Add the set motor speed task and set servo angle task if the exit flag is not set
if not to_exit:
if motor_speed is not None:
motor_speed_task = create_task(motor.set_speed(motor_speed))
if servo_angle is not None:
servo_angle_task = create_task(servo.set_angle(servo_angle))
# Wait for the motor speed and servo angle tasks to complete
if motor_speed_task is not None:
await motor_speed_task
if servo_angle_task is not None:
await servo_angle_task
# Stop the serial communication when exiting the loop
await serial_communication.stop()
except Exception as e:
# Get the traceback as string
buf = StringIO()
print_exception(e, e, e.__traceback__, file=buf)
serial_communication.send_buffer_message(OutgoingCategory.ERROR, buf)
# Set the speed to 0 and center the servo in case of an exception
await motor.stop()
await servo.center()
|