Skip to content

笔记重点:理解 rclcpp 中底层等待机制(WaitSet)与高层调度器(Executor)的关系与协作方式


一、WaitSet:底层等待原语

1.1 核心概念

WaitSet 是一个多路复用等待机制,能够同时监听多个 ROS 2 实体,直到其中某个实体"就绪"。

可监听的实体类型

  • Subscription(订阅者):消息到达时触发
  • GuardCondition(守卫条件):手动触发或内部事件触发
  • Timer(定时器):到期触发
  • Client(客户端):服务响应到达
  • Service(服务端):服务请求到达
  • Waitable(可等待对象):自定义可等待实体

1.2 WaitSet 的三种配置

定义位置:wait_set.hpp

cpp
// 1. 标准配置:动态存储 + 非线程安全(最常用)
using WaitSet = rclcpp::WaitSetTemplate<
  rclcpp::wait_set_policies::SequentialSynchronization,
  rclcpp::wait_set_policies::DynamicStorage
>;

// 2. 静态配置:固定容量 + 不可变(性能最优)
template<size_t NSub, size_t NGuard, size_t NTimer, ...>
using StaticWaitSet = rclcpp::WaitSetTemplate<
  rclcpp::wait_set_policies::SequentialSynchronization,
  rclcpp::wait_set_policies::StaticStorage<NSub, NGuard, NTimer, ...>
>;

// 3. 线程安全配置:动态存储 + 线程安全
using ThreadSafeWaitSet = rclcpp::WaitSetTemplate<
  rclcpp::wait_set_policies::ThreadSafeSynchronization,
  rclcpp::wait_set_policies::DynamicStorage
>;

1.3 WaitSet 工作流程

创建 WaitSet → 添加实体 → wait() 阻塞等待 → 返回就绪实体 → 执行处理 → 重复

关键方法wait_set_template.hpp):

  • add_*() 系列:添加不同类型的实体
  • remove_*() 系列:移除实体
  • wait(timeout):等待任意实体就绪
  • prune_destroyed_entities():清理已销毁的实体

二、Executor:高层调度器

2.1 核心定位

Executor 是 ROS 2 的事件循环调度器,负责:

  1. 管理节点和回调组
  2. 使用 WaitSet 等待事件
  3. 决策并执行回调
  4. 控制执行顺序和并发

定义位置:executor.hpp

cpp
/// Coordinate the order and timing of available communication tasks.
class Executor {
  // 核心方法
  virtual void spin() = 0;  // 阻塞式旋转
  void add_callback_group(...);
  void add_node(...);
  void spin_once_impl(...);
};

2.2 Executor 的两种实现

SingleThreadedExecutor(单线程执行器)

定义位置

cpp
class SingleThreadedExecutor : public rclcpp::Executor {
public:
  void spin() override;  // 单线程顺序执行所有回调
};

特点

  • ✅ 执行顺序可预测,无并发问题
  • ✅ 资源占用少,适合轻量级应用
  • ❌ 无法利用多核并行处理

适用场景

  • 回调执行顺序至关重要的场景
  • 资源受限的嵌入式系统
  • 需要确定性行为的实时系统

MultiThreadedExecutor(多线程执行器)

定义位置

cpp
class MultiThreadedExecutor : public rclcpp::Executor {
public:
  explicit MultiThreadedExecutor(
    const rclcpp::ExecutorOptions & options = rclcpp::ExecutorOptions(),
    size_t number_of_threads = 0,  // 0 表示使用 CPU 核心数(最少2)
    bool yield_before_execute = false,
    std::chrono::nanoseconds timeout = std::chrono::nanoseconds(-1)
  );
  
  void spin() override;  // 多线程并发执行
private:
  void run(size_t this_thread_number);  // 每个工作线程的执行逻辑
};

特点

  • 充分利用多核 CPU,提高吞吐量
  • 支持 Reentrant 回调组的并发执行
  • 需要处理线程同步问题
  • 执行顺序不可预测

适用场景

  • CPU 密集型回调处理
  • 需要高吞吐量的应用
  • 可并发处理的独立任务

三、Executor 内部工作原理

3.1 核心成员变量

定义位置

cpp
class Executor {
  std::atomic_bool spinning;  // 旋转状态标志
  std::shared_ptr<GuardCondition> interrupt_guard_condition_;  // 中断守卫条件
  std::shared_ptr<GuardCondition> shutdown_guard_condition_;  // 关闭守卫条件
  std::mutex mutex_;  // 保护共享状态
  std::shared_ptr<Context> context_;  // ROS 2 上下文
  
  // 实体收集和管理
  rclcpp::executors::ExecutorEntitiesCollector collector_;
  rclcpp::executors::ExecutorEntitiesCollection current_collection_;
  
  // 核心 WaitSet
  rclcpp::WaitSet wait_set_;  // 等待集
  std::optional<rclcpp::WaitResult<rclcpp::WaitSet>> wait_result_;
  
  // 通知机制
  std::shared_ptr<executors::ExecutorNotifyWaitable> notify_waitable_;
  std::atomic_bool entities_need_rebuild_;  // 是否需要重建实体集
};

3.2 Executor 工作流程

cpp
void Executor::spin() {
  spinning = true;
  try {
    while (spinning) {
      // 步骤 1:收集所有需要监听的实体
      collect_entities();
      
      // 步骤 2:使用 WaitSet 等待实体就绪
      wait_for_work(timeout);
      
      // 步骤 3:获取并执行准备好的回调
      while (get_next_ready_executable(any_executable)) {
        execute_any_executable(any_executable);
      }
    }
  } catch (...) {
    spinning = false;
    throw;
  }
}

关键方法

  • collect_entities():从节点和回调组收集实体 定义
  • wait_for_work(timeout):阻塞等待事件 定义
  • get_next_ready_executable(any_executable):获取就绪的可执行对象 定义
  • execute_any_executable(any_executable):执行回调

3.3 实体重建机制

Executor 使用 notify_waitable_ 监听实体变化:

cpp
// 当实体发生变化时
void handle_updated_entities(bool notify) {
  entities_need_rebuild_ = true;  // 标记需要重建
  if (notify) {
    // 触发 interrupt_guard_condition_,唤醒正在等待的 spin()
  }
}

// 在 spin() 中检查并重建
if (entities_need_rebuild_) {
  collect_entities();  // 重新收集实体
  entities_need_rebuild_ = false;
}

四、WaitSet 与 Executor 的关系

4.1 分层架构

┌─────────────────────────────────────────────────────────────┐
│                    应用层代码                                │
│  rclcpp::spin(node) 或 executor.spin()                     │
└─────────────────────────────────────────────────────────────┘

┌─────────────────────────────────────────────────────────────┐
│                   Executor(高层调度器)                     │
│  • 管理节点和回调组                                          │
│  • 决策回调执行顺序                                          │
│  • 控制并发模型(单线程/多线程)                              │
└─────────────────────────────────────────────────────────────┘

┌─────────────────────────────────────────────────────────────┐
│                  WaitSet(底层等待机制)                     │
│  • 监听多个实体的就绪状态                                     │
│  • 基于系统调用实现多路复用                                   │
│  • 封装 rcl_wait_set_t                                      │
└─────────────────────────────────────────────────────────────┘

┌─────────────────────────────────────────────────────────────┐
│                     rcl / rmw 层                            │
│  • rcl_wait_set_t                                            │
│  • DDS 等中间件通信                                          │
└─────────────────────────────────────────────────────────────┘

4.2 职责划分

维度 WaitSet Executor
抽象层次 底层等待原语 高层调度器
关注点 何时有工作 如何执行工作
功能 多路复用等待、实体管理 回调执行、顺序控制、并发管理
使用场景 高级自定义执行逻辑 绝大多数 ROS 2 应用
线程安全 可选(ThreadSafeWaitSet) MultiThreadedExecutor

五、代码示例

5.1 标准使用:通过 Executor(推荐)

cpp
#include <rclcpp/rclcpp.hpp>

int main(int argc, char ** argv) {
  rclcpp::init(argc, argv);
  
  // 创建节点
  auto node = rclcpp::Node::make_shared("my_node");
  
  // 创建订阅者
  auto subscription = node->create_subscription<std_msgs::msg::String>(
    "topic", 10,
    [](const std_msgs::msg::String::SharedPtr msg) {
      RCLCPP_INFO(rclcpp::get_logger("callback"), "Received: '%s'", msg->data.c_str());
    });
  
  // 方式 1:使用全局默认 executor(单线程)
  rclcpp::spin(node);
  
  // 方式 2:显式创建 SingleThreadedExecutor
  rclcpp::executors::SingleThreadedExecutor executor;
  executor.add_node(node);
  executor.spin();
  
  // 方式 3:使用 MultiThreadedExecutor
  rclcpp::executors::MultiThreadedExecutor executor(2);  // 2 个线程
  executor.add_node(node);
  executor.spin();
  
  rclcpp::shutdown();
  return 0;
}

5.2 高级使用:直接使用 WaitSet

cpp
#include <rclcpp/rclcpp.hpp>

int main(int argc, char ** argv) {
  rclcpp::init(argc, argv);
  
  auto node = rclcpp::Node::make_shared("custom_waitset_node");
  
  // 创建实体
  auto subscription = node->create_subscription<std_msgs::msg::String>(
    "topic", 10,
    [](const std_msgs::msg::String::SharedPtr msg) {
      RCLCPP_INFO(rclcpp::get_logger("callback"), "Received: '%s'", msg->data.c_str());
    });
  
  auto timer = node->create_wall_timer(
    std::chrono::seconds(1),
    []() {
      RCLCPP_INFO(rclcpp::get_logger("timer"), "Timer triggered");
    });
  
  // 创建自定义 WaitSet
  rclcpp::WaitSet wait_set;
  wait_set.add_subscription(subscription);
  wait_set.add_timer(timer);
  
  // 自定义等待循环
  while (rclcpp::ok()) {
    // 等待任意实体就绪(超时 1 秒)
    auto result = wait_set.wait(std::chrono::seconds(1));
    
    if (result.kind() == rclcpp::WaitResultKind::Ready) {
      // 检查并执行就绪的实体
      if (result.is_ready(subscription)) {
        // 手动获取并执行订阅回调(需要更复杂的逻辑)
        RCLCPP_INFO(rclcpp::get_logger("waitset"), "Subscription is ready");
      }
      if (result.is_ready(timer)) {
        // 执行定时器回调
        RCLCPP_INFO(rclcpp::get_logger("waitset"), "Timer is ready");
        timer->execute_callback();
      }
    } else if (result.kind() == rclcpp::WaitResultKind::Timeout) {
      RCLCPP_INFO(rclcpp::get_logger("waitset"), "Timeout, no work ready");
    } else if (result.kind() == rclcpp::WaitResultKind::Empty) {
      RCLCPP_INFO(rclcpp::get_logger("waitset"), "WaitSet is empty");
    }
  }
  
  rclcpp::shutdown();
  return 0;
}

六、最佳实践

6.1 Executor 选择建议

场景 推荐执行器 理由
简单 ROS 2 节点 rclcpp::spin(node) 默认单线程,简单可靠
需要控制执行顺序 SingleThreadedExecutor 顺序执行,可预测
CPU 密集型回调 MultiThreadedExecutor 并发处理,提高吞吐量
多个独立任务 MultiThreadedExecutor + Reentrant 回调组 充分利用多核
嵌入式/资源受限 SingleThreadedExecutor 资源占用少

6.2 WaitSet 使用建议

场景 推荐配置 理由
绝大多数应用 不直接使用 使用 Executor 即可
自定义执行逻辑 WaitSet 灵活控制等待和执行
性能关键场景 StaticWaitSet 固定容量,避免动态分配
多线程环境 ThreadSafeWaitSet 线程安全,防止竞争

6.3 注意事项

  1. 避免回调阻塞:无论是单线程还是多线程 executor,阻塞回调都会影响整体性能
  2. Reentrant 回调组:在 MultiThreadedExecutor 中,只有 Reentrant 类型的回调组才能并发执行
  3. 回调组管理:一个回调组只能关联一个 executor,否则会抛出异常
  4. 资源清理:正确调用 rclcpp::shutdown() 释放资源
  5. 异常处理:在回调中捕获异常,避免传播到 executor 主循环

七、常见问题与解决方案

Q1: 为什么 MultiThreadedExecutor 没有加速我的应用?

原因

  • 回调组类型为 MutuallyExclusive(默认),无法并发执行
  • 回调中存在互斥锁或其他同步机制
  • 任务受 I/O 限制,而非 CPU 限制

解决方案

cpp
// 创建 Reentrant 回调组
auto callback_group = node->create_callback_group(
  rclcpp::CallbackGroupType::MutuallyExclusive  // 改为 Reentrant
);

Q2: 如何优雅地停止 executor?

解决方案

cpp
// 在另一个线程中
executor.cancel();  // 设置 spinning = false,停止 spin()

// 或者使用 Context 关闭
rclcpp::shutdown();  // 关闭整个 ROS 2 上下文

Q3: Executor 如何处理新添加的节点?

机制

  • 使用 notify_waitable_ 监听实体变化
  • 当调用 add_node()add_callback_group() 时,触发 interrupt_guard_condition_
  • 等待中的 wait_for_work() 被唤醒,重新收集实体并重建 WaitSet

八、相关资源

核心文件

文档页面

外部参考


九、总结

概念 关键点
WaitSet 底层等待原语,多路复用监听多个实体
Executor 高层调度器,管理回调执行顺序和并发
关系 Executor 内部使用 WaitSet 实现等待机制
选择 优先使用 Executor,高级场景才直接用 WaitSet
性能 StaticWaitSet > ThreadSafeWaitSet > WaitSet
并发 MultiThreadedExecutor + Reentrant 回调组

核心思想:WaitSet 处理"何时有工作",Executor 处理"如何执行工作"。两者分层设计,各司其职,共同构建了 ROS 2 高效的回调调度系统。

基于 VitePress 构建