DDS Wrapper 设计文档
版本:v 2.1 日期:2026-03-27 依据:
PRD.md— Cyclone DDS C++ 封装层需求规格说明书
目录
- 1. 概述
- 2. 目录结构
- 3. 类图
- 4. 接口说明
- 5. 流程图
- 6. ROS 2 兼容设计
- 7. IDL 工具链与 CMake 设计
- 8. QoS 支持矩阵
- 9. Topic 命名规则
- 10. CMake 构建设计
- 11. 与 datacenter-api 迁移对照
- 12. 典型使用示例
1. 概述
1.1 设计目标
构建一个完全独立于 datacenter api、但 API 使用风格兼容 datacenter api 使用习惯的纯 C++ 通信中间件封装层,底层基于 CycloneDDS-CXX 实现。
| 条目 | 决策 |
|---|---|
| 语言 | C++17(std::optional、std::variant、structured bindings、if constexpr) |
| 命名空间 | dds_wrapper(完全独立于 datacenter) |
| 消息格式 | 纯模板 + 标准 IDL(.idl 文件),支持任意 IDL 生成类型 |
| 回调模型 | std::function,不使用 SPI 继承体系 |
| 错误处理 | 普通操作返回 bool;关键操作返回 StatusCode 枚举 |
| 构建系统 | CMake,支持 find_package(dds_wrapper REQUIRED) 导出 |
1.2 设计约束
| 约束 | 说明 |
|---|---|
| 基类依赖 | 不继承任何 datacenter 基类;自定义 IDdsApiBase |
| QoS 配置 | 按角色区分为 DdsWriterQosProfile / DdsReaderQosProfile,提供 SetQos + 便捷方法 |
| 线程模型 | 使用 Cyclone DDS Listener 接收;Sub / ServiceServer / ServiceClient 均可按实例启用独立分发线程(默认关闭) |
| ROS 2 兼容 | 运行时 API 开关(EnableRos2Compat),开启后 Topic 命名与 IDL 类型结构均兼容 ROS 2 |
| RAII | 严格使用 std::shared_ptr / std::unique_ptr,禁止裸 new / delete |
1.3 与 datacenter-api 的关系
datacenter-api(ZMQ/iceoryx) dds-wrapper(CycloneDDS)
┌──────────────────────────┐ ┌───────────────────────────┐
│ namespace datacenter │ │ namespace dds_wrapper │
│ IMsgBusApiBase (基类) │ ≈ │ IDdsApiBase (独立基类) │
│ SPI 继承体系 (回调) │ → │ std::function (回调) │
│ STRING 序列化消息 │ → │ 模板 T (类型化消息) │
│ MBApiFactory (工厂) │ ≈ │ DDSApiFactory (工厂) │
│ IPubApi / ISubApi │ ≈ │ IDdsPubApi<T> / IDdsSubApi<T> │
│ IServiceClientApi │ ≈ │ IDdsServiceClientApi<Req,Resp>│
└──────────────────────────┘ └───────────────────────────┘ 核心差异:
dds_wrapper使用 C++ 模板提供编译期类型安全,不再依赖运行时字符串序列化- 回调使用
std::function值语义,无需用户继承 SPI 基类 - 完全独立的命名空间,不与 datacenter 产生编译或链接依赖
- API 命名和使用流程(
Create → SetQos → Connect → 操作 → Stop)与 datacenter 保持一致,降低迁移成本
2. 目录结构
dds-api/
├── CMakeLists.txt # 库构建 + 安装导出
├── cmake/
│ ├── service_idl_inject.py # Service IDL 字段注入脚本
│ └── service_idl_macros.cmake # service_idlcxx_generate() 宏
├── docs/
│ ├── PRD.md # 需求规格说明书
│ ├── DESIGN_PARTION.md # 设计文档(当前)
│ ├── DESIGN_V2.md # 历史版本设计文档
│ └── TODO.md # 待办跟踪
├── api/ # 对外公开接口头文件(零 datacenter 依赖)
│ ├── dds_status_code.h # StatusCode 枚举
│ ├── dds_qos.h # Writer/Reader QoS 结构体
│ ├── dds_api_base.h # IDdsApiBase — 独立基类
│ ├── dds_pub_api.h # IDdsPubApi<T>
│ ├── dds_sub_api.h # IDdsSubApi<T>(std::function 回调)
│ ├── dds_factory.h # DDSApiFactory — 工厂 + 模板实现
│ ├── dds_service_server_api.h # IDdsServiceServerApi<Req,Resp>
│ └── dds_service_client_api.h # IDdsServiceClientApi<Req,Resp>
├── impl/ # 实现(模板全量 .h + 工厂 .cpp)
│ ├── dds_pub_api_impl.h # DdsPubApiImpl<T> + DdsPubListener<T>
│ ├── dds_sub_api_impl.h # DdsSubApiImpl<T> + DdsSubListener<T>
│ ├── dds_service_server_api_impl.h # DdsServiceServerApiImpl<Req,Resp>
│ ├── dds_service_client_api_impl.h # DdsServiceClientApiImpl<Req,Resp>
│ ├── dds_service_protocol.h # IServiceProtocol<Req,Resp> 策略接口
│ ├── dds_service_protocol_default.h # DefaultServiceProtocol — 原生模式策略
│ ├── dds_service_protocol_ros2.h # Ros2ServiceProtocol — ROS2 兼容策略
│ ├── dds_ros2_compat.h # Ros2CompatHelper 工具(serviceid/GUID 等)
│ └── dds_factory.cpp # DDSApiFactory 非模板实现
└── example/
├── CMakeLists.txt # 示例工程构建
├── idl/
│ ├── test_msg.idl # PubSub 示例消息
│ ├── service_test_msg.idl # Service 示例消息
│ ├── ros_topic.idl # ROS2 Topic 示例消息
│ └── ros2_add_ints.idl # ROS2 Service 示例消息
├── example_topic_pub.cpp
├── example_topic_sub.cpp
├── example_service_server.cpp
├── example_service_client.cpp
├── example_ros2_topic_pub.cpp
├── example_ros2_topic_sub.cpp
├── example_ros2_service_server.cpp
└── example_ros2_service_client.cpp 3. 类图
3.1 整体类层次
classDiagram
direction TB
class IDdsApiBase {
<<abstract>>
+Connect(topic_name: string) bool
+Stop() void
+IsConnected() bool
+Name() string
+SetNodeName(node_name: string) void
+EnableRos2Compat(enable: bool) void
}
class IDdsPubApi~T~ {
<<abstract>>
+PubMsg(msg: T) bool
+SubNotify(cb: function) void
+UnSubNotify(cb: function) void
+SetDurability() void
+SetReliability(reliable: bool) void
+SetDepth(depth: uint32) bool
+SetKeepAll() void
+SetQos(qos: DdsWriterQosProfile) bool
+GetQos() DdsWriterQosProfile
}
class IDdsSubApi~T~ {
<<abstract>>
+MsgCallback: function~void(T)~
+EnableDispatchThread(enable: bool) void
+SetDurability() void
+SetReliability(reliable: bool) void
+SetDepth(depth: uint32) bool
+SetKeepAll() void
+SetQos(qos: DdsReaderQosProfile) bool
+GetQos() DdsReaderQosProfile
}
class IDdsServiceServerApi~Req_Resp~ {
<<abstract>>
+Handler: function~StatusCode(Req, Resp)~
+EnableDispatchThread(enable: bool) void
+SetHandler(handler: Handler) void
}
class IDdsServiceClientApi~Req_Resp~ {
<<abstract>>
+RespCallback: function~void(StatusCode, Resp)~
+EnableDispatchThread(enable: bool) void
+Request(req: Req, resp: Resp, timeout_ms: int) StatusCode
+AsyncRequest(req: Req, cb: RespCallback) StatusCode
}
class DDSApiFactory {
<<singleton>>
+GetInstance() DDSApiFactory
+GetParticipant(domain_id: uint32) shared_ptr
+ReleaseParticipant() void
+Create~T_API~(name: string) shared_ptr~T_API~
+Create~T_API~(name: string, cb: MsgCallback) shared_ptr~T_API~
+Get(name: string) DdsApiBasePtr
+Release(name: string) void
+Clear() void
+SetNodeName(node_name: string) void
+SetRos2Compat(enable: bool) void
}
IDdsApiBase <|-- IDdsPubApi
IDdsApiBase <|-- IDdsSubApi
IDdsApiBase <|-- IDdsServiceServerApi
IDdsApiBase <|-- IDdsServiceClientApi
DDSApiFactory ..> IDdsApiBase : creates 3.2 PubSub 类图
classDiagram
direction TB
class `NoOpDataWriterListener~T~` {
<<abstract>>
+on_publication_matched(writer, status) void
}
class `NoOpDataReaderListener~T~` {
<<abstract>>
+on_data_available(reader) void
}
class IDdsPubApi~T~ {
<<interface>>
+PubMsg(msg: T) bool
+SubNotify(cb) void
+UnSubNotify(cb) void
+SetDurability() void
+SetReliability(reliable: bool) void
+SetDepth(depth: uint32) bool
+SetKeepAll() void
+SetQos(qos: DdsWriterQosProfile) bool
+GetQos() DdsWriterQosProfile
#CreateApi(name: string)$ shared_ptr
}
class DdsPubApiImpl~T~ {
-m_name: string
-m_node_name: string
-m_connected: atomic~bool~
-m_pub_mtx: mutex
-m_qos: DdsWriterQosProfile
-m_sub_cb: function
-m_unsub_cb: function
-m_topic: Topic~T~
-m_publisher: Publisher
-m_writer: DataWriter~T~
-m_listener: unique_ptr~DdsPubListener~
-ApplyQos(wqos: DataWriterQos) void
+CreateApiImpl(name: string)$ shared_ptr
}
class DdsPubListener~T~ {
-m_sub_cb: function
-m_unsub_cb: function
-m_topic_name: string
+on_publication_matched(writer, status) void
}
class IDdsSubApi~T~ {
<<interface>>
+MsgCallback: function~void(T)~
+EnableDispatchThread(enable: bool) void
+SetDurability() void
+SetReliability(reliable: bool) void
+SetDepth(depth: uint32) bool
+SetKeepAll() void
+SetQos(qos: DdsReaderQosProfile) bool
+GetQos() DdsReaderQosProfile
#CreateApi(name: string, cb: MsgCallback)$ shared_ptr
}
class DdsSubApiImpl~T~ {
-m_name: string
-m_node_name: string
-m_connected: atomic~bool~
-m_sub_mtx: mutex
-m_cb_mtx: mutex
-m_callback: MsgCallback
-m_enable_dispatch_thread: bool
-m_dispatch_thread: thread
-m_msg_queue: deque~T~
-m_queue_mtx: mutex
-m_queue_cv: condition_variable
-m_qos: DdsReaderQosProfile
-m_reader: DataReader~T~
-m_listener: unique_ptr~DdsSubListener~
-DispatchLoop() void
-EnqueueMessage(msg: T) void
-ApplyQos(rqos: DataReaderQos) void
+CreateApiImpl(name: string, cb: MsgCallback)$ shared_ptr
}
class DdsSubListener~T~ {
-m_callback: MsgCallback
-m_use_dispatch_thread: bool
-m_dispatch_fn: DispatchFn
+on_data_available(reader) void
}
IDdsPubApi <|.. DdsPubApiImpl : implements
DdsPubApiImpl --> DdsPubListener : owns
DdsPubListener --|> `NoOpDataWriterListener~T~` : extends
IDdsSubApi <|.. DdsSubApiImpl : implements
DdsSubApiImpl --> DdsSubListener : owns
DdsSubListener --|> `NoOpDataReaderListener~T~` : extends 3.3 Service 类图
classDiagram
direction TB
class `NoOpDataReaderListener~T~` {
<<abstract>>
+on_data_available(reader) void
}
class DdsServiceClientListener~Resp~ {
-m_dispatch_fn: function
-m_async_waiters: map~string, RespCallback~
+on_data_available(reader) void
}
class DdsServiceClientWriterListener {
-m_match_cv: condition_variable
-m_match_ready: atomic~bool~
+on_publication_matched(writer, status) void
}
class SyncWaiter~Resp~ {
+mtx: mutex
+cv: condition_variable
+ready: bool
+resp: Resp
}
class IDdsServiceServerApi~Req_Resp~ {
<<interface>>
+Handler: function~StatusCode(Req, Resp)~
+MsgCallback: Handler
+EnableDispatchThread(enable: bool) void
+SetHandler(handler: Handler) void
#CreateApi(name, handler)$ shared_ptr
}
class DdsServiceServerApiImpl~Req_Resp~ {
-m_name: string
-m_connected: atomic~bool~
-m_mtx: mutex
-m_handler: Handler
-m_enable_dispatch_thread: bool
-m_req_queue: deque~Req~
-m_queue_mtx: mutex
-m_dispatch_running: bool
-m_dispatch_thread: thread
-m_req_reader: DataReader~Req~
-m_rep_writer: DataWriter~Resp~
-m_listener: unique_ptr~`DdsServiceServerListener~Req~`~
-HandleRequest(req: Req) void
-EnqueueRequest(req: Req) void
-DispatchLoop() void
}
class DdsServiceServerListener~Req~ {
-m_dispatch_fn: function
+on_data_available(reader) void
}
class IDdsServiceClientApi~Req_Resp~ {
<<interface>>
+RespCallback: function~void(StatusCode, Resp)~
+EnableDispatchThread(enable: bool) void
+Request(req, resp, timeout_ms) StatusCode
+AsyncRequest(req, cb) StatusCode
#CreateApi(name)$ shared_ptr
}
class DdsServiceClientApiImpl~Req_Resp~ {
-m_name: string
-m_connected: atomic~bool~
-m_mtx: mutex
-m_sync_waiters: map~string, shared_ptr~`SyncWaiter~Resp~`~~
-m_async_waiters: map~string, RespCallback~
-m_enable_dispatch_thread: bool
-m_reply_queue: deque~Resp~
-m_reply_queue_mtx: mutex
-m_dispatch_running: bool
-m_dispatch_thread: thread
-m_req_writer: DataWriter~Req~
-m_rep_reader: DataReader~Resp~
-m_match_mtx: mutex
-m_match_cv: condition_variable
-m_match_ready: bool
-HandleReply(resp: Resp) void
-EnqueueReply(resp: Resp) void
-DispatchLoop() void
-WaitForMatchReady(timeout_ms: int) bool
}
`NoOpDataReaderListener~T~` <|.. DdsServiceServerListener : 对于Req类型
IDdsServiceServerApi <|.. DdsServiceServerApiImpl : implements
DdsServiceServerApiImpl --> DdsServiceServerListener : owns
DdsServiceServerListener --|> `NoOpDataReaderListener~Req~` : extends
IDdsServiceClientApi <|.. DdsServiceClientApiImpl : implements
DdsServiceClientApiImpl --> `SyncWaiter~Resp~` : manages
DdsServiceClientApiImpl --> DdsServiceClientListener : owns
DdsServiceClientApiImpl --> DdsServiceClientWriterListener : owns 3.5 Factory 类图
classDiagram
direction TB
class DDSApiFactory {
<<singleton>>
-m_mtx: mutex
-m_api_map: unordered_map~string, DdsApiBasePtr~
-m_participant: shared_ptr~DomainParticipant~
-m_participant_mtx: mutex
-m_domain_id: uint32
-m_node_name: string
-m_ros2_compat: bool
-DDSApiFactory()
-~DDSApiFactory()
-CreateImpl(name, creator)$ DdsApiBasePtr
+GetInstance()$ DDSApiFactory
+GetParticipant(domain_id)$ shared_ptr
+ReleaseParticipant()$ void
+Create~T_API~(name)$ shared_ptr~T_API~
+Create~T_API~(name, cb)$ shared_ptr~T_API~
+Get(name)$ DdsApiBasePtr
+Get~T~(name)$ shared_ptr~T~
+Release(name)$ void
+Clear()$ void
+SetNodeName(node_name)$ void
+SetRos2Compat(enable: bool)$ void
}
class IDdsApiBase {
<<abstract>>
}
class IDdsPubApi~T~ {
#CreateApi(name)$ shared_ptr
}
class IDdsSubApi~T~ {
#CreateApi(name, cb)$ shared_ptr
}
class IDdsServiceServerApi~Req_Resp~ {
#CreateApi(name, handler)$ shared_ptr
}
class IDdsServiceClientApi~Req_Resp~ {
#CreateApi(name)$ shared_ptr
}
DDSApiFactory ..> IDdsPubApi : Create
DDSApiFactory ..> IDdsSubApi : Create
DDSApiFactory ..> IDdsServiceServerApi : Create
DDSApiFactory ..> IDdsServiceClientApi : Create
DDSApiFactory o-- IDdsApiBase : manages 4. 接口说明
4.1 StatusCode — 错误码
namespace dds_wrapper {
enum class StatusCode : int {
SUCCESS = 0, // 操作成功
INVALID_ARGUMENT = -1, // 参数非法(如 depth > 100)
NOT_CONNECTED = -2, // 未 Connect() 就进行操作
DDS_ERROR = -3, // Cyclone DDS 底层异常
TIMEOUT = -4, // 操作超时
DUPLICATE_NAME = -5, // 工厂中同名 API 已存在
};
inline bool ok(StatusCode c) noexcept { return c == StatusCode::SUCCESS; }
} // namespace dds_wrapper 4.2 QoS 配置结构体
namespace dds_wrapper {
enum class DdsHistoryKind : uint8_t { KEEP_LAST = 0, KEEP_ALL = 1 };
enum class DdsReliabilityKind : uint8_t { RELIABLE = 0, BEST_EFFORT = 1 };
enum class DdsDurabilityKind : uint8_t { VOLATILE = 0, TRANSIENT_LOCAL = 1 };
struct DdsWriterQosProfile {
DdsHistoryKind history {DdsHistoryKind::KEEP_LAST};
uint32_t depth {10};
DdsReliabilityKind reliability{DdsReliabilityKind::RELIABLE};
DdsDurabilityKind durability {DdsDurabilityKind::VOLATILE};
};
struct DdsReaderQosProfile {
DdsHistoryKind history {DdsHistoryKind::KEEP_LAST};
uint32_t depth {10};
DdsReliabilityKind reliability{DdsReliabilityKind::RELIABLE};
DdsDurabilityKind durability {DdsDurabilityKind::VOLATILE};
};
} // namespace dds_wrapper 4.3 IDdsApiBase — 基类
替代 IMsgBusApiBase,只保留 DDS 场景需要的接口。
namespace dds_wrapper {
class IDdsApiBase {
public:
virtual ~IDdsApiBase() = default;
virtual bool Connect(const std::string& topic_name) = 0;
virtual void Stop() = 0;
virtual bool IsConnected() const = 0;
virtual std::string Name() const = 0;
virtual void SetNodeName(const std::string& node_name) = 0;
/**
* @brief 启用/关闭 Service Topic 自动加 rq/rr 前缀(须在 Connect() 前调用)
*
* 仅影响 Topic 命名规则:
* 开启:rq/{name}Request / rr/{name}Reply
* 关闭:{name}Request / {name}Reply
* IDL 关联字段(header_guid/header_seq)、QoS、user_data 两种模式完全一致。
* PubSub 不受影响。默认关闭。
*/
virtual void EnableRos2Compat(bool enable) = 0;
};
} // namespace dds_wrapper 与 datacenter IMsgBusApiBase 对比 — 删除的方法:
| 旧方法 | 删除原因 |
|---|---|
SetHost(ip, port) | ZMQ/网络地址概念,DDS 通过 DomainParticipant 发现 |
SetHighWaterMark(hwm) | ZMQ 专有,DDS 用 QoS depth 代替 |
SetMsgExpires(ms) | ZMQ 专有 |
EnableLockFree() | 共享内存专有 |
SetLoopTimeout(ms) | epoll 轮询概念,DDS 使用 Listener 模型 |
SetZMQContext(ctx) | ZMQ 专有 |
Release() | 与 Stop() 语义合并 |
Join() | DDS Listener 无需 join |
Type() / MBApiType | datacenter 内部分类,不暴露 |
GetMonitoringInfo() | 监控序列化,后续扩展 |
4.4 IDdsPubApi<T> — 发布接口
namespace dds_wrapper {
template <typename T>
class IDdsPubApi : public IDdsApiBase {
public:
virtual ~IDdsPubApi() override = default;
// ── 核心操作 ──
virtual bool PubMsg(const T& msg) = 0;
// ── 订阅者上/下线通知(Connect 前注册)──
virtual void SubNotify(std::function<void(const std::string& topic)> cb) = 0;
virtual void UnSubNotify(std::function<void(const std::string& topic)> cb) = 0;
// ── QoS 配置(Connect 前调用)──
virtual void SetDurability() = 0;
virtual void SetReliability(bool reliable) = 0;
virtual bool SetDepth(uint32_t depth) = 0; // depth > 100 返回 false
virtual void SetKeepAll() = 0;
virtual bool SetQos(const DdsWriterQosProfile& qos) = 0;
virtual DdsWriterQosProfile GetQos() const = 0;
protected:
static std::shared_ptr<IDdsPubApi<T>> CreateApi(const std::string& name);
};
} // namespace dds_wrapper 对应 datacenter IPubApi 接口映射:
datacenter IPubApi | dds_wrapper IDdsPubApi<T> | 说明 |
|---|---|---|
PubMsg(topic, STRING) | PubMsg(T) | Topic 在 Connect 时绑定;消息类型化 |
SubNotify(cb) | SubNotify(cb) | 1:1 |
UnSubNotify(cb) | UnSubNotify(cb) | 1:1 |
SetDurability() | SetDurability() | 1:1 |
SetDepth(n) | SetDepth(n) | 1:1 |
SetKeepAll() | SetKeepAll() | 1:1 |
RegisterSpi(spi*) | 删除 | 使用 std::function |
| — | SetReliability(bool) | 新增 |
| — | SetQos(profile) / GetQos() | 新增,统一 QoS 配置 |
4.5 IDdsSubApi<T> — 订阅接口
namespace dds_wrapper {
template <typename T>
class IDdsSubApi : public IDdsApiBase {
public:
using MsgCallback = std::function<void(const T& msg)>;
virtual ~IDdsSubApi() override = default;
// ── 线程模型(Connect 前调用)──
virtual void EnableDispatchThread(bool enable = true) = 0;
// ── QoS 配置(Connect 前调用)──
virtual void SetDurability() = 0;
virtual void SetReliability(bool reliable) = 0;
virtual bool SetDepth(uint32_t depth) = 0;
virtual void SetKeepAll() = 0;
virtual bool SetQos(const DdsReaderQosProfile& qos) = 0;
virtual DdsReaderQosProfile GetQos() const = 0;
protected:
static std::shared_ptr<IDdsSubApi<T>> CreateApi(
const std::string& name, MsgCallback cb = nullptr);
};
} // namespace dds_wrapper 对应 datacenter ISubApi + ISubSpi 接口映射:
| datacenter | dds_wrapper | 说明 |
|---|---|---|
继承 ISubSpi,重写 OnRecvSubMsg(topic, msg) | Create<SubApi>(name, [](const T& msg){...}) | lambda 回调替代 SPI 继承 |
SubMsg(topic) | 在 Connect(topic) 时隐式订阅 | 简化:一个 Sub 对应一个 Topic |
UnSubMsg(topic) | Stop() | DDS 模型无动态订阅/退订 |
RegisterSpi(spi*) | 删除 | 回调在 Create 时传入,不可运行时替换 |
| — | EnableDispatchThread(true) | 新增,可选独立分发线程 |
4.6 IDdsServiceServerApi<Req,Resp> — 服务端
namespace dds_wrapper {
template <typename Req, typename Resp>
class IDdsServiceServerApi : public IDdsApiBase {
public:
using Handler = std::function<StatusCode(const Req& req, Resp& resp)>;
using MsgCallback = Handler; // 复用工厂 Create(name, cb) 签名
virtual ~IDdsServiceServerApi() override = default;
// 开启后 DDS 监听线程只做入队,独立 dispatch 线程串行调用 Handler
virtual void EnableDispatchThread(bool enable) = 0;
virtual void SetHandler(Handler handler) = 0; // Connect 前有效
protected:
static std::shared_ptr<IDdsServiceServerApi<Req, Resp>>
CreateApi(const std::string& name, Handler handler);
};
} // namespace dds_wrapper Req/Resp IDL 约束: Req 和 Resp 类型须包含 uint64 header_guid + int64 header_seq 字段(统一 wire 格式,与 ROS 2 一致)。框架自动填充/透传这两个字段,用户业务代码无需关心。
对应 datacenter IServiceServerApi 映射:
| datacenter | dds_wrapper | 说明 |
|---|---|---|
继承 IServiceServerSpi,重写 OnRequest(msg, ctx) | Create<ServerApi>(name, handler) | handler 返回 StatusCode |
RepMsg(rsp, ctx) | Handler 中直接填写 Resp& resp | 回复由框架自动发送 |
EnableLockFree() | 删除 | DDS 无此概念 |
4.7 IDdsServiceClientApi<Req,Resp> — 客户端
namespace dds_wrapper {
template <typename Req, typename Resp>
class IDdsServiceClientApi : public IDdsApiBase {
public:
using RespCallback = std::function<void(StatusCode, const Resp&)>;
virtual ~IDdsServiceClientApi() override = default;
// 开启后 DDS 监听线程只做入队,独立 dispatch 线程处理回复并调用异步回调
virtual void EnableDispatchThread(bool enable) = 0;
// 同步请求(阻塞,header_guid/header_seq 由 API 内部自动生成)
virtual StatusCode Request(const Req& req, Resp& resp, int timeout_ms = 5000) = 0;
// 异步请求(非阻塞,结果回调)
virtual StatusCode AsyncRequest(const Req& req, RespCallback cb) = 0;
protected:
static std::shared_ptr<IDdsServiceClientApi<Req, Resp>>
CreateApi(const std::string& name);
};
} // namespace dds_wrapper 客户端匹配策略:
Connect()后短窗口预热匹配(1 s)Request()/AsyncRequest()发送前再次等待匹配就绪- 请求关联 ID 由
header_guid(DataWriter GUID hash)+header_seq(单调递增)自动生成
对应 datacenter IServiceClientApi 映射:
| datacenter | dds_wrapper | 说明 |
|---|---|---|
ReqMsgSync(req, rsp, timeout) | Request(req, resp, timeout_ms) | 返回 StatusCode 而非 bool |
ReqMsgAsync(req, request_id) + SPI OnRspMsg | AsyncRequest(req, cb) | lambda 回调替代 SPI |
EnableAsync(bool) | 删除 | 同步/异步由调用方法决定 |
| — | EnableDispatchThread(true) | 新增,可选独立分发线程(Client/Server 均支持) |
4.10 DDSApiFactory — 工厂
namespace dds_wrapper {
using DdsApiBasePtr = std::shared_ptr<IDdsApiBase>;
class DDSApiFactory {
public:
static DDSApiFactory& GetInstance();
// ── DomainParticipant 管理 ──
static std::shared_ptr<dds::domain::DomainParticipant>
GetParticipant(uint32_t domain_id = 0);
static void ReleaseParticipant();
// ── API 创建(幂等,同名返回已有实例)──
template <typename T_API>
static std::shared_ptr<T_API> Create(const std::string& name);
template <typename T_API>
static std::shared_ptr<T_API> Create(const std::string& name,
typename T_API::MsgCallback cb);
// ── API 查找 ──
static DdsApiBasePtr Get(const std::string& name);
template <typename T>
static std::shared_ptr<T> Get(const std::string& name);
// ── API 释放 ──
static void Release(const std::string& name);
static void Clear();
static void SetNodeName(const std::string& node_name);
/**
* @brief 设置全局 ROS2 兼容默认值
*
* 后续通过 Create() 创建的 API 实例将自动继承此设置。
* 已创建的实例不受影响。
* 单个 API 实例仍可通过 EnableRos2Compat() 覆盖。
*/
static void SetRos2Compat(bool enable);
};
} // namespace dds_wrapper Create 模板路由机制:
| API 类型 | Create 调用 | 内部路由 |
|---|---|---|
IDdsPubApi<T> | Create<IDdsPubApi<T>>("name") | → IDdsPubApi<T>::CreateApi(name) |
IDdsSubApi<T> | Create<IDdsSubApi<T>>("name", cb) | → IDdsSubApi<T>::CreateApi(name, cb) |
IDdsServiceServerApi<Req,Resp> | Create<...>("name", handler) | → CreateApi(name, handler) |
IDdsServiceClientApi<Req,Resp> | Create<...>("name") | → CreateApi(name) |
对应 datacenter MBApiFactory 映射:
datacenter MBApiFactory | dds_wrapper DDSApiFactory | 说明 |
|---|---|---|
GetZMQContext() | GetParticipant(domain_id) | 通信基础设施管理 |
Create<T>(name, spi*) | Create<T>(name) / Create<T>(name, cb) | 模板参数即接口类型 |
Get<T>(name) | Get<T>(name) | 1:1 |
Release(name) | Release(name) | 1:1 |
Clear(ignore_list) | Clear() | 删除 ignore_list |
SetNodeName(name) | SetNodeName(name) | 1:1 |
| — | SetRos2Compat(bool) | 新增:全局 ROS 2 兼容默认值 |
InitRuntime(app_name) | 删除 | iceoryx 专有 |
5. 流程图
5.1 PubSub 通信时序
sequenceDiagram
participant UserPub as 用户代码(Pub)
participant Factory as DDSApiFactory
participant PubImpl as DdsPubApiImpl
participant DDS as CycloneDDS
participant SubImpl as DdsSubApiImpl
participant UserSub as 用户代码(Sub)
Note over UserPub,UserSub: ── 创建阶段 ──
UserPub->>Factory: Create<IDdsPubApi<T>>("pub1")
Factory->>PubImpl: new DdsPubApiImpl("pub1")
Factory-->>UserPub: shared_ptr<IDdsPubApi<T>>
UserSub->>Factory: Create<IDdsSubApi<T>>("sub1", callback)
Factory->>SubImpl: new DdsSubApiImpl("sub1", cb)
Factory-->>UserSub: shared_ptr<IDdsSubApi<T>>
Note over UserPub,UserSub: ── 配置阶段(Connect 前)──
UserPub->>PubImpl: SetDurability()
UserPub->>PubImpl: SetDepth(20)
UserSub->>SubImpl: SetDurability()
UserSub->>SubImpl: EnableDispatchThread(true) [可选]
Note over UserPub,UserSub: ── 连接阶段 ──
UserPub->>PubImpl: Connect("/sensor/imu")
PubImpl->>DDS: 创建 Topic + Publisher + DataWriter
PubImpl-->>UserPub: true
UserSub->>SubImpl: Connect("/sensor/imu")
SubImpl->>DDS: 创建 Topic + Subscriber + DataReader + Listener
SubImpl-->>UserSub: true
Note over UserPub,UserSub: ── 通信阶段 ──
UserPub->>PubImpl: PubMsg(msg)
PubImpl->>DDS: DataWriter.write(msg)
DDS->>SubImpl: Listener::on_data_available()
SubImpl->>UserSub: callback(msg)
Note over UserPub,UserSub: ── 释放阶段 ──
UserPub->>PubImpl: Stop()
UserSub->>SubImpl: Stop() 5.2 Service 通信时序
sequenceDiagram
participant Client as 客户端用户
participant CliImpl as ServiceClientApiImpl
participant DDS as CycloneDDS
participant SvrImpl as ServiceServerApiImpl
participant Server as 服务端 Handler
Note over Client,Server: ── 创建与连接 ──
Client->>CliImpl: Create + Connect("my_service")
CliImpl->>DDS: 创建 request Writer + reply Reader
CliImpl->>CliImpl: WaitForMatchReady(1s) [预热]
Server->>SvrImpl: Create(name, handler) + Connect("my_service")
SvrImpl->>DDS: 创建 request Reader + reply Writer
Note over Client,Server: ── 同步请求 ──
Client->>CliImpl: Request(req, resp, 5000)
CliImpl->>CliImpl: WaitForMatchReady(timeout)
CliImpl->>CliImpl: FillRequestId → header_guid=writerGuidHash, header_seq=seq++
CliImpl->>DDS: reqWriter.write(req)
CliImpl->>CliImpl: SyncWaiter 等待...
DDS->>SvrImpl: Listener::on_data_available(req)
SvrImpl->>Server: handler(req, resp)
Server-->>SvrImpl: StatusCode::SUCCESS
SvrImpl->>SvrImpl: 兜底透传 header_guid/header_seq
SvrImpl->>DDS: repWriter.write(resp)
DDS->>CliImpl: Listener::on_data_available(resp)
CliImpl->>CliImpl: HandleReply → 匹配 SyncWaiter
CliImpl->>CliImpl: SyncWaiter.notify()
CliImpl-->>Client: StatusCode::SUCCESS, resp
Note over Client,Server: ── 异步请求 ──
Client->>CliImpl: AsyncRequest(req, callback)
CliImpl->>DDS: reqWriter.write(req)
DDS->>SvrImpl: (同上处理)
SvrImpl->>DDS: repWriter.write(resp)
DDS->>CliImpl: HandleReply → 匹配 AsyncWaiter
CliImpl->>Client: callback(SUCCESS, resp) 5.4 Factory 创建流程
flowchart TD
A[用户调用 DDSApiFactory::Create] --> B{name 已存在?}
B -->|是| C[返回已有 shared_ptr]
B -->|否| D[调用 T_API::CreateApi]
D --> E[构造 Impl 实例]
E --> F[设置 NodeName]
F --> G[注册到 m_api_map]
G --> H[返回 shared_ptr]
subgraph GetParticipant
P1[首次调用] --> P2{m_participant 存在?}
P2 -->|否| P3[创建 DomainParticipant]
P3 --> P4[缓存 shared_ptr]
P2 -->|是| P5[返回已有]
end
D -.-> P1 5.5 线程模型
flowchart LR
subgraph 用户线程
U1[PubMsg / Request]
end
subgraph CycloneDDS Listener 线程
L1[on_data_available]
L2[on_publication_matched]
L3[on_subscription_matched]
end
subgraph 可选分发线程["可选: 独立分发线程(Sub / ServiceServer / ServiceClient 均支持)"]
D1[DispatchLoop]
D2[deque 消息/请求/回复队列]
end
U1 -->|write| L1
L1 -->|dispatch_thread=false| CB1[用户回调/Handler 直接调用]
L1 -->|dispatch_thread=true| D2
D2 --> D1
D1 --> CB2[用户回调/Handler 线程隔离] 线程规则:
- 默认回调运行于 CycloneDDS Listener 线程,不得长时间阻塞
EnableDispatchThread(true)后,该实例有独立分发线程,回调/Handler 不阻塞 Listener- 三类实例均支持:
IDdsSubApi(消息回调)、IDdsServiceServerApi(Handler)、IDdsServiceClientApi(异步回调) - 跨回调共享数据由用户自行加锁
6. ROS 2 兼容设计
6.1 接口开关
通过运行时 API 控制 ROS 2 兼容模式,不使用编译宏,两种模式的代码始终编译。
两级控制:
| 层级 | API | 作用域 | 说明 |
|---|---|---|---|
| 工厂全局 | DDSApiFactory::SetRos2Compat(true) | 后续所有新建 API 实例 | 设置默认值,已创建实例不受影响 |
| 单实例 | api->EnableRos2Compat(true) | 仅当前实例 | 覆盖工厂默认值,须在 Connect() 前调用 |
典型用法:
// 方式 1:全局启用(所有后续 Create 的 Service 自动加 rq/rr 前缀)
DDSApiFactory::SetRos2Compat(true);
auto cli = DDSApiFactory::Create<IDdsServiceClientApi<Req, Resp>>("cli");
cli->Connect("add_two_ints"); // → rq/add_two_intsRequest, rr/add_two_intsReply
// 方式 2:单实例启用(不影响其他实例)
auto cli = DDSApiFactory::Create<IDdsServiceClientApi<Req, Resp>>("cli");
cli->EnableRos2Compat(true); // 覆盖工厂默认值
cli->Connect("add_two_ints"); // → rq/add_two_intsRequest, rr/add_two_intsReply
// 方式 3:全局启用后单实例关闭(不加前缀,适合纯内部服务)
DDSApiFactory::SetRos2Compat(true);
auto cli1 = DDSApiFactory::Create<IDdsServiceClientApi<Req, Resp>>("cli1");
cli1->Connect("svc_a"); // → rq/svc_aRequest, rr/svc_aReply
auto cli2 = DDSApiFactory::Create<IDdsServiceClientApi<Req, Resp>>("cli2");
cli2->EnableRos2Compat(false); // 覆盖:不加前缀
cli2->Connect("svc_b"); // → svc_bRequest, svc_bReply
// 方式 4:开关关闭时与 ROS2 互通 — 手动在 service_name 中加前缀
// wire 协议两种模式完全一致,开关仅影响 topic 名前缀
// ROS2 约定 request=rq/ reply=rr/(两者前缀不同),手动方式无法同时满足
// 正确做法:对需要和 ROS2 通信的实例使用开关即可(方式 1~3) 开关影响范围:
| 项目 | 关闭(默认) | 开启 |
|---|---|---|
| PubSub Topic 命名 | {topic_name} | rt/{topic_name} |
| Service Topic 命名 | {name}Request / {name}Reply | rq/{name}Request / rr/{name}Reply |
| Service IDL 结构 | 用户字段 + uint64 header_guid + int64 header_seq | 同左 |
| Service QoS | ROS 2 约定(请求 RELIABLE+KEEP_LAST(10),回复 RELIABLE+KEEP_ALL) | 同左 |
| Service user_data | serviceid=<GUID hex>; | 同左 |
区别仅在 Topic 名前缀:PubSub 加
rt/,Service 加rq//rr/,wire 格式统一,确保两种模式均可与 ROS 2 节点互通。
6.2 Topic 命名规则映射
flowchart LR
subgraph 默认模式["默认模式(开关关闭)"]
P1["Connect('imu')"] --> PT1["imu"]
S1["Connect('add_svc')"] --> T1["add_svcRequest"]
S1 --> T2["add_svcReply"]
end
subgraph ROS2模式["ROS2 兼容模式(开关开启)"]
P2["Connect('imu')"] --> PT2["rt/imu"]
S2["Connect('add_svc')"] --> T3["rq/add_svcRequest"]
S2 --> T4["rr/add_svcReply"]
end 命名规则转换逻辑(impl 层内部):
// PubSub:Connect 中
const std::string effective_topic = m_ros2_compat ? ("rt/" + topic_name) : topic_name;
// Service:策略对象 MakeTopicNames()
namespace dds_wrapper::detail {
struct ServiceTopicNames {
std::string request;
std::string reply;
};
/// @brief 仅前缀不同,wire 协议两种模式完全一致
inline ServiceTopicNames MakeServiceTopicNames(const std::string& service_name,
bool ros2_compat) {
if (ros2_compat) {
return { "rq/" + service_name + "Request",
"rr/" + service_name + "Reply" };
}
return { service_name + "Request",
service_name + "Reply" };
}
} // namespace dds_wrapper::detail 6.3 IDL 类型结构(统一 wire 格式,两种模式共用)
开关状态不影响 IDL。Service 的 wire 格式始终使用
header_guid/header_seq关联字段,与 ROS 2 rmw_cyclonedds_cpp 完全一致。
用户编写的 IDL(只含业务字段):
// my_service_user.idl
module my_pkg {
module srv {
module dds_ {
struct MySrv_Request_ {
int32 param_a;
int32 param_b;
};
struct MySrv_Response_ {
int32 result;
};
};
};
}; 经 service_idl_inject.py 注入后的 wire IDL(两种模式共用):
// 自动生成,不可手动编辑
module my_pkg {
module srv {
module dds_ {
struct MySrv_Request_ {
uint64 header_guid; // ← 自动注入
int64 header_seq; // ← 自动注入
int32 param_a;
int32 param_b;
};
struct MySrv_Response_ {
uint64 header_guid; // ← 自动注入
int64 header_seq; // ← 自动注入
int32 result;
};
};
};
}; wire 关联字段:
| 字段 | 类型 | 说明 |
|---|---|---|
header_guid | uint64 | DataWriter GUID 低 8 字节,标识客户端身份 |
header_seq | int64 | 单调递增序号,用于请求-应答关联 |
不再使用
string request_id。统一header_guid/header_seq既消除了两种模式的 IDL 差异,又避免了字符串分配的开销。
6.4 ROS 2 Service 兼容时序
sequenceDiagram
participant User as 用户代码
participant Client as DdsServiceClientApiImpl
participant DDS as CycloneDDS
participant ROS2 as ROS2 Service Server
User->>Client: EnableRos2Compat(true)
User->>Client: Connect("add_two_ints")
Client->>Client: 生成 serviceid GUID(两种模式均执行)
Client->>Client: m_ros2_compat=true → Topic 名加 rq/rr 前缀
Client->>Client: 创建 Topics:<br/>rq/add_two_intsRequest<br/>rr/add_two_intsReply
Client->>DDS: DataWriter QoS:<br/>RELIABLE + KEEP_LAST(10)<br/>user_data = serviceid
Client->>DDS: DataReader QoS:<br/>RELIABLE + KEEP_ALL<br/>user_data = serviceid
Client->>Client: WaitForServer(匹配等待)
ROS2-->>DDS: 服务端上线
DDS-->>Client: on_publication_matched + on_subscription_matched
Note over Client,ROS2: 发送请求
Client->>Client: header_guid = writerGuidHash(writer)
Client->>Client: header_seq = seq++
Client->>DDS: rq/add_two_intsRequest<br/>{header_guid, header_seq, a=1, b=2}
DDS->>ROS2: 服务端收到请求
ROS2->>ROS2: 处理 → sum=3
ROS2->>DDS: rr/add_two_intsReply<br/>{header_guid, header_seq, sum=3}
DDS->>Client: on_data_available(resp)
Client->>Client: 按 header_seq 匹配 waiter
Client-->>Client: 返回 resp 6.5 策略模式(Strategy Pattern)实现
wire 协议(IDL 字段、QoS、user_data)两种模式完全一致,唯一变化点是 Topic 名前缀。 策略模式封装这一变化点,同时为未来扩展不同 wire 协议(如 Zenoh Bridge)保留接口。
设计模式选型分析:
| 模式 | 优势 | 劣势 | 适用性 |
|---|---|---|---|
标志位 if/else | 极简 | 散落在 Connect/FillId/ExtractId 多处,扩展新协议时退化为 switch | 当前够用但不可扩展 |
| 策略 | 变化点精确封装,主干不变 | 需定义接口 | ✓ 最佳 |
7.5.1 策略接口
namespace dds_wrapper {
/// @brief Topic 命名策略接口(当前唯一变化点)
/// 接口保留完整扩展方法,未来新增 wire 协议时按需覆写。
template <typename Req, typename Resp>
class IServiceProtocol {
public:
virtual ~IServiceProtocol() = default;
/// @brief 根据 service_name 生成 request/reply Topic 名(唯一需覆写的方法)
virtual detail::ServiceTopicNames MakeTopicNames(
const std::string& service_name) const = 0;
/// @brief 配置 DataWriter/DataReader QoS
virtual void ApplyWriterQos(dds::pub::qos::DataWriterQos& wqos) const = 0;
virtual void ApplyReaderQos(dds::sub::qos::DataReaderQos& rqos) const = 0;
/// @brief Connect 后初始化钩子(GUID 提取、serviceid 生成)
virtual void OnConnected(dds::pub::DataWriter<Req>& writer,
dds::sub::DataReader<Resp>& reader) = 0;
/// @brief 填充请求关联 ID
virtual void FillRequestId(Req& req) = 0;
/// @brief 从响应中提取关联 ID,用于匹配 waiter
virtual std::string ExtractRequestId(const Resp& resp) const = 0;
/// @brief Connect 前初始化(serviceid 生成)
virtual void SetupServiceId(dds_entity_t participant_entity) = 0;
};
} // namespace dds_wrapper 7.5.2 共享 wire 协议基类
namespace dds_wrapper {
/// @brief 统一实现所有 wire 协议行为,两种命名策略均继承此基类
template <typename Req, typename Resp>
class ServiceProtocolBase : public IServiceProtocol<Req, Resp> {
public:
void ApplyWriterQos(dds::pub::qos::DataWriterQos& wqos) const override {
wqos << dds::core::policy::Reliability::Reliable(
dds::core::Duration(0, 100'000'000))
<< dds::core::policy::History::KeepLast(10)
<< dds::core::policy::UserData(m_service_ud);
}
void ApplyReaderQos(dds::sub::qos::DataReaderQos& rqos) const override {
rqos << dds::core::policy::Reliability::Reliable(
dds::core::Duration::from_secs(1))
<< dds::core::policy::History::KeepAll()
<< dds::core::policy::UserData(m_service_ud);
}
void OnConnected(dds::pub::DataWriter<Req>& writer,
dds::sub::DataReader<Resp>&) override {
m_writer_guid_hash = detail::Ros2CompatHelper::writerGuidHash(
writer.delegate()->get_ddsc_entity());
}
void FillRequestId(Req& req) override {
req.header_guid(m_writer_guid_hash);
req.header_seq(m_seq.fetch_add(1, std::memory_order_relaxed));
}
std::string ExtractRequestId(const Resp& resp) const override {
return std::to_string(resp.header_seq());
}
void SetupServiceId(dds_entity_t participant_entity) override {
m_client_guid = detail::Ros2CompatHelper::makeClientGuid(participant_entity);
auto svc_id = detail::Ros2CompatHelper::formatServiceId(m_client_guid);
m_service_ud.assign(svc_id.begin(), svc_id.end());
}
protected:
dds_guid_t m_client_guid{};
dds::core::ByteSeq m_service_ud;
uint64_t m_writer_guid_hash{0};
std::atomic<int64_t> m_seq{1};
};
} // namespace dds_wrapper 7.5.3 默认策略(无前缀)
namespace dds_wrapper {
template <typename Req, typename Resp>
class DefaultServiceProtocol : public ServiceProtocolBase<Req, Resp> {
public:
detail::ServiceTopicNames MakeTopicNames(
const std::string& service_name) const override
{
return { service_name + "Request",
service_name + "Reply" };
}
};
} // namespace dds_wrapper 7.5.4 ROS 2 策略(rq/rr 前缀)
namespace dds_wrapper {
template <typename Req, typename Resp>
class Ros2ServiceProtocol : public ServiceProtocolBase<Req, Resp> {
public:
detail::ServiceTopicNames MakeTopicNames(
const std::string& service_name) const override
{
return { "rq/" + service_name + "Request",
"rr/" + service_name + "Reply" };
}
};
} // namespace dds_wrapper 7.5.5 类图
classDiagram
direction TB
class IServiceProtocol {
<<interface>>
+MakeTopicNames(service_name) ServiceTopicNames
+ApplyWriterQos(wqos) void
+ApplyReaderQos(rqos) void
+OnConnected(writer, reader) void
+FillRequestId(req) void
+ExtractRequestId(resp) string
+SetupServiceId(pp_entity) void
}
class ServiceProtocolBase {
<<abstract>>
#m_client_guid: dds_guid_t
#m_service_ud: ByteSeq
#m_writer_guid_hash: uint64
#m_seq: atomic~int64~
+ApplyWriterQos() void
+ApplyReaderQos() void
+OnConnected() void
+FillRequestId() void
+ExtractRequestId() string
+SetupServiceId() void
}
class DefaultServiceProtocol {
+MakeTopicNames(service_name) ServiceTopicNames
}
class Ros2ServiceProtocol {
+MakeTopicNames(service_name) ServiceTopicNames
}
class DdsServiceClientApiImpl {
-m_protocol: unique_ptr~IServiceProtocol~
-m_sync_waiters: map
-m_async_waiters: map
-m_match_ready: bool
+EnableRos2Compat(enable: bool) void
+Connect(service_name) bool
+Request(req, resp, timeout) StatusCode
+AsyncRequest(req, cb) StatusCode
}
class Ros2CompatHelper {
<<utility>>
+formatServiceId(guid) string
+makeClientGuid(pp_entity) dds_guid_t
+writerGuidHash(writer_entity) uint64
}
%% 继承关系
IServiceProtocol <|-- ServiceProtocolBase
ServiceProtocolBase <|-- DefaultServiceProtocol
ServiceProtocolBase <|-- Ros2ServiceProtocol
%% 依赖与组合
DdsServiceClientApiImpl --> IServiceProtocol : uses
ServiceProtocolBase --> Ros2CompatHelper : uses Service Server 侧同理,持有
IServiceProtocol<Req,Resp>策略对象。
7.5.6 Impl 主干逻辑(策略委托)
template <typename Req, typename Resp>
class DdsServiceClientApiImpl : public IDdsServiceClientApi<Req, Resp> {
public:
explicit DdsServiceClientApiImpl(std::string name)
: m_name(std::move(name)),
m_protocol(std::make_unique<DefaultServiceProtocol<Req, Resp>>()) {}
void EnableRos2Compat(bool enable) override {
std::lock_guard<std::mutex> lk(m_mtx);
if (m_connected) return;
if (enable)
m_protocol = std::make_unique<Ros2ServiceProtocol<Req, Resp>>();
else
m_protocol = std::make_unique<DefaultServiceProtocol<Req, Resp>>();
}
bool Connect(const std::string& service_name) override {
std::lock_guard<std::mutex> lk(m_mtx);
if (m_connected) return true;
auto participant = DDSApiFactory::GetParticipant();
if (!participant) return false;
// ① serviceid 初始化(两种策略均通过基类统一执行)
m_protocol->SetupServiceId(participant->delegate()->get_ddsc_entity());
// ② 策略仅决定 Topic 名前缀
auto names = m_protocol->MakeTopicNames(service_name);
dds::topic::Topic<Req> req_topic(*participant, names.request);
dds::topic::Topic<Resp> rep_topic(*participant, names.reply);
dds::pub::Publisher publisher(*participant);
dds::sub::Subscriber subscriber(*participant);
auto wqos = publisher.default_datawriter_qos();
auto rqos = subscriber.default_datareader_qos();
// ③ 统一 QoS(两种策略相同)
m_protocol->ApplyWriterQos(wqos);
m_protocol->ApplyReaderQos(rqos);
// ... 创建 DataWriter / DataReader / Listener ...
// ④ GUID 提取(两种策略相同)
m_protocol->OnConnected(m_req_writer, m_rep_reader);
m_connected = true;
return true;
}
StatusCode Request(const Req& req, Resp& resp, int timeout_ms) override {
if (!WaitForMatchReady(timeout_ms)) return StatusCode::TIMEOUT;
Req req_to_send = req;
// ⑤ 统一填充 header_guid/header_seq
m_protocol->FillRequestId(req_to_send);
// ... 发送、等待回复(主干逻辑不变)...
}
private:
void HandleReply(const Resp& resp) {
std::string req_id = m_protocol->ExtractRequestId(resp);
if (req_id.empty()) return;
// ... 匹配 SyncWaiter / AsyncWaiter(主干逻辑不变)...
}
std::unique_ptr<IServiceProtocol<Req, Resp>> m_protocol;
// ... SyncWaiter、匹配等待等成员不变 ...
}; 核心优势:
DefaultServiceProtocol和Ros2ServiceProtocol各自只有MakeTopicNames()这一个方法- wire 协议(IDL、QoS、user_data)统一在
ServiceProtocolBase,两种模式零代码重复 EnableRos2Compat(true)只替换策略对象,Impl 主干无任何if/else- 未来新增 wire 协议只需继承
ServiceProtocolBase并覆写所需方法
7.5.7 Factory 传递 ROS 2 兼容默认值
DdsApiBasePtr DDSApiFactory::CreateImpl(const std::string& name, DdsApiCreator creator) {
auto& inst = GetInstance();
std::lock_guard<std::mutex> lock(inst.m_mtx);
auto it = inst.m_api_map.find(name);
if (it != inst.m_api_map.end()) return it->second;
auto api = creator();
if (api) {
api->SetNodeName(inst.m_node_name);
api->EnableRos2Compat(inst.m_ros2_compat); // 继承工厂默认值 → 切换 Topic 命名策略
inst.m_api_map.emplace(name, api);
}
return api;
} 7. IDL 工具链与 CMake 设计
7.1 标准 IDL 流程
flowchart LR
A["用户 .idl"] --> B["idlcxx_generate()"]
B --> C["生成 .hpp/.cpp"]
C --> D["编译链接"] 标准模式下,使用 CycloneDDS 自带 idlcxx_generate() CMake 宏:
find_package(CycloneDDS-CXX REQUIRED)
idlcxx_generate(TARGET my_idl FILES my_msg.idl WARNINGS no-implicit-extensibility)
target_link_libraries(my_app PRIVATE my_idl dds_wrapper::dds_wrapper) 7.2 ROS 2 兼容 IDL 流程
flowchart LR
A["用户 .idl<br/>(仅业务字段)"] --> B["service_idl_inject.py"]
B --> C["wire .idl<br/>(含 header_guid/seq)"]
C --> D["idlcxx_generate()"]
D --> E["生成 .hpp/.cpp"]
E --> F["编译链接"] ROS 2 兼容模式下,用户只编写业务字段 IDL,service_idlcxx_generate() 自动注入 wire header:
include(service_idl_macros.cmake)
service_idlcxx_generate(
TARGET my_svc_idl
FILES my_service_user.idl
WARNINGS no-implicit-extensibility
)
target_link_libraries(my_app PRIVATE my_svc_idl dds_wrapper::dds_wrapper) 7.3 CMake 宏接口
service_idlcxx_generate():
include(service_idl_macros.cmake)
service_idlcxx_generate(
TARGET my_svc_idl
FILES my_service.idl
WARNINGS no-implicit-extensibility
)
target_link_libraries(my_app PRIVATE my_svc_idl dds_wrapper::dds_wrapper) | 参数 | 类型 | 必填 | 说明 |
|---|---|---|---|
TARGET | string | 是 | 生成的 CMake 库目标名称 |
FILES | list | 是 | 用户 IDL 文件列表(仅含业务字段) |
WARNINGS | string | 否 | 传递给 idlcxx_generate 的 WARNINGS 参数 |
工作原理:
- 遍历每个用户 IDL 文件
- 调用
service_idl_inject.py生成同名 wire IDL(输出到${CMAKE_CURRENT_BINARY_DIR}/idl_wire/,文件名与原 IDL 相同) - 将 wire IDL 传入标准
idlcxx_generate() - 生成的
.hpp头文件与原 IDL 同名,#include路径无需修改 - 对用户完全透明,无需手动管理中间文件
7.4 IDL 预处理器
** service_idl_inject.py ** — 自动向每个 struct 开头注入 ROS 2 wire header 字段:
| 输入 | 输出 |
|---|---|
struct Foo { int32 x; }; | struct Foo { uint64 header_guid; int64 header_seq; int32 x; }; |
注入字段(cdds_request_header_t 对应):
| 字段 | IDL 类型 | 说明 |
|---|---|---|
header_guid | uint64 | rmw_cyclonedds_cpp 客户端身份标识 |
header_seq | int64 | 请求序号,用于请求-应答关联 |
8. QoS 支持矩阵
| 策略 | 可选值 | 接口 | 默认值 |
|---|---|---|---|
| History | KeepLast | SetDepth(n) | KeepLast(10) |
| History | KeepAll | SetKeepAll() | — |
| Durability | TransientLocal | SetDurability() | Volatile |
| Durability | Volatile | 默认 | ✓ |
| Reliability | Reliable | SetReliability(true) | ✓ |
| Reliability | BestEffort | SetReliability(false) | — |
常用组合说明:
| 组合 | 场景 |
|---|---|
| KeepLast(10) + Reliable | 默认组合,可靠传输且限定缓存 |
| KeepLast(n) + BestEffort | 传感器数据,低延迟优先 |
| KeepAll + Reliable | 日志/录制场景,全量不丢 |
| TransientLocal + 任意组合 | 晚加入者需获取历史数据 |
Deadline / Lifespan / Liveliness 留作后续扩展。
9. Topic 命名规则
| 场景 | 逻辑名称示例 | DDS Topic 名(默认,开关关闭) | DDS Topic 名(ROS 2 兼容,开关开启) |
|---|---|---|---|
| 普通 PubSub | robot/sensor/imu | robot/sensor/imu | rt/robot/sensor/imu |
| Service Request | my_service | my_serviceRequest | rq/my_serviceRequest |
| Service Reply | my_service | my_serviceReply | rr/my_serviceReply |
10. CMake 构建设计
project(dds_wrapper VERSION 1.0.0 LANGUAGES CXX)
add_library(dds_wrapper STATIC impl/dds_factory.cpp)
add_library(dds_wrapper::dds_wrapper ALIAS dds_wrapper)
target_compile_features(dds_wrapper PUBLIC cxx_std_17)
target_include_directories(dds_wrapper
PUBLIC
$<BUILD_INTERFACE:${CMAKE_CURRENT_SOURCE_DIR}/api>
$<BUILD_INTERFACE:${CMAKE_CURRENT_SOURCE_DIR}/impl>
$<INSTALL_INTERFACE:include/dds_wrapper>
)
target_link_libraries(dds_wrapper PUBLIC CycloneDDS-CXX::ddscxx)
# ROS2 兼容通过运行时 API 控制(EnableRos2Compat / SetRos2Compat),
# 无需编译宏,两种模式代码始终编译。
# 安装
install(TARGETS dds_wrapper
EXPORT dds_wrapperTargets
ARCHIVE DESTINATION lib
)
install(DIRECTORY api/ DESTINATION include/dds_wrapper)
install(EXPORT dds_wrapperTargets
FILE dds_wrapperTargets.cmake
NAMESPACE dds_wrapper::
DESTINATION lib/cmake/dds_wrapper
) 下游使用:
find_package(dds_wrapper REQUIRED)
target_link_libraries(my_app PRIVATE dds_wrapper::dds_wrapper) 11. 与 datacenter-api 迁移对照
11.1 头文件迁移
| 旧 include | 新 include | 说明 |
|---|---|---|
datacenter/msg_bus_api_base.h | dds_api_base.h | 独立基类 |
datacenter/ps_pub_api.h | dds_pub_api.h + dds_pub_api_impl.h | 模板化 |
datacenter/ps_sub_api.h | dds_sub_api.h + dds_sub_api_impl.h | 模板化 + std::function |
datacenter/service_client_api.h | dds_service_client_api.h | 模板化 |
datacenter/service_server_api.h | dds_service_server_api.h | 模板化 |
datacenter/api_factory.h | dds_factory.h | 独立工厂 |
datacenter/string_limited.hpp | 删除 | 使用 std::string |
11.2 代码模式迁移
PubSub 发布者:
// ── datacenter 旧代码 ──
auto pub = MBApiFactory::Create<IPubApi>("my_pub");
pub->SetDurability();
pub->SetDepth(20);
pub->Connect("/sensor/imu");
pub->PubMsg("/sensor/imu", serialized_string);
// ── dds_wrapper 新代码 ──
auto pub = DDSApiFactory::Create<IDdsPubApi<ImuMsg>>("my_pub");
pub->SetDurability();
pub->SetDepth(20);
pub->Connect("/sensor/imu");
ImuMsg msg; msg.id(1);
pub->PubMsg(msg); // 类型安全,无需序列化 PubSub 订阅者:
// ── datacenter 旧代码 ──
class MySpi : public ISubSpi {
void OnRecvSubMsg(const std::string& topic, const std::string& msg) override {
// 反序列化 msg...
}
};
MySpi spi;
auto sub = MBApiFactory::Create<ISubApi>("my_sub", &spi);
sub->Connect("/sensor");
sub->SubMsg("/sensor/imu");
// ── dds_wrapper 新代码 ──
auto sub = DDSApiFactory::Create<IDdsSubApi<ImuMsg>>(
"my_sub",
[](const ImuMsg& msg) {
std::cout << msg.id() << "\n"; // 直接使用类型化消息
});
sub->Connect("/sensor/imu"); Service 客户端:
// ── datacenter 旧代码 ──
class MyCliSpi : public IServiceClientSpi {
void OnRspMsg(const STRING& msg, uint64_t rid, bool) override { ... }
};
MyCliSpi spi;
auto cli = MBApiFactory::Create<IServiceClientApi>("my_cli", &spi);
cli->Connect("my_svc");
STRING rsp;
cli->ReqMsgSync(req_str, rsp, 5000);
// ── dds_wrapper 新代码 ──
auto cli = DDSApiFactory::Create<IDdsServiceClientApi<MyReq, MyResp>>("my_cli");
cli->Connect("my_svc");
MyReq req; req.command("hello");
MyResp resp;
auto sc = cli->Request(req, resp, 5000); // 类型安全
if (ok(sc)) std::cout << resp.message() << "\n"; 11.3 完整对照表
| datacenter 概念 | dds_wrapper 对应 | 变化说明 |
|---|---|---|
namespace datacenter | namespace dds_wrapper | 完全隔离 |
: public IMsgBusApiBase | : public IDdsApiBase | 精简接口 |
: public IXxxSpi + OnRecvXxx | std::function lambda | 零继承 |
Create<T>(name, &spi) | Create<T>(name, cb) | 值语义回调 |
RegisterSpi(&spi) | 删除 | 回调 Create 时固定 |
STRING 序列化消息 | 模板 T 类型化消息 | 编译期类型安全 |
MBApiType / Type() | 删除 | 内部分类不暴露 |
ServerName() | Name() | 更通用 |
SetHighWaterMark / SetMsgExpires | 删除 | ZMQ 专有 |
Release() / Join() | 合并为 Stop() | 语义统一 |
Clear(ignore_list) | Clear() | 简化 |
12. 典型使用示例
12.1 发布者
#include "dds_pub_api_impl.h"
#include "dds_factory.h"
#include "MyMsg.hpp"
using namespace dds_wrapper;
auto pub = DDSApiFactory::Create<IDdsPubApi<MyMsg>>("imu_pub");
pub->SubNotify([](const std::string& t) { LOG(...) << "订阅者上线: " << t; });
pub->SetDurability();
pub->SetDepth(20);
pub->Connect("/robot/sensor/imu");
MyMsg msg;
msg.id(1);
msg.value(3.14);
pub->PubMsg(msg); 12.2 订阅者(lambda)
#include "dds_sub_api_impl.h"
#include "dds_factory.h"
#include "MyMsg.hpp"
using namespace dds_wrapper;
auto sub = DDSApiFactory::Create<IDdsSubApi<MyMsg>>(
"imu_sub",
[](const MyMsg& msg) { LOG(...) << "id=" << msg.id(); });
sub->SetDurability();
sub->Connect("/robot/sensor/imu"); 12.3 订阅者(独立分发线程 + 类成员函数)
class SensorHandler {
public:
void Init() {
_sub = DDSApiFactory::Create<IDdsSubApi<ImuMsg>>(
"imu_sub",
[this](const ImuMsg& msg) { OnImu(msg); });
_sub->EnableDispatchThread(true);
_sub->Connect("/robot/sensor/imu");
}
private:
void OnImu(const ImuMsg& msg) { /* 可安全执行耗时操作 */ }
std::shared_ptr<IDdsSubApi<ImuMsg>> _sub;
}; 12.4 Service 服务端
#include "dds_service_server_api_impl.h"
using namespace dds_wrapper;
auto server = DDSApiFactory::Create<IDdsServiceServerApi<MyReq, MyResp>>(
"calc_server",
[](const MyReq& req, MyResp& resp) -> StatusCode {
resp.result(req.a() + req.b());
// header_guid/header_seq 由框架自动透传,handler 无需处理
return StatusCode::SUCCESS;
});
// 可选:开启独立分发线程,Handler 不阻塞 DDS 监听线程
server->EnableDispatchThread(true);
server->Connect("calc_service"); 12.5 Service 客户端(同步)
#include "dds_service_client_api_impl.h"
using namespace dds_wrapper;
auto client = DDSApiFactory::Create<IDdsServiceClientApi<MyReq, MyResp>>("calc_client");
client->Connect("calc_service");
MyReq req; req.a(1); req.b(2);
MyResp resp;
if (ok(client->Request(req, resp, 5000))) {
LOG(...) << "result=" << resp.result();
} 12.6 Service 客户端(异步 + 独立分发线程)
// 开启 dispatch thread,异步回调不阻塞 DDS 监听线程
auto client = DDSApiFactory::Create<IDdsServiceClientApi<MyReq, MyResp>>("calc_client");
client->EnableDispatchThread(true);
client->Connect("calc_service");
MyReq req; req.a(1); req.b(2);
client->AsyncRequest(req, [](StatusCode sc, const MyResp& resp) {
// 此回调运行于独立 dispatch 线程,可安全执行耗时操作
if (ok(sc)) LOG(...) << "async result=" << resp.result();
}); 12.7 PubSub 与 ROS 2 节点通信
// 开启 ROS2 兼容后 topic 自动加 rt/ 前缀
DDSApiFactory::SetRos2Compat(true);
auto pub = DDSApiFactory::Create<IDdsPubApi<SensorMsg>>("sensor_pub");
pub->Connect("robot/sensor/imu"); // 实际 DDS Topic: rt/robot/sensor/imu
auto sub = DDSApiFactory::Create<IDdsSubApi<SensorMsg>>(
"sensor_sub",
[](const SensorMsg& msg) { /* 处理 */ });
sub->Connect("robot/sensor/imu"); // 实际 DDS Topic: rt/robot/sensor/imu 12.8 Service 与 ROS 2 节点通信
#include "dds_service_client_api_impl.h"
using namespace dds_wrapper;
// 方式 A:全局启用 ROS2 兼容
DDSApiFactory::SetRos2Compat(true);
auto client = DDSApiFactory::Create<IDdsServiceClientApi<Ros2Req, Ros2Resp>>("ros2_cli");
client->Connect("add_two_ints"); // → rq/add_two_intsRequest, rr/add_two_intsReply
// 方式 B:单实例启用
auto client = DDSApiFactory::Create<IDdsServiceClientApi<Ros2Req, Ros2Resp>>("ros2_cli");
client->EnableRos2Compat(true);
client->Connect("add_two_ints");
Ros2Req req; req.a(1); req.b(2);
Ros2Resp resp;
if (ok(client->Request(req, resp, 5000))) {
LOG(...) << "sum=" << resp.sum();
} 文档结束 — 本文档覆盖
dds_wrapper通信模式(PubSub / Service)的接口设计、实现架构、线程模型(含 Sub / ServiceServer / ServiceClient 独立分发线程)、ROS 2 兼容机制(PubSubrt/前缀、Servicerq/rr/前缀、wire GUID 协议)以及 IDL 工具链设计(service_idlcxx_generate/service_idl_inject.py)。