DDS 消息丢失问题:先建订阅者仍丢首条消息
问题
即使采用"先建立订阅者,后建立发布者"的顺序,第一条消息仍可能丢失。
核心原因
1. QoS 配置限制
历史数据传输需要同时满足两个 QoS 条件(见 src/core/ddsi/src/ddsi_endpoint_match.c#L855-L896):
c
if (rd->xqos->reliability.kind > DDS_RELIABILITY_BEST_EFFORT &&
rd->xqos->durability.kind > DDS_DURABILITY_VOLATILE)
ddsi_deliver_historical_data(wr, rd); 默认配置问题:
- 可靠性:默认为 RELIABLE ✓
- 持久性:默认为 VOLATILE ✗
由于默认持久性是 VOLATILE,不满足历史数据传输条件,新连接的订阅者不会收到之前发送的消息。
2. 发现过程的时序竞争
3. 新连接的序列号处理
从 src/core/ddsi/src/ddsi_endpoint_match.c#L820-L823 可以看到:
c
if (pretend_everything_acked)
m->seq = DDSI_MAX_SEQ_NUMBER; // 最佳努力
else
m->seq = wr->seq; // 可靠连接从当前序列号开始 对于可靠连接,新读者从发布者当前的序列号开始接收,不会追溯历史数据。
解决方案
方案1:使用 TRANSIENT_LOCAL QoS(推荐)
c
// 发布者端配置
dds_qos_t *writer_qos = dds_create_qos();
dds_qset_reliability(writer_qos, DDS_RELIABILITY_RELIABLE, DDS_SECS(10));
dds_qset_durability(writer_qos, DDS_DURABILITY_TRANSIENT_LOCAL);
dds_qset_history(writer_qos, DDS_HISTORY_KEEP_ALL, 0); // 保留所有历史
dds_datawriter_set_qos(writer, writer_qos);
// 订阅者端配置
dds_qos_t *reader_qos = dds_create_qos();
dds_qset_reliability(reader_qos, DDS_RELIABILITY_RELIABLE, DDS_SECS(10));
dds_qset_durability(reader_qos, DDS_DURABILITY_TRANSIENT_LOCAL);
dds_reader_set_qos(reader, reader_qos); 优点:完全避免消息丢失,支持迟到的订阅者,符合 DDS 规范的持久化机制
缺点:增加内存使用(缓存历史数据),稍微增加发现延迟
方案2:添加启动延迟
c
dds_publisher *pub = dds_create_publisher(participant, NULL, NULL);
dds_datawriter *writer = dds_create_writer(pub, topic, NULL, NULL);
dds_sleepfor(DDS_MSECS(100)); // 给发现过程留出时间
dds_write(writer, &sample); 优点:简单直接,不改变 QoS 配置
缺点:延迟时间是经验值,不可靠;在慢速网络中仍可能失败
方案3:监听匹配状态(优雅)
c
dds_waitset_t *waitset = dds_create_waitset(participant);
dds_entity_t cond = dds_create_guardcondition(participant);
dds_waitset_attach(waitset, cond, NULL);
dds_set_status_mask(writer, DDS_PUBLICATION_MATCHED_STATUS);
// 等待匹配完成
dds_return_t rc;
while (true) {
dds_PublicationMatchedStatus status;
rc = dds_get_publication_matched_status(writer, &status);
if (status.current_count >= expected_readers) {
break; // 已匹配到足够的读者
}
dds_waitset_wait(waitset, NULL, 0, DDS_MSECS(50));
}
// 现在可以安全发送第一条消息
dds_write(writer, &sample); 优点:精确等待,不浪费时间;适应网络条件变化
缺点:代码复杂度较高,仍不解决持久化问题
最佳实践
| 场景 | 推荐方案 |
|---|---|
| 快速启动 | 方案3(匹配状态监听)+ 固定延迟 |
| 数据完整性关键 | 方案1(TRANSIENT_LOCAL) |
| 开发测试 | 方案2(固定延迟) |
快速启动场景示例:
c
dds_set_status_mask(writer, DDS_PUBLICATION_MATCHED_STATUS);
dds_sleepfor(DDS_MSECS(50)); // 短暂等待作为兜底
dds_write(writer, &sample); 相关代码位置
- 端点匹配逻辑:
src/core/ddsi/src/ddsi_endpoint_match.c - 历史数据传输:
src/core/ddsi/src/ddsi_endpoint_match.c#L855-L896 - 连接建立:
src/core/ddsi/src/ddsi_endpoint_match.c#L748-L846 - 写入者历史缓存:
src/core/ddsi/src/ddsi_whc.c