DDS Bridge Lab:Raspberry Pi 5 與 Pico W2 感測資料整合實作

本實驗頁面提供一條可操作性高的 DDS 實作路線:由 Raspberry Pi 5 執行 RTI Connext DDS 與 Python 橋接程式,Pico W2 負責感測與送資料,最後由 DDS Subscriber 接收並顯示資料。

內容重點是先建立「Pico W2 → Pi 5 → DDS」資料流,再延伸到 QoS、控制命令與原生 MCU DDS。

Python 為主 Pi 5 Pico W2 RTI Connext DDS Bridge Lab

一、實驗目標

二、系統架構

Pico W2 (MicroPython)
  └─ Wi-Fi + HTTP POST JSON
         │
         ▼
Raspberry Pi 5 (Python Bridge)
  ├─ Flask 接收感測資料
  ├─ 轉成 SensorReading Topic
  └─ RTI Connext DDS Publisher
         │
         ▼
Raspberry Pi 5 / PC (Python DDS Subscriber)
  └─ 顯示即時感測資料
這個架構的好處是:Pico W2 不需要直接跑 DDS middleware,因此開發與除錯都簡單很多。

三、設備與軟體需求

項目用途
Raspberry Pi 5DDS 主節點、Bridge Server、Subscriber
Raspberry Pi Pico W2感測資料來源
Wi-Fi 網路Pico W2 與 Pi 5 通訊
Python 3Bridge 與 DDS 應用
RTI Connext DDSDDS middleware
Flask建立 HTTP 接收端

四、Pi 5 端安裝步驟

1. 安裝 RTI Connext DDS

sudo curl -sSL -o /usr/share/keyrings/rti-official-archive.gpg \
  https://packages.rti.com/deb/official/repo.key

printf -- "deb [arch=%s, signed-by=%s] %s %s main\n" \
  $(dpkg --print-architecture) \
  /usr/share/keyrings/rti-official-archive.gpg \
  https://packages.rti.com/deb/official \
  $(. /etc/os-release && echo ${VERSION_CODENAME}) | \
  sudo tee /etc/apt/sources.list.d/rti-official.list >/dev/null

sudo apt update
sudo apt install rti-connext-dds-7.7.0

2. 設定授權並載入 RTI 環境

eval $(rtienv -l ~/rti/rti_license.dat)
echo $NDDSHOME
依 RTI Connext DDS Python API 6.0.1 的 Building and Installing 文件,建置 connextdds-py 前要先確認 NDDSHOME 可用。

3. 建立 Python 環境與 Linux 建置工具

python3 -m venv ~/venvs/ddsbridge
source ~/venvs/ddsbridge/bin/activate
pip3 install --upgrade pip
pip3 install setuptools wheel cmake patchelf-wrapper flask

4. 依官方文件建置並安裝 Python API

cd ~/dds-lab
git clone --recurse-submodules https://github.com/rticommunity/connextdds-py.git
cd connextdds-py
python3 configure.py --nddshome "$NDDSHOME" <platform>
pip3 install .
<platform> 需替換成目前 RTI Connext DDS 的平台字串;若後續想預先打包 wheel,可在同目錄執行 pip3 wheel .

五、建立 DDS 資料型別

在 Pi 5 上建立實驗資料夾:

mkdir -p ~/dds-lab/pico-bridge
cd ~/dds-lab/pico-bridge

建立 SensorReading.idl

struct SensorReading
{
    string device_id;
    float temperature;
    float humidity;
    int64 timestamp_ms;
};

產生 Python 型別與範例骨架:

rtiddsgen -language python -example universal SensorReading.idl
產生後通常會出現對應的 Python 型別支援檔,可直接以這個 Topic 作為感測資料交換格式。

六、Pi 5:撰寫 HTTP → DDS Bridge 程式

建立 bridge_server.py

from flask import Flask, request, jsonify
import time
import rti.connextdds as dds

# 匯入 rtiddsgen 產生的型別模組名稱時,請依實際輸出檔名調整
from SensorReading import SensorReading

app = Flask(__name__)

participant = dds.DomainParticipant(domain_id=0)
topic = dds.Topic(participant, "SensorReading", SensorReading)
publisher = dds.Publisher(participant)
writer = dds.DataWriter(publisher, topic)

@app.route('/sensor', methods=['POST'])
def sensor_ingest():
    payload = request.get_json(force=True)

    sample = SensorReading()
    sample.device_id = payload.get('device_id', 'pico-w2-01')
    sample.temperature = float(payload.get('temperature', 0.0))
    sample.humidity = float(payload.get('humidity', 0.0))
    sample.timestamp_ms = int(payload.get('timestamp_ms', int(time.time() * 1000)))

    writer.write(sample)

    return jsonify({
        'status': 'ok',
        'published_topic': 'SensorReading',
        'device_id': sample.device_id
    })

if __name__ == '__main__':
    app.run(host='0.0.0.0', port=5000, debug=True)

執行 Bridge

source ~/venvs/ddsbridge/bin/activate
cd ~/dds-lab/pico-bridge
eval $(rtienv -l ~/rti/rti_license.dat)
python bridge_server.py

七、Pi 5:撰寫 DDS Subscriber

建立 subscriber.py

import rti.connextdds as dds
from SensorReading import SensorReading

participant = dds.DomainParticipant(domain_id=0)
topic = dds.Topic(participant, "SensorReading", SensorReading)
subscriber = dds.Subscriber(participant)
reader = dds.DataReader(subscriber, topic)

print('Waiting for DDS samples...')

for data in reader.take_data_iter(timeout=dds.Duration(3600)):
    print(
        f"device={data.device_id}, "
        f"temperature={data.temperature:.2f}, "
        f"humidity={data.humidity:.2f}, "
        f"timestamp_ms={data.timestamp_ms}"
    )

執行 Subscriber

source ~/venvs/ddsbridge/bin/activate
cd ~/dds-lab/pico-bridge
eval $(rtienv -l ~/rti/rti_license.dat)
python subscriber.py

八、Pico W2:MicroPython 送資料到 Pi 5

下列範例以 Pico W2 連上 Wi-Fi,定期送出 JSON 到 Pi 5 的 /sensor API:

import network
import time
import ujson
import urequests
import machine

WIFI_SSID = '你的WiFi名稱'
WIFI_PASSWORD = '你的WiFi密碼'
PI5_URL = 'http://192.168.1.50:5000/sensor'

wlan = network.WLAN(network.STA_IF)
wlan.active(True)
wlan.connect(WIFI_SSID, WIFI_PASSWORD)

while not wlan.isconnected():
    print('Connecting Wi-Fi...')
    time.sleep(1)

print('Wi-Fi connected:', wlan.ifconfig())

adc = machine.ADC(4)  # 示範用:可替換成真實感測器

while True:
    raw = adc.read_u16()
    temperature = 20.0 + (raw / 65535.0) * 10.0
    humidity = 50.0 + (raw / 65535.0) * 5.0

    payload = {
        'device_id': 'pico-w2-01',
        'temperature': temperature,
        'humidity': humidity,
        'timestamp_ms': time.ticks_ms()
    }

    try:
        response = urequests.post(
            PI5_URL,
            data=ujson.dumps(payload),
            headers={'Content-Type': 'application/json'}
        )
        print('status=', response.status_code)
        print(response.text)
        response.close()
    except Exception as e:
        print('send error:', e)

    time.sleep(3)
time.ticks_ms() 只提供開機後經過時間,代表它不是 Unix timestamp,但對即時資料流示範已足夠;若需要真實時間,可在 Pi 5 端補上時間戳。

九、操作流程

  1. Pi 5 啟動 subscriber.py
  2. Pi 5 啟動 bridge_server.py
  3. Pico W2 上傳並執行 MicroPython 程式
  4. 觀察 Pi 5 subscriber 是否持續收到資料
  5. 修改 Pico W2 傳送週期,觀察 DDS 資料變化
  6. 再加入第二個 Subscriber,模擬多節點資料共享

十、驗證方法

檢查項目成功條件
Wi-Fi 連線Pico W2 顯示 IP 位址
HTTP BridgePOST 後回傳 {"status":"ok"}
DDS PublisherBridge 啟動後無例外錯誤
DDS Subscriber終端機持續顯示溫度與濕度資料
端對端資料流Pico W2 數值變化可反映到 Subscriber

十一、外網轉址與 Tunnel 服務延伸

若 Pico W2 與 Raspberry Pi 5 不在同一個區域網路,或需要從外部網路暫時連回 Pi 5 的 HTTP Bridge,可搭配外網轉址與 tunnel 工具建立公開入口。

開啟外網轉址與 Tunnel 服務教材

十二、延伸方向(參考 rticonnextdds-examples 的 Python 教學路線)

A. 先做 QoS 與 Partitions

  • 在 Pi 5 先跑 using_qos_profiles/pypartitions/py 範例。
  • 讓學生理解資料可靠度、歷史保留、late joiner 與邏輯分群。
  • 之後再把 Bridge publisher 套用相同的 QoS 思維。

B. 再做條件式接收與事件驅動

  • 利用 content_filtered_topic/py 示範只接收指定範圍資料。
  • 利用 waitset_query_cond/py 示範告警條件與事件等待。
  • 這兩組範例都很適合改造成感測器門檻監控實驗。

C. 最後再加服務互動與儀表板

  • request_reply/py 理解 DDS 也能做 request / reply。
  • 把 Subscriber 接到 Flask / Node-RED / Web 儀表板。
  • 形成「Pico W2 感測 → Pi 5 bridge → DDS → Web / 控制回應」的完整教學案例。
建議不要一開始就把 Pico W2、QoS、條件過濾、告警與 Web 全部混在一起。先讓學生在 Pi 5 上跑 RTI 官方 Python 範例,再逐步把 bridge 取代其中的 publisher,成功率會高很多。

十三、若要進一步做 Pico W2 原生 DDS

這樣的編排方式可先用 Python 完成概念與整合,再逐步延伸到嵌入式 DDS 實作。

十四、補充提示