【.NET Core專案實戰-統一認證平臺】第十六章 閘道器篇-Ocelot整合RPC服務
一、什麼是RPC
RPC是“遠端呼叫( Remote Procedure Call )”的一個名稱的縮寫,並不是任何規範化的協議,也不是大眾都認知的協議標準,我們更多時候使用時都是建立的自定義化(例如Socket,Netty)的訊息方式進行呼叫,相比http協議,我們省掉了不少http中無用的訊息內容。因此很多系統內部呼叫仍然採用自定義化的RPC呼叫模式進行通訊,畢竟速度和效能是內網的關鍵指標之一,而標準化和語義無關性在外網中舉足輕重。所以,為何API閘道器無法工作在RPC上,因為它沒有一個像HTTP/HTTPS那樣的通用標準。
二、CzarRpc簡介
CzarRpc是作者基於Dotnetty實現的RPC通訊框架,參考了 Surging
和 Tars.Net
優秀設計,目前正在內部使用中,下面就CzarRpc呼叫方式做一個簡單介紹,測試結構如下:

1、服務介面
新建一個 Czar.Rpc.Common
類庫,首先需要引用 Czar.Rpc
Nuget包。
Install-Package Czar.Rpc
然後定義測試介面 IHelloRpc.cs
,也是目前支援的呼叫方式。
using Czar.Rpc.Attributes; using Czar.Rpc.Exceptions; using Czar.Rpc.Metadata; using System; using System.Collections.Generic; using System.Threading.Tasks; namespace Czar.Rpc.Common { /// <summary> /// 測試Rpc實體 /// </summary> [BusinessExceptionInterceptor] [CzarRpc("Demo.Rpc.Hello")] public interface IHelloRpc: IRpcBaseService { string Hello(int no, string name); void HelloHolder(int no, out string name); Task<string> HelloTask(int no, string name); ValueTask<string> HelloValueTask(int no, string name); [CzarOneway] void HelloOneway(int no, string name); Task TestBusinessExceptionInterceptor(); DemoModel HelloModel(int D1, string D2, DateTime D3); Task<DemoModel> HelloModelAsync(int D1, string D2, DateTime D3); DemoModel HelloSendModel(DemoModel model); DemoModel HelloSendModelParm(string name,DemoModel model); List<DemoModel> HelloSendModelList(List<DemoModel> model); } public class DemoModel { /// <summary> /// 測試1 /// </summary> public int T1 { get; set; } /// <summary> /// 測試2 /// </summary> public string T2 { get; set; } /// <summary> /// 測試3 /// </summary> public DateTime T3 { get; set; } public ChildModel Child { get; set; } } public class ChildModel { public string C1 { get; set; } } }
2.服務端
新建一個控制檯程式 Czar.Rpc.Server
,然後實現服務介面,因為都是測試資料,所以就隨意實現了方法。
HelloRpcServer.cs
using Czar.Rpc.Exceptions; using System; using System.Collections.Generic; using System.Threading.Tasks; using System.Linq; using System.Net; using Czar.Rpc.Common; namespace Demo.Rpc.Server { public class HelloRpcServer: IHelloRpc { public EndPoint CzarEndPoint { get; set; } public string Hello(int no, string name) { string result = $"{no}: Hi, {name}"; Console.WriteLine(result); return result + " callback"; } public void HelloHolder(int no, out string name) { name = no.ToString() + " out"; } public void HelloOneway(int no, string name) { /* 耗時操作 */ Console.WriteLine($"From oneway - {no}: Hi, {name}"); } public Task<string> HelloTask(int no, string name) { return Task.FromResult(Hello(no, name)); } public ValueTask<string> HelloValueTask(int no, string name) { return new ValueTask<string>(Hello(no, name)); } public Task TestBusinessExceptionInterceptor() { throw new BusinessException() { CzarCode = "1", CzarMessage = "test" }; } public DemoModel HelloModel(int D1, string D2, DateTime D3) { return new DemoModel() { T1 = D1 + 1, T2 = D2 + "2", T3 = D3.AddDays(1) }; } public async Task<DemoModel> HelloModelAsync(int D1, string D2, DateTime D3) { return await Task.FromResult( new DemoModel() { T1 = D1 + 1, T2 = D2 + "77777", T3 = D3.AddDays(1) } ); } public DemoModel HelloSendModel(DemoModel model) { model.T1 = model.T1 + 10; model.T2 = model.T2 + "11"; model.T3 = model.T3.AddDays(12); return model; } public DemoModel HelloSendModelParm(string name, DemoModel model) { model.T1 = model.T1 + 10; model.T2 = model.T2 + "11"; model.T3 = model.T3.AddDays(12); if (model.Child != null) { model.Child.C1 = name+"說:"+ model.Child.C1; } return model; } public List<DemoModel> HelloSendModelList(List<DemoModel> model) { return model.Select(t => new DemoModel() { T1=t.T1+10,T2=t.T2+"13",T3=t.T3.AddYears(1),Child=t.Child }).ToList(); } } }
然後啟動服務端監聽。
class Program { static void Main(string[] args) { var host = new HostBuilder() .ConfigureHostConfiguration(i => i.AddJsonFile("CzarConfig.json")) .ConfigureLogging((hostContext, configLogging) => { configLogging.AddConsole(); }) .UseCodec<JsonCodec>() .UseLibuvTcpHost() .UseProxy() .UseConsoleLifetime() .Build(); host.RunAsync().Wait(); } }
啟用外部使用CzarConfig.json的配置檔案,注意需要設定成始終複製。
{ "CzarHost": { "Port": 7711, //監聽埠 "QuietPeriodSeconds": 2,//退出靜默時間DotNetty特性 "ShutdownTimeoutSeconds": 2, //關閉超時時間 DotNetty特性 "IsSsl": "false",//是否啟用 SSL, 客戶端需要保持一致 "PfxPath": "cert/datasync.pfx", //證書 "PfxPassword": "123456"//證書金鑰 } }
到此伺服器端搭載完成。
3、客戶端
新建客戶端控制檯程式 Czar.Rpc.Client
,然後配置Rpc呼叫資訊。
{ "CzarHost": { "ProxyEndPoint": true, //是否啟用動態服務地址,就是指定服務端IP "IsSsl": "false", //是否啟用SSL "PfxPath": "cert/datasync.pfx", //證書 "PfxPassword": "123456", //證書金鑰 "ClientConfig": {//客戶端配置 "Demo.Rpc.Hello": {//對應服務[CzarRpc("Demo.Rpc.Hello")] 值 "Host": "127.0.0.1", //服務端IP 如果ProxyEndPoint=false 時使用 "Port": 7711, //服務端埠 如果ProxyEndPoint=false 時使用 "Timeout": 10, //呼叫超時時間 "WriterIdleTimeSeconds";30//空閒超時時間,預設為30秒,非內網環境建議設定成5分鐘內。 } } } }
現在開始啟用客戶端資訊。
class Program { public static IServiceProvider service; public static IConfiguration config; static async Task Main(string[] args) { try { var builder = new ConfigurationBuilder(); config = builder.AddJsonFile("CzarConfig.json").Build(); service = new ServiceCollection() .AddSingleton(config) .AddLogging(j => j.AddConsole()) .AddLibuvTcpClient(config) .AddProxy() .BuildDynamicProxyServiceProvider(); var rpc = service.GetRequiredService<IHelloRpc>(); //使用的內部指定的伺服器地址 rpc.CzarEndPoint = new IPEndPoint(IPAddress.Parse("127.0.0.1"), 7711); var result = string.Empty; string t = "基本呼叫"; result = rpc.Hello(18, t); Console.WriteLine(result); result = "無返回結果"; rpc.HelloHolder(1, out result); Console.WriteLine(result); result = await rpc.HelloTask(2, "非同步任務"); Console.WriteLine(result); result = "單向"; rpc.HelloOneway(3, "單向呼叫"); Console.WriteLine(result); result = await rpc.HelloValueTask(4, "ValueTask任務"); Console.WriteLine(result); var modelResult = rpc.HelloModel(5, "返回實體", DateTime.Now); Console.WriteLine($"{modelResult.T1} {modelResult.T2} {modelResult.T3.ToLongDateString()}"); var modelResult1 = await rpc.HelloModelAsync(6, "返回Task實體", DateTime.Now); Console.WriteLine($"{modelResult1.T1} {modelResult1.T2} {modelResult1.T3.ToLongDateString()}"); var mm = new DemoModel() { T1 = 7, T2 = "傳實體返回實體", T3 = DateTime.Now, Child = new ChildModel() { C1 = "子類1" } }; var model2 = rpc.HelloSendModel(mm); Console.WriteLine($"{model2.T1} {model2.T2} {model2.T3.ToLongDateString()}{model2.Child.C1}"); var list = new List<DemoModel>(); var mm1 = new DemoModel() { T1 = 8, T2 = "傳List返回List", T3 = DateTime.Now, Child = new ChildModel() { C1 = "子類2" } }; var mm3 = new DemoModel() { T1 = 9, T2 = "傳List返回List", T3 = DateTime.Now, Child = new ChildModel() { C1 = "子類3" } }; list.Add(mm1); list.Add(mm3); var list3 = rpc.HelloSendModelList(list); Console.WriteLine($"{list3[0].T1} {list3[0].T2} {list3[0].T3.ToLongDateString()} {list3[0].Child?.C1}"); var mm4 = new DemoModel() { T1 = 9, T2 = "HelloSendModelParm", T3 = DateTime.Now, Child = new ChildModel() { C1 = "子類4" } }; var dd = rpc.HelloSendModelParm("HelloSendModelParm", mm4); Console.WriteLine($"{dd.T1} {dd.T2} {dd.T3.ToLongDateString()}{dd.Child.C1}"); //異常呼叫 await rpc.TestBusinessExceptionInterceptor(); } catch (BusinessException e) { Console.WriteLine($"CzarCode:{e.CzarCode} CzarMessage:{e.CzarMessage}"); } catch (Exception ex) { Console.WriteLine(ex); } Console.ReadLine(); } }
現在整個RPC呼叫搭建完畢,然後分別啟動伺服器端和客戶端,就可以看到螢幕輸出內容如下。
客戶端輸出:

伺服器端輸出:

至此整個CzarRpc的基本使用已經介紹完畢,感興趣的朋友可以自行測試。
三、Ocelot增加RPC支援
有了 CzarRpc
的通訊框架後,現在在 Ocelot
上實現 Rpc
功能簡直易如反掌,現在開始新增我們的 Rpc
中介軟體,也讓我們擴充套件的閘道器靈活起來。
還記得我介紹閘道器篇時新增中介軟體的步驟嗎?如果不記得的可以先回去回顧下。
首先如何讓閘道器知道這個後端呼叫是 http
還是 Rpc
呢?這時應該會想到 Ocelot
路由配置裡的 DownstreamScheme
,可以在這裡判斷我們定義的是 http
還是 rpc
即可。同時我們希望之前定義的所有中介軟體都生效,最後一步請求時如果配置下端路由 rpc
,使用 rpc
呼叫,否則使用 http
呼叫,這樣可以重複利用之前所有的中介軟體功能,減少重複開發。
在之前的開發的自定義限流和自定義授權中介軟體開發中,我們知道開發完的中介軟體放到哪裡使用,這裡就不介紹原理了,直接新增到 BuildCzarOcelotPipeline
裡如下程式碼。
public static OcelotRequestDelegate BuildCzarOcelotPipeline(this IOcelotPipelineBuilder builder, OcelotPipelineConfiguration pipelineConfiguration) { // 註冊一個全域性異常 builder.UseExceptionHandlerMiddleware(); // 如果請求是websocket使用單獨的管道 builder.MapWhen(context => context.HttpContext.WebSockets.IsWebSocketRequest, app => { app.UseDownstreamRouteFinderMiddleware(); app.UseDownstreamRequestInitialiser(); app.UseLoadBalancingMiddleware(); app.UseDownstreamUrlCreatorMiddleware(); app.UseWebSocketsProxyMiddleware(); }); // 新增自定義的錯誤管道 builder.UseIfNotNull(pipelineConfiguration.PreErrorResponderMiddleware); //使用自定義的輸出管道 builder.UseCzarResponderMiddleware(); // 下游路由匹配管道 builder.UseDownstreamRouteFinderMiddleware(); //增加自定義擴充套件管道 if (pipelineConfiguration.MapWhenOcelotPipeline != null) { foreach (var pipeline in pipelineConfiguration.MapWhenOcelotPipeline) { builder.MapWhen(pipeline); } } // 使用Http頭部轉換管道 builder.UseHttpHeadersTransformationMiddleware(); // 初始化下游請求管道 builder.UseDownstreamRequestInitialiser(); // 使用自定義限流管道 builder.UseRateLimiting(); //使用請求ID生成管道 builder.UseRequestIdMiddleware(); //使用自定義授權前管道 builder.UseIfNotNull(pipelineConfiguration.PreAuthenticationMiddleware); //根據請求判斷是否啟用授權來使用管道 if (pipelineConfiguration.AuthenticationMiddleware == null) { builder.UseAuthenticationMiddleware(); } else { builder.Use(pipelineConfiguration.AuthenticationMiddleware); } //新增自定義限流中介軟體 2018-11-18 金焰的世界 builder.UseCzarClientRateLimitMiddleware(); //新增自定義授權中介軟體2018-11-15 金焰的世界 builder.UseAhphAuthenticationMiddleware(); //啟用自定義的認證之前中介軟體 builder.UseIfNotNull(pipelineConfiguration.PreAuthorisationMiddleware); //是否使用自定義的認證中介軟體 if (pipelineConfiguration.AuthorisationMiddleware == null) { builder.UseAuthorisationMiddleware(); } else { builder.Use(pipelineConfiguration.AuthorisationMiddleware); } // 使用自定義的引數構建中介軟體 builder.UseIfNotNull(pipelineConfiguration.PreQueryStringBuilderMiddleware); // 使用負載均衡中介軟體 builder.UseLoadBalancingMiddleware(); // 使用下游地址建立中介軟體 builder.UseDownstreamUrlCreatorMiddleware(); // 使用快取中介軟體 builder.UseOutputCacheMiddleware(); //判斷下游的是否啟用rpc通訊,切換到RPC處理 builder.MapWhen(context => context.DownstreamReRoute.DownstreamScheme.Equals("rpc", StringComparison.OrdinalIgnoreCase), app => { app.UseCzarRpcMiddleware(); }); //使用下游請求中介軟體 builder.UseCzaHttpRequesterMiddleware(); return builder.Build(); }
這裡是在最後請求前判斷使用的下游請求方式,如果 DownstreamScheme
使用的 rpc
,就使用 rpc
中介軟體處理。
Rpc處理的完整邏輯是,如何從http請求中獲取想要解析的引數,這裡需要設定匹配的優先順序,目前設計的優先順序為。
1、首先提取路由引數,如果匹配上就是用路由引數名稱為key,值為value,按順序組成第一批引數。 2、提取query引數,如有有值按順序組成第二批引數。 3、如果非Get請求,提取body內容,如果非空,組成第三批引數 4、從配置庫裡提取rpc路由呼叫的服務名稱和函式名稱,以及是否單向呼叫。 5、按照獲取的資料進行rpc呼叫並等待返回。
看了上面的設計是不是思路很清晰了呢?
1、rpc路由表設計
CREATE TABLE AhphReRouteRpcConfig ( RpcId int IDENTITY(1,1) NOT NULL, ReRouteId int,//路由表主鍵 ServantName varchar(100) NOT NULL,//呼叫的服務名稱 FuncName varchar(100) NOT NULL,//呼叫的方法名稱 IsOneway bit NOT NULL//是否單向呼叫 )
2、提取遠端呼叫方法
根據上游路由獲取遠端呼叫的配置專案
public interface IRpcRepository { /// <summary> /// 根據模板地址獲取RPC請求方法 /// </summary> /// <param name="UpUrl">上游模板</param> /// <returns></returns> Task<RemoteInvokeMessage> GetRemoteMethodAsync(string UpUrl); } public class SqlServerRpcRepository : IRpcRepository { private readonly CzarOcelotConfiguration _option; public SqlServerRpcRepository(CzarOcelotConfiguration option) { _option = option; } /// <summary> /// 獲取RPC呼叫方法 /// </summary> /// <param name="UpUrl"></param> /// <returns></returns> public async Task<RemoteInvokeMessage> GetRemoteMethodAsync(string UpUrl) { using (var connection = new SqlConnection(_option.DbConnectionStrings)) { string sql = @"select T4.* from AhphGlobalConfiguration t1 inner join AhphConfigReRoutes T2 on T1.AhphId=t2.AhphId inner join AhphReRoute T3 on T2.ReRouteId=T3.ReRouteId INNER JOIN AhphReRouteRpcConfig T4 ON T3.ReRouteId=T4.ReRouteId where IsDefault=1 and T1.InfoStatus=1 AND T3.InfoStatus=1 AND UpstreamPathTemplate=@URL"; var result = await connection.QueryFirstOrDefaultAsync<RemoteInvokeMessage>(sql, new { URL = UpUrl }); return result; } } }
3、重寫返回結果
由於rpc呼叫後是返回的Json封裝的資訊,需要解析成對應的HttpContent。
using System.IO; using System.Net; using System.Net.Http; using System.Threading.Tasks; namespace Czar.Gateway.Rpc { public class RpcHttpContent : HttpContent { private string result; public RpcHttpContent(string result) { this.result = result; } public RpcHttpContent(object result) { this.result = Newtonsoft.Json.JsonConvert.SerializeObject(result); } protected override async Task SerializeToStreamAsync(Stream stream, TransportContext context) { var writer = new StreamWriter(stream); await writer.WriteAsync(result); await writer.FlushAsync(); } protected override bool TryComputeLength(out long length) { length = result.Length; return true; } } }
4、rpc中介軟體邏輯處理
有了前面的準備資訊,現在基本可以完成邏輯程式碼的開發了,詳細的中介軟體程式碼如下。
using Czar.Gateway.Errors; using Czar.Rpc.Clients; using Ocelot.Logging; using Ocelot.Middleware; using Ocelot.Responses; using System.Collections.Generic; using System.Net; using System.Threading.Tasks; namespace Czar.Gateway.Rpc.Middleware { public class CzarRpcMiddleware : OcelotMiddleware { private readonly OcelotRequestDelegate _next; private readonly IRpcClientFactory _clientFactory; private readonly ICzarRpcProcessor _czarRpcProcessor; public CzarRpcMiddleware(OcelotRequestDelegate next, IRpcClientFactory clientFactory, IOcelotLoggerFactory loggerFactory, ICzarRpcProcessor czarRpcProcessor) : base(loggerFactory.CreateLogger<CzarRpcMiddleware>()) { _next = next; _clientFactory = clientFactory; _czarRpcProcessor = czarRpcProcessor; } public async Task Invoke(DownstreamContext context) { var httpStatusCode = HttpStatusCode.OK; var _param = new List<object>(); //1、提取路由引數 var tmpInfo = context.TemplatePlaceholderNameAndValues; if (tmpInfo != null && tmpInfo.Count > 0) { foreach (var tmp in tmpInfo) { _param.Add(tmp.Value); } } //2、提取query引數 foreach (var _q in context.HttpContext.Request.Query) { _param.Add(_q.Value.ToString()); } //3、從body裡提取內容 if (context.HttpContext.Request.Method.ToUpper() != "GET") { context.DownstreamRequest.Scheme = "http"; var requert = context.DownstreamRequest.ToHttpRequestMessage(); if (requert.Content!=null) { var json = "{}"; json = await requert.Content.ReadAsStringAsync(); _param.Add(json); } } //從快取裡提取 var req = await _czarRpcProcessor.GetRemoteMethodAsync(context.DownstreamReRoute.UpstreamPathTemplate.OriginalValue); if (req != null) { req.Parameters = _param.ToArray(); var result = await _clientFactory.SendAsync(req, GetEndPoint(context.DownstreamRequest.Host, context.DownstreamRequest.Port)); OkResponse<RpcHttpContent> httpResponse; if (result.CzarCode == Czar.Rpc.Utilitys.RpcStatusCode.Success) { httpResponse = new OkResponse<RpcHttpContent>(new RpcHttpContent(result.CzarResult?.ToString())); } else { httpResponse = new OkResponse<RpcHttpContent>(new RpcHttpContent(result)); } context.HttpContext.Response.ContentType = "application/json"; context.DownstreamResponse = new DownstreamResponse(httpResponse.Data, httpStatusCode, httpResponse.Data.Headers, "OK"); } else {//輸出錯誤 var error = new InternalServerError($"請求路由 {context.HttpContext.Request.Path}未配置後端轉發"); Logger.LogWarning($"{error}"); SetPipelineError(context, error); } } private EndPoint GetEndPoint(string ipaddress, int port) { if (IPAddress.TryParse(ipaddress, out IPAddress ip)) { return new IPEndPoint(ip, port); } else { return new DnsEndPoint(ipaddress, port); } } } }
5、啟動Rpc客戶端配置
目前Rpc的客戶端配置我們還沒啟動,只需要在 AddCzarOcelot
中新增相關注入即可。
var service = builder.First(x => x.ServiceType == typeof(IConfiguration)); var configuration = (IConfiguration)service.ImplementationInstance; //Rpc應用 builder.AddSingleton<ICzarRpcProcessor, CzarRpcProcessor>(); builder.AddSingleton<IRpcRepository, SqlServerRpcRepository>(); builder.AddLibuvTcpClient(configuration);
6、配置客戶端
最後別忘了配置Rpc客戶端資訊是否啟用證書資訊,為了配置資訊的內容。
{ "CzarHost": { "ProxyEndPoint": true, "IsSsl": "false", "PfxPath": "cert/datasync.pfx", "PfxPassword": "bl123456", "ClientConfig": { "Demo.Rpc.Hello": { "Host": "127.0.0.1", "Port": 7711, "Timeout": 20 } } } }
現在讓閘道器整合Rpc功能全部配置完畢。
四、閘道器Rpc功能測試
本次測試我在原有的閘道器基礎上,增加不同型別的Rpc呼叫,就按照不同維度測試Rpc呼叫功能,本次測試案例是建立在Czar.Rpc 服務端基礎上,正好可以測試。

1、測試路由引數
請求路徑 /hello/{no}/{name}
,呼叫的服務端方法 Hello
,傳入的兩個引數分別是 no ,name
。
可以在伺服器端新增斷點除錯,發現確實接收到請求資訊,並正常返回,下面是 PostMan
測試結果。

2、使用Query方式傳遞引數
請求路徑 /rpc/query
,呼叫的服務端方法還是 Hello
,引數分別是 no ,name
。

3、使用Post方式傳遞Json
請求路徑 /rpc/body
,呼叫的伺服器方法是 HelloSendModel
。

4、混合引數使用
請求的路徑 /rpc/bodyparm/{name}
,呼叫的伺服器端方法是 HelloSendModelParm
。

所有的返回結果可自行除錯測試,發現都能達到預期結果。
同時此閘道器還是支援預設的http請求的,這裡就不一一測試了。
五、總結
本篇我介紹了什麼是Rpc,以及Czar.Rpc的基本使用,然後使用Czar.Rpc框架整合到我們基於Ocelot擴充套件閘道器中,並實現了不能方式的Rpc呼叫,可以在幾乎不改變現有流程的情況下很快速的整合進去,這也是Ocelot開發框架的魅力所在。
如果在使用過程中有什麼問題或建議,可以在 .NET Core專案實戰交流群(637326624)
中聯絡作者。
最後本文涉及的所有的原始碼可在 https://github.com/jinyancao/czar.gateway 中下載預覽。