什麼是釋出-訂閱

釋出訂閱是一種眾所周知並被廣泛使用的訊息傳送模式,常用在微服務架構的服務間通訊,高併發削峰等情況。但是不同的訊息中介軟體之間存在細微的差異,專案使用不同的產品需要實現不同的實現類,雖然是明智的決策,但必須編寫和維護抽象及其基礎實現。 此方法需要複雜、重複且容易出錯的自定義程式碼。

Dapr為了解決這種問題,提供開箱即用的訊息傳送抽象和實現,封裝在 Dapr 構建基塊中。業務系統只需呼叫跟據Dapr的要求實現訂閱釋出即可。

工作原理

Dapr 釋出&訂閱構建基塊提供了一個與平臺無關的 API 框架來發送和接收訊息。

服務將訊息釋出到指定主題, 業務服務訂閱主題以使用訊息。

服務在 Dapr sidecar 上呼叫 pub/sub API。 然後,sidecar 呼叫預定義 Dapr pub/sub 元件。

任何程式設計平臺都可以使用 Dapr 本機 API 通過 HTTP 或 gRPC 呼叫構建基塊。 若要釋出訊息,請進行以下 API 呼叫:

http://localhost:<dapr-port>/v1.0/publish/<pub-sub-name>/<topic>

上述呼叫中有幾個特定於 Dapr 的 URL 段:

  • <dapr-port> 提供 Dapr sidecar 偵聽的埠號。
  • <pub-sub-name> 提供所選 Dapr pub/sub 元件的名稱。
  • <topic> 提供訊息釋出到的主題的名稱。

設定釋出訂閱元件

Dapr 為 Pub/Sub 提供很多支援的元件,例如 Redis 和 Kafka 等。支援元件詳見 連結

在win10上的自承載的Dapr中,預設在 %UserProfile%\.dapr\components\pubsub.yaml 中使用redis作為了pub/sub元件,dapr run一個app時,使用預設元件作為pub/sub元件

apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
name: pubsub
spec:
type: pubsub.redis
version: v1
metadata:
- name: redisHost
value: localhost:6379
- name: redisPassword
value: ""

訂閱主題

Dapr 允許兩種方法訂閱主題:

  • 宣告式,其中定義在外部檔案中。
  • 程式設計方式,訂閱在使用者程式碼中定義

1.宣告式訂閱

在預設元件目錄 %UserProfile%\.dapr\components\pubsub.yaml 中新建subscription.yaml檔案,並寫入以下內容

apiVersion: dapr.io/v1alpha1
kind: Subscription
metadata:
name: myevent-subscription
spec:
topic: test_topic
route: /TestPubSub
pubsubname: pubsub
scopes:
- frontend

上面的示例顯示了 test_topic主題的事件訂閱,使用元件 pubsub

  • route 告訴 Dapr 將所有主題訊息傳送到應用程式中的 /TestPubSub 端點。
  • scopes 為 FrontEnd啟用訂閱

現在需要在FrontEnd專案中定義介面TestSub,在FrontEnd中新建TestPubSubController

using Dapr.Client;

using Microsoft.AspNetCore.Mvc;
using Microsoft.Extensions.Logging; using System.IO;
using System.Text;
using System.Threading.Tasks; namespace FrontEnd.Controllers
{
[Route("[controller]")]
[ApiController]
public class TestPubSubController : ControllerBase
{
private readonly ILogger<TestPubSubController> _logger;
private readonly DaprClient _daprClient;
public TestPubSubController(ILogger<TestPubSubController> logger, DaprClient daprClient)
{
_logger = logger;
_daprClient = daprClient;
} [HttpPost]
public ActionResult Post()
{
Stream stream = Request.Body;
byte[] buffer = new byte[Request.ContentLength.Value];
stream.Position = 0L;
stream.ReadAsync(buffer, 0, buffer.Length);
string content = Encoding.UTF8.GetString(buffer);
return Ok(content);
} [HttpGet("pub")]
public async Task<ActionResult> PubAsync()
{
var data = new WeatherForecast();
await _daprClient.PublishEventAsync<WeatherForecast>("pubsub", "test_topic", data);
return Ok();
}
}
}

需要在Startup的Configure中開啟重複讀取Body才能讀取到資料

            app.Use((context, next) =>
{
context.Request.EnableBuffering();
return next();
});

啟動FrontEnd

dapr run --dapr-http-port 3501 --app-port 5001  --app-id frontend dotnet  .\FrontEnd\bin\Debug\net5.0\FrontEnd.dll

呼叫 pub API釋出訊息

檢視訂閱情況,訂閱訊息消費成功

2.程式設計式訂閱

為了防止宣告式訂閱的影響,需要先把目錄<%UserProfile%\.dapr\components\pubsub.yaml>中subscription.yaml檔案刪除

TestPubSubController新增Api Sub

        [Topic("pubsub", "test_topic")]
[HttpPost("sub")]
public async Task<ActionResult> Sub()
{
Stream stream = Request.Body;
byte[] buffer = new byte[Request.ContentLength.Value];
stream.Position = 0L;
stream.ReadAsync(buffer, 0, buffer.Length);
string content = Encoding.UTF8.GetString(buffer);
_logger.LogInformation("testsub" + content);
return Ok(content);
}

在Startup的Configure方法中新增中介軟體

public void Configure(IApplicationBuilder app, IWebHostEnvironment env)
{
// ...
app.UseCloudEvents(); app.UseEndpoints(endpoints =>
{
endpoints.MapSubscribeHandler();
// ...
});
}

啟動FrontEnd

dapr run --dapr-http-port 3501 --app-port 5001  --app-id frontend dotnet  .\FrontEnd\bin\Debug\net5.0\FrontEnd.dll

呼叫API釋出訊息

檢視訂閱情況,訂閱訊息消費成功

通過DapreCLI同樣可以釋出訊息

dapr publish --publish-app-id frontend --pubsub pubsub --topic test_topic --data '{"date":"0001-01-01T00:00:00","temperatureC":0,"temperatureF":32,"summary":null}'

檢視訂閱情況,訂閱訊息消費成功