Skip to content

Commit

Permalink
fixed race condition
Browse files Browse the repository at this point in the history
  • Loading branch information
jian-dong committed May 9, 2024
1 parent 27be98d commit b205cfd
Show file tree
Hide file tree
Showing 4 changed files with 86 additions and 48 deletions.
6 changes: 3 additions & 3 deletions orbbec_camera/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,10 @@ project(orbbec_camera)
set(CMAKE_CXX_STANDARD 17)
set(CMAKE_C_STANDARD 11)
set(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} -fPIC -O3")
set(CMAKE_CXX_FLAGS_DEBUG "${CMAKE_CXX_FLAGS_DEBUG} -fPIC -g")
set(CMAKE_CXX_FLAGS_DEBUG "${CMAKE_CXX_FLAGS_DEBUG} -fPIC -g3")
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -fPIC -O3")
set(CMAKE_CXX_FLAGS_DEBUG "${CMAKE_CXX_FLAGS_DEBUG} -fPIC -g")
set(CMAKE_BUILD_TYPE "ReleaseWithDebInfo")
set(CMAKE_CXX_FLAGS_DEBUG "${CMAKE_CXX_FLAGS_DEBUG} -fPIC -g3")
set(CMAKE_BUILD_TYPE "Release")
option(USE_RK_HW_DECODER "Use Rockchip hardware decoder" OFF)
option(USE_NV_HW_DECODER "Use Nvidia hardware decoder" OFF)

Expand Down
10 changes: 7 additions & 3 deletions orbbec_camera/include/orbbec_camera/ob_camera_node.h
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,8 @@ class OBCameraNode {

void setupProfiles();

void updateImageConfig(const stream_index_pair& stream_index);

void printSensorProfiles(const std::shared_ptr<ob::Sensor>& sensor);

void selectBaseStream();
Expand Down Expand Up @@ -414,6 +416,7 @@ class OBCameraNode {
bool enable_point_cloud_ = true;
bool enable_colored_point_cloud_ = false;
sensor_msgs::msg::PointCloud2 point_cloud_msg_;
std::recursive_mutex point_cloud_mutex_;

orbbec_camera_msgs::msg::DeviceInfo device_info_;
std::string point_cloud_qos_;
Expand Down Expand Up @@ -476,10 +479,10 @@ class OBCameraNode {
bool is_color_frame_decoded_ = false;
std::mutex device_lock_;
// For color
std::queue<std::shared_ptr<ob::FrameSet>> colorFrameQueue_;
std::queue<std::shared_ptr<ob::FrameSet>> color_frame_queue_;
std::shared_ptr<std::thread> colorFrameThread_ = nullptr;
std::mutex colorFrameMtx_;
std::condition_variable colorFrameCV_;
std::mutex color_frame_queue_lock_;
std::condition_variable color_frame_queue_cv_;

bool ordered_pc_ = false;
bool use_hardware_time_ = true;
Expand Down Expand Up @@ -520,5 +523,6 @@ class OBCameraNode {
std::unique_ptr<ob::Align> align_filter_ = nullptr;
OBStreamType align_target_stream_ = OB_STREAM_COLOR;
bool retry_on_usb3_detection_failure_ = false;
std::atomic_bool is_camera_node_initialized_{false};
};
} // namespace orbbec_camera
2 changes: 1 addition & 1 deletion orbbec_camera/src/list_camera_profile.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ void printPreset(const std::shared_ptr<ob::Device>& device) {
std::cout << "Preset list:" << std::endl;
for (uint32_t i = 0; i < preset_list->count(); i++) {
auto name = preset_list->getName(i);
std::cout << "Preset list[ " << i << "]: " << name << std::endl;
std::cout << "Preset list[" << i << "]: " << name << std::endl;
}
}
int main() {
Expand Down
116 changes: 75 additions & 41 deletions orbbec_camera/src/ob_camera_node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ OBCameraNode::OBCameraNode(rclcpp::Node *node, std::shared_ptr<ob::Device> devic
if (enable_stream_[COLOR]) {
rgb_buffer_ = new uint8_t[width_[COLOR] * height_[COLOR] * 3];
}
is_camera_node_initialized_ = true;
}

template <class T>
Expand Down Expand Up @@ -95,7 +96,7 @@ void OBCameraNode::clean() {
}

if (colorFrameThread_ && colorFrameThread_->joinable()) {
colorFrameCV_.notify_all();
color_frame_queue_cv_.notify_all();
colorFrameThread_->join();
}

Expand Down Expand Up @@ -444,6 +445,11 @@ void OBCameraNode::setupProfiles() {
}
CHECK_NOTNULL(selected_profile);
stream_profile_[elem] = selected_profile;
height_[elem] = static_cast<int>(selected_profile->height());
width_[elem] = static_cast<int>(selected_profile->width());
fps_[elem] = static_cast<int>(selected_profile->fps());
format_[elem] = selected_profile->format();
updateImageConfig(elem);
images_[elem] =
cv::Mat(height_[elem], width_[elem], image_format_[elem], cv::Scalar(0, 0, 0));
RCLCPP_INFO_STREAM(
Expand Down Expand Up @@ -483,6 +489,28 @@ void OBCameraNode::setupProfiles() {
}
}
}
void OBCameraNode::updateImageConfig(const stream_index_pair &stream_index) {
if (format_[stream_index] == OB_FORMAT_Y8) {
image_format_[stream_index] = CV_8UC1;
encoding_[stream_index] = stream_index.first == OB_STREAM_DEPTH
? sensor_msgs::image_encodings::TYPE_8UC1
: sensor_msgs::image_encodings::MONO8;
unit_step_size_[stream_index] = sizeof(uint8_t);
}
if (format_[stream_index] == OB_FORMAT_MJPG) {
if (stream_index.first == OB_STREAM_IR || stream_index.first == OB_STREAM_IR_LEFT ||
stream_index.first == OB_STREAM_IR_RIGHT) {
image_format_[stream_index] = CV_8UC1;
encoding_[stream_index] = sensor_msgs::image_encodings::MONO8;
unit_step_size_[stream_index] = sizeof(uint8_t);
}
}
if (format_[stream_index] == OB_FORMAT_Y16 && stream_index == COLOR) {
image_format_[stream_index] = CV_16UC1;
encoding_[stream_index] = sensor_msgs::image_encodings::MONO16;
unit_step_size_[stream_index] = sizeof(uint16_t);
}
}

void OBCameraNode::startStreams() {
if (pipeline_ != nullptr) {
Expand Down Expand Up @@ -670,31 +698,20 @@ void OBCameraNode::getParameters() {
param_name = stream_name_[stream_index] + "_format";
setAndGetNodeParameter(format_str_[stream_index], param_name, format_str_[stream_index]);
format_[stream_index] = OBFormatFromString(format_str_[stream_index]);
if (format_[stream_index] == OB_FORMAT_Y8) {
image_format_[stream_index] = CV_8UC1;
encoding_[stream_index] = stream_index.first == OB_STREAM_DEPTH
? sensor_msgs::image_encodings::TYPE_8UC1
: sensor_msgs::image_encodings::MONO8;
unit_step_size_[stream_index] = sizeof(uint8_t);
}
if (format_[stream_index] == OB_FORMAT_MJPG) {
if (stream_index.first == OB_STREAM_IR || stream_index.first == OB_STREAM_IR_LEFT ||
stream_index.first == OB_STREAM_IR_RIGHT) {
image_format_[stream_index] = CV_8UC1;
encoding_[stream_index] = sensor_msgs::image_encodings::MONO8;
unit_step_size_[stream_index] = sizeof(uint8_t);
}
}
if (format_[stream_index] == OB_FORMAT_Y16 && stream_index == COLOR) {
image_format_[stream_index] = CV_16UC1;
encoding_[stream_index] = sensor_msgs::image_encodings::MONO16;
unit_step_size_[stream_index] = sizeof(uint16_t);
}

updateImageConfig(stream_index);
param_name = stream_name_[stream_index] + "_qos";
setAndGetNodeParameter<std::string>(image_qos_[stream_index], param_name, "default");
param_name = stream_name_[stream_index] + "_camera_info_qos";
setAndGetNodeParameter<std::string>(camera_info_qos_[stream_index], param_name, "default");
auto device_info = device_->getDeviceInfo();
CHECK_NOTNULL(device_info.get());
auto pid = device_info->pid();
if (isOpenNIDevice(pid)) {
use_hardware_time_ = false;
}
if (isGemini335PID(pid)) {
use_hardware_time_ = true;
}
}

for (auto stream_index : IMAGE_STREAMS) {
Expand Down Expand Up @@ -1009,6 +1026,7 @@ void OBCameraNode::publishDepthPointCloud(const std::shared_ptr<ob::FrameSet> &f
if (!depth_frame_) {
return;
}
std::lock_guard<decltype(point_cloud_mutex_)> point_cloud_msg_lock(point_cloud_mutex_);
auto depth_frame = depth_frame_->as<ob::DepthFrame>();
if (!depth_frame) {
RCLCPP_ERROR_STREAM(logger_, "depth frame is null");
Expand Down Expand Up @@ -1061,13 +1079,18 @@ void OBCameraNode::publishDepthPointCloud(const std::shared_ptr<ob::FrameSet> &f
}
}
}
if (valid_count == 0) {
RCLCPP_WARN(logger_, "No valid point in point cloud");
return;
}
if (!ordered_pc_) {
point_cloud_msg_.is_dense = true;
point_cloud_msg_.width = valid_count;
point_cloud_msg_.height = 1;
modifier.resize(valid_count);
}
auto timestamp = fromMsToROSTime(depth_frame->timeStamp());
auto timestamp = use_hardware_time_ ? fromUsToROSTime(depth_frame->timeStampUs())
: fromMsToROSTime(depth_frame->systemTimeStamp());
std::string frame_id = depth_registration_ ? optical_frame_id_[COLOR] : optical_frame_id_[DEPTH];
point_cloud_msg_.header.stamp = timestamp;
point_cloud_msg_.header.frame_id = frame_id;
Expand Down Expand Up @@ -1101,6 +1124,7 @@ void OBCameraNode::publishColoredPointCloud(const std::shared_ptr<ob::FrameSet>
return;
}
CHECK_NOTNULL(depth_frame_.get());
std::lock_guard<decltype(point_cloud_mutex_)> point_cloud_msg_lock(point_cloud_mutex_);
auto depth_frame = depth_frame_->as<ob::DepthFrame>();
auto color_frame = frame_set->colorFrame();
if (!depth_frame || !color_frame) {
Expand Down Expand Up @@ -1184,13 +1208,18 @@ void OBCameraNode::publishColoredPointCloud(const std::shared_ptr<ob::FrameSet>
}
}
}
if (valid_count == 0) {
RCLCPP_WARN(logger_, "No valid points in point cloud");
return;
}
if (!ordered_pc_) {
point_cloud_msg_.is_dense = true;
point_cloud_msg_.width = valid_count;
point_cloud_msg_.height = 1;
modifier.resize(valid_count);
}
auto timestamp = fromUsToROSTime(depth_frame->timeStampUs());
auto timestamp = use_hardware_time_ ? fromUsToROSTime(depth_frame->timeStampUs())
: fromMsToROSTime(depth_frame->systemTimeStamp());
point_cloud_msg_.header.stamp = timestamp;
point_cloud_msg_.header.frame_id = optical_frame_id_[COLOR];
depth_registration_cloud_pub_->publish(point_cloud_msg_);
Expand Down Expand Up @@ -1241,7 +1270,9 @@ void OBCameraNode::onNewFrameSetCallback(std::shared_ptr<ob::FrameSet> frame_set
if (!is_running_.load()) {
return;
}
std::lock_guard<decltype(device_lock_)> lock(device_lock_);
if (!is_camera_node_initialized_.load()) {
return;
}
if (frame_set == nullptr) {
return;
}
Expand All @@ -1250,13 +1281,12 @@ void OBCameraNode::onNewFrameSetCallback(std::shared_ptr<ob::FrameSet> frame_set
publishStaticTransforms();
tf_published_ = true;
}

// is_color_frame_decoded_ = decodeColorFrameToBuffer(frame_set->colorFrame(), rgb_buffer_);
std::shared_ptr<ob::ColorFrame> colorFrame = frame_set->colorFrame();
depth_frame_ = frame_set->getFrame(OB_FRAME_DEPTH);

auto device_info = device_->getDeviceInfo();
CHECK_NOTNULL(device_info.get());
auto pid = device_info->pid();

if (isGemini335PID(pid)) {
if (depth_registration_ && align_filter_ && depth_frame_) {
auto new_frame = align_filter_->process(frame_set);
Expand All @@ -1268,11 +1298,11 @@ void OBCameraNode::onNewFrameSetCallback(std::shared_ptr<ob::FrameSet> frame_set
}
depth_frame_ = processDepthFrameFilter(depth_frame_);
}

if (enable_stream_[COLOR] && colorFrame) {
std::lock_guard<std::mutex> colorLock(colorFrameMtx_);
colorFrameQueue_.push(frame_set);
colorFrameCV_.notify_all();
auto color_frame = frame_set->getFrame(OB_FRAME_COLOR);
if (enable_stream_[COLOR] && color_frame) {
std::unique_lock<std::mutex> lock(color_frame_queue_lock_);
color_frame_queue_.push(frame_set);
color_frame_queue_cv_.notify_all();
} else {
publishPointCloud(frame_set);
}
Expand All @@ -1291,7 +1321,6 @@ void OBCameraNode::onNewFrameSetCallback(std::shared_ptr<ob::FrameSet> frame_set
if (stream_index == DEPTH) {
frame = depth_frame_;
}

std::shared_ptr<ob::Frame> irFrame = decodeIRMJPGFrame(frame);
if (irFrame) {
onNewFrameCallback(irFrame, stream_index);
Expand All @@ -1312,19 +1341,19 @@ void OBCameraNode::onNewFrameSetCallback(std::shared_ptr<ob::FrameSet> frame_set

void OBCameraNode::onNewColorFrameCallback() {
while (enable_stream_[COLOR] && rclcpp::ok() && is_running_.load()) {
std::unique_lock<std::mutex> lock(colorFrameMtx_);
colorFrameCV_.wait(lock,
[this]() { return !colorFrameQueue_.empty() || !(is_running_.load()); });
std::unique_lock<std::mutex> lock(color_frame_queue_lock_);
color_frame_queue_cv_.wait(
lock, [this]() { return !color_frame_queue_.empty() || !(is_running_.load()); });

if (!rclcpp::ok() || !is_running_.load()) {
break;
}

std::shared_ptr<ob::FrameSet> frameSet = colorFrameQueue_.front();
std::shared_ptr<ob::FrameSet> frameSet = color_frame_queue_.front();
is_color_frame_decoded_ = decodeColorFrameToBuffer(frameSet->colorFrame(), rgb_buffer_);
publishPointCloud(frameSet);
onNewFrameCallback(frameSet->colorFrame(), IMAGE_STREAMS.at(2));
colorFrameQueue_.pop();
onNewFrameCallback(frameSet->colorFrame(), COLOR);
color_frame_queue_.pop();
}

RCLCPP_INFO_STREAM(logger_, "Color frame thread exit!");
Expand Down Expand Up @@ -1477,7 +1506,6 @@ void OBCameraNode::onNewFrameCallback(const std::shared_ptr<ob::Frame> &frame,
}
int width = static_cast<int>(video_frame->width());
int height = static_cast<int>(video_frame->height());
use_hardware_time_ = true;
auto timestamp = use_hardware_time_ ? fromUsToROSTime(video_frame->timeStampUs())
: fromMsToROSTime(video_frame->systemTimeStamp());
auto device_info = device_->getDeviceInfo();
Expand Down Expand Up @@ -1633,6 +1661,9 @@ void OBCameraNode::saveImageToFile(const stream_index_pair &stream_index, const

void OBCameraNode::onNewIMUFrameSyncOutputCallback(const std::shared_ptr<ob::Frame> &accelframe,
const std::shared_ptr<ob::Frame> &gryoframe) {
if (!is_camera_node_initialized_) {
return;
}
if (!imu_gyro_accel_publisher_) {
RCLCPP_ERROR_STREAM(logger_, "stream Accel Gryo publisher not initialized");
return;
Expand Down Expand Up @@ -1670,6 +1701,9 @@ void OBCameraNode::onNewIMUFrameSyncOutputCallback(const std::shared_ptr<ob::Fra

void OBCameraNode::onNewIMUFrameCallback(const std::shared_ptr<ob::Frame> &frame,
const stream_index_pair &stream_index) {
if (!is_camera_node_initialized_) {
return;
}
if (!imu_publishers_.count(stream_index)) {
RCLCPP_ERROR_STREAM(logger_,
"stream " << stream_name_[stream_index] << " publisher not initialized");
Expand Down

0 comments on commit b205cfd

Please sign in to comment.