Skip to content

DDS 消息丢失问题:先建订阅者仍丢首条消息

问题

即使采用"先建立订阅者,后建立发布者"的顺序,第一条消息仍可能丢失。

核心原因

1. QoS 配置限制

历史数据传输需要同时满足两个 QoS 条件(见 src/core/ddsi/src/ddsi_endpoint_match.c#L855-L896):

c
if (rd->xqos->reliability.kind > DDS_RELIABILITY_BEST_EFFORT &&
    rd->xqos->durability.kind > DDS_DURABILITY_VOLATILE)
    ddsi_deliver_historical_data(wr, rd);

默认配置问题

  • 可靠性:默认为 RELIABLE ✓
  • 持久性:默认为 VOLATILE ✗

由于默认持久性是 VOLATILE,不满足历史数据传输条件,新连接的订阅者不会收到之前发送的消息。

2. 发现过程的时序竞争

3. 新连接的序列号处理

src/core/ddsi/src/ddsi_endpoint_match.c#L820-L823 可以看到:

c
if (pretend_everything_acked)
    m->seq = DDSI_MAX_SEQ_NUMBER;  // 最佳努力
else
    m->seq = wr->seq;  // 可靠连接从当前序列号开始

对于可靠连接,新读者从发布者当前的序列号开始接收,不会追溯历史数据。

解决方案

方案1:使用 TRANSIENT_LOCAL QoS(推荐)

c
// 发布者端配置
dds_qos_t *writer_qos = dds_create_qos();
dds_qset_reliability(writer_qos, DDS_RELIABILITY_RELIABLE, DDS_SECS(10));
dds_qset_durability(writer_qos, DDS_DURABILITY_TRANSIENT_LOCAL);
dds_qset_history(writer_qos, DDS_HISTORY_KEEP_ALL, 0);  // 保留所有历史
dds_datawriter_set_qos(writer, writer_qos);

// 订阅者端配置
dds_qos_t *reader_qos = dds_create_qos();
dds_qset_reliability(reader_qos, DDS_RELIABILITY_RELIABLE, DDS_SECS(10));
dds_qset_durability(reader_qos, DDS_DURABILITY_TRANSIENT_LOCAL);
dds_reader_set_qos(reader, reader_qos);

优点:完全避免消息丢失,支持迟到的订阅者,符合 DDS 规范的持久化机制
缺点:增加内存使用(缓存历史数据),稍微增加发现延迟

方案2:添加启动延迟

c
dds_publisher *pub = dds_create_publisher(participant, NULL, NULL);
dds_datawriter *writer = dds_create_writer(pub, topic, NULL, NULL);
dds_sleepfor(DDS_MSECS(100));  // 给发现过程留出时间
dds_write(writer, &sample);

优点:简单直接,不改变 QoS 配置
缺点:延迟时间是经验值,不可靠;在慢速网络中仍可能失败

方案3:监听匹配状态(优雅)

c
dds_waitset_t *waitset = dds_create_waitset(participant);
dds_entity_t cond = dds_create_guardcondition(participant);
dds_waitset_attach(waitset, cond, NULL);
dds_set_status_mask(writer, DDS_PUBLICATION_MATCHED_STATUS);

// 等待匹配完成
dds_return_t rc;
while (true) {
    dds_PublicationMatchedStatus status;
    rc = dds_get_publication_matched_status(writer, &status);
    if (status.current_count >= expected_readers) {
        break;  // 已匹配到足够的读者
    }
    dds_waitset_wait(waitset, NULL, 0, DDS_MSECS(50));
}

// 现在可以安全发送第一条消息
dds_write(writer, &sample);

优点:精确等待,不浪费时间;适应网络条件变化
缺点:代码复杂度较高,仍不解决持久化问题

最佳实践

场景 推荐方案
快速启动 方案3(匹配状态监听)+ 固定延迟
数据完整性关键 方案1(TRANSIENT_LOCAL)
开发测试 方案2(固定延迟)

快速启动场景示例:

c
dds_set_status_mask(writer, DDS_PUBLICATION_MATCHED_STATUS);
dds_sleepfor(DDS_MSECS(50));  // 短暂等待作为兜底
dds_write(writer, &sample);

相关代码位置

  • 端点匹配逻辑:src/core/ddsi/src/ddsi_endpoint_match.c
  • 历史数据传输:src/core/ddsi/src/ddsi_endpoint_match.c#L855-L896
  • 连接建立:src/core/ddsi/src/ddsi_endpoint_match.c#L748-L846
  • 写入者历史缓存:src/core/ddsi/src/ddsi_whc.c

基于 VitePress 构建