Experience Dapr - Pub/Sub


上一篇整理了 Dapr Secret Store,本文繼續整理分散式系統很重要的通訊模式: Pub/Sub


摘要

Message Systems 種類非常多,功能也非常多樣化,底下整理一些基本資訊與概念。

常見的功能

  • FIFO
  • Pub/Sub
  • Replay

Dapr 主要是以 Pub/Sub 為主。

常見的產品

底下整理常見的產品:

  • AWS
    • AWS SQS Standard
    • AWS SQS FIFO (First-In First-Out)
    • AWS MQ (Apache Active MQ)
  • RabbitMQ
  • Apache Kafka
  • NATS.io
  • GCP Pub/Sub

底下是部分產品的比較表:



QoS

MQTT 通訊協議裡 定義 了訊息傳遞品質 (QoS) 的三個等級:

  1. Level 0: At-most-once delivery 最多一次,也就是只能一次 ( =1 )
  2. Level 1: At-least-once delivery 至少一次 ( >=1 )
  3. Level 2: Exactly-once delivery 一定要一次 ( >= 1 )

PubSub in Dapr

摘要

下圖是官方提供的概念圖:

整理官方文件重要的概念:

  • Dapr 支援很多中 Message Borker (Pub/Sub Components),詳細清單看 這裡,本文使用最簡單的 Redis 當說明。
  • Dapr 提供的 QoS 是 at-least-once
  • Dapr 提供兩種 Message Subscription:
    • 宣告式 (Declarative): 透過 K8s CRD (YAML) 外部定義的方式,透過外部注入,不需要修改程式。
    • 程序式 (Programatic): 由在程式碼裡面自己定義,可以動態注入
  • Dapr 針對 Message delivery 提供 API 讓 subscriber 知道訊息是否成功的傳遞,詳細參閱 API
  • Dapr 會自動處理 app-id 在多個 instance 的時候,透過 single consumer group 處理訊息。
    • 舉例:兩個 app-id (email, 簡訊) 訂閱同一個 topic,那麼 Dapr 會送訊息給 email / 簡訊兩個 app 的每個 instance
  • Dapr 的 pub/sub components 裡的設定範圍,預設是一樣的,對應用程式來說都是可以使用的。Dapr 支援 pub/sub 的適用範圍,讓不同 app 可以使用不同的 topics,詳細參閱 Scope Pub/Sub topic access
  • Dapr 可以為每個 Message 指定 TTL,詳細參閱 Message Time-to-Live (TTL)
  • Dapr 使用 CloudEvents 1.0 specification 作為訊息的主要格式。

範例

為了方便說明,底下範例使用 Programatic 的 Subscription 方式。

  1. PubSub: 使用 Dapr 本機環境預設的 Redis,可以在 docker ps 確認是否有跑起來。
  2. Subscribe 1: Python APP
  3. Subscribe 2: Node APP
  4. Publish: Dapr CLI

Subscribe 1 - Python APP

準備 Subscribe 1,Subscribe Topic 在 Code 裡直接指定。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
import flask
from flask import request, jsonify
from flask_cors import CORS
import json
import sys

app = flask.Flask(__name__)
CORS(app)

@app.route('/dapr/subscribe', methods=['GET'])
def subscribe():
subscriptions = [{'pubsubname': 'pubsub',
'topic': 'deathStarStatus',
'route': 'dsstatus'}]
print("subscribe")
return jsonify(subscriptions)

@app.route('/dsstatus', methods=['POST'])
def ds_subscriber():
# print("ds_subscriber")
print(request.json, flush=True)

return json.dumps({'success':True}), 200, {'ContentType':'application/json'}
app.run()

上述的程式碼存成 app-programmatic.py,然後依序執行以下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
APP_ID="Subscriber1"

# 安裝 Flask
pip install flask
pip install flask_cors

# 確認是否能夠正常執行
python3 app-programmatic.py

# 在本機起動 subscribe 1
## 注意環境的 Python 版本,我的環境要指定 Interpreter python3 才可以
dapr --app-id ${APP_ID} \
--app-port 5000 \
run python3 app-programmatic.py

Subscribe 2 - Node APP

準備 Subscribe 2,同樣的 Subscribe Topic 在 Code 裡直接指定。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
const express = require('express')
const bodyParser = require('body-parser')
const app = express()
app.use(bodyParser.json({ type: 'application/*+json' }));

const port = 3000

app.get('/dapr/subscribe', (req, res) => {
console.log(`subscribe`);

res.json([
{
pubsubname: "pubsub",
topic: "deathStarStatus",
route: "dsstatus"
}
]);
})

app.post('/dsstatus', (req, res) => {
console.log(`dsstatus`);
console.log(req.body);
res.sendStatus(200);
});

app.listen(port, () => console.log(`consumer app listening on port ${port}!`))

啟動 Subscribe 2

1
2
3
4
5
6
7
8
9
APP_ID="Subscriber2"

# 安裝必要的 node module
npm i express

# 啟動 Dapr APP
dapr --app-id ${APP_ID} \
--app-port 3000 \
run node app-programmatic.js

Publish - Dapr CLI

直接使用 Dapr CLI 送出 Message

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
APP_ID="publisher"

# 起一個 Dapr App,名稱為 publisher
dapr run --app-id ${APP_ID} \
--dapr-http-port 3500

# 透過 Dapr APP: Publisher 送出一個 Message
dapr publish \
--publish-app-id ${APP_ID} \
--pubsub pubsub \
--topic deathStarStatus \
--data '{"status": "completed"}'

# 透過 Dapr APP: Publisher 送出一個 Message
dapr publish --publish-app-id ${APP_ID} \
--pubsub pubsub \
--topic deathStarStatus \
--data '{"status": "fail"}'

結果:

其中訊息的格式為 CloudEvents 1.0 specification 的標準:

1
2
3
4
5
6
7
8
9
10
11
12
13
{
"pubsubname": "pubsub",
"data": {
"status": "pending"
},
"source": "publisher",
"type": "com.dapr.event.sent",
"topic": "deathStarStatus",
"traceid": "00-de4b4a7e1cbdec658862945bdd23c28b-63473c20cda081c9-01",
"id": "c76d68fd-9885-4f8f-824e-57a35696bd0d",
"specversion": "1.0",
"datacontenttype": "application/json"
}

結語

簡單整理透過 Dapr CLI 如何在本機環境快速體驗 Pub/Sub Component 的基本概念,同樣的方法,依照之前整理的 Experience Dapr - Run on K8s ,只要再把 K8s 部分完成就可以很快速地部署到 K8s 上執行。

Dapr 的 PubSub Component 支援的實作種類很多,過程也看到漸漸的在使用一些標準化,像是 CloudEvents (CNCF) 這樣的概念。

其實類似標準化在 Dapr 各個 Component 都有看到,像是:

  1. PubSub 的 CloudEvent
  2. Service invocation / Sidecar-to-sidecar communication]d6: Sentry
  3. mTLS
  4. Observability: W3Tracing

這些標準化,背後就是要讓應用程式開發過程,能夠有更好的相容性、彈性,但是也會提高開發的複雜度,提高抽象層次。不過我還是樂見這樣的發展。


延伸閱讀

Dapr 系列文章

分散式系統系列文章

Dapr 官方文件

參考資料




Comments