Fix multithreaded lazy pub sub

This commit is contained in:
Paul Bovbel 2015-01-22 09:21:45 -05:00 committed by Paul Bovbel
parent e739e0f297
commit 040f68c31f
2 changed files with 77 additions and 59 deletions

View File

@ -46,7 +46,6 @@
#include "nodelet/nodelet.h"
#include "tf2_ros/buffer.h"
#include "tf2_ros/transform_listener.h"
#include "tf2_ros/message_filter.h"
#include "message_filters/subscriber.h"
#include "sensor_msgs/PointCloud2.h"
@ -54,7 +53,6 @@
namespace pointcloud_to_laserscan
{
typedef message_filters::Subscriber<sensor_msgs::PointCloud2> FilteredSub;
typedef tf2_ros::MessageFilter<sensor_msgs::PointCloud2> MessageFilter;
/**
* Class to process incoming pointclouds into laserscans. Some initial code was pulled from the defunct turtlebot
@ -82,8 +80,7 @@ namespace pointcloud_to_laserscan
boost::mutex connect_mutex_;
tf2_ros::Buffer tf2_;
tf2_ros::TransformListener tf2_listener_;
boost::shared_ptr<FilteredSub> sub_;
message_filters::Subscriber<sensor_msgs::PointCloud2> sub_;
boost::shared_ptr<MessageFilter> message_filter_;
// ROS Parameters

View File

@ -47,25 +47,22 @@
namespace pointcloud_to_laserscan
{
PointCloudToLaserScanNodelet::PointCloudToLaserScanNodelet()
: tf2_(), tf2_listener_(tf2_)
{ }
PointCloudToLaserScanNodelet::PointCloudToLaserScanNodelet() {}
void PointCloudToLaserScanNodelet::onInit()
{
nh_ = getMTNodeHandle();
private_nh_ = getMTPrivateNodeHandle();
boost::mutex::scoped_lock lock(connect_mutex_);
private_nh_ = getPrivateNodeHandle();
private_nh_.param<std::string>("target_frame", target_frame_, "");
private_nh_.param<double>("tolerance", tolerance_, 0.01);
private_nh_.param<double>("min_height", min_height_, 0.0);
private_nh_.param<double>("max_height", max_height_, 1.0);
private_nh_.param<double>("angle_min", angle_min_, -M_PI/2.0);
private_nh_.param<double>("angle_max", angle_max_, M_PI/2.0);
private_nh_.param<double>("angle_increment", angle_increment_, M_PI/360.0);
private_nh_.param<double>("scan_time", scan_time_, 1.0/30.0);
private_nh_.param<double>("angle_min", angle_min_, -M_PI / 2.0);
private_nh_.param<double>("angle_max", angle_max_, M_PI / 2.0);
private_nh_.param<double>("angle_increment", angle_increment_, M_PI / 360.0);
private_nh_.param<double>("scan_time", scan_time_, 1.0 / 30.0);
private_nh_.param<double>("range_min", range_min_, 0.45);
private_nh_.param<double>("range_max", range_max_, 4.0);
@ -73,65 +70,79 @@ namespace pointcloud_to_laserscan
private_nh_.param<int>("concurrency_level", concurrency_level, true);
private_nh_.param<bool>("use_inf", use_inf_, true);
boost::mutex::scoped_lock lock(connect_mutex_);
//Check if explicitly single threaded, otherwise, let nodelet manager dictate thread pool size
if (concurrency_level == 1)
{
nh_ = getNodeHandle();
}
else
{
nh_ = getMTNodeHandle();
}
// Only queue one pointcloud per running thread
if(concurrency_level > 0)
if (concurrency_level > 0)
{
input_queue_size_ = concurrency_level;
}else{
}
else
{
input_queue_size_ = boost::thread::hardware_concurrency();
}
// if pointcloud target frame specified, we need to filter by transform availability
if (!target_frame_.empty())
{
message_filter_.reset(new MessageFilter(sub_, tf2_, target_frame_, input_queue_size_, nh_));
message_filter_->setTolerance(ros::Duration(tolerance_));
message_filter_->registerCallback(boost::bind(&PointCloudToLaserScanNodelet::cloudCb, this, _1));
message_filter_->registerFailureCallback(boost::bind(&PointCloudToLaserScanNodelet::failureCb, this, _1, _2));
}
else // otherwise setup direct subscription
{
sub_.registerCallback(boost::bind(&PointCloudToLaserScanNodelet::cloudCb, this, _1));
}
pub_ = nh_.advertise<sensor_msgs::LaserScan>("scan", 10,
boost::bind(&PointCloudToLaserScanNodelet::connectCb, this),
boost::bind(&PointCloudToLaserScanNodelet::disconnectCb, this));
boost::bind(&PointCloudToLaserScanNodelet::connectCb, this),
boost::bind(&PointCloudToLaserScanNodelet::disconnectCb, this));
}
void PointCloudToLaserScanNodelet::connectCb()
{
boost::mutex::scoped_lock lock(connect_mutex_);
if (!sub_ && pub_.getNumSubscribers() > 0) {
if (pub_.getNumSubscribers() > 0 && sub_.getSubscriber().getNumPublishers() == 0)
{
NODELET_INFO("Got a subscriber to scan, starting subscriber to pointcloud");
sub_.reset(new FilteredSub(nh_, "cloud_in", input_queue_size_));
if(!target_frame_.empty())
{
message_filter_.reset(new MessageFilter(*sub_, tf2_, target_frame_, input_queue_size_, nh_));
message_filter_->setTolerance(ros::Duration(tolerance_));
message_filter_->registerCallback(boost::bind(&PointCloudToLaserScanNodelet::cloudCb, this, _1));
message_filter_->registerFailureCallback(boost::bind(&PointCloudToLaserScanNodelet::failureCb, this, _1, _2));
}else{
sub_->registerCallback(boost::bind(&PointCloudToLaserScanNodelet::cloudCb, this, _1));
}
sub_.subscribe(nh_, "cloud_in", input_queue_size_);
}
}
void PointCloudToLaserScanNodelet::failureCb(const sensor_msgs::PointCloud2ConstPtr &cloud_msg,
tf2_ros::filter_failure_reasons::FilterFailureReason reason){
NODELET_WARN_STREAM_THROTTLE(3.0, "Can't transform pointcloud from frame " << cloud_msg->header.frame_id << " to "
<< message_filter_->getTargetFramesString() << " with tolerance " << tolerance_);
}
void PointCloudToLaserScanNodelet::disconnectCb()
{
boost::mutex::scoped_lock lock(connect_mutex_);
if (pub_.getNumSubscribers() == 0) {
if (pub_.getNumSubscribers() == 0)
{
NODELET_INFO("No subscibers to scan, shutting down subscriber to pointcloud");
if(!target_frame_.empty())
{
message_filter_.reset();
}
sub_.reset();
sub_.unsubscribe();
}
}
void PointCloudToLaserScanNodelet::failureCb(const sensor_msgs::PointCloud2ConstPtr &cloud_msg,
tf2_ros::filter_failure_reasons::FilterFailureReason reason)
{
NODELET_WARN_STREAM_THROTTLE(1.0, "Can't transform pointcloud from frame " << cloud_msg->header.frame_id << " to "
<< message_filter_->getTargetFramesString() << " with tolerance " << tolerance_);
}
void PointCloudToLaserScanNodelet::cloudCb(const sensor_msgs::PointCloud2ConstPtr &cloud_msg)
{
//build laserscan output
sensor_msgs::LaserScan output;
output.header = cloud_msg->header;
if(!target_frame_.empty()){
if (!target_frame_.empty())
{
output.header.frame_id = target_frame_;
}
@ -147,9 +158,12 @@ namespace pointcloud_to_laserscan
uint32_t ranges_size = std::ceil((output.angle_max - output.angle_min) / output.angle_increment);
//determine if laserscan rays with no obstacle data will evaluate to infinity or max_range
if(use_inf_){
if (use_inf_)
{
output.ranges.assign(ranges_size, std::numeric_limits<double>::infinity());
}else{
}
else
{
output.ranges.assign(ranges_size, output.range_max + 1.0);
}
@ -157,7 +171,7 @@ namespace pointcloud_to_laserscan
sensor_msgs::PointCloud2Ptr cloud;
// Transform cloud if necessary
if(!(output.header.frame_id == cloud_msg->header.frame_id))
if (!(output.header.frame_id == cloud_msg->header.frame_id))
{
try
{
@ -170,43 +184,50 @@ namespace pointcloud_to_laserscan
NODELET_ERROR_STREAM("Transform failure: " << ex.what());
return;
}
}else{
}
else
{
cloud_out = cloud_msg;
}
// Iterate through pointcloud
sensor_msgs::PointCloud2ConstIterator<float> iter_x(*cloud_out, "x");
sensor_msgs::PointCloud2ConstIterator<float> iter_y(*cloud_out, "y");
sensor_msgs::PointCloud2ConstIterator<float> iter_z(*cloud_out, "z");
for (sensor_msgs::PointCloud2ConstIterator<float>
iter_x(*cloud_out, "x"), iter_y(*cloud_out, "y"), iter_z(*cloud_out, "z");
iter_x != iter_x.end();
++iter_x, ++iter_y, ++iter_z)
{
for(; iter_x != iter_x.end(); ++iter_x, ++iter_y, ++iter_z){
if ( std::isnan(*iter_x) || std::isnan(*iter_y) || std::isnan(*iter_z) ){
if (std::isnan(*iter_x) || std::isnan(*iter_y) || std::isnan(*iter_z))
{
NODELET_DEBUG("rejected for nan in point(%f, %f, %f)\n", *iter_x, *iter_y, *iter_z);
continue;
}
if (*iter_z > max_height_ || *iter_z < min_height_){
if (*iter_z > max_height_ || *iter_z < min_height_)
{
NODELET_DEBUG("rejected for height %f not in range (%f, %f)\n", *iter_z, min_height_, max_height_);
continue;
}
double range = hypot(*iter_x,*iter_y);
if (range < range_min_){
double range = hypot(*iter_x, *iter_y);
if (range < range_min_)
{
NODELET_DEBUG("rejected for range %f below minimum value %f. Point: (%f, %f, %f)", range, range_min_, *iter_x, *iter_y,
*iter_z);
*iter_z);
continue;
}
double angle = atan2(*iter_y, *iter_x);
if (angle < output.angle_min || angle > output.angle_max){
if (angle < output.angle_min || angle > output.angle_max)
{
NODELET_DEBUG("rejected for angle %f not in range (%f, %f)\n", angle, output.angle_min, output.angle_max);
continue;
}
//overwrite range at laserscan ray if new range is smaller
int index = (angle - output.angle_min) / output.angle_increment;
if (range < output.ranges[index]){
if (range < output.ranges[index])
{
output.ranges[index] = range;
}