本實驗頁面提供一條可操作性高的 DDS 實作路線:由 Raspberry Pi 5 執行 RTI Connext DDS 與 Python 橋接程式,Pico W2 負責感測與送資料,最後由 DDS Subscriber 接收並顯示資料。
內容重點是先建立「Pico W2 → Pi 5 → DDS」資料流,再延伸到 QoS、控制命令與原生 MCU DDS。
Pico W2 (MicroPython)
└─ Wi-Fi + HTTP POST JSON
│
▼
Raspberry Pi 5 (Python Bridge)
├─ Flask 接收感測資料
├─ 轉成 SensorReading Topic
└─ RTI Connext DDS Publisher
│
▼
Raspberry Pi 5 / PC (Python DDS Subscriber)
└─ 顯示即時感測資料
| 項目 | 用途 |
|---|---|
| Raspberry Pi 5 | DDS 主節點、Bridge Server、Subscriber |
| Raspberry Pi Pico W2 | 感測資料來源 |
| Wi-Fi 網路 | Pico W2 與 Pi 5 通訊 |
| Python 3 | Bridge 與 DDS 應用 |
| RTI Connext DDS | DDS middleware |
| Flask | 建立 HTTP 接收端 |
sudo curl -sSL -o /usr/share/keyrings/rti-official-archive.gpg \
https://packages.rti.com/deb/official/repo.key
printf -- "deb [arch=%s, signed-by=%s] %s %s main\n" \
$(dpkg --print-architecture) \
/usr/share/keyrings/rti-official-archive.gpg \
https://packages.rti.com/deb/official \
$(. /etc/os-release && echo ${VERSION_CODENAME}) | \
sudo tee /etc/apt/sources.list.d/rti-official.list >/dev/null
sudo apt update
sudo apt install rti-connext-dds-7.7.0
eval $(rtienv -l ~/rti/rti_license.dat)
echo $NDDSHOME
connextdds-py 前要先確認 NDDSHOME 可用。python3 -m venv ~/venvs/ddsbridge
source ~/venvs/ddsbridge/bin/activate
pip3 install --upgrade pip
pip3 install setuptools wheel cmake patchelf-wrapper flask
cd ~/dds-lab
git clone --recurse-submodules https://github.com/rticommunity/connextdds-py.git
cd connextdds-py
python3 configure.py --nddshome "$NDDSHOME" <platform>
pip3 install .
<platform> 需替換成目前 RTI Connext DDS 的平台字串;若後續想預先打包 wheel,可在同目錄執行 pip3 wheel .。在 Pi 5 上建立實驗資料夾:
mkdir -p ~/dds-lab/pico-bridge
cd ~/dds-lab/pico-bridge
建立 SensorReading.idl:
struct SensorReading
{
string device_id;
float temperature;
float humidity;
int64 timestamp_ms;
};
產生 Python 型別與範例骨架:
rtiddsgen -language python -example universal SensorReading.idl
建立 bridge_server.py:
from flask import Flask, request, jsonify
import time
import rti.connextdds as dds
# 匯入 rtiddsgen 產生的型別模組名稱時,請依實際輸出檔名調整
from SensorReading import SensorReading
app = Flask(__name__)
participant = dds.DomainParticipant(domain_id=0)
topic = dds.Topic(participant, "SensorReading", SensorReading)
publisher = dds.Publisher(participant)
writer = dds.DataWriter(publisher, topic)
@app.route('/sensor', methods=['POST'])
def sensor_ingest():
payload = request.get_json(force=True)
sample = SensorReading()
sample.device_id = payload.get('device_id', 'pico-w2-01')
sample.temperature = float(payload.get('temperature', 0.0))
sample.humidity = float(payload.get('humidity', 0.0))
sample.timestamp_ms = int(payload.get('timestamp_ms', int(time.time() * 1000)))
writer.write(sample)
return jsonify({
'status': 'ok',
'published_topic': 'SensorReading',
'device_id': sample.device_id
})
if __name__ == '__main__':
app.run(host='0.0.0.0', port=5000, debug=True)
source ~/venvs/ddsbridge/bin/activate
cd ~/dds-lab/pico-bridge
eval $(rtienv -l ~/rti/rti_license.dat)
python bridge_server.py
建立 subscriber.py:
import rti.connextdds as dds
from SensorReading import SensorReading
participant = dds.DomainParticipant(domain_id=0)
topic = dds.Topic(participant, "SensorReading", SensorReading)
subscriber = dds.Subscriber(participant)
reader = dds.DataReader(subscriber, topic)
print('Waiting for DDS samples...')
for data in reader.take_data_iter(timeout=dds.Duration(3600)):
print(
f"device={data.device_id}, "
f"temperature={data.temperature:.2f}, "
f"humidity={data.humidity:.2f}, "
f"timestamp_ms={data.timestamp_ms}"
)
source ~/venvs/ddsbridge/bin/activate
cd ~/dds-lab/pico-bridge
eval $(rtienv -l ~/rti/rti_license.dat)
python subscriber.py
下列範例以 Pico W2 連上 Wi-Fi,定期送出 JSON 到 Pi 5 的 /sensor API:
import network
import time
import ujson
import urequests
import machine
WIFI_SSID = '你的WiFi名稱'
WIFI_PASSWORD = '你的WiFi密碼'
PI5_URL = 'http://192.168.1.50:5000/sensor'
wlan = network.WLAN(network.STA_IF)
wlan.active(True)
wlan.connect(WIFI_SSID, WIFI_PASSWORD)
while not wlan.isconnected():
print('Connecting Wi-Fi...')
time.sleep(1)
print('Wi-Fi connected:', wlan.ifconfig())
adc = machine.ADC(4) # 示範用:可替換成真實感測器
while True:
raw = adc.read_u16()
temperature = 20.0 + (raw / 65535.0) * 10.0
humidity = 50.0 + (raw / 65535.0) * 5.0
payload = {
'device_id': 'pico-w2-01',
'temperature': temperature,
'humidity': humidity,
'timestamp_ms': time.ticks_ms()
}
try:
response = urequests.post(
PI5_URL,
data=ujson.dumps(payload),
headers={'Content-Type': 'application/json'}
)
print('status=', response.status_code)
print(response.text)
response.close()
except Exception as e:
print('send error:', e)
time.sleep(3)
time.ticks_ms() 只提供開機後經過時間,代表它不是 Unix timestamp,但對即時資料流示範已足夠;若需要真實時間,可在 Pi 5 端補上時間戳。subscriber.pybridge_server.py| 檢查項目 | 成功條件 |
|---|---|
| Wi-Fi 連線 | Pico W2 顯示 IP 位址 |
| HTTP Bridge | POST 後回傳 {"status":"ok"} |
| DDS Publisher | Bridge 啟動後無例外錯誤 |
| DDS Subscriber | 終端機持續顯示溫度與濕度資料 |
| 端對端資料流 | Pico W2 數值變化可反映到 Subscriber |
若 Pico W2 與 Raspberry Pi 5 不在同一個區域網路,或需要從外部網路暫時連回 Pi 5 的 HTTP Bridge,可搭配外網轉址與 tunnel 工具建立公開入口。
ngrok、Cloudflare Tunnel、Pinggy 與 LocalTunnel。using_qos_profiles/py 與 partitions/py 範例。content_filtered_topic/py 示範只接收指定範圍資料。waitset_query_cond/py 示範告警條件與事件等待。request_reply/py 理解 DDS 也能做 request / reply。