Skip to content

Instantly share code, notes, and snippets.

@YoshiRi
Created August 8, 2024 07:36
Show Gist options
  • Save YoshiRi/0287c951b18ab3d552aefafbd32bde3c to your computer and use it in GitHub Desktop.
Save YoshiRi/0287c951b18ab3d552aefafbd32bde3c to your computer and use it in GitHub Desktop.
PointcloudStatsAnalizer
Display the source blob
Display the rendered blob
Raw
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
import rclpy
from rclpy.node import Node
from sensor_msgs.msg import PointCloud2
import sensor_msgs_py.point_cloud2 as pcl2
import pandas as pd
import matplotlib.pyplot as plt
from collections import Counter
from rclpy.qos import QoSProfile, QoSReliabilityPolicy
class PointCloud2ToCSV(Node):
def __init__(self):
super().__init__('pointcloud2_to_csv')
best_effort_qos_profile = QoSProfile(depth=10, reliability=QoSReliabilityPolicy.BEST_EFFORT)
self.subscription = self.create_subscription(
PointCloud2,
# '/perception/obstacle_segmentation/single_frame/pointcloud',
# '/sensing/lidar/concatenated/pointcloud',
# '/sensing/lidar/top/pointcloud_before_sync',
'/sensing/lidar/top/rectified/pointcloud_ex',
self.listener_callback,
best_effort_qos_profile)
self.subscription # prevent unused variable warning
self.visualize = False
if self.visualize:
self.fig, self.ax = plt.subplots()
plt.ion()
plt.show()
self.duplicates_count = 0
self.total_points = 0
self.histogram_data = []
def listener_callback(self, msg):
# Extract points from PointCloud2 message
points = pcl2.read_points(msg, field_names=("x", "y", "z"), skip_nans=True)
# Convert points to a list of tuples
points_list = [(point[0], point[1], point[2]) for point in points]
# Create a DataFrame from the list of points
df = pd.DataFrame(points_list, columns=['x', 'y', 'z'])
# Count duplicates
duplicates = df[df.duplicated(keep=False)]
duplicates_count = len(duplicates)
total_points = len(df)
self.duplicates_count = duplicates_count
self.total_points = total_points
self.histogram_data = duplicates['x'].values
save_flag = False
if duplicates_count > 1000 or save_flag: # Save to CSV if there are more than 1000 duplicates
df.to_csv('pointcloud.csv', index=False)
self.get_logger().info(f'PointCloud data saved to pointcloud.csv with {duplicates_count} duplicates.')
self.update_histogram()
print(f'{duplicates_count} duplicates in total {total_points} points')
def update_histogram(self):
if not self.visualize:
return
self.ax.hist(self.histogram_data, bins=100)
self.ax.set_title(f'Histogram of x values in duplicates\n{self.duplicates_count} duplicates in total {self.total_points} points')
self.ax.set_xlabel('x')
self.ax.set_ylabel('Frequency')
plt.draw()
plt.pause(0.001)
def main(args=None):
rclpy.init(args=args)
node = PointCloud2ToCSV()
rclpy.spin(node)
node.destroy_node()
rclpy.shutdown()
if __name__ == '__main__':
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment