ASP.NET Core 3.0 gRPC 雙向流
目錄
- ASP.NET Core 3.0 使用gRPC
- ASP.NET Core 3.0 gRPC 雙向流
- ASP.NET Core 3.0 gRPC 認證授權
一.前言
在前一文 《ASP.NET Core 3.0 使用gRPC》中有提到 gRPC 支援雙向流呼叫,支援實時推送訊息,這也是 gRPC的一大特點,且 gRPC 在對雙向流的控制支援上也是非常強大的。
二. 什麼是 gRPC 流
gRPC 有四種服務型別,分別是:簡單 RPC(Unary RPC)、服務端流式 RPC (Server streaming RPC)、客戶端流式 RPC (Client streaming RPC)、雙向流式 RPC(Bi-directional streaming RPC)。它們主要有以下特點:
服務型別 | 特點 |
---|---|
簡單 RPC | 一般的rpc呼叫,傳入一個請求物件,返回一個返回物件 |
服務端流式 RPC | 傳入一個請求物件,服務端可以返回多個結果物件 |
客戶端流式 RPC | 客戶端傳入多個請求物件,服務端返回一個結果物件 |
雙向流式 RPC | 結合客戶端流式RPC和服務端流式RPC,可以傳入多個請求物件,返回多個結果物件 |
三.為什麼 gRPC 支援流
gRPC 通訊是基於 HTTP/2 實現的,它的雙向流對映到 HTTP/2 流。HTTP/2 具有流的概念,流是為了實現HTTP/2的多路複用。流是伺服器和客戶端在HTTP/2連線內用於交換幀資料的獨立雙向序列,邏輯上可看做一個較為完整的互動處理單元,即表達一次完整的資源請求、響應資料交換流程;一個業務處理單元,在一個流內進行處理完畢,這個流生命週期完結。
特點如下:
- 一個HTTP/2連線可同時保持多個開啟的流,任一端點交換幀
- 流可被客戶端或伺服器單獨或共享建立和使用
- 流可被任一端關閉
- 在流內傳送和接收資料都要按照順序
- 流的識別符號自然數表示,1~2^31-1區間,有建立流的終端分配
- 流與流之間邏輯上是並行、獨立存在
摘自 HTTP/2筆記之流和多路複用 by 聶永
四.gRPC中使用雙向流呼叫
我們在前文中編寫的RPC屬於簡單RPC,沒有包含流呼叫,下面直接講一下雙向流,根據第二小節列舉的四種服務型別,如果我們掌握了簡單RPC和雙向流RPC,那麼服務端流式 RPC和客戶端流式 RPC自然也就沒有問題了。
這裡我們繼續使用前文的程式碼,要實現的目標是一次給多個貓洗澡。
① 首先在 LuCat.proto
定義兩個rpc,一個 Count 用於統計貓的數量,一個 雙向流 RPC BathTheCat 用於給貓洗澡
syntax = "proto3";
option csharp_namespace = "AspNetCoregRpcService";
import "google/protobuf/empty.proto";
package LuCat; //定義包名
//定義服務
service LuCat{
//定義給貓洗澡雙向流rpc
rpc BathTheCat(stream BathTheCatReq) returns ( stream BathTheCatResp);
//定義統計貓數量簡單rpc
rpc Count(google.protobuf.Empty) returns (CountCatResult);
}
message SuckingCatResult{
string message=1;
}
message BathTheCatReq{
int32 id=1;
}
message BathTheCatResp{
string message=1;
}
message CountCatResult{
int32 Count=1;
}
② 新增服務的實現
這裡安利下Resharper,非常方便
private readonly ILogger<LuCatService> _logger;
private static readonly List<string> Cats=new List<string>(){"英短銀漸層","英短金漸層","美短","藍貓","狸花貓","橘貓"};
private static readonly Random Rand=new Random(DateTime.Now.Millisecond);
public LuCatService(ILogger<LuCatService> logger)
{
_logger = logger;
}
public override async Task BathTheCat(IAsyncStreamReader<BathTheCatReq> requestStream, IServerStreamWriter<BathTheCatResp> responseStream, ServerCallContext context)
{
var bathQueue=new Queue<int>();
while (await requestStream.MoveNext())
{
//將要洗澡的貓加入佇列
bathQueue.Enqueue(requestStream.Current.Id);
_logger.LogInformation($"Cat {requestStream.Current.Id} Enqueue.");
}
//遍歷佇列開始洗澡
while (bathQueue.TryDequeue(out var catId))
{
await responseStream.WriteAsync(new BathTheCatResp() { Message = $"鏟屎的成功給一隻{Cats[catId]}洗了澡!" });
await Task.Delay(500);//此處主要是為了方便客戶端能看出流呼叫的效果
}
}
public override Task<CountCatResult> Count(Empty request, ServerCallContext context)
{
return Task.FromResult(new CountCatResult()
{
Count = Cats.Count
});
}
BathTheCat 方法會接收多個客戶端發來的CatId,然後將他們加入佇列中,等客戶端傳送完成後開始依次洗澡並返回給客戶端。
③ 客戶端實現
隨機發送10個貓Id給服務端,然後接收10個洗澡結果。
var channel = GrpcChannel.ForAddress("https://localhost:5001");
var catClient = new LuCat.LuCatClient(channel);
//獲取貓總數
var catCount = await catClient.CountAsync(new Empty());
Console.WriteLine($"一共{catCount.Count}只貓。");
var rand = new Random(DateTime.Now.Millisecond);
var bathCat = catClient.BathTheCat();
//定義接收吸貓響應邏輯
var bathCatRespTask = Task.Run(async() =>
{
await foreach (var resp in bathCat.ResponseStream.ReadAllAsync())
{
Console.WriteLine(resp.Message);
}
});
//隨機給10個貓洗澡
for (int i = 0; i < 10; i++)
{
await bathCat.RequestStream.WriteAsync(new BathTheCatReq() {Id = rand.Next(0, catCount.Count)});
}
//傳送完畢
await bathCat.RequestStream.CompleteAsync();
Console.WriteLine("客戶端已傳送完10個需要洗澡的貓id");
Console.WriteLine("接收洗澡結果:");
//開始接收響應
await bathCatRespTask;
Console.WriteLine("洗澡完畢");
④ 執行
可以看到雙向流呼叫成功,客戶端傳送了10個貓洗澡請求物件,服務端返回了10個貓洗澡結果物件。且是實時推送的,這就是 gRPC 的雙向流呼叫。
這裡大家可以自行改進來演變成客戶端流式或者服務端流式呼叫。客戶端傳送一個貓Id列表,然後服務端返回每個貓洗澡結果,這就是服務端流式呼叫。客戶端依次傳送貓Id,然後服務端一次性返回所有貓的洗澡結果(給所有貓洗澡看做是一個業務,返回這個業務的結果),就是客戶端流式呼叫。這裡我就不再演示了。
五.流控制
gRPC 的流式呼叫支援對流進行主動取消的控制,進而可以衍生出流超時限制等控制。
在流式呼叫是,可以傳一個 CancellationToken 引數,它就是我們用來對流進行取消控制的:
改造一下我們在第四小節的程式碼:
① 客戶端
var cts = new CancellationTokenSource();
//指定在2.5s後進行取消操作
cts.CancelAfter(TimeSpan.FromSeconds(2.5));
var bathCat = catClient.BathTheCat(cancellationToken: cts.Token);
//定義接收吸貓響應邏輯
var bathCatRespTask = Task.Run(async() =>
{
try
{
await foreach (var resp in bathCat.ResponseStream.ReadAllAsync())
{
Console.WriteLine(resp.Message);
}
}
catch (RpcException ex) when (ex.StatusCode == StatusCode.Cancelled)
{
Console.WriteLine("Stream cancelled.");
}
});
② 服務端
//遍歷佇列開始洗澡
while (!context.CancellationToken.IsCancellationRequested && bathQueue.TryDequeue(out var catId))
{
_logger.LogInformation($"Cat {catId} Dequeue.");
await responseStream.WriteAsync(new BathTheCatResp() { Message = $"鏟屎的成功給一隻{Cats[catId]}洗了澡!" });
await Task.Delay(500);//此處主要是為了方便客戶端能看出流呼叫的效果
}
③ 執行
設定的是雙向流式呼叫2.5s後取消流,從客戶端呼叫結果看到,並沒有收到全部10個貓的洗澡返回結果,流就已經被取消了,這就是 gRPC 的流控制。
六.結束
這裡流式呼叫可以實現實時推送,服務端到客戶端或者客戶端到服務端短實時推送訊息,但是這個和傳統意義上的長連線主動推送、廣播訊息不一樣,不管你是服務端流式、客戶端流式還是雙向流式,必須要由客戶端進行發起,通過客戶端請求來建立流通訊。
七.參考資料
GRPC的四種服務型別 by twtydgo
HTTP/2筆記之流和多路複用 by 聶永
本文所用程式碼