Skip to content

DDS Wrapper 设计评审文稿

版本:v 1.0
日期:2026-03-27
文档编号:对应设计文档 DESIGN_PARTION.md v 2.1


一、开场白与议程(约 3 分钟)

大家好。今天的评审主题是 dds_wrapper —— 一个基于 CycloneDDS-CXX 的 C++ 通信中间件封装层。

这次设计的背景很直接:我们的机器人系统目前使用 datacenter-api(底层是 ZMQ/iceoryx)做进程间通信,但随着我们需要接入 ROS 2 生态、并对通信层做更精细的类型控制,我们需要一个独立的、基于 DDS 的通信层

今天的评审议程分六个部分:

  1. 背景与动机 — 为什么做、目标是什么
  2. 架构与接口设计 — 核心类层次、工厂机制
  3. 线程模型 — 接收线程、独立分发线程
  4. ROS 2 兼容设计 — 兼容策略与实现细节
  5. IDL 工具链 — 用户如何编写消息类型
  6. 迁移成本 — 与现有 datacenter 代码的对照

整个设计文档已完成,实现代码(impl/ 目录)也已完成核心部分。评审的目的是检查设计决策的合理性,以及识别潜在风险点。


二、背景与动机(约 5 分钟)

2.1 现有 datacenter-api 的局限

我们现在的 datacenter-api 存在几个问题,在 DDS 场景下较为突出:

问题 具体表现
消息是字符串序列化 所有消息走 STRING 类型,运行时才知道数据结构,类型错误在编译期无法发现
回调必须继承 SPI 基类 每个模块都要写 class MySpi : public ISubSpi 这样的样板代码
与 ZMQ/iceoryx 强绑定 有大量 SetZMQContextEnableLockFree 这类和具体底层强关联的接口
不具备 ROS 2 互通能力 datacenter-api 没有 DDS 发现机制,无法直接与 ROS 2 节点通信

2.2 设计目标

我们要做的不是重写 datacenter-api,而是在旁边建一个独立的通道

  • 完全独立的命名空间 dds_wrapper,不与 datacenter 产生编译或链接依赖
  • 使用 C++ 模板提供编译期类型安全,消息类型在模板参数里就确定了
  • 使用 std::function 回调,用户不需要继承任何基类
  • API 命名和使用流程(Create → SetQos → Connect → 操作 → Stop故意向 datacenter 靠拢,降低迁移成本
  • 运行时开关控制 ROS 2 兼容模式,不用编译宏,两种模式代码始终编译

一句话概括:功能对齐 datacenter-api,但底层换成 DDS,类型安全性和 ROS 2 互通是核心增益


三、架构与接口设计(约 15 分钟)

3.1 整体类层次

整个设计的核心层次只有三层:

IDdsApiBase(独立基类)
 ├── IDdsPubApi<T>                      发布
 ├── IDdsSubApi<T>                      订阅
 ├── IDdsServiceServerApi<Req,Resp>     服务端
 └── IDdsServiceClientApi<Req,Resp>     客户端

DDSApiFactory(单例工厂)
 └── 管理所有 API 实例的生命周期

IDdsApiBase 是基类,只保留 DDS 场景必要的接口:ConnectStopIsConnectedNameSetNodeNameEnableRos2Compat。和 datacenter 的 IMsgBusApiBase 相比,删掉了所有 ZMQ/iceoryx 专有方法,比如 SetHostSetHighWaterMarkEnableLockFree 等。

重要的设计决策:所有接口类都是纯虚的,具体实现在 impl/ 目录下。用户代码只依赖 api/ 目录,不需要看到实现细节。

3.2 PubSub 接口

cpp
// 发布端:类型 T 在模板参数里固定
auto pub = DDSApiFactory::Create<IDdsPubApi<ImuMsg>>("imu_pub");
pub->SetDurability();
pub->SetDepth(20);
pub->Connect("/robot/sensor/imu");
pub->PubMsg(msg);   // 类型安全,不再传字符串

// 订阅端:回调是 lambda,不需要继承任何基类
auto sub = DDSApiFactory::Create<IDdsSubApi<ImuMsg>>(
    "imu_sub",
    [](const ImuMsg& msg) { /* 处理 */ });
sub->Connect("/robot/sensor/imu");

这与 datacenter 的使用模式是一致的,区别只是:消息类型从字符串变成了模板类型。

3.3 Service 接口

Service 分服务端和客户端:

服务端 — Handler 是一个 std::function<StatusCode(const Req&, Resp&)>,在 Create 时传入,框架自动把请求路由过来,Handler 填好 resp 返回就行:

cpp
auto server = DDSApiFactory::Create<IDdsServiceServerApi<MyReq, MyResp>>(
    "calc_server",
    [](const MyReq& req, MyResp& resp) -> StatusCode {
        resp.result(req.a() + req.b());
        return StatusCode::SUCCESS;
    });
server->Connect("calc_service");

客户端 — 支持同步和异步两种请求方式:

cpp
// 同步(阻塞 5 秒)
auto sc = client->Request(req, resp, 5000);

// 异步(立即返回,结果通过回调通知)
client->AsyncRequest(req, [](StatusCode sc, const MyResp& resp) { ... });

header_guid / header_seq 请求关联 是实现 Service 的核心机制。客户端为每个请求生成唯一的 (writer_guid_hash, seq) 组合,服务端把这两个字段原样透传到回复里,客户端用 header_seq 匹配等待的 waiter。这个机制与 ROS 2 的 rmw_cyclonedds_cpp 完全一致,是实现互通的基础。

3.4 工厂机制

DDSApiFactory 是单例工厂,负责:

  1. 管理 DomainParticipant 的生命周期(懒加载,第一次使用时创建)
  2. 幂等创建 API 实例(同名的 Create 调用返回同一个 shared_ptr
  3. 统一传递 NodeNameRos2Compat 默认值到新创建的实例

模板路由机制值得提一下。工厂的 Create<T_API>(name) 只有一个模板参数,内部通过 T_API::CreateApi(name) 这个静态方法路由到具体 Impl。这样工厂不需要感知各个子类,扩展新类型只需要在接口类里实现 CreateApi 静态方法。


四、线程模型(约 8 分钟)

4.1 默认模式:Listener 线程直接回调

CycloneDDS 收到数据后,在其内部的 Listener 线程上触发 on_data_available。默认情况下,我们在这个线程里直接调用用户回调/Handler。

规则:用户回调不得长时间阻塞,否则会堵塞整个 DDS 接收通道。

4.2 可选模式:独立分发线程

SubServiceServerServiceClient 三类实例都支持 EnableDispatchThread(true)

DDS Listener 线程
 └── on_data_available
      ├── [dispatch=false] → 直接调用 用户回调/Handler
      └── [dispatch=true]  → 消息入队 → 独立 dispatch 线程 → 调用 用户回调/Handler

开启后,DDS 内部线程只负责把消息/回复入队std::deque),然后立刻返回;独立的 dispatch 线程负责串行消费队列、调用用户代码。这样无论用户 Handler 执行多久,都不影响 DDS 接收通道。

这个设计的关键约束

  • 三类实例各自有独立的 dispatch 线程,互不干扰
  • 同一实例的回调是串行的,不存在并发
  • 跨实例的共享数据需要用户自行加锁

何时使用:Handler 需要执行耗时操作(如访问数据库、复杂计算)时开启。纯状态读取或计数器更新等轻量回调无需开启。


五、ROS 2 兼容设计(约 12 分钟)

这部分是整个设计里技术细节最多的,也是我们区别于一般 DDS 封装的关键能力。

5.1 需要解决什么问题

要让我们的代码和 ROS 2 节点互通,有三个要对齐的地方:

对齐项 ROS 2 约定 我们的处理方式
Topic 命名 PubSub 加 rt/ 前缀;Service 请求加 rq/,回复加 rr/ 运行时开关控制,不影响 IDL
wire 格式 header_guid + header_seq 字段做请求关联 统一 IDL,两种模式共用
QoS Service 请求 RELIABLE+KEEP_LAST(10),回复 RELIABLE+KEEP_ALL 统一策略基类实现
user_data serviceid=<GUID hex>; 格式 Ros2CompatHelper 工具类生成

关键决策:wire 格式统一。我们没有为两种模式维护两套 IDL,而是让所有 Service 消息都携带 header_guid / header_seq,不管是否开启 ROS 2 兼容模式。这样 IDL 只有一份,差异仅在 Topic 名前缀。

5.2 开关设计

ROS 2 兼容通过运行时开关控制,不使用编译宏

cpp
// 全局开关:所有后续创建的实例自动继承
DDSApiFactory::SetRos2Compat(true);

// 单实例开关(须在 Connect() 前调用)
client->EnableRos2Compat(true);

两级控制允许同一进程里混用两种模式:一部分服务通过 ROS 2 兼容模式与 ROS 2 节点通信,另一部分用默认模式做内部通信。

5.3 策略模式实现 — 这是核心设计亮点

实现 ROS 2 兼容最直观的方式是在 Connect、FillRequestId、HandleReply 里加 if (ros2_compat_) 判断。但我们选择了策略模式

IServiceProtocol<Req,Resp>(接口)
  └── ServiceProtocolBase<Req,Resp>(基类,实现 wire 协议共用部分)
        ├── DefaultServiceProtocol   MakeTopicNames() → {name}Request / {name}Reply
        └── Ros2ServiceProtocol      MakeTopicNames() → rq/{name}Request / rr/{name}Reply

DdsServiceClientApiImpl 持有一个 unique_ptr<IServiceProtocol>EnableRos2Compat(true) 只做一件事:替换策略对象

cpp
void EnableRos2Compat(bool enable) override {
    if (enable)
        m_protocol = std::make_unique<Ros2ServiceProtocol<Req, Resp>>();
    else
        m_protocol = std::make_unique<DefaultServiceProtocol<Req, Resp>>();
}

Impl 主干的 ConnectRequestHandleReply没有任何 if/else,全部委托给策略对象:

步骤 委托方法 两种策略的差异
生成 Topic 名 m_protocol->MakeTopicNames() 唯一差异:有无 rq/rr 前缀
配置 QoS m_protocol->ApplyWriterQos/ReaderQos() 完全相同
生成 serviceid m_protocol->Init() 完全相同
填充请求 ID m_protocol->FillRequestId() 完全相同
提取回复 ID m_protocol->ExtractRequestId() 完全相同

这意味着两个策略子类各自只有约 5 行代码,其余全在基类。未来如果需要对接其他协议(比如 Zenoh Bridge),只需继承 ServiceProtocolBase 并按需覆写,不动 Impl 主干。

5.4 Service 兼容时序要点

当客户端开启 ROS 2 兼容时,与 ROS 2 服务端的交互流程是:

  1. Connect("add_two_ints") → 创建 Topic rq/add_two_intsRequestrr/add_two_intsReply
  2. DataWriter QoS 的 user_data 设置为 serviceid=<GUID>;(ROS 2 识别客户端的方式)
  3. 发送请求时,header_guid = DataWriter GUID 低 8 字节,header_seq = 单调递增序号
  4. ROS 2 服务端把 header_guid / header_seq 原样回传
  5. 客户端按 header_seq 匹配等待的 waiter,唤醒阻塞的 Request() 调用

整个过程与 rmw_cyclonedds_cpp 的实现完全对齐。


六、IDL 工具链(约 5 分钟)

6.1 标准 PubSub 流程

PubSub 消息使用标准 CycloneDDS IDL 流程:

cmake
idlcxx_generate(TARGET my_idl FILES my_msg.idl)
target_link_libraries(my_app PRIVATE my_idl dds_wrapper::dds_wrapper)

用户只需要编写标准 IDL,工具链生成对应的 .hpp 类,直接作为模板参数传给 IDdsPubApi<MyMsg> 即可。

6.2 Service 的 IDL 注入

Service 消息需要包含 header_guid / header_seq 字段,但让用户手写这两个字段既容易出错,又使 IDL 和业务意图耦合。

我们的做法是:用户只写业务字段,构建时由 service_idl_inject.py 自动注入 wire header:

用户 IDL(只含业务字段)
  └─→ service_idl_inject.py 注入 header_guid / header_seq
       └─→ wire IDL(自动生成,不可手动编辑)
            └─→ idlcxx_generate() 生成 .hpp

CMake 接口封装为一个宏:

cmake
include(service_idl_macros.cmake)
service_idlcxx_generate(
    TARGET my_svc_idl
    FILES  my_service.idl
)

生成的头文件名与原 IDL 同名,#include 路径无需修改,用户完全感知不到中间步骤。


七、与 datacenter-api 的迁移对照(约 5 分钟)

我们刻意让 API 风格向 datacenter 靠拢,迁移的主要变化只有三点:

第一:消息类型从字符串变成模板参数

cpp
// 旧代码
auto pub = MBApiFactory::Create<IPubApi>("imu_pub");
pub->PubMsg("/sensor/imu", serialized_string);

// 新代码
auto pub = DDSApiFactory::Create<IDdsPubApi<ImuMsg>>("imu_pub");
pub->PubMsg(msg);

第二:回调从 SPI 继承变成 lambda

cpp
// 旧代码
class MySpi : public ISubSpi {
    void OnRecvSubMsg(const std::string& topic, const STRING& msg) override { ... }
};
auto sub = MBApiFactory::Create<ISubApi>("imu_sub", &spi);

// 新代码
auto sub = DDSApiFactory::Create<IDdsSubApi<ImuMsg>>(
    "imu_sub",
    [](const ImuMsg& msg) { ... });

第三:include 路径换一下

旧 include 新 include
datacenter/ps_pub_api.h dds_pub_api.h + dds_pub_api_impl.h
datacenter/ps_sub_api.h dds_sub_api.h + dds_sub_api_impl.h
datacenter/service_client_api.h dds_service_client_api.h
datacenter/api_factory.h dds_factory.h

使用流程(Create → SetQos → Connect → 操作 → Stop)不变,连方法名都基本一致。


八、已知风险与待决问题(约 5 分钟)

在评审前,我们自己梳理了以下几个需要讨论的点:

风险 1:模板头文件膨胀

所有 Impl 都是模板类,实现全在 .h 里。如果业务代码有大量不同的消息类型,每种类型都会实例化一份 Impl,编译时间和二进制大小可能增加

缓解措施:显式实例化常用类型(在单独的 .cpp 里),非常见类型按需实例化。

风险 2:CycloneDDS Listener 线程数

每个 DataReader/DataWriter 都会有 Listener。大量实例时 CycloneDDS 内部的线程池压力如何,需要做压力测试。

缓解措施EnableDispatchThread 设计已隔离业务回调,Listener 线程只做入队,压力主要在 Listener 线程本身的 on_data_available 调用,通常很快。

风险 3:同步 Request 超时后 waiter 泄漏

Request() 超时返回 StatusCode::TIMEOUT 时,对应的 SyncWaiter 已经从 m_sync_waiters 里移除。但如果服务端在超时后才回复HandleReply 找不到 waiter 会直接丢弃,不会崩溃,但已经没有人消费这个回复,属于正常设计行为。

确认:这个行为是否符合业务预期?还是需要增加超时后的重试机制?

待决问题

  1. Domain ID 策略:目前 GetParticipant(domain_id=0) 使用 Domain 0。是否需要支持多 Domain,或者从配置文件读取 Domain ID?
  2. PubSub 的 ROS 2 兼容是否需要用 rt/ 前缀以外的行为?目前 PubSub 兼容只处理了 Topic 命名,还没有对齐 ROS 2 的 matched_endpoints 等机制。
  3. dds_sub.cpp 的定位example/dds_sub.cpp 目前看起来是调试用途,是否要保留在 example 目录里?

九、总结(约 2 分钟)

本次设计的核心成果:

维度 成果
类型安全 全模板接口,消息类型编译期确定,消除运行时字符串序列化
零继承回调 std::function 替代 SPI 继承体系,用户代码更简洁
线程安全 RAII + shared_ptr,Listener 线程与业务代码隔离,dispatch thread 可选
ROS 2 互通 wire 格式统一(header_guid/seq),Topic 命名由策略模式控制,运行时切换
低迁移成本 API 命名和使用流程向 datacenter 对齐,迁移三步走:换 include、换模板参数、去掉 SPI 继承
可扩展性 策略模式预留扩展接口,新增 wire 协议不改主干代码

需要评审委员重点关注和决策的是:

  • Domain ID 策略是否明确
  • 模板膨胀是否在项目编译资源内可接受
  • 超时后丢弃回复的行为是否符合业务预期

感谢各位,进入 Q&A 环节。


附录:常见问题预问答

Q:为什么不直接用 rclcpp(ROS 2 C++ 客户端库)?
A:我们的系统不一定运行完整的 ROS 2 环境,rclcpp 依赖 ROS 2 节点运行时(节点名、参数服务器等)。dds_wrapper 只依赖 CycloneDDS-CXX,在任何嵌入式 Linux 环境都可以运行,也可以在有 ROS 2 的环境里和 ROS 2 节点互通。

Q:EnableDispatchThreadSetHandler 的顺序是否有限制?
A:两者都必须在 Connect() 前调用。Connect() 后调用会被忽略,这在接口注释中有明确说明。

Q:PubSub 支持一对多吗?
A:完全支持。DDS 天然是发布-订阅模型,一个 Publisher 可以对应多个 Subscriber,Topic 匹配由 CycloneDDS 自动完成。

Q:DDSApiFactory::Create 的幂等性在多线程下安全吗?
A:安全。工厂内部有 m_mtx 保护 m_api_mapCreateImpl 是原子查-建-注册操作。

Q:IDL 的 module 嵌套结构(module pkg { module srv { module dds_ { ... } } })是 ROS 2 约定吗?
A:是的,这是 rmw_cyclonedds_cpp 要求的命名空间结构,用于类型发现时的匹配。如果只用于内部通信(不对接 ROS 2),可以使用任意 IDL 结构。

基于 VitePress 构建