Saltar a contenido

RPLiDAR Module

RPLidar

Bases: RPLidarABC, LoggerConsumerProtocol

Class to handle RPLidar operations.

Source code in devices\raspberry_pi_5\src\rplidar\__init__.py
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
class RPLidar(RPLidarABC, LoggerConsumerProtocol):
    """
    Class to handle RPLidar operations.
    """

    # Logger configuration
    LOGGER_TAG = "RPLidar"

    # Process wait timeout
    PROCESS_WAIT_TIMEOUT = 5

    # Wait timeout
    WAIT_TIMEOUT = 0.1

    # Wait timeout for the start event
    START_WAIT_TIMEOUT = 0.1

    def __init__(
        self,
        debug: bool,
        update_measures_event: EventCls,
        measures_queue: Queue,
        start_event: EventCls,
        stop_event: EventCls,
        writer_messages_queue: Queue,
        server_messages_queue: Optional[Queue] = None,
        baudrate: int = RPLIDAR_C1_BAUDRATE,
        port: str = RPLIDAR_C1_PORT,
        is_upside_down: bool = True,
        angle_rotation: Optional[float] = None
    ):
        """
        Initialize the RPLidar.

        Args:
            debug (bool): Flag to indicate if the RPLidar is in debug mode.
            update_measures_event (EventCls): Event to signal when the RPLidar should update measures.
            measures_queue (Queue): Queue to hold the measures from the RPLidar.
            start_event (EventCls): Event to signal when the RPLidar should start.
            stop_event (EventCls): Event to signal when the RPLidar should stop.
            writer_messages_queue (Queue): Queue to hold log messages.
            server_messages_queue (Optional[Queue]): Queue to broadcast messages through the websockets server.
            baudrate (int): Baud rate for the serial communication.
            port (str): SerialCommunication port for the RPLidar.
            is_upside_down (bool): If True, the RPLidar is upside down, and angles will be adjusted accordingly.
            angle_rotation (Optional[float]): Optional angle rotation to apply to the measures.
        """
        # Initialize the debug flag
        self.__debug = debug

        # Initialize the queues and events
        self.__update_measures_event = update_measures_event
        self.__measures_queue = measures_queue
        self.__started_event = Event()
        self.__start_event = start_event
        self.__deleted_event = Event()
        self.__stop_event = stop_event

        # Initialize the logger
        self.__logger = Logger(
            writer_messages_queue,
            tag=self.LOGGER_TAG,
            debug=self.__debug
            )

        # Initialize the server dispatcher
        self.__server_dispatcher = WebSocketServerDispatcher(
            server_messages_queue,
            writer_messages_queue
        ) if server_messages_queue else None

        # Create the reentrant lock
        self.__rlock = RLock()

        # Check the type of baudrate
        is_instance(baudrate, int)
        self.__baudrate = baudrate

        # Check the type of the port
        is_instance(port, str)
        self.__port = port

        # Check the type of is_upside_down
        is_instance(is_upside_down, bool)
        self.__is_upside_down = is_upside_down

        # Check the type of angle translation
        is_instance(angle_rotation, (float, NoneType))
        self.__angle_translation = angle_rotation

        # Initialize measures dictionary
        self.__measures = {angle: Measure(angle * 1.0, 0.0, 0) for angle in
                           range(360)}

        # Messages counter
        self.__messages_counter = 0

        # Initialize the process
        self.__process = None

        # Initialize the rotation flag
        self.__rotation = False

        # Initialize the listener thread for measures updates
        self.__update_measures_listener_thread = None

    @final
    @property
    def logger(self) -> Logger:
        return self.__logger

    @final
    def _read_output(self):
        if not self.__process:
            return

        # Read a line from the process output
        line = self.__process.stdout.readline()
        if not line:
            return

            # Check if it's one of the first 6 messages
        if self.__messages_counter < 6:
            self.__messages_counter += 1
            return

        # Check if the listener thread for measures updates is running
        if not self.__update_measures_listener_thread:
            # Create and start the listener thread
            self.__update_measures_listener_thread = Thread(
                target=self._update_measures_event_listener,
            )
            self.__update_measures_listener_thread.start()

        # Strip the line to remove leading/trailing whitespace
        parsed_line = line.strip()

        # Split the line by spaces
        parts = parsed_line.split()

        # Ignore if there are not enough parts
        if len(parts) < 6:
            return

        # Check if it's the last measure of a full rotation
        rotation = len(parts) == 7
        if rotation:
            parts = parts[1:]

            # Log the end of the rotation
            self.__logger.info("Full rotation completed.")

        # Get the angle, distance and quality
        angle = float(parts[1])
        distance = float(parts[3])
        quality = int(parts[5])

        # Check the quality
        if quality == 0:
            if rotation:
                self.__rotation = True
            return

        # Check if the distance is within the maximum limit
        if distance < 0 or distance > MAX_DISTANCE_LIMIT:
            if rotation:
                self.__rotation = True
            return

        # Check if the angle is within the valid range
        angle = angle - 360 if angle >= 360 else angle

        # Adjust the angle if the RPLidar is upside down
        angle = 360 - angle if self.__is_upside_down else angle

        # Apply angle translation if provided
        if self.__angle_translation:
            angle += self.__angle_translation

            if angle >= 360:
                angle -= 360
            elif angle < 0:
                angle += 360

        # Floor the angle to a float with no decimal places
        angle = round(angle, 0)

        # Normalize angle to be between 0 and 359
        normalize_angle = int(angle) % 360

        # Check if the angle is already in the distances dictionary
        measure = self.__measures.get(normalize_angle, None)
        if measure and abs(measure.distance - distance) < DISTANCE_DIFF:
            if rotation:
                self.__rotation = True
            return

        measure = Measure(normalize_angle, distance, quality)
        self.__measures[normalize_angle] = measure

        # Send the measure to the server
        self.__server_dispatcher.broadcast_rplidar_measure(
            measure
        ) if self.__server_dispatcher else None

        # Check if the rotation is complete
        if rotation or self.__rotation:
            self.__rotation = False

        # Log
        # self.__logger.debug("RPLidar measure: " + str(measure))

        # Increment the messages counter
        self.__messages_counter += 1

    def _update_measures_event_listener(self):
        """
        Listen for the update measures event and process the measures.
        """
        # Start the listener thread
        self.__logger.info("Starting the measures update listener thread...")
        while not self.__stop_event.is_set() and not self.__deleted_event.is_set():
            # Wait for the update measures event to be set
            update = self.__update_measures_event.wait(
                timeout=self.WAIT_TIMEOUT
                )
            if not update:
                continue

            # Put the measure in the measures queue
            self.__measures_queue.put(self.__measures)

            # Clear the update measures event
            self.__update_measures_event.clear()

    @final
    def _start(self) -> None:
        with self.__rlock:
            # Check if the stop event is set
            if self.__stop_event.is_set():
                raise RuntimeError(
                    "Stop event is set. RPLidar will not run."
                )

            # Check if the RPLidar is already running
            if self.__started_event.is_set():
                raise RuntimeError(
                    "RPLidar is already running. Cannot start again."
                )

            # Set the started event to signal that the RPLidar has started
            self.__started_event.set()

        # Log
        self.__logger.info("Initialized.")

    def _stop(self) -> None:
        # Ensure the process is cleaned up even if an error occurs
        if self.__process and self.__process.poll() is None:
            # Terminate the process gracefully
            self.__process.terminate()
            self.__process.wait(timeout=self.PROCESS_WAIT_TIMEOUT)

            # If the process is still running, kill it
            if self.__process.poll() is None:
                self.__process.kill()

            # Wait for the process to finish
            self.__process.wait()
            self.__process = None

        # Wait for the listener for measures updates to finish
        if self.__update_measures_listener_thread:
            self.__update_measures_listener_thread.join()
            self.__update_measures_listener_thread = None

        with self.__rlock:
            # Clear the started event
            self.__started_event.clear()

            # Clear the deleted event
            self.__deleted_event.clear()

            # Reset the messages counter
            self.__messages_counter = 0

        # Log the stop message
        self.__logger.info("Stopped.")

    @final
    @ignore_sigint
    @log_on_error()
    def run(self):
        # Start the RPLidar
        self._start()

        # Wait for the start event to be set
        self.__logger.info("Waiting for the start event...")
        while not self.__stop_event.is_set() and not self.__deleted_event.is_set():
            if self.__start_event.wait(timeout=self.START_WAIT_TIMEOUT):
                break
        if self.__stop_event.is_set() or self.__deleted_event.is_set():
            # Stop the RPLidar if the stop or deleted event is set
            self._stop()
            return
        self.__logger.info("Started.")

        command = [
            ULTRA_SIMPLE_PATH,
            "--channel",
            "--serial",
            self.__port,
            str(self.__baudrate)
        ]

        try:
            self.__process = subprocess.Popen(
                command,
                stdout=subprocess.PIPE,
                stderr=subprocess.PIPE,
                text=True,  # Decode output as text
                bufsize=1,  # Line-buffered output
                universal_newlines=True  # Handles different newline characters
            )

        except FileNotFoundError:
            raise ValueError(
                f"The RPLidar ultra_simple executable was not found at {ULTRA_SIMPLE_PATH}. Please ensure it is installed correctly."
            )

        except Exception as e:
            raise RuntimeError(
                f"An error occurred while starting the RPLidar process: {e}"
            )

        try:
            # Read the output in a loop until the process ends or stop event is set
            while self.__process.poll() is None and not self.__stop_event.is_set() and not self.__deleted_event.is_set():
                self._read_output()

            # Stop the RPLidar process
            self._stop()

        except Exception as e:
            # Stop the RPLidar in case of an exception
            self._stop()
            raise e

    def __del__(self):
        """
        Destructor to clean up resources when the RPLidar is no longer needed.
        """
        self.__deleted_event.set()

        # Log
        self.__logger.info(
            "Instance is being deleted. Resources will be cleaned up."
        )

__del__()

Destructor to clean up resources when the RPLidar is no longer needed.

Source code in devices\raspberry_pi_5\src\rplidar\__init__.py
373
374
375
376
377
378
379
380
381
382
def __del__(self):
    """
    Destructor to clean up resources when the RPLidar is no longer needed.
    """
    self.__deleted_event.set()

    # Log
    self.__logger.info(
        "Instance is being deleted. Resources will be cleaned up."
    )

__init__(debug, update_measures_event, measures_queue, start_event, stop_event, writer_messages_queue, server_messages_queue=None, baudrate=RPLIDAR_C1_BAUDRATE, port=RPLIDAR_C1_PORT, is_upside_down=True, angle_rotation=None)

Initialize the RPLidar.

Parameters:

Name Type Description Default
debug bool

Flag to indicate if the RPLidar is in debug mode.

required
update_measures_event Event

Event to signal when the RPLidar should update measures.

required
measures_queue Queue

Queue to hold the measures from the RPLidar.

required
start_event Event

Event to signal when the RPLidar should start.

required
stop_event Event

Event to signal when the RPLidar should stop.

required
writer_messages_queue Queue

Queue to hold log messages.

required
server_messages_queue Optional[Queue]

Queue to broadcast messages through the websockets server.

None
baudrate int

Baud rate for the serial communication.

RPLIDAR_C1_BAUDRATE
port str

SerialCommunication port for the RPLidar.

RPLIDAR_C1_PORT
is_upside_down bool

If True, the RPLidar is upside down, and angles will be adjusted accordingly.

True
angle_rotation Optional[float]

Optional angle rotation to apply to the measures.

None
Source code in devices\raspberry_pi_5\src\rplidar\__init__.py
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
def __init__(
    self,
    debug: bool,
    update_measures_event: EventCls,
    measures_queue: Queue,
    start_event: EventCls,
    stop_event: EventCls,
    writer_messages_queue: Queue,
    server_messages_queue: Optional[Queue] = None,
    baudrate: int = RPLIDAR_C1_BAUDRATE,
    port: str = RPLIDAR_C1_PORT,
    is_upside_down: bool = True,
    angle_rotation: Optional[float] = None
):
    """
    Initialize the RPLidar.

    Args:
        debug (bool): Flag to indicate if the RPLidar is in debug mode.
        update_measures_event (EventCls): Event to signal when the RPLidar should update measures.
        measures_queue (Queue): Queue to hold the measures from the RPLidar.
        start_event (EventCls): Event to signal when the RPLidar should start.
        stop_event (EventCls): Event to signal when the RPLidar should stop.
        writer_messages_queue (Queue): Queue to hold log messages.
        server_messages_queue (Optional[Queue]): Queue to broadcast messages through the websockets server.
        baudrate (int): Baud rate for the serial communication.
        port (str): SerialCommunication port for the RPLidar.
        is_upside_down (bool): If True, the RPLidar is upside down, and angles will be adjusted accordingly.
        angle_rotation (Optional[float]): Optional angle rotation to apply to the measures.
    """
    # Initialize the debug flag
    self.__debug = debug

    # Initialize the queues and events
    self.__update_measures_event = update_measures_event
    self.__measures_queue = measures_queue
    self.__started_event = Event()
    self.__start_event = start_event
    self.__deleted_event = Event()
    self.__stop_event = stop_event

    # Initialize the logger
    self.__logger = Logger(
        writer_messages_queue,
        tag=self.LOGGER_TAG,
        debug=self.__debug
        )

    # Initialize the server dispatcher
    self.__server_dispatcher = WebSocketServerDispatcher(
        server_messages_queue,
        writer_messages_queue
    ) if server_messages_queue else None

    # Create the reentrant lock
    self.__rlock = RLock()

    # Check the type of baudrate
    is_instance(baudrate, int)
    self.__baudrate = baudrate

    # Check the type of the port
    is_instance(port, str)
    self.__port = port

    # Check the type of is_upside_down
    is_instance(is_upside_down, bool)
    self.__is_upside_down = is_upside_down

    # Check the type of angle translation
    is_instance(angle_rotation, (float, NoneType))
    self.__angle_translation = angle_rotation

    # Initialize measures dictionary
    self.__measures = {angle: Measure(angle * 1.0, 0.0, 0) for angle in
                       range(360)}

    # Messages counter
    self.__messages_counter = 0

    # Initialize the process
    self.__process = None

    # Initialize the rotation flag
    self.__rotation = False

    # Initialize the listener thread for measures updates
    self.__update_measures_listener_thread = None

abstracts

RPLidarABC

Bases: ABC

Abstract class to handle RPLidar operations.

Source code in devices\raspberry_pi_5\src\rplidar\abstracts.py
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
class RPLidarABC(ABC):
    """
    Abstract class to handle RPLidar operations.
    """

    @abstractmethod
    def logger(self) -> Logger:
        """
        Get the logger instance for the RPLidar.

        Returns:
            Logger: The logger instance.
        """
        pass

    @abstractmethod
    def _read_output(self):
        """
        Read the output from the RPLidar process.
        """
        pass

    @abstractmethod
    def _start(self) -> None:
        """
        Start the RPLidar process.

        Raises:
            RuntimeError: If the RPLidar process fails to start.
        """
        pass

    @abstractmethod
    def _stop(self) -> None:
        """
        Stop the RPLidar process.
        """
        pass

    @abstractmethod
    def run(self):
        """
        Run the RPLidar process.

        Raises:
            ValueError: If the ultra_simple file is not found.
            RuntimeError: If the RPLidar process fails to start.
        """
        pass

logger() abstractmethod

Get the logger instance for the RPLidar.

Returns:

Name Type Description
Logger Logger

The logger instance.

Source code in devices\raspberry_pi_5\src\rplidar\abstracts.py
12
13
14
15
16
17
18
19
20
@abstractmethod
def logger(self) -> Logger:
    """
    Get the logger instance for the RPLidar.

    Returns:
        Logger: The logger instance.
    """
    pass

run() abstractmethod

Run the RPLidar process.

Raises:

Type Description
ValueError

If the ultra_simple file is not found.

RuntimeError

If the RPLidar process fails to start.

Source code in devices\raspberry_pi_5\src\rplidar\abstracts.py
46
47
48
49
50
51
52
53
54
55
@abstractmethod
def run(self):
    """
    Run the RPLidar process.

    Raises:
        ValueError: If the ultra_simple file is not found.
        RuntimeError: If the RPLidar process fails to start.
    """
    pass

gui

App

Klevor RPLidar GUI Application.

Source code in devices\raspberry_pi_5\src\rplidar\gui.py
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
class App:
    """
    Klevor RPLidar GUI Application.
    """
    # IP and port for the WebSocket server
    IP = "0.0.0.0"
    PORT = 8765

    # Application size and scaling factors
    APP_SIZE = 800
    MAX_DISTANCE_RADIUS = MAX_DISTANCE_LIMIT
    MAX_DISTANCE_RADIUS_FACTOR = APP_SIZE / (2 * MAX_DISTANCE_RADIUS)
    RADIUS = APP_SIZE // 2
    CENTER_X = APP_SIZE // 2
    CENTER_Y = APP_SIZE // 2
    BACKGROUND_COLOR = (255, 255, 255)

    # Point properties
    POINT_RADIUS = 3
    POINT_COLOR = (255, 0, 0)  # Red
    POINT_BORDER_COLOR = (0, 0, 0)  # Black
    POINT_BORDER_WIDTH = 1

    # Central point properties
    CENTRAL_POINT_RADIUS = 5
    CENTRAL_POINT_COLOR = (0, 0, 255)  # Blue

    # Static circle properties
    STATIC_CIRCLE_RADIUS = 1000
    INTERNAL_STATIC_CIRCLE_COLOR = (0, 0, 0)  # Black
    INTERNAL_STATIC_CIRCLE_WIDTH = 2
    EXTERNAL_STATIC_CIRCLE_COLOR = (0, 0, 0)  # Black
    EXTERNAL_STATIC_CIRCLE_WIDTH = 3

    # Update properties
    UPDATE_DELAY = 50  # ms
    DISTANCE_MINIMUM_DIFFERENCE = 50

    def __init__(self, ip: str, port: int):
        """
        Initializes the application with the given IP and port.

        Args:
            ip (str): The IP address of the WebSocket server.
            port (int): The port of the WebSocket server.
        """
        # Initialize the GUI
        pygame.init()
        self.__screen = pygame.display.set_mode((self.APP_SIZE, self.APP_SIZE))
        pygame.display.set_caption("Klevor RPLidar GUI")
        self.__clock = pygame.time.Clock()

        # Initialize the measures and points-related objects
        self.__measures = [Measure(angle * 1.0, 0.0, 0) for angle in range(361)]
        self.__previous_measures = {angle: Measure(angle * 1.0, 0.0, 0) for
                                    angle in range(361)}
        self.__point_positions = {angle: (self.CENTER_X, self.CENTER_Y) for
                                  angle in range(361)}

        # Initialize the WebSocket server connection parameters
        self.__ip = ip
        self.__port = port
        self.__url = f'ws://{self.__ip}:{self.__port}'

    def draw_static(self):
        """
        Draws the static elements of the GUI, including circles and central point.
        """
        # Draw static circles
        n = int(self.MAX_DISTANCE_RADIUS / self.STATIC_CIRCLE_RADIUS)
        is_exact = self.MAX_DISTANCE_RADIUS % self.STATIC_CIRCLE_RADIUS == 0
        for i in range(1, n + 1):
            radius = int(
                i * self.STATIC_CIRCLE_RADIUS * self.MAX_DISTANCE_RADIUS_FACTOR
                )
            color = self.INTERNAL_STATIC_CIRCLE_COLOR if not is_exact or i < n else self.EXTERNAL_STATIC_CIRCLE_COLOR
            width = self.INTERNAL_STATIC_CIRCLE_WIDTH if not is_exact or i < n else self.EXTERNAL_STATIC_CIRCLE_WIDTH
            pygame.draw.circle(
                self.__screen, color,
                (self.CENTER_X, self.CENTER_Y), radius, width
            )

        # Draw external static circle
        if not is_exact:
            pygame.draw.circle(
                self.__screen, self.EXTERNAL_STATIC_CIRCLE_COLOR,
                (self.CENTER_X, self.CENTER_Y), self.RADIUS,
                self.EXTERNAL_STATIC_CIRCLE_WIDTH
            )

        # Draw central point
        pygame.draw.circle(
            self.__screen, self.CENTRAL_POINT_COLOR,
            (self.CENTER_X, self.CENTER_Y),
            self.CENTRAL_POINT_RADIUS
        )

    def update_points(self):
        """
        Updates the positions of the points based on the current measures.
        This method calculates the positions of the points based on the angle and distance
        of each measure, and stores them in the __point_positions dictionary.
        """
        for measure in self.__measures:
            prev = self.__previous_measures.get(int(measure.angle))
            if prev and abs(
                    prev.distance - measure.distance
            ) < self.DISTANCE_MINIMUM_DIFFERENCE:
                continue
            self.__previous_measures[int(measure.angle)] = measure

            # Adjust angle to match the coordinate system
            radian_angle = math.radians(int(measure.angle + 270) % 360)
            x = int(
                self.CENTER_X + measure.distance * math.cos(
                    radian_angle
                ) * self.MAX_DISTANCE_RADIUS_FACTOR
            )
            y = int(
                self.CENTER_Y + measure.distance * math.sin(
                    radian_angle
                ) * self.MAX_DISTANCE_RADIUS_FACTOR
            )
            self.__point_positions[int(measure.angle)] = (x, y)

    def draw_points(self):
        """
        Draws the points on the __screen based on the current measures.
        """
        for pos in self.__point_positions.values():
            # Draw the point
            pygame.draw.circle(
                self.__screen, self.POINT_COLOR, pos,
                self.POINT_RADIUS
            )

            # Draw border around the point
            pygame.draw.circle(
                self.__screen, self.POINT_BORDER_COLOR, pos,
                self.POINT_RADIUS, self.POINT_BORDER_WIDTH
            )

    def run(self):
        """
        Main loop of the application that handles events, updates the display, and draws the GUI.
        This method runs until the application is closed.
        """
        while True:
            for event in pygame.event.get():
                if event.type == pygame.QUIT:
                    break

            self.__screen.fill(self.BACKGROUND_COLOR)
            self.draw_static()
            self.update_points()
            self.draw_points()
            pygame.display.flip()
            self.__clock.tick(1000 // self.UPDATE_DELAY)

        pygame.quit()

    async def ws_listener(self):
        """
        Asynchronously listens for messages from the WebSocket server and updates the measures.
        This method connects to the WebSocket server and processes incoming messages,
        updating the measures accordingly.
        """

        print(f"Connecting to WebSocket server at {self.__url}...")
        async with connect(self.__url) as ws:
            while True:
                try:
                    # Get the message from the WebSocket server
                    msg_str = await ws.recv()
                    msg = Message.from_string(msg_str)

                    # Check if the message is a valid RPLidar message
                    if not msg.tag == Tag.RPLIDAR_MEASURES:
                        continue

                    # Parse the measures from the message content
                    measure = Measure.from_string(msg.content)
                    if not measure:
                        print(f"Invalid measure received: {msg.content}")
                        continue

                    angle_idx = int(measure.angle)
                    self.__measures[angle_idx] = measure

                except Exception as e:
                    print(f"Error receiving message: {e}")
                    continue

__init__(ip, port)

Initializes the application with the given IP and port.

Parameters:

Name Type Description Default
ip str

The IP address of the WebSocket server.

required
port int

The port of the WebSocket server.

required
Source code in devices\raspberry_pi_5\src\rplidar\gui.py
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
def __init__(self, ip: str, port: int):
    """
    Initializes the application with the given IP and port.

    Args:
        ip (str): The IP address of the WebSocket server.
        port (int): The port of the WebSocket server.
    """
    # Initialize the GUI
    pygame.init()
    self.__screen = pygame.display.set_mode((self.APP_SIZE, self.APP_SIZE))
    pygame.display.set_caption("Klevor RPLidar GUI")
    self.__clock = pygame.time.Clock()

    # Initialize the measures and points-related objects
    self.__measures = [Measure(angle * 1.0, 0.0, 0) for angle in range(361)]
    self.__previous_measures = {angle: Measure(angle * 1.0, 0.0, 0) for
                                angle in range(361)}
    self.__point_positions = {angle: (self.CENTER_X, self.CENTER_Y) for
                              angle in range(361)}

    # Initialize the WebSocket server connection parameters
    self.__ip = ip
    self.__port = port
    self.__url = f'ws://{self.__ip}:{self.__port}'

draw_points()

Draws the points on the __screen based on the current measures.

Source code in devices\raspberry_pi_5\src\rplidar\gui.py
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
def draw_points(self):
    """
    Draws the points on the __screen based on the current measures.
    """
    for pos in self.__point_positions.values():
        # Draw the point
        pygame.draw.circle(
            self.__screen, self.POINT_COLOR, pos,
            self.POINT_RADIUS
        )

        # Draw border around the point
        pygame.draw.circle(
            self.__screen, self.POINT_BORDER_COLOR, pos,
            self.POINT_RADIUS, self.POINT_BORDER_WIDTH
        )

draw_static()

Draws the static elements of the GUI, including circles and central point.

Source code in devices\raspberry_pi_5\src\rplidar\gui.py
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
def draw_static(self):
    """
    Draws the static elements of the GUI, including circles and central point.
    """
    # Draw static circles
    n = int(self.MAX_DISTANCE_RADIUS / self.STATIC_CIRCLE_RADIUS)
    is_exact = self.MAX_DISTANCE_RADIUS % self.STATIC_CIRCLE_RADIUS == 0
    for i in range(1, n + 1):
        radius = int(
            i * self.STATIC_CIRCLE_RADIUS * self.MAX_DISTANCE_RADIUS_FACTOR
            )
        color = self.INTERNAL_STATIC_CIRCLE_COLOR if not is_exact or i < n else self.EXTERNAL_STATIC_CIRCLE_COLOR
        width = self.INTERNAL_STATIC_CIRCLE_WIDTH if not is_exact or i < n else self.EXTERNAL_STATIC_CIRCLE_WIDTH
        pygame.draw.circle(
            self.__screen, color,
            (self.CENTER_X, self.CENTER_Y), radius, width
        )

    # Draw external static circle
    if not is_exact:
        pygame.draw.circle(
            self.__screen, self.EXTERNAL_STATIC_CIRCLE_COLOR,
            (self.CENTER_X, self.CENTER_Y), self.RADIUS,
            self.EXTERNAL_STATIC_CIRCLE_WIDTH
        )

    # Draw central point
    pygame.draw.circle(
        self.__screen, self.CENTRAL_POINT_COLOR,
        (self.CENTER_X, self.CENTER_Y),
        self.CENTRAL_POINT_RADIUS
    )

run()

Main loop of the application that handles events, updates the display, and draws the GUI.
This method runs until the application is closed.

Source code in devices\raspberry_pi_5\src\rplidar\gui.py
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
def run(self):
    """
    Main loop of the application that handles events, updates the display, and draws the GUI.
    This method runs until the application is closed.
    """
    while True:
        for event in pygame.event.get():
            if event.type == pygame.QUIT:
                break

        self.__screen.fill(self.BACKGROUND_COLOR)
        self.draw_static()
        self.update_points()
        self.draw_points()
        pygame.display.flip()
        self.__clock.tick(1000 // self.UPDATE_DELAY)

    pygame.quit()

update_points()

Updates the positions of the points based on the current measures.
This method calculates the positions of the points based on the angle and distance
of each measure, and stores them in the __point_positions dictionary.

Source code in devices\raspberry_pi_5\src\rplidar\gui.py
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
def update_points(self):
    """
    Updates the positions of the points based on the current measures.
    This method calculates the positions of the points based on the angle and distance
    of each measure, and stores them in the __point_positions dictionary.
    """
    for measure in self.__measures:
        prev = self.__previous_measures.get(int(measure.angle))
        if prev and abs(
                prev.distance - measure.distance
        ) < self.DISTANCE_MINIMUM_DIFFERENCE:
            continue
        self.__previous_measures[int(measure.angle)] = measure

        # Adjust angle to match the coordinate system
        radian_angle = math.radians(int(measure.angle + 270) % 360)
        x = int(
            self.CENTER_X + measure.distance * math.cos(
                radian_angle
            ) * self.MAX_DISTANCE_RADIUS_FACTOR
        )
        y = int(
            self.CENTER_Y + measure.distance * math.sin(
                radian_angle
            ) * self.MAX_DISTANCE_RADIUS_FACTOR
        )
        self.__point_positions[int(measure.angle)] = (x, y)

ws_listener() async

Asynchronously listens for messages from the WebSocket server and updates the measures.
This method connects to the WebSocket server and processes incoming messages,
updating the measures accordingly.

Source code in devices\raspberry_pi_5\src\rplidar\gui.py
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
async def ws_listener(self):
    """
    Asynchronously listens for messages from the WebSocket server and updates the measures.
    This method connects to the WebSocket server and processes incoming messages,
    updating the measures accordingly.
    """

    print(f"Connecting to WebSocket server at {self.__url}...")
    async with connect(self.__url) as ws:
        while True:
            try:
                # Get the message from the WebSocket server
                msg_str = await ws.recv()
                msg = Message.from_string(msg_str)

                # Check if the message is a valid RPLidar message
                if not msg.tag == Tag.RPLIDAR_MEASURES:
                    continue

                # Parse the measures from the message content
                measure = Measure.from_string(msg.content)
                if not measure:
                    print(f"Invalid measure received: {msg.content}")
                    continue

                angle_idx = int(measure.angle)
                self.__measures[angle_idx] = measure

            except Exception as e:
                print(f"Error receiving message: {e}")
                continue

multiprocessing

rplidar_target(debug, update_measures_event, measures_queue, start_event, stop_event, writer_messages_queue, server_messages_queue=None, baudrate=RPLIDAR_C1_BAUDRATE, port=RPLIDAR_C1_PORT, is_upside_down=True, angle_rotation=0.0)

Target function for a multiprocessing process that handles the RPLidar.

Parameters:

Name Type Description Default
debug bool

Flag to indicate if the RPLidar is in debug mode.

required
update_measures_event Event

Event to signal when the RPLidar should update measures.

required
measures_queue Queue

Queue to hold the measures from the RPLidar.

required
start_event Event

Event to signal when the RPLidar should start.

required
stop_event Event

Event to signal when the RPLidar should stop.

required
writer_messages_queue Queue

Queue to hold log messages.

required
server_messages_queue Optional[Queue]

Queue to broadcast messages through the websockets server.

None
baudrate int

Baud rate for the serial communication.

RPLIDAR_C1_BAUDRATE
port str

SerialCommunication port for the RPLidar.

RPLIDAR_C1_PORT
is_upside_down bool

If True, the RPLidar is upside down, and angles will be adjusted accordingly.

True
angle_rotation Optional[float]

Angle rotation to apply to the measures.

0.0
Source code in devices\raspberry_pi_5\src\rplidar\multiprocessing.py
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
def rplidar_target(
    debug: bool,
    update_measures_event: EventCls,
    measures_queue: Queue,
    start_event: EventCls,
    stop_event: EventCls,
    writer_messages_queue: Queue,
    server_messages_queue: Optional[Queue] = None,
    baudrate: int = RPLIDAR_C1_BAUDRATE,
    port: str = RPLIDAR_C1_PORT,
    is_upside_down: bool = True,
    angle_rotation: Optional[float] = 0.0,
) -> None:
    """
    Target function for a multiprocessing process that handles the RPLidar.

    Args:
        debug (bool): Flag to indicate if the RPLidar is in debug mode.
        update_measures_event (EventCls): Event to signal when the RPLidar should update measures.
        measures_queue (Queue): Queue to hold the measures from the RPLidar.
        start_event (EventCls): Event to signal when the RPLidar should start.
        stop_event (EventCls): Event to signal when the RPLidar should stop.
        writer_messages_queue (Queue): Queue to hold log messages.
        server_messages_queue (Optional[Queue]): Queue to broadcast messages through the websockets server.
        baudrate (int): Baud rate for the serial communication.
        port (str): SerialCommunication port for the RPLidar.
        is_upside_down (bool): If True, the RPLidar is upside down, and angles will be adjusted accordingly.
        angle_rotation (Optional[float]): Angle rotation to apply to the measures.
    """
    print(
        "Initializing RPLidar in multiprocessing mode. Process ID: ",
        os.getpid()
    )

    # Initialize the RPLidar
    rplidar = RPLidar(
        debug=debug,
        update_measures_event=update_measures_event,
        measures_queue=measures_queue,
        start_event=start_event,
        stop_event=stop_event,
        writer_messages_queue=writer_messages_queue,
        server_messages_queue=server_messages_queue,
        baudrate=baudrate,
        port=port,
        is_upside_down=is_upside_down,
        angle_rotation=angle_rotation,
    )

    # Run the RPLidar
    rplidar.run()