NetCore下搭建websocket叢集方案
介紹
最近在做一個基於netcore的實時訊息服務。最初選用的是ASP.NET Core SignalR,但是後來發現目前它並沒有支援IOS的客戶端,所以自己只好又基於websocket重新搭建了一套服務。
因為前期已經使用了SignalR,所以我直接在原本的專案裡面重新擴充套件了一套自定義websocket服務。
在網上有一篇博文介紹瞭如何在Asp.net Core中使用中介軟體來管理websocket,我的大部分程式碼也是參考這篇文章。在這兒貼個連結
ofollow,noindex" target="_blank">在Asp.net Core中使用中介軟體來管理websocket
自定義Socket/">WebSocket 中介軟體
要閱讀ASP.NET Core中的WebSockets支援,可以在此處檢視 。如果你的專案跟我一樣,已經使用了Signalr,那麼你不需要在安裝Microsoft.AspNetCore.WebSockets包,否則在專案開始前,
需要安裝此Nuget包。現在你可以自定義你自己的中介軟體了。
/// <summary> /// websocket 協議擴充套件中介軟體 /// </summary> public class CustomWebSocketMiddlewarr { private readonly RequestDelegate _next; public CustomWebSocketMiddlewarr(RequestDelegate next) { _next = next; } public async Task Invoke(HttpContext context, ICustomWebSocketFactory wsFactory, ICustomWebSocketMessageHandler wsmHandler) { if (context.WebSockets.IsWebSocketRequest) { string ConId = context.Request.Query["sign"]; if (!string.IsNullOrEmpty(ConId)) { WebSocket webSocket = await context.WebSockets.AcceptWebSocketAsync(); CustomWebSocket userWebSocket = new CustomWebSocket() { WebSocket = webSocket, ConId = ConId }; wsFactory.Add(userWebSocket); //await wsmHandler.SendInitialMessages(userWebSocket); await Listen(context, userWebSocket, wsFactory, wsmHandler); } } else { context.Response.StatusCode = 400; } await _next(context); } //監聽客戶端傳送過來的訊息 private async Task Listen(HttpContext context, CustomWebSocket userWebSocket, ICustomWebSocketFactory wsFactory, ICustomWebSocketMessageHandler wsmHandler) { WebSocket webSocket = userWebSocket.WebSocket; var buffer = new byte[1024 * 4]; WebSocketReceiveResult result = await webSocket.ReceiveAsync(new ArraySegment<byte>(buffer), CancellationToken.None); while (!result.CloseStatus.HasValue) { await wsmHandler.HandleMessage(result, buffer, userWebSocket, wsFactory); buffer = new byte[1024 * 4]; result = await webSocket.ReceiveAsync(new ArraySegment<byte>(buffer), CancellationToken.None); } wsFactory.Remove(userWebSocket.ConId); await webSocket.CloseAsync(result.CloseStatus.Value, result.CloseStatusDescription, CancellationToken.None); } }
在自定義的中介軟體中,首先判斷是否是websocket請求,如果是的話,在檢視是否有對應的sign標識,滿足條件後進入後續的處理環節。
簡單講解一下這裡面的處理邏輯。因為我的專案中同時存在Signalr,而Signalr也會使用到websocket協議。但是Signalr的websocket請求傳入的引數是id,所以我在這兒自定義了一個引數sign為了和Signalr
做區分。那麼這個sign是做什麼用的呢? 其實sign是前端傳過來的唯一標識,和此次連線對應,也可以理解為Signalr裡面的connectionId。然後會把標識和對應websocket類到存入到一個list集合中。即程式碼
中的 wsFactory.Add(userWebSocket)。
CustomWebSocket是一個包含WebSocket和標識的類:
publicclass CustomWebSocket { public string ConId { get; set; } public WebSocket WebSocket { get; set; } }
然後定義了一個Websocket工廠類,用來存取連線到服務的Websocket例項。
//介面 publicinterface ICustomWebSocketFactory { void Add(CustomWebSocket uws); void Remove(string conId); List<CustomWebSocket> All(); List<CustomWebSocket> Others(CustomWebSocket client); CustomWebSocket Client(string conId); }
具體實現
public class CustomWebSocketFactory: ICustomWebSocketFactory { List<CustomWebSocket> List; public CustomWebSocketFactory() { List = new List<CustomWebSocket>(); } public void Add(CustomWebSocket uws) { List.Add(uws); } public void Remove(string conId) { List.Remove(Client(conId)); } public List<CustomWebSocket> All() { return List; } public List<CustomWebSocket> Others(CustomWebSocket client) { return List.Where(c => c.ConId != client.ConId).ToList(); } public CustomWebSocket Client(string conId) { var uws= List.FirstOrDefault(c => c.ConId == conId); return uws; } }
可以看到最終我們存取websocket都是通過list來進行,所以在注入的時候一定要注意。注入成單例模式。
services.AddSingleton<ICustomWebSocketFactory, CustomWebSocketFactory>();
CustomWebSocketMessageHandle包含有關訊息處理的邏輯(傳送,接收)
public interface ICustomWebSocketMessageHandler { Task SendInitialMessages(CustomWebSocket userWebSocket); Task HandleMessage(WebSocketReceiveResult result, byte[] buffer, CustomWebSocket userWebSocket, ICustomWebSocketFactory wsFactory); Task SendMessageInfo(string conId, object data, ICustomWebSocketFactory wsFactory); } publicclass CustomWebSocketMessageHandler:ICustomWebSocketMessageHandler { public async Task SendInitialMessages(CustomWebSocket userWebSocket) { WebSocket webSocket = userWebSocket.WebSocket; var msg = new CustomWebSocketMessage { MessagDateTime = DateTime.Now, Type = WSMessageType.連線響應 }; string serialisedMessage = JsonConvert.SerializeObject(msg); byte[] bytes = Encoding.ASCII.GetBytes(serialisedMessage); await webSocket.SendAsync(new ArraySegment<byte>(bytes, 0, bytes.Length), WebSocketMessageType.Text, true, CancellationToken.None); } /// <summary> /// 推送訊息到客戶端 /// </summary> /// <returns></returns> public async Task SendMessageInfo(string conId,object data, ICustomWebSocketFactory wsFactory) { var uws = wsFactory.Client(conId); CustomWebSocketMessage message = new CustomWebSocketMessage(); message.DataInfo = data; message.Type = WSMessageType.任務數量; message.MessagDateTime = DateTime.Now; if (uws == null) { //廣播到其他叢集節點 var listpush = new List<PushMsg>(); var push = new PushMsg() { sendjsonMsg = new WebSocketFanoutDto() { conId = conId, data = message }, exchangeName = "saas.reltimewsmes.exchange", sendEnum = SendEnum.訂閱模式 }; listpush.Add(push); BTRabbitMQManage.PushMessageAsync(listpush); return; } var mesbuffer = Encoding.UTF8.GetBytes(JsonConvert.SerializeObject(message)); var mescount = Encoding.UTF8.GetByteCount(JsonConvert.SerializeObject(message)); await uws.WebSocket.SendAsync(new ArraySegment<byte>(mesbuffer, 0, mescount), WebSocketMessageType.Text, true, CancellationToken.None); } /// <summary> /// 處理接收到的客戶端資訊 /// </summary> /// <param name="result"></param> /// <param name="buffer"></param> /// <param name="userWebSocket"></param> /// <param name="wsFactory"></param> /// <returns></returns> public async Task HandleMessage(WebSocketReceiveResult result, byte[] buffer, CustomWebSocket userWebSocket, ICustomWebSocketFactory wsFactory) { string msg = Encoding.UTF8.GetString(buffer); try { var message = JsonConvert.DeserializeObject<CustomWebSocketMessage>(msg); if (message.Type == WSMessageType.使用者資訊) { var logdto = JsonConvert.DeserializeObject<LoginInfoDto>(message.DataInfo.ToJsonString()); await InitUserInfo(logdto, userWebSocket, wsFactory); } } catch (Exception e) { var exbuffer = Encoding.UTF8.GetBytes(e.Message); var excount = Encoding.UTF8.GetByteCount(e.Message); await userWebSocket.WebSocket.SendAsync(new ArraySegment<byte>(exbuffer, 0, excount), result.MessageType, result.EndOfMessage, CancellationToken.None); } } /// <summary> /// 初始化使用者連線關係 /// </summary> /// <param name="dto"></param> /// <param name="userWebSocket"></param> /// <param name="wsFactory"></param> /// <returns></returns> private async Task InitUserInfo(LoginInfoDto dto, CustomWebSocket userWebSocket, ICustomWebSocketFactory wsFactory) { if (dto.userId == 0) return; var contectid = userWebSocket.ConId; var key = ""; if (dto.tenantId.HasValue) key += "T_" + dto.userId + "_" + dto.tenantId + "_" + "tenant_"; if (dto.bankId.HasValue) key += "B_" + dto.userId + "_" + dto.bankId + "_" + "bank_"; key += dto.fromeType; //新增快取 CacheInstace<string>.GetRedisInstanceDefaultMemery().AddOrUpdate(key, contectid, r => { r = contectid; return r; }); CacheInstace<string>.GetRedisInstanceDefaultMemery().Expire(key, new TimeSpan(12, 0, 0)); } }
在這裡面,推送訊息到客戶端的時候,如果未找到標識對應的Websocket物件,則將訊息廣播到所有的叢集節點上。我們知道Signalr裡面的叢集實現通過redis來做的,但在此處,因為 我專案裡面已經搭建了Rabbitmq的高可用叢集,所以我直接通過Rabbitmq來進行廣播。這樣不管我是在叢集的那個節點上來推送訊息,都可以保證訊息被正確推送到客戶端。 關於廣播訊息的訂閱實現:
public class WebSocketFanoutDto { public string conId { get; set; } public CustomWebSocketMessage data { get; set; } } public class FanoutMesConsume : IMessageConsume { public void Consume(string message) { var condto = JsonConvert.DeserializeObject<WebSocketFanoutDto>(message); var wsFactory = IOCManage.ServiceProvider.GetService<ICustomWebSocketFactory>(); var uws = wsFactory.Client(condto.conId); if (uws != null) { //傳送訊息 var mesbuffer = Encoding.UTF8.GetBytes(JsonConvert.SerializeObject(condto.data)); var mescount = Encoding.UTF8.GetByteCount(JsonConvert.SerializeObject(condto.data)); uws.WebSocket.SendAsync(new ArraySegment<byte>(mesbuffer, 0, mescount), WebSocketMessageType.Text, true, CancellationToken.None); } } }
最後在擴充套件類裡面新增訊息監視和注入Websocket中介軟體。
當然不要忘記 訊息處理類的依賴注入
services.AddSingleton<ICustomWebSocketMessageHandler, CustomWebSocketMessageHandler>();
public static IApplicationBuilder UseCustomWebSocketManager(this IApplicationBuilder app) { //新增針對分散式叢集的訊息監視 RabbitMQManage.Subscribe<FanoutMesConsume>(new MesArgs() { exchangeName = "reltimewsmes.exchange", sendEnum = SendEnum.訂閱模式 }); return app.UseMiddleware<CustomWebSocketMiddlewarr>(); }
至此這個框架搭建完成,最後在startup類中注入。
關於Rabbitmq的使用,傳送和接收是我基於easynetq封裝的一個幫助類,大家可以自行實現。
這裡面最主要的邏輯就是每一個websocket例項都有一個對應的標識,然後在連線成功後,前端會發送使用者資訊,後端服務再把使用者資訊和連線標識關聯。這樣如果想推送資訊到某個使用者的話,就可以通過
使用者資訊來找到使用者對應的連線資訊。至於為什麼整個流程會這麼複雜的,就一言難盡(我能怎麼辦,我也很絕望啊)。大多數時候大家都可以直接通過token認證來繫結使用者和socket連線。
目前還有幾個問題一個廣播訊息的時候,傳送訊息方也會收到這個訊息,這挺尷尬,目前我還沒想到太好的解決辦法。
第二個是採用單例list欄位儲存連線的websocket例項,少的時候還好,如果多的話,感覺可能會存在堆疊溢位的問題,但沒實際測試過,所以目前還不知道最大的連線數多少。