Skip to content

DDS Wrapper 设计文档

版本:v 2.1 日期:2026-03-27 依据PRD.md — Cyclone DDS C++ 封装层需求规格说明书


目录


1. 概述

1.1 设计目标

构建一个完全独立于 datacenter api、但 API 使用风格兼容 datacenter api 使用习惯的纯 C++ 通信中间件封装层,底层基于 CycloneDDS-CXX 实现。

条目 决策
语言 C++17(std::optionalstd::variant、structured bindings、if constexpr
命名空间 dds_wrapper(完全独立于 datacenter
消息格式 纯模板 + 标准 IDL(.idl 文件),支持任意 IDL 生成类型
回调模型 std::function不使用 SPI 继承体系
错误处理 普通操作返回 bool;关键操作返回 StatusCode 枚举
构建系统 CMake,支持 find_package(dds_wrapper REQUIRED) 导出

1.2 设计约束

约束 说明
基类依赖 不继承任何 datacenter 基类;自定义 IDdsApiBase
QoS 配置 按角色区分为 DdsWriterQosProfile / DdsReaderQosProfile,提供 SetQos + 便捷方法
线程模型 使用 Cyclone DDS Listener 接收;Sub / ServiceServer / ServiceClient 均可按实例启用独立分发线程(默认关闭)
ROS 2 兼容 运行时 API 开关(EnableRos2Compat),开启后 Topic 命名与 IDL 类型结构均兼容 ROS 2
RAII 严格使用 std::shared_ptr / std::unique_ptr,禁止裸 new / delete

1.3 与 datacenter-api 的关系

datacenter-api(ZMQ/iceoryx)                 dds-wrapper(CycloneDDS)
┌──────────────────────────┐          ┌───────────────────────────┐
│  namespace datacenter        │          │  namespace dds_wrapper         │
│  IMsgBusApiBase (基类)       │    ≈     │  IDdsApiBase (独立基类)        │
│  SPI 继承体系 (回调)         │    →     │  std::function (回调)          │
│  STRING 序列化消息           │    →     │  模板 T (类型化消息)           │
│  MBApiFactory (工厂)         │    ≈     │  DDSApiFactory (工厂)          │
│  IPubApi / ISubApi           │    ≈     │  IDdsPubApi<T> / IDdsSubApi<T> │
│  IServiceClientApi           │    ≈     │  IDdsServiceClientApi<Req,Resp>│
└──────────────────────────┘          └───────────────────────────┘

核心差异:

  • dds_wrapper 使用 C++ 模板提供编译期类型安全,不再依赖运行时字符串序列化
  • 回调使用 std::function 值语义,无需用户继承 SPI 基类
  • 完全独立的命名空间,不与 datacenter 产生编译或链接依赖
  • API 命名和使用流程(Create → SetQos → Connect → 操作 → Stop)与 datacenter 保持一致,降低迁移成本

2. 目录结构

text
dds-api/
├── CMakeLists.txt                     # 库构建 + 安装导出
├── cmake/
│   ├── service_idl_inject.py         # Service IDL 字段注入脚本
│   └── service_idl_macros.cmake      # service_idlcxx_generate() 宏
├── docs/
│   ├── PRD.md                         # 需求规格说明书
│   ├── DESIGN_PARTION.md              # 设计文档(当前)
│   ├── DESIGN_V2.md                   # 历史版本设计文档
│   └── TODO.md                        # 待办跟踪
├── api/                               # 对外公开接口头文件(零 datacenter 依赖)
│   ├── dds_status_code.h              # StatusCode 枚举
│   ├── dds_qos.h                      # Writer/Reader QoS 结构体
│   ├── dds_api_base.h                 # IDdsApiBase — 独立基类
│   ├── dds_pub_api.h                  # IDdsPubApi<T>
│   ├── dds_sub_api.h                  # IDdsSubApi<T>(std::function 回调)
│   ├── dds_factory.h                  # DDSApiFactory — 工厂 + 模板实现
│   ├── dds_service_server_api.h       # IDdsServiceServerApi<Req,Resp>
│   └── dds_service_client_api.h       # IDdsServiceClientApi<Req,Resp>
├── impl/                              # 实现(模板全量 .h + 工厂 .cpp)
│   ├── dds_pub_api_impl.h             # DdsPubApiImpl<T> + DdsPubListener<T>
│   ├── dds_sub_api_impl.h             # DdsSubApiImpl<T> + DdsSubListener<T>
│   ├── dds_service_server_api_impl.h  # DdsServiceServerApiImpl<Req,Resp>
│   ├── dds_service_client_api_impl.h  # DdsServiceClientApiImpl<Req,Resp>
│   ├── dds_service_protocol.h         # IServiceProtocol<Req,Resp> 策略接口
│   ├── dds_service_protocol_default.h  # DefaultServiceProtocol — 原生模式策略
│   ├── dds_service_protocol_ros2.h    # Ros2ServiceProtocol — ROS2 兼容策略
│   ├── dds_ros2_compat.h              # Ros2CompatHelper 工具(serviceid/GUID 等)
│   └── dds_factory.cpp                # DDSApiFactory 非模板实现
└── example/
    ├── CMakeLists.txt                 # 示例工程构建
    ├── idl/
    │   ├── test_msg.idl               # PubSub 示例消息
    │   ├── service_test_msg.idl       # Service 示例消息
    │   ├── ros_topic.idl              # ROS2 Topic 示例消息
    │   └── ros2_add_ints.idl          # ROS2 Service 示例消息
    ├── example_topic_pub.cpp
    ├── example_topic_sub.cpp
    ├── example_service_server.cpp
    ├── example_service_client.cpp
    ├── example_ros2_topic_pub.cpp
    ├── example_ros2_topic_sub.cpp
    ├── example_ros2_service_server.cpp
    └── example_ros2_service_client.cpp

3. 类图

3.1 整体类层次

mermaid
classDiagram
    direction TB

    class IDdsApiBase {
        <<abstract>>
        +Connect(topic_name: string) bool
        +Stop() void
        +IsConnected() bool
        +Name() string
        +SetNodeName(node_name: string) void
        +EnableRos2Compat(enable: bool) void
    }

    class IDdsPubApi~T~ {
        <<abstract>>
        +PubMsg(msg: T) bool
        +SubNotify(cb: function) void
        +UnSubNotify(cb: function) void
        +SetDurability() void
        +SetReliability(reliable: bool) void
        +SetDepth(depth: uint32) bool
        +SetKeepAll() void
        +SetQos(qos: DdsWriterQosProfile) bool
        +GetQos() DdsWriterQosProfile
    }

    class IDdsSubApi~T~ {
        <<abstract>>
        +MsgCallback: function~void(T)~
        +EnableDispatchThread(enable: bool) void
        +SetDurability() void
        +SetReliability(reliable: bool) void
        +SetDepth(depth: uint32) bool
        +SetKeepAll() void
        +SetQos(qos: DdsReaderQosProfile) bool
        +GetQos() DdsReaderQosProfile
    }

    class IDdsServiceServerApi~Req_Resp~ {
        <<abstract>>
        +Handler: function~StatusCode(Req, Resp)~
        +EnableDispatchThread(enable: bool) void
        +SetHandler(handler: Handler) void
    }

    class IDdsServiceClientApi~Req_Resp~ {
        <<abstract>>
        +RespCallback: function~void(StatusCode, Resp)~
        +EnableDispatchThread(enable: bool) void
        +Request(req: Req, resp: Resp, timeout_ms: int) StatusCode
        +AsyncRequest(req: Req, cb: RespCallback) StatusCode
    }


    class DDSApiFactory {
        <<singleton>>
        +GetInstance() DDSApiFactory
        +GetParticipant(domain_id: uint32) shared_ptr
        +ReleaseParticipant() void
        +Create~T_API~(name: string) shared_ptr~T_API~
        +Create~T_API~(name: string, cb: MsgCallback) shared_ptr~T_API~
        +Get(name: string) DdsApiBasePtr
        +Release(name: string) void
        +Clear() void
        +SetNodeName(node_name: string) void
        +SetRos2Compat(enable: bool) void
    }

    IDdsApiBase <|-- IDdsPubApi
    IDdsApiBase <|-- IDdsSubApi
    IDdsApiBase <|-- IDdsServiceServerApi
    IDdsApiBase <|-- IDdsServiceClientApi

    DDSApiFactory ..> IDdsApiBase : creates

3.2 PubSub 类图

mermaid
classDiagram
    direction TB

    class `NoOpDataWriterListener~T~` {
        <<abstract>>
        +on_publication_matched(writer, status) void
    }

    class `NoOpDataReaderListener~T~` {
        <<abstract>>
        +on_data_available(reader) void
    }

    class IDdsPubApi~T~ {
        <<interface>>
        +PubMsg(msg: T) bool
        +SubNotify(cb) void
        +UnSubNotify(cb) void
        +SetDurability() void
        +SetReliability(reliable: bool) void
        +SetDepth(depth: uint32) bool
        +SetKeepAll() void
        +SetQos(qos: DdsWriterQosProfile) bool
        +GetQos() DdsWriterQosProfile
        #CreateApi(name: string)$ shared_ptr
    }

    class DdsPubApiImpl~T~ {
        -m_name: string
        -m_node_name: string
        -m_connected: atomic~bool~
        -m_pub_mtx: mutex
        -m_qos: DdsWriterQosProfile
        -m_sub_cb: function
        -m_unsub_cb: function
        -m_topic: Topic~T~
        -m_publisher: Publisher
        -m_writer: DataWriter~T~
        -m_listener: unique_ptr~DdsPubListener~
        -ApplyQos(wqos: DataWriterQos) void
        +CreateApiImpl(name: string)$ shared_ptr
    }

    class DdsPubListener~T~ {
        -m_sub_cb: function
        -m_unsub_cb: function
        -m_topic_name: string
        +on_publication_matched(writer, status) void
    }

    class IDdsSubApi~T~ {
        <<interface>>
        +MsgCallback: function~void(T)~
        +EnableDispatchThread(enable: bool) void
        +SetDurability() void
        +SetReliability(reliable: bool) void
        +SetDepth(depth: uint32) bool
        +SetKeepAll() void
        +SetQos(qos: DdsReaderQosProfile) bool
        +GetQos() DdsReaderQosProfile
        #CreateApi(name: string, cb: MsgCallback)$ shared_ptr
    }

    class DdsSubApiImpl~T~ {
        -m_name: string
        -m_node_name: string
        -m_connected: atomic~bool~
        -m_sub_mtx: mutex
        -m_cb_mtx: mutex
        -m_callback: MsgCallback
        -m_enable_dispatch_thread: bool
        -m_dispatch_thread: thread
        -m_msg_queue: deque~T~
        -m_queue_mtx: mutex
        -m_queue_cv: condition_variable
        -m_qos: DdsReaderQosProfile
        -m_reader: DataReader~T~
        -m_listener: unique_ptr~DdsSubListener~
        -DispatchLoop() void
        -EnqueueMessage(msg: T) void
        -ApplyQos(rqos: DataReaderQos) void
        +CreateApiImpl(name: string, cb: MsgCallback)$ shared_ptr
    }

    class DdsSubListener~T~ {
        -m_callback: MsgCallback
        -m_use_dispatch_thread: bool
        -m_dispatch_fn: DispatchFn
        +on_data_available(reader) void
    }

    IDdsPubApi <|.. DdsPubApiImpl : implements
    DdsPubApiImpl --> DdsPubListener : owns
    DdsPubListener --|> `NoOpDataWriterListener~T~` : extends

    IDdsSubApi <|.. DdsSubApiImpl : implements
    DdsSubApiImpl --> DdsSubListener : owns
    DdsSubListener --|> `NoOpDataReaderListener~T~` : extends

3.3 Service 类图

mermaid
classDiagram
    direction TB

    class `NoOpDataReaderListener~T~` {
        <<abstract>>
        +on_data_available(reader) void
    }

    class DdsServiceClientListener~Resp~ {
        -m_dispatch_fn: function
        -m_async_waiters: map~string, RespCallback~
        +on_data_available(reader) void
    }

    class DdsServiceClientWriterListener {
        -m_match_cv: condition_variable
        -m_match_ready: atomic~bool~
        +on_publication_matched(writer, status) void
    }

    class SyncWaiter~Resp~ {
        +mtx: mutex
        +cv: condition_variable
        +ready: bool
        +resp: Resp
    }

    class IDdsServiceServerApi~Req_Resp~ {
        <<interface>>
        +Handler: function~StatusCode(Req, Resp)~
        +MsgCallback: Handler
        +EnableDispatchThread(enable: bool) void
        +SetHandler(handler: Handler) void
        #CreateApi(name, handler)$ shared_ptr
    }

    class DdsServiceServerApiImpl~Req_Resp~ {
        -m_name: string
        -m_connected: atomic~bool~
        -m_mtx: mutex
        -m_handler: Handler
        -m_enable_dispatch_thread: bool
        -m_req_queue: deque~Req~
        -m_queue_mtx: mutex
        -m_dispatch_running: bool
        -m_dispatch_thread: thread
        -m_req_reader: DataReader~Req~
        -m_rep_writer: DataWriter~Resp~
        -m_listener: unique_ptr~`DdsServiceServerListener~Req~`~
        -HandleRequest(req: Req) void
        -EnqueueRequest(req: Req) void
        -DispatchLoop() void
    }

    class DdsServiceServerListener~Req~ {
        -m_dispatch_fn: function
        +on_data_available(reader) void
    }

    class IDdsServiceClientApi~Req_Resp~ {
        <<interface>>
        +RespCallback: function~void(StatusCode, Resp)~
        +EnableDispatchThread(enable: bool) void
        +Request(req, resp, timeout_ms) StatusCode
        +AsyncRequest(req, cb) StatusCode
        #CreateApi(name)$ shared_ptr
    }

    class DdsServiceClientApiImpl~Req_Resp~ {
        -m_name: string
        -m_connected: atomic~bool~
        -m_mtx: mutex
        -m_sync_waiters: map~string, shared_ptr~`SyncWaiter~Resp~`~~
        -m_async_waiters: map~string, RespCallback~
        -m_enable_dispatch_thread: bool
        -m_reply_queue: deque~Resp~
        -m_reply_queue_mtx: mutex
        -m_dispatch_running: bool
        -m_dispatch_thread: thread
        -m_req_writer: DataWriter~Req~
        -m_rep_reader: DataReader~Resp~
        -m_match_mtx: mutex
        -m_match_cv: condition_variable
        -m_match_ready: bool
        -HandleReply(resp: Resp) void
        -EnqueueReply(resp: Resp) void
        -DispatchLoop() void
        -WaitForMatchReady(timeout_ms: int) bool
    }

    `NoOpDataReaderListener~T~` <|.. DdsServiceServerListener : 对于Req类型
    IDdsServiceServerApi <|.. DdsServiceServerApiImpl : implements
    DdsServiceServerApiImpl --> DdsServiceServerListener : owns
    DdsServiceServerListener --|> `NoOpDataReaderListener~Req~` : extends

    IDdsServiceClientApi <|.. DdsServiceClientApiImpl : implements
    DdsServiceClientApiImpl --> `SyncWaiter~Resp~` : manages
    DdsServiceClientApiImpl --> DdsServiceClientListener : owns
    DdsServiceClientApiImpl --> DdsServiceClientWriterListener : owns

3.5 Factory 类图

mermaid
classDiagram
    direction TB

    class DDSApiFactory {
        <<singleton>>
        -m_mtx: mutex
        -m_api_map: unordered_map~string, DdsApiBasePtr~
        -m_participant: shared_ptr~DomainParticipant~
        -m_participant_mtx: mutex
        -m_domain_id: uint32
        -m_node_name: string
        -m_ros2_compat: bool
        -DDSApiFactory()
        -~DDSApiFactory()
        -CreateImpl(name, creator)$ DdsApiBasePtr
        +GetInstance()$ DDSApiFactory
        +GetParticipant(domain_id)$ shared_ptr
        +ReleaseParticipant()$ void
        +Create~T_API~(name)$ shared_ptr~T_API~
        +Create~T_API~(name, cb)$ shared_ptr~T_API~
        +Get(name)$ DdsApiBasePtr
        +Get~T~(name)$ shared_ptr~T~
        +Release(name)$ void
        +Clear()$ void
        +SetNodeName(node_name)$ void
        +SetRos2Compat(enable: bool)$ void
    }

    class IDdsApiBase {
        <<abstract>>
    }

    class IDdsPubApi~T~ {
        #CreateApi(name)$ shared_ptr
    }
    class IDdsSubApi~T~ {
        #CreateApi(name, cb)$ shared_ptr
    }
    class IDdsServiceServerApi~Req_Resp~ {
        #CreateApi(name, handler)$ shared_ptr
    }
    class IDdsServiceClientApi~Req_Resp~ {
        #CreateApi(name)$ shared_ptr
    }

    DDSApiFactory ..> IDdsPubApi : Create
    DDSApiFactory ..> IDdsSubApi : Create
    DDSApiFactory ..> IDdsServiceServerApi : Create
    DDSApiFactory ..> IDdsServiceClientApi : Create
    DDSApiFactory o-- IDdsApiBase : manages

4. 接口说明

4.1 StatusCode — 错误码

cpp
namespace dds_wrapper {

enum class StatusCode : int {
    SUCCESS          =  0,   // 操作成功
    INVALID_ARGUMENT = -1,   // 参数非法(如 depth > 100)
    NOT_CONNECTED    = -2,   // 未 Connect() 就进行操作
    DDS_ERROR        = -3,   // Cyclone DDS 底层异常
    TIMEOUT          = -4,   // 操作超时
    DUPLICATE_NAME   = -5,   // 工厂中同名 API 已存在
};

inline bool ok(StatusCode c) noexcept { return c == StatusCode::SUCCESS; }

} // namespace dds_wrapper

4.2 QoS 配置结构体

cpp
namespace dds_wrapper {

enum class DdsHistoryKind : uint8_t      { KEEP_LAST = 0, KEEP_ALL = 1 };
enum class DdsReliabilityKind : uint8_t  { RELIABLE = 0, BEST_EFFORT = 1 };
enum class DdsDurabilityKind : uint8_t   { VOLATILE = 0, TRANSIENT_LOCAL = 1 };

struct DdsWriterQosProfile {
    DdsHistoryKind     history    {DdsHistoryKind::KEEP_LAST};
    uint32_t           depth      {10};
    DdsReliabilityKind reliability{DdsReliabilityKind::RELIABLE};
    DdsDurabilityKind  durability {DdsDurabilityKind::VOLATILE};
};

struct DdsReaderQosProfile {
    DdsHistoryKind     history    {DdsHistoryKind::KEEP_LAST};
    uint32_t           depth      {10};
    DdsReliabilityKind reliability{DdsReliabilityKind::RELIABLE};
    DdsDurabilityKind  durability {DdsDurabilityKind::VOLATILE};
};

} // namespace dds_wrapper

4.3 IDdsApiBase — 基类

替代 IMsgBusApiBase,只保留 DDS 场景需要的接口。

cpp
namespace dds_wrapper {

class IDdsApiBase {
public:
    virtual ~IDdsApiBase() = default;
    virtual bool        Connect(const std::string& topic_name) = 0;
    virtual void        Stop() = 0;
    virtual bool        IsConnected() const = 0;
    virtual std::string Name() const = 0;
    virtual void        SetNodeName(const std::string& node_name) = 0;

    /**
     * @brief 启用/关闭 Service Topic 自动加 rq/rr 前缀(须在 Connect() 前调用)
     *
     * 仅影响 Topic 命名规则:
     *   开启:rq/{name}Request / rr/{name}Reply
     *   关闭:{name}Request / {name}Reply
     * IDL 关联字段(header_guid/header_seq)、QoS、user_data 两种模式完全一致。
     * PubSub 不受影响。默认关闭。
     */
    virtual void EnableRos2Compat(bool enable) = 0;
};

} // namespace dds_wrapper

与 datacenter IMsgBusApiBase 对比 — 删除的方法:

旧方法 删除原因
SetHost(ip, port) ZMQ/网络地址概念,DDS 通过 DomainParticipant 发现
SetHighWaterMark(hwm) ZMQ 专有,DDS 用 QoS depth 代替
SetMsgExpires(ms) ZMQ 专有
EnableLockFree() 共享内存专有
SetLoopTimeout(ms) epoll 轮询概念,DDS 使用 Listener 模型
SetZMQContext(ctx) ZMQ 专有
Release() Stop() 语义合并
Join() DDS Listener 无需 join
Type() / MBApiType datacenter 内部分类,不暴露
GetMonitoringInfo() 监控序列化,后续扩展

4.4 IDdsPubApi<T> — 发布接口

cpp
namespace dds_wrapper {

template <typename T>
class IDdsPubApi : public IDdsApiBase {
public:
    virtual ~IDdsPubApi() override = default;

    // ── 核心操作 ──
    virtual bool PubMsg(const T& msg) = 0;

    // ── 订阅者上/下线通知(Connect 前注册)──
    virtual void SubNotify(std::function<void(const std::string& topic)> cb) = 0;
    virtual void UnSubNotify(std::function<void(const std::string& topic)> cb) = 0;

    // ── QoS 配置(Connect 前调用)──
    virtual void SetDurability() = 0;
    virtual void SetReliability(bool reliable) = 0;
    virtual bool SetDepth(uint32_t depth) = 0;          // depth > 100 返回 false
    virtual void SetKeepAll() = 0;
    virtual bool SetQos(const DdsWriterQosProfile& qos) = 0;
    virtual DdsWriterQosProfile GetQos() const = 0;

protected:
    static std::shared_ptr<IDdsPubApi<T>> CreateApi(const std::string& name);
};

} // namespace dds_wrapper

对应 datacenter IPubApi 接口映射:

datacenter IPubApi dds_wrapper IDdsPubApi<T> 说明
PubMsg(topic, STRING) PubMsg(T) Topic 在 Connect 时绑定;消息类型化
SubNotify(cb) SubNotify(cb) 1:1
UnSubNotify(cb) UnSubNotify(cb) 1:1
SetDurability() SetDurability() 1:1
SetDepth(n) SetDepth(n) 1:1
SetKeepAll() SetKeepAll() 1:1
RegisterSpi(spi*) 删除 使用 std::function
SetReliability(bool) 新增
SetQos(profile) / GetQos() 新增,统一 QoS 配置

4.5 IDdsSubApi<T> — 订阅接口

cpp
namespace dds_wrapper {

template <typename T>
class IDdsSubApi : public IDdsApiBase {
public:
    using MsgCallback = std::function<void(const T& msg)>;
    virtual ~IDdsSubApi() override = default;

    // ── 线程模型(Connect 前调用)──
    virtual void EnableDispatchThread(bool enable = true) = 0;

    // ── QoS 配置(Connect 前调用)──
    virtual void SetDurability() = 0;
    virtual void SetReliability(bool reliable) = 0;
    virtual bool SetDepth(uint32_t depth) = 0;
    virtual void SetKeepAll() = 0;
    virtual bool SetQos(const DdsReaderQosProfile& qos) = 0;
    virtual DdsReaderQosProfile GetQos() const = 0;

protected:
    static std::shared_ptr<IDdsSubApi<T>> CreateApi(
        const std::string& name, MsgCallback cb = nullptr);
};

} // namespace dds_wrapper

对应 datacenter ISubApi + ISubSpi 接口映射:

datacenter dds_wrapper 说明
继承 ISubSpi,重写 OnRecvSubMsg(topic, msg) Create<SubApi>(name, [](const T& msg){...}) lambda 回调替代 SPI 继承
SubMsg(topic) Connect(topic) 时隐式订阅 简化:一个 Sub 对应一个 Topic
UnSubMsg(topic) Stop() DDS 模型无动态订阅/退订
RegisterSpi(spi*) 删除 回调在 Create 时传入,不可运行时替换
EnableDispatchThread(true) 新增,可选独立分发线程

4.6 IDdsServiceServerApi<Req,Resp> — 服务端

cpp
namespace dds_wrapper {

template <typename Req, typename Resp>
class IDdsServiceServerApi : public IDdsApiBase {
public:
    using Handler = std::function<StatusCode(const Req& req, Resp& resp)>;
    using MsgCallback = Handler;   // 复用工厂 Create(name, cb) 签名

    virtual ~IDdsServiceServerApi() override = default;

    // 开启后 DDS 监听线程只做入队,独立 dispatch 线程串行调用 Handler
    virtual void EnableDispatchThread(bool enable) = 0;
    virtual void SetHandler(Handler handler) = 0;   // Connect 前有效

protected:
    static std::shared_ptr<IDdsServiceServerApi<Req, Resp>>
        CreateApi(const std::string& name, Handler handler);
};

} // namespace dds_wrapper

Req/Resp IDL 约束: Req 和 Resp 类型须包含 uint64 header_guid + int64 header_seq 字段(统一 wire 格式,与 ROS 2 一致)。框架自动填充/透传这两个字段,用户业务代码无需关心。

对应 datacenter IServiceServerApi 映射:

datacenter dds_wrapper 说明
继承 IServiceServerSpi,重写 OnRequest(msg, ctx) Create<ServerApi>(name, handler) handler 返回 StatusCode
RepMsg(rsp, ctx) Handler 中直接填写 Resp& resp 回复由框架自动发送
EnableLockFree() 删除 DDS 无此概念

4.7 IDdsServiceClientApi<Req,Resp> — 客户端

cpp
namespace dds_wrapper {

template <typename Req, typename Resp>
class IDdsServiceClientApi : public IDdsApiBase {
public:
    using RespCallback = std::function<void(StatusCode, const Resp&)>;

    virtual ~IDdsServiceClientApi() override = default;

    // 开启后 DDS 监听线程只做入队,独立 dispatch 线程处理回复并调用异步回调
    virtual void EnableDispatchThread(bool enable) = 0;

    // 同步请求(阻塞,header_guid/header_seq 由 API 内部自动生成)
    virtual StatusCode Request(const Req& req, Resp& resp, int timeout_ms = 5000) = 0;

    // 异步请求(非阻塞,结果回调)
    virtual StatusCode AsyncRequest(const Req& req, RespCallback cb) = 0;

protected:
    static std::shared_ptr<IDdsServiceClientApi<Req, Resp>>
        CreateApi(const std::string& name);
};

} // namespace dds_wrapper

客户端匹配策略:

  • Connect() 后短窗口预热匹配(1 s)
  • Request() / AsyncRequest() 发送前再次等待匹配就绪
  • 请求关联 ID 由 header_guid(DataWriter GUID hash)+ header_seq(单调递增)自动生成

对应 datacenter IServiceClientApi 映射:

datacenter dds_wrapper 说明
ReqMsgSync(req, rsp, timeout) Request(req, resp, timeout_ms) 返回 StatusCode 而非 bool
ReqMsgAsync(req, request_id) + SPI OnRspMsg AsyncRequest(req, cb) lambda 回调替代 SPI
EnableAsync(bool) 删除 同步/异步由调用方法决定
EnableDispatchThread(true) 新增,可选独立分发线程(Client/Server 均支持)

4.10 DDSApiFactory — 工厂

cpp
namespace dds_wrapper {

using DdsApiBasePtr = std::shared_ptr<IDdsApiBase>;

class DDSApiFactory {
public:
    static DDSApiFactory& GetInstance();

    // ── DomainParticipant 管理 ──
    static std::shared_ptr<dds::domain::DomainParticipant>
        GetParticipant(uint32_t domain_id = 0);
    static void ReleaseParticipant();

    // ── API 创建(幂等,同名返回已有实例)──
    template <typename T_API>
    static std::shared_ptr<T_API> Create(const std::string& name);

    template <typename T_API>
    static std::shared_ptr<T_API> Create(const std::string& name,
                                          typename T_API::MsgCallback cb);

    // ── API 查找 ──
    static DdsApiBasePtr Get(const std::string& name);
    template <typename T>
    static std::shared_ptr<T> Get(const std::string& name);

    // ── API 释放 ──
    static void Release(const std::string& name);
    static void Clear();
    static void SetNodeName(const std::string& node_name);

    /**
     * @brief 设置全局 ROS2 兼容默认值
     *
     * 后续通过 Create() 创建的 API 实例将自动继承此设置。
     * 已创建的实例不受影响。
     * 单个 API 实例仍可通过 EnableRos2Compat() 覆盖。
     */
    static void SetRos2Compat(bool enable);
};

} // namespace dds_wrapper

Create 模板路由机制:

API 类型 Create 调用 内部路由
IDdsPubApi<T> Create<IDdsPubApi<T>>("name") IDdsPubApi<T>::CreateApi(name)
IDdsSubApi<T> Create<IDdsSubApi<T>>("name", cb) IDdsSubApi<T>::CreateApi(name, cb)
IDdsServiceServerApi<Req,Resp> Create<...>("name", handler) CreateApi(name, handler)
IDdsServiceClientApi<Req,Resp> Create<...>("name") CreateApi(name)

对应 datacenter MBApiFactory 映射:

datacenter MBApiFactory dds_wrapper DDSApiFactory 说明
GetZMQContext() GetParticipant(domain_id) 通信基础设施管理
Create<T>(name, spi*) Create<T>(name) / Create<T>(name, cb) 模板参数即接口类型
Get<T>(name) Get<T>(name) 1:1
Release(name) Release(name) 1:1
Clear(ignore_list) Clear() 删除 ignore_list
SetNodeName(name) SetNodeName(name) 1:1
SetRos2Compat(bool) 新增:全局 ROS 2 兼容默认值
InitRuntime(app_name) 删除 iceoryx 专有

5. 流程图

5.1 PubSub 通信时序

mermaid
sequenceDiagram
    participant UserPub as 用户代码(Pub)
    participant Factory as DDSApiFactory
    participant PubImpl as DdsPubApiImpl
    participant DDS as CycloneDDS
    participant SubImpl as DdsSubApiImpl
    participant UserSub as 用户代码(Sub)

    Note over UserPub,UserSub: ── 创建阶段 ──
    UserPub->>Factory: Create<IDdsPubApi<T>>("pub1")
    Factory->>PubImpl: new DdsPubApiImpl("pub1")
    Factory-->>UserPub: shared_ptr<IDdsPubApi<T>>

    UserSub->>Factory: Create<IDdsSubApi<T>>("sub1", callback)
    Factory->>SubImpl: new DdsSubApiImpl("sub1", cb)
    Factory-->>UserSub: shared_ptr<IDdsSubApi<T>>

    Note over UserPub,UserSub: ── 配置阶段(Connect 前)──
    UserPub->>PubImpl: SetDurability()
    UserPub->>PubImpl: SetDepth(20)
    UserSub->>SubImpl: SetDurability()
    UserSub->>SubImpl: EnableDispatchThread(true) [可选]

    Note over UserPub,UserSub: ── 连接阶段 ──
    UserPub->>PubImpl: Connect("/sensor/imu")
    PubImpl->>DDS: 创建 Topic + Publisher + DataWriter
    PubImpl-->>UserPub: true

    UserSub->>SubImpl: Connect("/sensor/imu")
    SubImpl->>DDS: 创建 Topic + Subscriber + DataReader + Listener
    SubImpl-->>UserSub: true

    Note over UserPub,UserSub: ── 通信阶段 ──
    UserPub->>PubImpl: PubMsg(msg)
    PubImpl->>DDS: DataWriter.write(msg)
    DDS->>SubImpl: Listener::on_data_available()
    SubImpl->>UserSub: callback(msg)

    Note over UserPub,UserSub: ── 释放阶段 ──
    UserPub->>PubImpl: Stop()
    UserSub->>SubImpl: Stop()

5.2 Service 通信时序

mermaid
sequenceDiagram
    participant Client as 客户端用户
    participant CliImpl as ServiceClientApiImpl
    participant DDS as CycloneDDS
    participant SvrImpl as ServiceServerApiImpl
    participant Server as 服务端 Handler

    Note over Client,Server: ── 创建与连接 ──
    Client->>CliImpl: Create + Connect("my_service")
    CliImpl->>DDS: 创建 request Writer + reply Reader
    CliImpl->>CliImpl: WaitForMatchReady(1s) [预热]

    Server->>SvrImpl: Create(name, handler) + Connect("my_service")
    SvrImpl->>DDS: 创建 request Reader + reply Writer

    Note over Client,Server: ── 同步请求 ──
    Client->>CliImpl: Request(req, resp, 5000)
    CliImpl->>CliImpl: WaitForMatchReady(timeout)
    CliImpl->>CliImpl: FillRequestId → header_guid=writerGuidHash, header_seq=seq++
    CliImpl->>DDS: reqWriter.write(req)
    CliImpl->>CliImpl: SyncWaiter 等待...

    DDS->>SvrImpl: Listener::on_data_available(req)
    SvrImpl->>Server: handler(req, resp)
    Server-->>SvrImpl: StatusCode::SUCCESS
    SvrImpl->>SvrImpl: 兜底透传 header_guid/header_seq
    SvrImpl->>DDS: repWriter.write(resp)

    DDS->>CliImpl: Listener::on_data_available(resp)
    CliImpl->>CliImpl: HandleReply → 匹配 SyncWaiter
    CliImpl->>CliImpl: SyncWaiter.notify()
    CliImpl-->>Client: StatusCode::SUCCESS, resp

    Note over Client,Server: ── 异步请求 ──
    Client->>CliImpl: AsyncRequest(req, callback)
    CliImpl->>DDS: reqWriter.write(req)
    DDS->>SvrImpl: (同上处理)
    SvrImpl->>DDS: repWriter.write(resp)
    DDS->>CliImpl: HandleReply → 匹配 AsyncWaiter
    CliImpl->>Client: callback(SUCCESS, resp)

5.4 Factory 创建流程

mermaid
flowchart TD
    A[用户调用 DDSApiFactory::Create] --> B{name 已存在?}
    B -->|是| C[返回已有 shared_ptr]
    B -->|否| D[调用 T_API::CreateApi]
    D --> E[构造 Impl 实例]
    E --> F[设置 NodeName]
    F --> G[注册到 m_api_map]
    G --> H[返回 shared_ptr]

    subgraph GetParticipant
        P1[首次调用] --> P2{m_participant 存在?}
        P2 -->|否| P3[创建 DomainParticipant]
        P3 --> P4[缓存 shared_ptr]
        P2 -->|是| P5[返回已有]
    end

    D -.-> P1

5.5 线程模型

mermaid
flowchart LR
    subgraph 用户线程
        U1[PubMsg / Request]
    end

    subgraph CycloneDDS Listener 线程
        L1[on_data_available]
        L2[on_publication_matched]
        L3[on_subscription_matched]
    end

    subgraph 可选分发线程["可选: 独立分发线程(Sub / ServiceServer / ServiceClient 均支持)"]
        D1[DispatchLoop]
        D2[deque 消息/请求/回复队列]
    end

    U1 -->|write| L1
    L1 -->|dispatch_thread=false| CB1[用户回调/Handler 直接调用]
    L1 -->|dispatch_thread=true| D2
    D2 --> D1
    D1 --> CB2[用户回调/Handler 线程隔离]

线程规则:

  • 默认回调运行于 CycloneDDS Listener 线程,不得长时间阻塞
  • EnableDispatchThread(true) 后,该实例有独立分发线程,回调/Handler 不阻塞 Listener
  • 三类实例均支持:IDdsSubApi(消息回调)、IDdsServiceServerApi(Handler)、IDdsServiceClientApi(异步回调)
  • 跨回调共享数据由用户自行加锁

6. ROS 2 兼容设计

6.1 接口开关

通过运行时 API 控制 ROS 2 兼容模式,不使用编译宏,两种模式的代码始终编译。

两级控制:

层级 API 作用域 说明
工厂全局 DDSApiFactory::SetRos2Compat(true) 后续所有新建 API 实例 设置默认值,已创建实例不受影响
单实例 api->EnableRos2Compat(true) 仅当前实例 覆盖工厂默认值,须在 Connect() 前调用

典型用法:

cpp
// 方式 1:全局启用(所有后续 Create 的 Service 自动加 rq/rr 前缀)
DDSApiFactory::SetRos2Compat(true);
auto cli = DDSApiFactory::Create<IDdsServiceClientApi<Req, Resp>>("cli");
cli->Connect("add_two_ints");   // → rq/add_two_intsRequest, rr/add_two_intsReply

// 方式 2:单实例启用(不影响其他实例)
auto cli = DDSApiFactory::Create<IDdsServiceClientApi<Req, Resp>>("cli");
cli->EnableRos2Compat(true);    // 覆盖工厂默认值
cli->Connect("add_two_ints");   // → rq/add_two_intsRequest, rr/add_two_intsReply

// 方式 3:全局启用后单实例关闭(不加前缀,适合纯内部服务)
DDSApiFactory::SetRos2Compat(true);
auto cli1 = DDSApiFactory::Create<IDdsServiceClientApi<Req, Resp>>("cli1");
cli1->Connect("svc_a");         // → rq/svc_aRequest, rr/svc_aReply

auto cli2 = DDSApiFactory::Create<IDdsServiceClientApi<Req, Resp>>("cli2");
cli2->EnableRos2Compat(false);  // 覆盖:不加前缀
cli2->Connect("svc_b");         // → svc_bRequest, svc_bReply

// 方式 4:开关关闭时与 ROS2 互通 — 手动在 service_name 中加前缀
// wire 协议两种模式完全一致,开关仅影响 topic 名前缀
// ROS2 约定 request=rq/ reply=rr/(两者前缀不同),手动方式无法同时满足
// 正确做法:对需要和 ROS2 通信的实例使用开关即可(方式 1~3)

开关影响范围:

项目 关闭(默认) 开启
PubSub Topic 命名 {topic_name} rt/{topic_name}
Service Topic 命名 {name}Request / {name}Reply rq/{name}Request / rr/{name}Reply
Service IDL 结构 用户字段 + uint64 header_guid + int64 header_seq 同左
Service QoS ROS 2 约定(请求 RELIABLE+KEEP_LAST(10),回复 RELIABLE+KEEP_ALL) 同左
Service user_data serviceid=<GUID hex>; 同左

区别仅在 Topic 名前缀:PubSub 加 rt/,Service 加 rq/ / rr/,wire 格式统一,确保两种模式均可与 ROS 2 节点互通。

6.2 Topic 命名规则映射

mermaid
flowchart LR
    subgraph 默认模式["默认模式(开关关闭)"]
        P1["Connect('imu')"] --> PT1["imu"]
        S1["Connect('add_svc')"] --> T1["add_svcRequest"]
        S1 --> T2["add_svcReply"]
    end

    subgraph ROS2模式["ROS2 兼容模式(开关开启)"]
        P2["Connect('imu')"] --> PT2["rt/imu"]
        S2["Connect('add_svc')"] --> T3["rq/add_svcRequest"]
        S2 --> T4["rr/add_svcReply"]
    end

命名规则转换逻辑(impl 层内部):

cpp
// PubSub:Connect 中
const std::string effective_topic = m_ros2_compat ? ("rt/" + topic_name) : topic_name;

// Service:策略对象 MakeTopicNames()
namespace dds_wrapper::detail {

struct ServiceTopicNames {
    std::string request;
    std::string reply;
};

/// @brief 仅前缀不同,wire 协议两种模式完全一致
inline ServiceTopicNames MakeServiceTopicNames(const std::string& service_name,
                                                bool ros2_compat) {
    if (ros2_compat) {
        return { "rq/" + service_name + "Request",
                 "rr/" + service_name + "Reply" };
    }
    return { service_name + "Request",
             service_name + "Reply" };
}

} // namespace dds_wrapper::detail

6.3 IDL 类型结构(统一 wire 格式,两种模式共用)

开关状态不影响 IDL。Service 的 wire 格式始终使用 header_guid/header_seq 关联字段,与 ROS 2 rmw_cyclonedds_cpp 完全一致。

用户编写的 IDL(只含业务字段):

idl
// my_service_user.idl
module my_pkg { 
	module srv { 
		module dds_ {
		    struct MySrv_Request_ {
		        int32 param_a;
		        int32 param_b;
		    };
		    struct MySrv_Response_ {
		        int32 result;
		    };
		}; 
	}; 
};

service_idl_inject.py 注入后的 wire IDL(两种模式共用):

idl
// 自动生成,不可手动编辑
module my_pkg { 
	module srv { 
		module dds_ {
		    struct MySrv_Request_ {
		        uint64 header_guid;    // ← 自动注入
		        int64  header_seq;     // ← 自动注入
		        int32 param_a;
		        int32 param_b;
		    };
		    struct MySrv_Response_ {
		        uint64 header_guid;    // ← 自动注入
		        int64  header_seq;     // ← 自动注入
		        int32 result;
		    };
		}; 
	}; 
};

wire 关联字段:

字段 类型 说明
header_guid uint64 DataWriter GUID 低 8 字节,标识客户端身份
header_seq int64 单调递增序号,用于请求-应答关联

不再使用 string request_id。统一 header_guid/header_seq 既消除了两种模式的 IDL 差异,又避免了字符串分配的开销。

6.4 ROS 2 Service 兼容时序

mermaid
sequenceDiagram
    participant User as 用户代码
    participant Client as DdsServiceClientApiImpl
    participant DDS as CycloneDDS
    participant ROS2 as ROS2 Service Server

    User->>Client: EnableRos2Compat(true)
    User->>Client: Connect("add_two_ints")
    Client->>Client: 生成 serviceid GUID(两种模式均执行)
    Client->>Client: m_ros2_compat=true → Topic 名加 rq/rr 前缀
    Client->>Client: 创建 Topics:<br/>rq/add_two_intsRequest<br/>rr/add_two_intsReply
    Client->>DDS: DataWriter QoS:<br/>RELIABLE + KEEP_LAST(10)<br/>user_data = serviceid
    Client->>DDS: DataReader QoS:<br/>RELIABLE + KEEP_ALL<br/>user_data = serviceid

    Client->>Client: WaitForServer(匹配等待)
    ROS2-->>DDS: 服务端上线
    DDS-->>Client: on_publication_matched + on_subscription_matched

    Note over Client,ROS2: 发送请求
    Client->>Client: header_guid = writerGuidHash(writer)
    Client->>Client: header_seq = seq++
    Client->>DDS: rq/add_two_intsRequest<br/>{header_guid, header_seq, a=1, b=2}

    DDS->>ROS2: 服务端收到请求
    ROS2->>ROS2: 处理 → sum=3
    ROS2->>DDS: rr/add_two_intsReply<br/>{header_guid, header_seq, sum=3}

    DDS->>Client: on_data_available(resp)
    Client->>Client: 按 header_seq 匹配 waiter
    Client-->>Client: 返回 resp

6.5 策略模式(Strategy Pattern)实现

wire 协议(IDL 字段、QoS、user_data)两种模式完全一致,唯一变化点是 Topic 名前缀。 策略模式封装这一变化点,同时为未来扩展不同 wire 协议(如 Zenoh Bridge)保留接口。

设计模式选型分析:

模式 优势 劣势 适用性
标志位 if/else 极简 散落在 Connect/FillId/ExtractId 多处,扩展新协议时退化为 switch 当前够用但不可扩展
策略 变化点精确封装,主干不变 需定义接口 ✓ 最佳

7.5.1 策略接口

cpp
namespace dds_wrapper {

/// @brief Topic 命名策略接口(当前唯一变化点)
/// 接口保留完整扩展方法,未来新增 wire 协议时按需覆写。
template <typename Req, typename Resp>
class IServiceProtocol {
public:
    virtual ~IServiceProtocol() = default;

    /// @brief 根据 service_name 生成 request/reply Topic 名(唯一需覆写的方法)
    virtual detail::ServiceTopicNames MakeTopicNames(
        const std::string& service_name) const = 0;

    /// @brief 配置 DataWriter/DataReader QoS
    virtual void ApplyWriterQos(dds::pub::qos::DataWriterQos& wqos) const = 0;
    virtual void ApplyReaderQos(dds::sub::qos::DataReaderQos& rqos) const = 0;

    /// @brief Connect 后初始化钩子(GUID 提取、serviceid 生成)
    virtual void OnConnected(dds::pub::DataWriter<Req>& writer,
                             dds::sub::DataReader<Resp>& reader) = 0;

    /// @brief 填充请求关联 ID
    virtual void FillRequestId(Req& req) = 0;

    /// @brief 从响应中提取关联 ID,用于匹配 waiter
    virtual std::string ExtractRequestId(const Resp& resp) const = 0;

    /// @brief Connect 前初始化(serviceid 生成)
    virtual void SetupServiceId(dds_entity_t participant_entity) = 0;
};

} // namespace dds_wrapper

7.5.2 共享 wire 协议基类

cpp
namespace dds_wrapper {

/// @brief 统一实现所有 wire 协议行为,两种命名策略均继承此基类
template <typename Req, typename Resp>
class ServiceProtocolBase : public IServiceProtocol<Req, Resp> {
public:
    void ApplyWriterQos(dds::pub::qos::DataWriterQos& wqos) const override {
        wqos << dds::core::policy::Reliability::Reliable(
                    dds::core::Duration(0, 100'000'000))
             << dds::core::policy::History::KeepLast(10)
             << dds::core::policy::UserData(m_service_ud);
    }

    void ApplyReaderQos(dds::sub::qos::DataReaderQos& rqos) const override {
        rqos << dds::core::policy::Reliability::Reliable(
                    dds::core::Duration::from_secs(1))
             << dds::core::policy::History::KeepAll()
             << dds::core::policy::UserData(m_service_ud);
    }

    void OnConnected(dds::pub::DataWriter<Req>& writer,
                     dds::sub::DataReader<Resp>&) override {
        m_writer_guid_hash = detail::Ros2CompatHelper::writerGuidHash(
            writer.delegate()->get_ddsc_entity());
    }

    void FillRequestId(Req& req) override {
        req.header_guid(m_writer_guid_hash);
        req.header_seq(m_seq.fetch_add(1, std::memory_order_relaxed));
    }

    std::string ExtractRequestId(const Resp& resp) const override {
        return std::to_string(resp.header_seq());
    }

    void SetupServiceId(dds_entity_t participant_entity) override {
        m_client_guid = detail::Ros2CompatHelper::makeClientGuid(participant_entity);
        auto svc_id = detail::Ros2CompatHelper::formatServiceId(m_client_guid);
        m_service_ud.assign(svc_id.begin(), svc_id.end());
    }

protected:
    dds_guid_t         m_client_guid{};
    dds::core::ByteSeq m_service_ud;
    uint64_t           m_writer_guid_hash{0};
    std::atomic<int64_t> m_seq{1};
};

} // namespace dds_wrapper

7.5.3 默认策略(无前缀)

cpp
namespace dds_wrapper {

template <typename Req, typename Resp>
class DefaultServiceProtocol : public ServiceProtocolBase<Req, Resp> {
public:
    detail::ServiceTopicNames MakeTopicNames(
        const std::string& service_name) const override
    {
        return { service_name + "Request",
                 service_name + "Reply" };
    }
};

} // namespace dds_wrapper

7.5.4 ROS 2 策略(rq/rr 前缀)

cpp
namespace dds_wrapper {

template <typename Req, typename Resp>
class Ros2ServiceProtocol : public ServiceProtocolBase<Req, Resp> {
public:
    detail::ServiceTopicNames MakeTopicNames(
        const std::string& service_name) const override
    {
        return { "rq/" + service_name + "Request",
                 "rr/" + service_name + "Reply" };
    }
};

} // namespace dds_wrapper

7.5.5 类图

mermaid
classDiagram
    direction TB

    class IServiceProtocol {
        <<interface>>
        +MakeTopicNames(service_name) ServiceTopicNames
        +ApplyWriterQos(wqos) void
        +ApplyReaderQos(rqos) void
        +OnConnected(writer, reader) void
        +FillRequestId(req) void
        +ExtractRequestId(resp) string
        +SetupServiceId(pp_entity) void
    }

    class ServiceProtocolBase {
        <<abstract>>
        #m_client_guid: dds_guid_t
        #m_service_ud: ByteSeq
        #m_writer_guid_hash: uint64
        #m_seq: atomic~int64~
        +ApplyWriterQos() void
        +ApplyReaderQos() void
        +OnConnected() void
        +FillRequestId() void
        +ExtractRequestId() string
        +SetupServiceId() void
    }

    class DefaultServiceProtocol {
        +MakeTopicNames(service_name) ServiceTopicNames
    }

    class Ros2ServiceProtocol {
        +MakeTopicNames(service_name) ServiceTopicNames
    }

    class DdsServiceClientApiImpl {
        -m_protocol: unique_ptr~IServiceProtocol~
        -m_sync_waiters: map
        -m_async_waiters: map
        -m_match_ready: bool
        +EnableRos2Compat(enable: bool) void
        +Connect(service_name) bool
        +Request(req, resp, timeout) StatusCode
        +AsyncRequest(req, cb) StatusCode
    }

    class Ros2CompatHelper {
        <<utility>>
        +formatServiceId(guid) string
        +makeClientGuid(pp_entity) dds_guid_t
        +writerGuidHash(writer_entity) uint64
    }

    %% 继承关系
    IServiceProtocol <|-- ServiceProtocolBase
    ServiceProtocolBase <|-- DefaultServiceProtocol
    ServiceProtocolBase <|-- Ros2ServiceProtocol

    %% 依赖与组合
    DdsServiceClientApiImpl --> IServiceProtocol : uses
    ServiceProtocolBase --> Ros2CompatHelper : uses

Service Server 侧同理,持有 IServiceProtocol<Req,Resp> 策略对象。

7.5.6 Impl 主干逻辑(策略委托)

cpp
template <typename Req, typename Resp>
class DdsServiceClientApiImpl : public IDdsServiceClientApi<Req, Resp> {
public:
    explicit DdsServiceClientApiImpl(std::string name)
        : m_name(std::move(name)),
          m_protocol(std::make_unique<DefaultServiceProtocol<Req, Resp>>()) {}

    void EnableRos2Compat(bool enable) override {
        std::lock_guard<std::mutex> lk(m_mtx);
        if (m_connected) return;
        if (enable)
            m_protocol = std::make_unique<Ros2ServiceProtocol<Req, Resp>>();
        else
            m_protocol = std::make_unique<DefaultServiceProtocol<Req, Resp>>();
    }

    bool Connect(const std::string& service_name) override {
        std::lock_guard<std::mutex> lk(m_mtx);
        if (m_connected) return true;

        auto participant = DDSApiFactory::GetParticipant();
        if (!participant) return false;

        // ① serviceid 初始化(两种策略均通过基类统一执行)
        m_protocol->SetupServiceId(participant->delegate()->get_ddsc_entity());

        // ② 策略仅决定 Topic 名前缀
        auto names = m_protocol->MakeTopicNames(service_name);

        dds::topic::Topic<Req>  req_topic(*participant, names.request);
        dds::topic::Topic<Resp> rep_topic(*participant, names.reply);
        dds::pub::Publisher  publisher(*participant);
        dds::sub::Subscriber subscriber(*participant);

        auto wqos = publisher.default_datawriter_qos();
        auto rqos = subscriber.default_datareader_qos();

        // ③ 统一 QoS(两种策略相同)
        m_protocol->ApplyWriterQos(wqos);
        m_protocol->ApplyReaderQos(rqos);

        // ... 创建 DataWriter / DataReader / Listener ...

        // ④ GUID 提取(两种策略相同)
        m_protocol->OnConnected(m_req_writer, m_rep_reader);

        m_connected = true;
        return true;
    }

    StatusCode Request(const Req& req, Resp& resp, int timeout_ms) override {
        if (!WaitForMatchReady(timeout_ms)) return StatusCode::TIMEOUT;
        Req req_to_send = req;

        // ⑤ 统一填充 header_guid/header_seq
        m_protocol->FillRequestId(req_to_send);

        // ... 发送、等待回复(主干逻辑不变)...
    }

private:
    void HandleReply(const Resp& resp) {
        std::string req_id = m_protocol->ExtractRequestId(resp);
        if (req_id.empty()) return;
        // ... 匹配 SyncWaiter / AsyncWaiter(主干逻辑不变)...
    }

    std::unique_ptr<IServiceProtocol<Req, Resp>> m_protocol;
    // ... SyncWaiter、匹配等待等成员不变 ...
};

核心优势:

  • DefaultServiceProtocolRos2ServiceProtocol 各自只有 MakeTopicNames() 这一个方法
  • wire 协议(IDL、QoS、user_data)统一在 ServiceProtocolBase两种模式零代码重复
  • EnableRos2Compat(true) 只替换策略对象,Impl 主干无任何 if/else
  • 未来新增 wire 协议只需继承 ServiceProtocolBase 并覆写所需方法

7.5.7 Factory 传递 ROS 2 兼容默认值

cpp
DdsApiBasePtr DDSApiFactory::CreateImpl(const std::string& name, DdsApiCreator creator) {
    auto& inst = GetInstance();
    std::lock_guard<std::mutex> lock(inst.m_mtx);
    auto it = inst.m_api_map.find(name);
    if (it != inst.m_api_map.end()) return it->second;

    auto api = creator();
    if (api) {
        api->SetNodeName(inst.m_node_name);
        api->EnableRos2Compat(inst.m_ros2_compat);  // 继承工厂默认值 → 切换 Topic 命名策略
        inst.m_api_map.emplace(name, api);
    }
    return api;
}

7. IDL 工具链与 CMake 设计

7.1 标准 IDL 流程

mermaid
flowchart LR
    A["用户 .idl"] --> B["idlcxx_generate()"]
    B --> C["生成 .hpp/.cpp"]
    C --> D["编译链接"]

标准模式下,使用 CycloneDDS 自带 idlcxx_generate() CMake 宏:

cmake
find_package(CycloneDDS-CXX REQUIRED)
idlcxx_generate(TARGET my_idl FILES my_msg.idl WARNINGS no-implicit-extensibility)
target_link_libraries(my_app PRIVATE my_idl dds_wrapper::dds_wrapper)

7.2 ROS 2 兼容 IDL 流程

mermaid
flowchart LR
    A["用户 .idl<br/>(仅业务字段)"] --> B["service_idl_inject.py"]
    B --> C["wire .idl<br/>(含 header_guid/seq)"]
    C --> D["idlcxx_generate()"]
    D --> E["生成 .hpp/.cpp"]
    E --> F["编译链接"]

ROS 2 兼容模式下,用户只编写业务字段 IDL,service_idlcxx_generate() 自动注入 wire header:

cmake
include(service_idl_macros.cmake)
service_idlcxx_generate(
    TARGET my_svc_idl
    FILES  my_service_user.idl
    WARNINGS no-implicit-extensibility
)
target_link_libraries(my_app PRIVATE my_svc_idl dds_wrapper::dds_wrapper)

7.3 CMake 宏接口

service_idlcxx_generate()

cmake
include(service_idl_macros.cmake)
service_idlcxx_generate(
    TARGET my_svc_idl
    FILES  my_service.idl
    WARNINGS no-implicit-extensibility
)
target_link_libraries(my_app PRIVATE my_svc_idl dds_wrapper::dds_wrapper)
参数 类型 必填 说明
TARGET string 生成的 CMake 库目标名称
FILES list 用户 IDL 文件列表(仅含业务字段)
WARNINGS string 传递给 idlcxx_generate 的 WARNINGS 参数

工作原理:

  1. 遍历每个用户 IDL 文件
  2. 调用 service_idl_inject.py 生成同名 wire IDL(输出到 ${CMAKE_CURRENT_BINARY_DIR}/idl_wire/文件名与原 IDL 相同
  3. 将 wire IDL 传入标准 idlcxx_generate()
  4. 生成的 .hpp 头文件与原 IDL 同名,#include 路径无需修改
  5. 对用户完全透明,无需手动管理中间文件

7.4 IDL 预处理器

** service_idl_inject.py ** — 自动向每个 struct 开头注入 ROS 2 wire header 字段:

输入 输出
struct Foo { int32 x; }; struct Foo { uint64 header_guid; int64 header_seq; int32 x; };

注入字段(cdds_request_header_t 对应):

字段 IDL 类型 说明
header_guid uint64 rmw_cyclonedds_cpp 客户端身份标识
header_seq int64 请求序号,用于请求-应答关联

8. QoS 支持矩阵

策略 可选值 接口 默认值
History KeepLast SetDepth(n) KeepLast(10)
History KeepAll SetKeepAll()
Durability TransientLocal SetDurability() Volatile
Durability Volatile 默认
Reliability Reliable SetReliability(true)
Reliability BestEffort SetReliability(false)

常用组合说明:

组合 场景
KeepLast(10) + Reliable 默认组合,可靠传输且限定缓存
KeepLast(n) + BestEffort 传感器数据,低延迟优先
KeepAll + Reliable 日志/录制场景,全量不丢
TransientLocal + 任意组合 晚加入者需获取历史数据

Deadline / Lifespan / Liveliness 留作后续扩展。


9. Topic 命名规则

场景 逻辑名称示例 DDS Topic 名(默认,开关关闭) DDS Topic 名(ROS 2 兼容,开关开启)
普通 PubSub robot/sensor/imu robot/sensor/imu rt/robot/sensor/imu
Service Request my_service my_serviceRequest rq/my_serviceRequest
Service Reply my_service my_serviceReply rr/my_serviceReply

10. CMake 构建设计

cmake
project(dds_wrapper VERSION 1.0.0 LANGUAGES CXX)

add_library(dds_wrapper STATIC impl/dds_factory.cpp)
add_library(dds_wrapper::dds_wrapper ALIAS dds_wrapper)

target_compile_features(dds_wrapper PUBLIC cxx_std_17)

target_include_directories(dds_wrapper
    PUBLIC
        $<BUILD_INTERFACE:${CMAKE_CURRENT_SOURCE_DIR}/api>
        $<BUILD_INTERFACE:${CMAKE_CURRENT_SOURCE_DIR}/impl>
        $<INSTALL_INTERFACE:include/dds_wrapper>
)

target_link_libraries(dds_wrapper PUBLIC CycloneDDS-CXX::ddscxx)

# ROS2 兼容通过运行时 API 控制(EnableRos2Compat / SetRos2Compat),
# 无需编译宏,两种模式代码始终编译。

# 安装
install(TARGETS dds_wrapper
    EXPORT  dds_wrapperTargets
    ARCHIVE DESTINATION lib
)
install(DIRECTORY api/ DESTINATION include/dds_wrapper)
install(EXPORT dds_wrapperTargets
    FILE      dds_wrapperTargets.cmake
    NAMESPACE dds_wrapper::
    DESTINATION lib/cmake/dds_wrapper
)

下游使用:

cmake
find_package(dds_wrapper REQUIRED)
target_link_libraries(my_app PRIVATE dds_wrapper::dds_wrapper)

11. 与 datacenter-api 迁移对照

11.1 头文件迁移

旧 include 新 include 说明
datacenter/msg_bus_api_base.h dds_api_base.h 独立基类
datacenter/ps_pub_api.h dds_pub_api.h + dds_pub_api_impl.h 模板化
datacenter/ps_sub_api.h dds_sub_api.h + dds_sub_api_impl.h 模板化 + std::function
datacenter/service_client_api.h dds_service_client_api.h 模板化
datacenter/service_server_api.h dds_service_server_api.h 模板化
datacenter/api_factory.h dds_factory.h 独立工厂
datacenter/string_limited.hpp 删除 使用 std::string

11.2 代码模式迁移

PubSub 发布者:

cpp
// ── datacenter 旧代码 ──
auto pub = MBApiFactory::Create<IPubApi>("my_pub");
pub->SetDurability();
pub->SetDepth(20);
pub->Connect("/sensor/imu");
pub->PubMsg("/sensor/imu", serialized_string);

// ── dds_wrapper 新代码 ──
auto pub = DDSApiFactory::Create<IDdsPubApi<ImuMsg>>("my_pub");
pub->SetDurability();
pub->SetDepth(20);
pub->Connect("/sensor/imu");
ImuMsg msg; msg.id(1);
pub->PubMsg(msg);   // 类型安全,无需序列化

PubSub 订阅者:

cpp
// ── datacenter 旧代码 ──
class MySpi : public ISubSpi {
    void OnRecvSubMsg(const std::string& topic, const std::string& msg) override {
        // 反序列化 msg...
    }
};
MySpi spi;
auto sub = MBApiFactory::Create<ISubApi>("my_sub", &spi);
sub->Connect("/sensor");
sub->SubMsg("/sensor/imu");

// ── dds_wrapper 新代码 ──
auto sub = DDSApiFactory::Create<IDdsSubApi<ImuMsg>>(
    "my_sub",
    [](const ImuMsg& msg) {
        std::cout << msg.id() << "\n";   // 直接使用类型化消息
    });
sub->Connect("/sensor/imu");

Service 客户端:

cpp
// ── datacenter 旧代码 ──
class MyCliSpi : public IServiceClientSpi {
    void OnRspMsg(const STRING& msg, uint64_t rid, bool) override { ... }
};
MyCliSpi spi;
auto cli = MBApiFactory::Create<IServiceClientApi>("my_cli", &spi);
cli->Connect("my_svc");
STRING rsp;
cli->ReqMsgSync(req_str, rsp, 5000);

// ── dds_wrapper 新代码 ──
auto cli = DDSApiFactory::Create<IDdsServiceClientApi<MyReq, MyResp>>("my_cli");
cli->Connect("my_svc");
MyReq req; req.command("hello");
MyResp resp;
auto sc = cli->Request(req, resp, 5000);   // 类型安全
if (ok(sc)) std::cout << resp.message() << "\n";

11.3 完整对照表

datacenter 概念 dds_wrapper 对应 变化说明
namespace datacenter namespace dds_wrapper 完全隔离
: public IMsgBusApiBase : public IDdsApiBase 精简接口
: public IXxxSpi + OnRecvXxx std::function lambda 零继承
Create<T>(name, &spi) Create<T>(name, cb) 值语义回调
RegisterSpi(&spi) 删除 回调 Create 时固定
STRING 序列化消息 模板 T 类型化消息 编译期类型安全
MBApiType / Type() 删除 内部分类不暴露
ServerName() Name() 更通用
SetHighWaterMark / SetMsgExpires 删除 ZMQ 专有
Release() / Join() 合并为 Stop() 语义统一
Clear(ignore_list) Clear() 简化

12. 典型使用示例

12.1 发布者

cpp
#include "dds_pub_api_impl.h"
#include "dds_factory.h"
#include "MyMsg.hpp"

using namespace dds_wrapper;

auto pub = DDSApiFactory::Create<IDdsPubApi<MyMsg>>("imu_pub");
pub->SubNotify([](const std::string& t) { LOG(...) << "订阅者上线: " << t; });
pub->SetDurability();
pub->SetDepth(20);
pub->Connect("/robot/sensor/imu");

MyMsg msg;
msg.id(1);
msg.value(3.14);
pub->PubMsg(msg);

12.2 订阅者(lambda)

cpp
#include "dds_sub_api_impl.h"
#include "dds_factory.h"
#include "MyMsg.hpp"

using namespace dds_wrapper;

auto sub = DDSApiFactory::Create<IDdsSubApi<MyMsg>>(
    "imu_sub",
    [](const MyMsg& msg) { LOG(...) << "id=" << msg.id(); });

sub->SetDurability();
sub->Connect("/robot/sensor/imu");

12.3 订阅者(独立分发线程 + 类成员函数)

cpp
class SensorHandler {
public:
    void Init() {
        _sub = DDSApiFactory::Create<IDdsSubApi<ImuMsg>>(
            "imu_sub",
            [this](const ImuMsg& msg) { OnImu(msg); });
        _sub->EnableDispatchThread(true);
        _sub->Connect("/robot/sensor/imu");
    }

private:
    void OnImu(const ImuMsg& msg) { /* 可安全执行耗时操作 */ }
    std::shared_ptr<IDdsSubApi<ImuMsg>> _sub;
};

12.4 Service 服务端

cpp
#include "dds_service_server_api_impl.h"
using namespace dds_wrapper;

auto server = DDSApiFactory::Create<IDdsServiceServerApi<MyReq, MyResp>>(
    "calc_server",
    [](const MyReq& req, MyResp& resp) -> StatusCode {
        resp.result(req.a() + req.b());
        // header_guid/header_seq 由框架自动透传,handler 无需处理
        return StatusCode::SUCCESS;
    });
// 可选:开启独立分发线程,Handler 不阻塞 DDS 监听线程
server->EnableDispatchThread(true);
server->Connect("calc_service");

12.5 Service 客户端(同步)

cpp
#include "dds_service_client_api_impl.h"
using namespace dds_wrapper;

auto client = DDSApiFactory::Create<IDdsServiceClientApi<MyReq, MyResp>>("calc_client");
client->Connect("calc_service");

MyReq req; req.a(1); req.b(2);
MyResp resp;
if (ok(client->Request(req, resp, 5000))) {
    LOG(...) << "result=" << resp.result();
}

12.6 Service 客户端(异步 + 独立分发线程)

cpp
// 开启 dispatch thread,异步回调不阻塞 DDS 监听线程
auto client = DDSApiFactory::Create<IDdsServiceClientApi<MyReq, MyResp>>("calc_client");
client->EnableDispatchThread(true);
client->Connect("calc_service");

MyReq req; req.a(1); req.b(2);
client->AsyncRequest(req, [](StatusCode sc, const MyResp& resp) {
    // 此回调运行于独立 dispatch 线程,可安全执行耗时操作
    if (ok(sc)) LOG(...) << "async result=" << resp.result();
});

12.7 PubSub 与 ROS 2 节点通信

cpp
// 开启 ROS2 兼容后 topic 自动加 rt/ 前缀
DDSApiFactory::SetRos2Compat(true);

auto pub = DDSApiFactory::Create<IDdsPubApi<SensorMsg>>("sensor_pub");
pub->Connect("robot/sensor/imu");   // 实际 DDS Topic: rt/robot/sensor/imu

auto sub = DDSApiFactory::Create<IDdsSubApi<SensorMsg>>(
    "sensor_sub",
    [](const SensorMsg& msg) { /* 处理 */ });
sub->Connect("robot/sensor/imu");   // 实际 DDS Topic: rt/robot/sensor/imu

12.8 Service 与 ROS 2 节点通信

cpp
#include "dds_service_client_api_impl.h"
using namespace dds_wrapper;

// 方式 A:全局启用 ROS2 兼容
DDSApiFactory::SetRos2Compat(true);
auto client = DDSApiFactory::Create<IDdsServiceClientApi<Ros2Req, Ros2Resp>>("ros2_cli");
client->Connect("add_two_ints");   // → rq/add_two_intsRequest, rr/add_two_intsReply

// 方式 B:单实例启用
auto client = DDSApiFactory::Create<IDdsServiceClientApi<Ros2Req, Ros2Resp>>("ros2_cli");
client->EnableRos2Compat(true);
client->Connect("add_two_ints");

Ros2Req req; req.a(1); req.b(2);
Ros2Resp resp;
if (ok(client->Request(req, resp, 5000))) {
    LOG(...) << "sum=" << resp.sum();
}

文档结束 — 本文档覆盖 dds_wrapper 通信模式(PubSub / Service)的接口设计、实现架构、线程模型(含 Sub / ServiceServer / ServiceClient 独立分发线程)、ROS 2 兼容机制(PubSub rt/ 前缀、Service rq/rr/ 前缀、wire GUID 协议)以及 IDL 工具链设计(service_idlcxx_generate / service_idl_inject.py)。

基于 VitePress 构建