Created
August 8, 2024 07:36
-
-
Save YoshiRi/0287c951b18ab3d552aefafbd32bde3c to your computer and use it in GitHub Desktop.
PointcloudStatsAnalizer
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import rclpy | |
from rclpy.node import Node | |
from sensor_msgs.msg import PointCloud2 | |
import sensor_msgs_py.point_cloud2 as pcl2 | |
import pandas as pd | |
import matplotlib.pyplot as plt | |
from collections import Counter | |
from rclpy.qos import QoSProfile, QoSReliabilityPolicy | |
class PointCloud2ToCSV(Node): | |
def __init__(self): | |
super().__init__('pointcloud2_to_csv') | |
best_effort_qos_profile = QoSProfile(depth=10, reliability=QoSReliabilityPolicy.BEST_EFFORT) | |
self.subscription = self.create_subscription( | |
PointCloud2, | |
# '/perception/obstacle_segmentation/single_frame/pointcloud', | |
# '/sensing/lidar/concatenated/pointcloud', | |
# '/sensing/lidar/top/pointcloud_before_sync', | |
'/sensing/lidar/top/rectified/pointcloud_ex', | |
self.listener_callback, | |
best_effort_qos_profile) | |
self.subscription # prevent unused variable warning | |
self.visualize = False | |
if self.visualize: | |
self.fig, self.ax = plt.subplots() | |
plt.ion() | |
plt.show() | |
self.duplicates_count = 0 | |
self.total_points = 0 | |
self.histogram_data = [] | |
def listener_callback(self, msg): | |
# Extract points from PointCloud2 message | |
points = pcl2.read_points(msg, field_names=("x", "y", "z"), skip_nans=True) | |
# Convert points to a list of tuples | |
points_list = [(point[0], point[1], point[2]) for point in points] | |
# Create a DataFrame from the list of points | |
df = pd.DataFrame(points_list, columns=['x', 'y', 'z']) | |
# Count duplicates | |
duplicates = df[df.duplicated(keep=False)] | |
duplicates_count = len(duplicates) | |
total_points = len(df) | |
self.duplicates_count = duplicates_count | |
self.total_points = total_points | |
self.histogram_data = duplicates['x'].values | |
save_flag = False | |
if duplicates_count > 1000 or save_flag: # Save to CSV if there are more than 1000 duplicates | |
df.to_csv('pointcloud.csv', index=False) | |
self.get_logger().info(f'PointCloud data saved to pointcloud.csv with {duplicates_count} duplicates.') | |
self.update_histogram() | |
print(f'{duplicates_count} duplicates in total {total_points} points') | |
def update_histogram(self): | |
if not self.visualize: | |
return | |
self.ax.hist(self.histogram_data, bins=100) | |
self.ax.set_title(f'Histogram of x values in duplicates\n{self.duplicates_count} duplicates in total {self.total_points} points') | |
self.ax.set_xlabel('x') | |
self.ax.set_ylabel('Frequency') | |
plt.draw() | |
plt.pause(0.001) | |
def main(args=None): | |
rclpy.init(args=args) | |
node = PointCloud2ToCSV() | |
rclpy.spin(node) | |
node.destroy_node() | |
rclpy.shutdown() | |
if __name__ == '__main__': | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment