笔记重点:理解 rclcpp 中底层等待机制(WaitSet)与高层调度器(Executor)的关系与协作方式
一、WaitSet:底层等待原语
1.1 核心概念
WaitSet 是一个多路复用等待机制,能够同时监听多个 ROS 2 实体,直到其中某个实体"就绪"。
可监听的实体类型:
- Subscription(订阅者):消息到达时触发
- GuardCondition(守卫条件):手动触发或内部事件触发
- Timer(定时器):到期触发
- Client(客户端):服务响应到达
- Service(服务端):服务请求到达
- Waitable(可等待对象):自定义可等待实体
1.2 WaitSet 的三种配置
定义位置:wait_set.hpp
cpp
// 1. 标准配置:动态存储 + 非线程安全(最常用)
using WaitSet = rclcpp::WaitSetTemplate<
rclcpp::wait_set_policies::SequentialSynchronization,
rclcpp::wait_set_policies::DynamicStorage
>;
// 2. 静态配置:固定容量 + 不可变(性能最优)
template<size_t NSub, size_t NGuard, size_t NTimer, ...>
using StaticWaitSet = rclcpp::WaitSetTemplate<
rclcpp::wait_set_policies::SequentialSynchronization,
rclcpp::wait_set_policies::StaticStorage<NSub, NGuard, NTimer, ...>
>;
// 3. 线程安全配置:动态存储 + 线程安全
using ThreadSafeWaitSet = rclcpp::WaitSetTemplate<
rclcpp::wait_set_policies::ThreadSafeSynchronization,
rclcpp::wait_set_policies::DynamicStorage
>; 1.3 WaitSet 工作流程
创建 WaitSet → 添加实体 → wait() 阻塞等待 → 返回就绪实体 → 执行处理 → 重复 关键方法(wait_set_template.hpp):
add_*()系列:添加不同类型的实体remove_*()系列:移除实体wait(timeout):等待任意实体就绪prune_destroyed_entities():清理已销毁的实体
二、Executor:高层调度器
2.1 核心定位
Executor 是 ROS 2 的事件循环调度器,负责:
- 管理节点和回调组
- 使用 WaitSet 等待事件
- 决策并执行回调
- 控制执行顺序和并发
定义位置:executor.hpp
cpp
/// Coordinate the order and timing of available communication tasks.
class Executor {
// 核心方法
virtual void spin() = 0; // 阻塞式旋转
void add_callback_group(...);
void add_node(...);
void spin_once_impl(...);
}; 2.2 Executor 的两种实现
SingleThreadedExecutor(单线程执行器)
cpp
class SingleThreadedExecutor : public rclcpp::Executor {
public:
void spin() override; // 单线程顺序执行所有回调
}; 特点:
- ✅ 执行顺序可预测,无并发问题
- ✅ 资源占用少,适合轻量级应用
- ❌ 无法利用多核并行处理
适用场景:
- 回调执行顺序至关重要的场景
- 资源受限的嵌入式系统
- 需要确定性行为的实时系统
MultiThreadedExecutor(多线程执行器)
cpp
class MultiThreadedExecutor : public rclcpp::Executor {
public:
explicit MultiThreadedExecutor(
const rclcpp::ExecutorOptions & options = rclcpp::ExecutorOptions(),
size_t number_of_threads = 0, // 0 表示使用 CPU 核心数(最少2)
bool yield_before_execute = false,
std::chrono::nanoseconds timeout = std::chrono::nanoseconds(-1)
);
void spin() override; // 多线程并发执行
private:
void run(size_t this_thread_number); // 每个工作线程的执行逻辑
}; 特点:
- 充分利用多核 CPU,提高吞吐量
- 支持 Reentrant 回调组的并发执行
- 需要处理线程同步问题
- 执行顺序不可预测
适用场景:
- CPU 密集型回调处理
- 需要高吞吐量的应用
- 可并发处理的独立任务
三、Executor 内部工作原理
3.1 核心成员变量
cpp
class Executor {
std::atomic_bool spinning; // 旋转状态标志
std::shared_ptr<GuardCondition> interrupt_guard_condition_; // 中断守卫条件
std::shared_ptr<GuardCondition> shutdown_guard_condition_; // 关闭守卫条件
std::mutex mutex_; // 保护共享状态
std::shared_ptr<Context> context_; // ROS 2 上下文
// 实体收集和管理
rclcpp::executors::ExecutorEntitiesCollector collector_;
rclcpp::executors::ExecutorEntitiesCollection current_collection_;
// 核心 WaitSet
rclcpp::WaitSet wait_set_; // 等待集
std::optional<rclcpp::WaitResult<rclcpp::WaitSet>> wait_result_;
// 通知机制
std::shared_ptr<executors::ExecutorNotifyWaitable> notify_waitable_;
std::atomic_bool entities_need_rebuild_; // 是否需要重建实体集
}; 3.2 Executor 工作流程
cpp
void Executor::spin() {
spinning = true;
try {
while (spinning) {
// 步骤 1:收集所有需要监听的实体
collect_entities();
// 步骤 2:使用 WaitSet 等待实体就绪
wait_for_work(timeout);
// 步骤 3:获取并执行准备好的回调
while (get_next_ready_executable(any_executable)) {
execute_any_executable(any_executable);
}
}
} catch (...) {
spinning = false;
throw;
}
} 关键方法:
collect_entities():从节点和回调组收集实体 定义wait_for_work(timeout):阻塞等待事件 定义get_next_ready_executable(any_executable):获取就绪的可执行对象 定义execute_any_executable(any_executable):执行回调
3.3 实体重建机制
Executor 使用 notify_waitable_ 监听实体变化:
cpp
// 当实体发生变化时
void handle_updated_entities(bool notify) {
entities_need_rebuild_ = true; // 标记需要重建
if (notify) {
// 触发 interrupt_guard_condition_,唤醒正在等待的 spin()
}
}
// 在 spin() 中检查并重建
if (entities_need_rebuild_) {
collect_entities(); // 重新收集实体
entities_need_rebuild_ = false;
} 四、WaitSet 与 Executor 的关系
4.1 分层架构
┌─────────────────────────────────────────────────────────────┐
│ 应用层代码 │
│ rclcpp::spin(node) 或 executor.spin() │
└─────────────────────────────────────────────────────────────┘
↓
┌─────────────────────────────────────────────────────────────┐
│ Executor(高层调度器) │
│ • 管理节点和回调组 │
│ • 决策回调执行顺序 │
│ • 控制并发模型(单线程/多线程) │
└─────────────────────────────────────────────────────────────┘
↓
┌─────────────────────────────────────────────────────────────┐
│ WaitSet(底层等待机制) │
│ • 监听多个实体的就绪状态 │
│ • 基于系统调用实现多路复用 │
│ • 封装 rcl_wait_set_t │
└─────────────────────────────────────────────────────────────┘
↓
┌─────────────────────────────────────────────────────────────┐
│ rcl / rmw 层 │
│ • rcl_wait_set_t │
│ • DDS 等中间件通信 │
└─────────────────────────────────────────────────────────────┘ 4.2 职责划分
| 维度 | WaitSet | Executor |
|---|---|---|
| 抽象层次 | 底层等待原语 | 高层调度器 |
| 关注点 | 何时有工作 | 如何执行工作 |
| 功能 | 多路复用等待、实体管理 | 回调执行、顺序控制、并发管理 |
| 使用场景 | 高级自定义执行逻辑 | 绝大多数 ROS 2 应用 |
| 线程安全 | 可选(ThreadSafeWaitSet) | MultiThreadedExecutor |
五、代码示例
5.1 标准使用:通过 Executor(推荐)
cpp
#include <rclcpp/rclcpp.hpp>
int main(int argc, char ** argv) {
rclcpp::init(argc, argv);
// 创建节点
auto node = rclcpp::Node::make_shared("my_node");
// 创建订阅者
auto subscription = node->create_subscription<std_msgs::msg::String>(
"topic", 10,
[](const std_msgs::msg::String::SharedPtr msg) {
RCLCPP_INFO(rclcpp::get_logger("callback"), "Received: '%s'", msg->data.c_str());
});
// 方式 1:使用全局默认 executor(单线程)
rclcpp::spin(node);
// 方式 2:显式创建 SingleThreadedExecutor
rclcpp::executors::SingleThreadedExecutor executor;
executor.add_node(node);
executor.spin();
// 方式 3:使用 MultiThreadedExecutor
rclcpp::executors::MultiThreadedExecutor executor(2); // 2 个线程
executor.add_node(node);
executor.spin();
rclcpp::shutdown();
return 0;
} 5.2 高级使用:直接使用 WaitSet
cpp
#include <rclcpp/rclcpp.hpp>
int main(int argc, char ** argv) {
rclcpp::init(argc, argv);
auto node = rclcpp::Node::make_shared("custom_waitset_node");
// 创建实体
auto subscription = node->create_subscription<std_msgs::msg::String>(
"topic", 10,
[](const std_msgs::msg::String::SharedPtr msg) {
RCLCPP_INFO(rclcpp::get_logger("callback"), "Received: '%s'", msg->data.c_str());
});
auto timer = node->create_wall_timer(
std::chrono::seconds(1),
[]() {
RCLCPP_INFO(rclcpp::get_logger("timer"), "Timer triggered");
});
// 创建自定义 WaitSet
rclcpp::WaitSet wait_set;
wait_set.add_subscription(subscription);
wait_set.add_timer(timer);
// 自定义等待循环
while (rclcpp::ok()) {
// 等待任意实体就绪(超时 1 秒)
auto result = wait_set.wait(std::chrono::seconds(1));
if (result.kind() == rclcpp::WaitResultKind::Ready) {
// 检查并执行就绪的实体
if (result.is_ready(subscription)) {
// 手动获取并执行订阅回调(需要更复杂的逻辑)
RCLCPP_INFO(rclcpp::get_logger("waitset"), "Subscription is ready");
}
if (result.is_ready(timer)) {
// 执行定时器回调
RCLCPP_INFO(rclcpp::get_logger("waitset"), "Timer is ready");
timer->execute_callback();
}
} else if (result.kind() == rclcpp::WaitResultKind::Timeout) {
RCLCPP_INFO(rclcpp::get_logger("waitset"), "Timeout, no work ready");
} else if (result.kind() == rclcpp::WaitResultKind::Empty) {
RCLCPP_INFO(rclcpp::get_logger("waitset"), "WaitSet is empty");
}
}
rclcpp::shutdown();
return 0;
} 六、最佳实践
6.1 Executor 选择建议
| 场景 | 推荐执行器 | 理由 |
|---|---|---|
| 简单 ROS 2 节点 | rclcpp::spin(node) | 默认单线程,简单可靠 |
| 需要控制执行顺序 | SingleThreadedExecutor | 顺序执行,可预测 |
| CPU 密集型回调 | MultiThreadedExecutor | 并发处理,提高吞吐量 |
| 多个独立任务 | MultiThreadedExecutor + Reentrant 回调组 | 充分利用多核 |
| 嵌入式/资源受限 | SingleThreadedExecutor | 资源占用少 |
6.2 WaitSet 使用建议
| 场景 | 推荐配置 | 理由 |
|---|---|---|
| 绝大多数应用 | 不直接使用 | 使用 Executor 即可 |
| 自定义执行逻辑 | WaitSet | 灵活控制等待和执行 |
| 性能关键场景 | StaticWaitSet | 固定容量,避免动态分配 |
| 多线程环境 | ThreadSafeWaitSet | 线程安全,防止竞争 |
6.3 注意事项
- 避免回调阻塞:无论是单线程还是多线程 executor,阻塞回调都会影响整体性能
- Reentrant 回调组:在 MultiThreadedExecutor 中,只有 Reentrant 类型的回调组才能并发执行
- 回调组管理:一个回调组只能关联一个 executor,否则会抛出异常
- 资源清理:正确调用
rclcpp::shutdown()释放资源 - 异常处理:在回调中捕获异常,避免传播到 executor 主循环
七、常见问题与解决方案
Q1: 为什么 MultiThreadedExecutor 没有加速我的应用?
原因:
- 回调组类型为 MutuallyExclusive(默认),无法并发执行
- 回调中存在互斥锁或其他同步机制
- 任务受 I/O 限制,而非 CPU 限制
解决方案:
cpp
// 创建 Reentrant 回调组
auto callback_group = node->create_callback_group(
rclcpp::CallbackGroupType::MutuallyExclusive // 改为 Reentrant
); Q2: 如何优雅地停止 executor?
解决方案:
cpp
// 在另一个线程中
executor.cancel(); // 设置 spinning = false,停止 spin()
// 或者使用 Context 关闭
rclcpp::shutdown(); // 关闭整个 ROS 2 上下文 Q3: Executor 如何处理新添加的节点?
机制:
- 使用
notify_waitable_监听实体变化 - 当调用
add_node()或add_callback_group()时,触发interrupt_guard_condition_ - 等待中的
wait_for_work()被唤醒,重新收集实体并重建 WaitSet
八、相关资源
核心文件
- wait_set.hpp - WaitSet 类型定义
- wait_set_template.hpp - WaitSet 核心实现
- executor.hpp - Executor 基类定义
- single_threaded_executor.hpp - 单线程执行器
- multi_threaded_executor.hpp - 多线程执行器
文档页面
外部参考
- ROS 2 Executor Design - 官方设计文档
九、总结
| 概念 | 关键点 |
|---|---|
| WaitSet | 底层等待原语,多路复用监听多个实体 |
| Executor | 高层调度器,管理回调执行顺序和并发 |
| 关系 | Executor 内部使用 WaitSet 实现等待机制 |
| 选择 | 优先使用 Executor,高级场景才直接用 WaitSet |
| 性能 | StaticWaitSet > ThreadSafeWaitSet > WaitSet |
| 并发 | MultiThreadedExecutor + Reentrant 回调组 |
核心思想:WaitSet 处理"何时有工作",Executor 处理"如何执行工作"。两者分层设计,各司其职,共同构建了 ROS 2 高效的回调调度系统。