DDS Wrapper 设计评审文稿
版本:v 1.0
日期:2026-03-27
文档编号:对应设计文档DESIGN_PARTION.mdv 2.1
一、开场白与议程(约 3 分钟)
大家好。今天的评审主题是 dds_wrapper —— 一个基于 CycloneDDS-CXX 的 C++ 通信中间件封装层。
这次设计的背景很直接:我们的机器人系统目前使用 datacenter-api(底层是 ZMQ/iceoryx)做进程间通信,但随着我们需要接入 ROS 2 生态、并对通信层做更精细的类型控制,我们需要一个独立的、基于 DDS 的通信层。
今天的评审议程分六个部分:
- 背景与动机 — 为什么做、目标是什么
- 架构与接口设计 — 核心类层次、工厂机制
- 线程模型 — 接收线程、独立分发线程
- ROS 2 兼容设计 — 兼容策略与实现细节
- IDL 工具链 — 用户如何编写消息类型
- 迁移成本 — 与现有 datacenter 代码的对照
整个设计文档已完成,实现代码(impl/ 目录)也已完成核心部分。评审的目的是检查设计决策的合理性,以及识别潜在风险点。
二、背景与动机(约 5 分钟)
2.1 现有 datacenter-api 的局限
我们现在的 datacenter-api 存在几个问题,在 DDS 场景下较为突出:
| 问题 | 具体表现 |
|---|---|
| 消息是字符串序列化 | 所有消息走 STRING 类型,运行时才知道数据结构,类型错误在编译期无法发现 |
| 回调必须继承 SPI 基类 | 每个模块都要写 class MySpi : public ISubSpi 这样的样板代码 |
| 与 ZMQ/iceoryx 强绑定 | 有大量 SetZMQContext、EnableLockFree 这类和具体底层强关联的接口 |
| 不具备 ROS 2 互通能力 | datacenter-api 没有 DDS 发现机制,无法直接与 ROS 2 节点通信 |
2.2 设计目标
我们要做的不是重写 datacenter-api,而是在旁边建一个独立的通道:
- 完全独立的命名空间
dds_wrapper,不与datacenter产生编译或链接依赖 - 使用 C++ 模板提供编译期类型安全,消息类型在模板参数里就确定了
- 使用
std::function回调,用户不需要继承任何基类 - API 命名和使用流程(
Create → SetQos → Connect → 操作 → Stop)故意向 datacenter 靠拢,降低迁移成本 - 运行时开关控制 ROS 2 兼容模式,不用编译宏,两种模式代码始终编译
一句话概括:功能对齐 datacenter-api,但底层换成 DDS,类型安全性和 ROS 2 互通是核心增益。
三、架构与接口设计(约 15 分钟)
3.1 整体类层次
整个设计的核心层次只有三层:
IDdsApiBase(独立基类)
├── IDdsPubApi<T> 发布
├── IDdsSubApi<T> 订阅
├── IDdsServiceServerApi<Req,Resp> 服务端
└── IDdsServiceClientApi<Req,Resp> 客户端
DDSApiFactory(单例工厂)
└── 管理所有 API 实例的生命周期 IDdsApiBase 是基类,只保留 DDS 场景必要的接口:Connect、Stop、IsConnected、Name、SetNodeName、EnableRos2Compat。和 datacenter 的 IMsgBusApiBase 相比,删掉了所有 ZMQ/iceoryx 专有方法,比如 SetHost、SetHighWaterMark、EnableLockFree 等。
重要的设计决策:所有接口类都是纯虚的,具体实现在 impl/ 目录下。用户代码只依赖 api/ 目录,不需要看到实现细节。
3.2 PubSub 接口
// 发布端:类型 T 在模板参数里固定
auto pub = DDSApiFactory::Create<IDdsPubApi<ImuMsg>>("imu_pub");
pub->SetDurability();
pub->SetDepth(20);
pub->Connect("/robot/sensor/imu");
pub->PubMsg(msg); // 类型安全,不再传字符串
// 订阅端:回调是 lambda,不需要继承任何基类
auto sub = DDSApiFactory::Create<IDdsSubApi<ImuMsg>>(
"imu_sub",
[](const ImuMsg& msg) { /* 处理 */ });
sub->Connect("/robot/sensor/imu"); 这与 datacenter 的使用模式是一致的,区别只是:消息类型从字符串变成了模板类型。
3.3 Service 接口
Service 分服务端和客户端:
服务端 — Handler 是一个 std::function<StatusCode(const Req&, Resp&)>,在 Create 时传入,框架自动把请求路由过来,Handler 填好 resp 返回就行:
auto server = DDSApiFactory::Create<IDdsServiceServerApi<MyReq, MyResp>>(
"calc_server",
[](const MyReq& req, MyResp& resp) -> StatusCode {
resp.result(req.a() + req.b());
return StatusCode::SUCCESS;
});
server->Connect("calc_service"); 客户端 — 支持同步和异步两种请求方式:
// 同步(阻塞 5 秒)
auto sc = client->Request(req, resp, 5000);
// 异步(立即返回,结果通过回调通知)
client->AsyncRequest(req, [](StatusCode sc, const MyResp& resp) { ... }); header_guid / header_seq 请求关联 是实现 Service 的核心机制。客户端为每个请求生成唯一的 (writer_guid_hash, seq) 组合,服务端把这两个字段原样透传到回复里,客户端用 header_seq 匹配等待的 waiter。这个机制与 ROS 2 的 rmw_cyclonedds_cpp 完全一致,是实现互通的基础。
3.4 工厂机制
DDSApiFactory 是单例工厂,负责:
- 管理
DomainParticipant的生命周期(懒加载,第一次使用时创建) - 幂等创建 API 实例(同名的
Create调用返回同一个shared_ptr) - 统一传递
NodeName和Ros2Compat默认值到新创建的实例
模板路由机制值得提一下。工厂的 Create<T_API>(name) 只有一个模板参数,内部通过 T_API::CreateApi(name) 这个静态方法路由到具体 Impl。这样工厂不需要感知各个子类,扩展新类型只需要在接口类里实现 CreateApi 静态方法。
四、线程模型(约 8 分钟)
4.1 默认模式:Listener 线程直接回调
CycloneDDS 收到数据后,在其内部的 Listener 线程上触发 on_data_available。默认情况下,我们在这个线程里直接调用用户回调/Handler。
规则:用户回调不得长时间阻塞,否则会堵塞整个 DDS 接收通道。
4.2 可选模式:独立分发线程
Sub、ServiceServer、ServiceClient 三类实例都支持 EnableDispatchThread(true):
DDS Listener 线程
└── on_data_available
├── [dispatch=false] → 直接调用 用户回调/Handler
└── [dispatch=true] → 消息入队 → 独立 dispatch 线程 → 调用 用户回调/Handler 开启后,DDS 内部线程只负责把消息/回复入队(std::deque),然后立刻返回;独立的 dispatch 线程负责串行消费队列、调用用户代码。这样无论用户 Handler 执行多久,都不影响 DDS 接收通道。
这个设计的关键约束:
- 三类实例各自有独立的 dispatch 线程,互不干扰
- 同一实例的回调是串行的,不存在并发
- 跨实例的共享数据需要用户自行加锁
何时使用:Handler 需要执行耗时操作(如访问数据库、复杂计算)时开启。纯状态读取或计数器更新等轻量回调无需开启。
五、ROS 2 兼容设计(约 12 分钟)
这部分是整个设计里技术细节最多的,也是我们区别于一般 DDS 封装的关键能力。
5.1 需要解决什么问题
要让我们的代码和 ROS 2 节点互通,有三个要对齐的地方:
| 对齐项 | ROS 2 约定 | 我们的处理方式 |
|---|---|---|
| Topic 命名 | PubSub 加 rt/ 前缀;Service 请求加 rq/,回复加 rr/ | 运行时开关控制,不影响 IDL |
| wire 格式 | header_guid + header_seq 字段做请求关联 | 统一 IDL,两种模式共用 |
| QoS | Service 请求 RELIABLE+KEEP_LAST(10),回复 RELIABLE+KEEP_ALL | 统一策略基类实现 |
| user_data | serviceid=<GUID hex>; 格式 | Ros2CompatHelper 工具类生成 |
关键决策:wire 格式统一。我们没有为两种模式维护两套 IDL,而是让所有 Service 消息都携带 header_guid / header_seq,不管是否开启 ROS 2 兼容模式。这样 IDL 只有一份,差异仅在 Topic 名前缀。
5.2 开关设计
ROS 2 兼容通过运行时开关控制,不使用编译宏:
// 全局开关:所有后续创建的实例自动继承
DDSApiFactory::SetRos2Compat(true);
// 单实例开关(须在 Connect() 前调用)
client->EnableRos2Compat(true); 两级控制允许同一进程里混用两种模式:一部分服务通过 ROS 2 兼容模式与 ROS 2 节点通信,另一部分用默认模式做内部通信。
5.3 策略模式实现 — 这是核心设计亮点
实现 ROS 2 兼容最直观的方式是在 Connect、FillRequestId、HandleReply 里加 if (ros2_compat_) 判断。但我们选择了策略模式:
IServiceProtocol<Req,Resp>(接口)
└── ServiceProtocolBase<Req,Resp>(基类,实现 wire 协议共用部分)
├── DefaultServiceProtocol MakeTopicNames() → {name}Request / {name}Reply
└── Ros2ServiceProtocol MakeTopicNames() → rq/{name}Request / rr/{name}Reply DdsServiceClientApiImpl 持有一个 unique_ptr<IServiceProtocol>。EnableRos2Compat(true) 只做一件事:替换策略对象。
void EnableRos2Compat(bool enable) override {
if (enable)
m_protocol = std::make_unique<Ros2ServiceProtocol<Req, Resp>>();
else
m_protocol = std::make_unique<DefaultServiceProtocol<Req, Resp>>();
} Impl 主干的 Connect、Request、HandleReply 里没有任何 if/else,全部委托给策略对象:
| 步骤 | 委托方法 | 两种策略的差异 |
|---|---|---|
| 生成 Topic 名 | m_protocol->MakeTopicNames() | 唯一差异:有无 rq/rr 前缀 |
| 配置 QoS | m_protocol->ApplyWriterQos/ReaderQos() | 完全相同 |
| 生成 serviceid | m_protocol->Init() | 完全相同 |
| 填充请求 ID | m_protocol->FillRequestId() | 完全相同 |
| 提取回复 ID | m_protocol->ExtractRequestId() | 完全相同 |
这意味着两个策略子类各自只有约 5 行代码,其余全在基类。未来如果需要对接其他协议(比如 Zenoh Bridge),只需继承 ServiceProtocolBase 并按需覆写,不动 Impl 主干。
5.4 Service 兼容时序要点
当客户端开启 ROS 2 兼容时,与 ROS 2 服务端的交互流程是:
Connect("add_two_ints")→ 创建 Topicrq/add_two_intsRequest和rr/add_two_intsReply- DataWriter QoS 的
user_data设置为serviceid=<GUID>;(ROS 2 识别客户端的方式) - 发送请求时,
header_guid= DataWriter GUID 低 8 字节,header_seq= 单调递增序号 - ROS 2 服务端把
header_guid/header_seq原样回传 - 客户端按
header_seq匹配等待的 waiter,唤醒阻塞的Request()调用
整个过程与 rmw_cyclonedds_cpp 的实现完全对齐。
六、IDL 工具链(约 5 分钟)
6.1 标准 PubSub 流程
PubSub 消息使用标准 CycloneDDS IDL 流程:
idlcxx_generate(TARGET my_idl FILES my_msg.idl)
target_link_libraries(my_app PRIVATE my_idl dds_wrapper::dds_wrapper) 用户只需要编写标准 IDL,工具链生成对应的 .hpp 类,直接作为模板参数传给 IDdsPubApi<MyMsg> 即可。
6.2 Service 的 IDL 注入
Service 消息需要包含 header_guid / header_seq 字段,但让用户手写这两个字段既容易出错,又使 IDL 和业务意图耦合。
我们的做法是:用户只写业务字段,构建时由 service_idl_inject.py 自动注入 wire header:
用户 IDL(只含业务字段)
└─→ service_idl_inject.py 注入 header_guid / header_seq
└─→ wire IDL(自动生成,不可手动编辑)
└─→ idlcxx_generate() 生成 .hpp CMake 接口封装为一个宏:
include(service_idl_macros.cmake)
service_idlcxx_generate(
TARGET my_svc_idl
FILES my_service.idl
) 生成的头文件名与原 IDL 同名,#include 路径无需修改,用户完全感知不到中间步骤。
七、与 datacenter-api 的迁移对照(约 5 分钟)
我们刻意让 API 风格向 datacenter 靠拢,迁移的主要变化只有三点:
第一:消息类型从字符串变成模板参数
// 旧代码
auto pub = MBApiFactory::Create<IPubApi>("imu_pub");
pub->PubMsg("/sensor/imu", serialized_string);
// 新代码
auto pub = DDSApiFactory::Create<IDdsPubApi<ImuMsg>>("imu_pub");
pub->PubMsg(msg); 第二:回调从 SPI 继承变成 lambda
// 旧代码
class MySpi : public ISubSpi {
void OnRecvSubMsg(const std::string& topic, const STRING& msg) override { ... }
};
auto sub = MBApiFactory::Create<ISubApi>("imu_sub", &spi);
// 新代码
auto sub = DDSApiFactory::Create<IDdsSubApi<ImuMsg>>(
"imu_sub",
[](const ImuMsg& msg) { ... }); 第三:include 路径换一下
| 旧 include | 新 include |
|---|---|
datacenter/ps_pub_api.h | dds_pub_api.h + dds_pub_api_impl.h |
datacenter/ps_sub_api.h | dds_sub_api.h + dds_sub_api_impl.h |
datacenter/service_client_api.h | dds_service_client_api.h |
datacenter/api_factory.h | dds_factory.h |
使用流程(Create → SetQos → Connect → 操作 → Stop)不变,连方法名都基本一致。
八、已知风险与待决问题(约 5 分钟)
在评审前,我们自己梳理了以下几个需要讨论的点:
风险 1:模板头文件膨胀
所有 Impl 都是模板类,实现全在 .h 里。如果业务代码有大量不同的消息类型,每种类型都会实例化一份 Impl,编译时间和二进制大小可能增加。
缓解措施:显式实例化常用类型(在单独的 .cpp 里),非常见类型按需实例化。
风险 2:CycloneDDS Listener 线程数
每个 DataReader/DataWriter 都会有 Listener。大量实例时 CycloneDDS 内部的线程池压力如何,需要做压力测试。
缓解措施:EnableDispatchThread 设计已隔离业务回调,Listener 线程只做入队,压力主要在 Listener 线程本身的 on_data_available 调用,通常很快。
风险 3:同步 Request 超时后 waiter 泄漏
Request() 超时返回 StatusCode::TIMEOUT 时,对应的 SyncWaiter 已经从 m_sync_waiters 里移除。但如果服务端在超时后才回复,HandleReply 找不到 waiter 会直接丢弃,不会崩溃,但已经没有人消费这个回复,属于正常设计行为。
确认:这个行为是否符合业务预期?还是需要增加超时后的重试机制?
待决问题
- Domain ID 策略:目前
GetParticipant(domain_id=0)使用 Domain 0。是否需要支持多 Domain,或者从配置文件读取 Domain ID? - PubSub 的 ROS 2 兼容是否需要用
rt/前缀以外的行为?目前 PubSub 兼容只处理了 Topic 命名,还没有对齐 ROS 2 的matched_endpoints等机制。 dds_sub.cpp的定位:example/dds_sub.cpp目前看起来是调试用途,是否要保留在 example 目录里?
九、总结(约 2 分钟)
本次设计的核心成果:
| 维度 | 成果 |
|---|---|
| 类型安全 | 全模板接口,消息类型编译期确定,消除运行时字符串序列化 |
| 零继承回调 | std::function 替代 SPI 继承体系,用户代码更简洁 |
| 线程安全 | RAII + shared_ptr,Listener 线程与业务代码隔离,dispatch thread 可选 |
| ROS 2 互通 | wire 格式统一(header_guid/seq),Topic 命名由策略模式控制,运行时切换 |
| 低迁移成本 | API 命名和使用流程向 datacenter 对齐,迁移三步走:换 include、换模板参数、去掉 SPI 继承 |
| 可扩展性 | 策略模式预留扩展接口,新增 wire 协议不改主干代码 |
需要评审委员重点关注和决策的是:
- Domain ID 策略是否明确
- 模板膨胀是否在项目编译资源内可接受
- 超时后丢弃回复的行为是否符合业务预期
感谢各位,进入 Q&A 环节。
附录:常见问题预问答
Q:为什么不直接用 rclcpp(ROS 2 C++ 客户端库)?
A:我们的系统不一定运行完整的 ROS 2 环境,rclcpp 依赖 ROS 2 节点运行时(节点名、参数服务器等)。dds_wrapper 只依赖 CycloneDDS-CXX,在任何嵌入式 Linux 环境都可以运行,也可以在有 ROS 2 的环境里和 ROS 2 节点互通。
Q:EnableDispatchThread 和 SetHandler 的顺序是否有限制?
A:两者都必须在 Connect() 前调用。Connect() 后调用会被忽略,这在接口注释中有明确说明。
Q:PubSub 支持一对多吗?
A:完全支持。DDS 天然是发布-订阅模型,一个 Publisher 可以对应多个 Subscriber,Topic 匹配由 CycloneDDS 自动完成。
Q:DDSApiFactory::Create 的幂等性在多线程下安全吗?
A:安全。工厂内部有 m_mtx 保护 m_api_map,CreateImpl 是原子查-建-注册操作。
Q:IDL 的 module 嵌套结构(module pkg { module srv { module dds_ { ... } } })是 ROS 2 约定吗?
A:是的,这是 rmw_cyclonedds_cpp 要求的命名空间结构,用于类型发现时的匹配。如果只用于内部通信(不对接 ROS 2),可以使用任意 IDL 结构。