Skip to content

iceoryx2 事件驱动通信笔记

核心概念区分

数据流 vs 控制流

  • 数据流:函数接收什么输入 → 发布-订阅请求-响应模式
  • 控制流:函数何时执行 → 事件模式
  • 分离原因:避免混合导致过早唤醒/空消息等"hack"

设计优势

  • 显式控制系统响应时机
  • 支持无数据唤醒
  • 支持多输入等待(所有输入到达前不中断)

应用案例:机器人紧急刹车系统

场景需求

  • 选择性响应:只在障碍物接近时刹车
  • 趋势分析:需最近3个样本计算位置、速度、加速度
  • 容错处理
    • 传感器死亡 → 安全状态(停车)
    • 刹车后启动 → 仍需历史数据

C++实现代码示例

发布者设置

cpp
#include "iox2/iceoryx2.hpp"
using namespace iox2;

// 创建节点
auto node = NodeBuilder().create<ServiceType::Ipc>().value();

// 1. 发布-订阅服务(距离数据)
auto pubsub_service = node.service_builder(
                        ServiceName::create("distance_to_obstacle").value())
                   .publish_subscribe<Distance>()
                   .subscriber_max_buffer_size(3)
                   .history_size(3)
                   .subscriber_max_borrowed_samples(3)
                   .open_or_create()
                   .value();

// 2. 事件服务(控制信号)
auto ultra_sonic_service_dead = EventId::new(10);
auto event_service = node.service_builder(
                        ServiceName::create("distance_to_obstacle").value())
                   .event()
                   .notifier_dead_event(ultra_sonic_service_dead)
                   .open_or_create()
                   .value();

// 创建端口
auto publisher = pubsub_service.publisher_builder().create().value();
auto notifier = event_service.notifier_builder().create().value();
auto obstacle_too_close = EventId(5);

// 发布循环,每 100 ms 发送一次距离样本,如果低于阈值则触发事件。
while (node.wait(iox2::bb:Duration::from_millis(100)).has_value()) {
    auto sample = publisher.loan_uninit().value();
    
    auto distance = get_ultra_sonic_sensor_distance();
    auto initialized_sample = sample.write_payload(Distance { distance, 42.0 });
    
    if (distance < distance_threshold) {
        notifier.notify_with_custom_event_id(obstacle_too_close).value();
    }
}

订阅者设置

cpp
#include "iox2/iceoryx2.hpp"
using namespace iox2;

// 创建节点和服务(同上)
auto node = NodeBuilder().create<ServiceType::Ipc>().value();
auto pubsub_service = node.service_builder(...)...;
auto event_service = node.service_builder(...)...;

// 创建端口
auto subscriber = pubsub_service.subscriber_builder().create().value();
auto listener = event_service.listener_builder().create().value();

// 事件监听循环,订阅者不每 100 ms 轮询一次,而是只等待事件。当被唤醒时,它处理它们然后继续休眠。
while (listener.blocking_wait_all(
    [](auto event_id) {
        // 传感器死亡事件
        if ( event_id == ultra_sonic_service_dead ) {
            go_into_parking_position();
        }
        
        // 障碍物过近事件
        if ( event_id == obstacle_too_close ) {
            std::vector<Sample<ServiceType::Ipc, Distance, void>> last_samples;
            
            auto sample = subscriber.receive().value();
            while (sample.has_value()) {
                sample = subscriber.receive().value();
                last_samples.push_back(std::move(sample.value()));
                if (last_samples.size() == 3) break;
            }
            
            perform_break(last_samples);
        }
    })) {}

关键配置说明

发布-订阅服务配置

  • subscriber_max_buffer_size(3):订阅者最大缓冲区大小
  • history_size(3):历史样本保留数量
  • subscriber_max_borrowed_samples(3):最大借用样本数

事件服务配置

  • notifier_dead_event(ultra_sonic_service_dead):参与者死亡检测

健康监控

  • iceoryx2 内置参与者死亡检测
  • 通过 notifier_dead_event 配置自动触发
  • 详细机制在单独教程中介绍

支持的语言

  • Rust:有完整示例
  • C++:有完整示例(如上)
  • Python/C:功能可用,但缺少示例

设计亮点

  1. 解耦合:数据流与控制流完全分离
  2. 容错性:内置死亡检测和安全回退
  3. 历史支持:订阅者可获取历史数据
  4. 条件触发:基于特定条件的事件通知

基于 VitePress 构建