1. 程式人生 > >在.net core3.0中使用SignalR實現實時通訊

在.net core3.0中使用SignalR實現實時通訊

  最近用.net core3.0重構網站,老大想做個站內信功能,就是有些耗時的後臺任務的結果需要推送給使用者。一開始我想簡單點,客戶端每隔1分鐘呼叫一下我的介面,看看是不是有新訊息,有的話就告訴使用者有新推送,但老大不幹了,他就是要實時通訊,於是我只好上SignalR了。

  說幹就幹,首先去Nuget搜尋

 

 

 但是隻有Common是有3.0版本的,後來發現我需要的是Microsoft.AspNetCore.SignalR.Core,然而這個停更的狀態?於是我一臉矇蔽,搗鼓了一陣發現,原來.net core的SDK已經內建了Microsoft.AspNetCore.SignalR.Core,,右鍵專案,開啟C:\Program Files\dotnet\packs\Microsoft.AspNetCore.App.Ref\3.0.0\ref\netcoreapp3.0 資料夾搜尋SignalR,新增引用即可。

 

 

  接下來注入SignalR,如下程式碼:

            //注入SignalR實時通訊,預設用json傳輸
            services.AddSignalR(options =>
            {
                //客戶端發保持連線請求到服務端最長間隔,預設30秒,改成4分鐘,網頁需跟著設定connection.keepAliveIntervalInMilliseconds = 12e4;即2分鐘
                options.ClientTimeoutInterval = TimeSpan.FromMinutes(4);
                //服務端發保持連線請求到客戶端間隔,預設15秒,改成2分鐘,網頁需跟著設定connection.serverTimeoutInMilliseconds = 24e4;即4分鐘
                options.KeepAliveInterval = TimeSpan.FromMinutes(2);
            });

 

  這個解釋一下,SignalR預設是用Json傳輸的,但是還有另外一種更短小精悍的傳輸方式MessagePack,用這個的話效能會稍微高點,但是需要另外引入一個DLL,JAVA端呼叫的話也是暫時不支援的。但是我其實是不需要這點效能的,所以我就用預設的json好了。另外有個概念,就是實時通訊,其實是需要發“心跳包”的,就是雙方都需要確定對方還在不在,若掛掉的話我好重連或者把你幹掉啊,所以就有了兩個引數,一個是發心跳包的間隔時間,另一個就是等待對方心跳包的最長等待時間。一般等待的時間設定成發心跳包的間隔時間的兩倍即可,預設KeepAliveInterval是15秒,ClientTimeoutInterval是30秒,我覺得不需要這麼頻繁的確認對方“死掉”了沒,所以我改成2分鐘發一次心跳包,最長等待對方的心跳包時間是4分鐘,對應的客戶端就得設定

connection.keepAliveIntervalInMilliseconds = 12e4;
connection.serverTimeoutInMilliseconds = 24e4;
  注入了SignalR之後,接下來需要使用WebSocket和SignalR,對應程式碼如下:
            //新增WebSocket支援,SignalR優先使用WebSocket傳輸
            app.UseWebSockets();
            //app.UseWebSockets(new WebSocketOptions
            //{
            //    //傳送保持連線請求的時間間隔,預設2分鐘
            //    KeepAliveInterval = TimeSpan.FromMinutes(2)
            //});
            app.UseEndpoints(endpoints =>
            {
                endpoints.MapControllers();
                endpoints.MapHub<MessageHub>("/msg");
            });

這裡提醒一下,WebSocket只是實現SignalR實時通訊的一種手段,若這個走不通的情況下,他還可以降級使用SSE,再不行就用輪詢的方式,也就是我最開始想的那種辦法。

  另外得說一下的是假如前端呼叫的話,他是需要測試的,這時候其實需要跨域訪問,不然每次打包好放到伺服器再測這個實時通訊的話有點麻煩。新增跨域的程式碼如下:

#if DEBUG
            //注入跨域
            services.AddCors(option => option.AddPolicy("cors",
                policy => policy.AllowAnyHeader().AllowAnyMethod().AllowCredentials()
                    .WithOrigins("http://localhost:8001", "http://localhost:8000", "http://localhost:8002")));
#endif

  然後加上如下程式碼即可。

#if DEBUG
            //允許跨域,不支援向所有域名開放了,會有錯誤提示
            app.UseCors("cors");
#endif

  好了,可以開始動工了。建立一個MessageHub:

    public class MessageHub : Hub
    {
        private readonly IUidClient _uidClient;

        public MessageHub(IUidClient uidClient)
        {
            _uidClient = uidClient;
        }

        public override async Task OnConnectedAsync()
        {
            var user = await _uidClient.GetLoginUser();
            //將同一個人的連線ID繫結到同一個分組,推送時就推送給這個分組
            await Groups.AddToGroupAsync(Context.ConnectionId, user.Account);
        }
    }

  由於每次連線的連線ID不同,所以最好把他和登入使用者的使用者ID繫結起來,推送時直接推給繫結的這個使用者ID即可,做法可以直接把連線ID和登入使用者ID繫結起來,把這個使用者ID作為一個分組ID。

  然後使用時就如下:

    public class MessageService : BaseService<Message, ObjectId>, IMessageService
    {
        private readonly IUidClient _uidClient;
        private readonly IHubContext<MessageHub> _messageHub;

        public MessageService(IMessageRepository repository, IUidClient uidClient, IHubContext<MessageHub> messageHub) : base(repository)
        {
            _uidClient = uidClient;
            _messageHub = messageHub;
        }

        /// <summary>
        /// 新增並推送站內信
        /// </summary>
        /// <param name="dto"></param>
        /// <returns></returns>
        public async Task Add(MessageDTO dto)
        {
            var now = DateTime.Now;
            
            var log = new Message
            {
                Id = ObjectId.GenerateNewId(now),
                CreateTime = now,
                Name = dto.Name,
                Detail = dto.Detail,
                ToUser = dto.ToUser,
                Type = dto.Type
            };

            var push = new PushMessageDTO
            {
                Id = log.Id.ToString(),
                Name = log.Name,
                Detail = log.Detail,
                Type = log.Type,
                ToUser = log.ToUser,
                CreateTime = now
            };

            await Repository.Insert(log);
            //推送站內信
            await _messageHub.Clients.Groups(dto.ToUser).SendAsync("newmsg", push);
            //推送未讀條數
            await SendUnreadCount(dto.ToUser);

            if (dto.PushCorpWeixin)
            {
                const string content = @"<font color='blue'>{0}</font>
<font color='comment'>{1}</font>
系統:**CMS**
站內信ID:<font color='info'>{2}</font>
詳情:<font color='comment'>{3}</font>";

                //把站內信推送到企業微信
                await _uidClient.SendMarkdown(new CorpSendTextDto
                {
                    touser = dto.ToUser,
                    content = string.Format(content, dto.Name, now, log.Id, dto.Detail)
                });
            }
        }

        /// <summary>
        /// 獲取本人的站內信列表
        /// </summary>
        /// <param name="name">標題</param>
        /// <param name="detail">詳情</param>
        /// <param name="unread">只顯示未讀</param>
        /// <param name="type">型別</param>
        /// <param name="createStart">建立起始時間</param>
        /// <param name="createEnd">建立結束時間</param>
        /// <param name="pageIndex">當前頁</param>
        /// <param name="pageSize">每頁個數</param>
        /// <returns></returns>
        public async Task<PagedData<PushMessageDTO>> GetMyMessage(string name, string detail, bool unread = false, EnumMessageType? type = null, DateTime? createStart = null, DateTime? createEnd = null, int pageIndex = 1, int pageSize = 10)
        {
            var user = await _uidClient.GetLoginUser();
            Expression<Func<Message, bool>> exp = o => o.ToUser == user.Account;

            if (unread)
            {
                exp = exp.And(o => o.ReadTime == null);
            }

            if (!string.IsNullOrEmpty(name))
            {
                exp = exp.And(o => o.Name.Contains(name));
            }

            if (!string.IsNullOrEmpty(detail))
            {
                exp = exp.And(o => o.Detail.Contains(detail));
            }

            if (type != null)
            {
                exp = exp.And(o => o.Type == type.Value);
            }

            if (createStart != null)
            {
                exp.And(o => o.CreateTime >= createStart.Value);
            }

            if (createEnd != null)
            {
                exp.And(o => o.CreateTime < createEnd.Value);
            }

            return await Repository.FindPageObjectList(exp, o => o.Id, true, pageIndex,
                pageSize, o => new PushMessageDTO
                {
                    Id = o.Id.ToString(),
                    CreateTime = o.CreateTime,
                    Detail = o.Detail,
                    Name = o.Name,
                    ToUser = o.ToUser,
                    Type = o.Type,
                    ReadTime = o.ReadTime
                });
        }

        /// <summary>
        /// 設定已讀
        /// </summary>
        /// <param name="id">站內信ID</param>
        /// <returns></returns>
        public async Task Read(ObjectId id)
        {
            var msg = await Repository.First(id);

            if (msg == null)
            {
                throw new CmsException(EnumStatusCode.ArgumentOutOfRange, "不存在此站內信");
            }

            if (msg.ReadTime != null)
            {
                //已讀的不再更新讀取時間
                return;
            }

            msg.ReadTime = DateTime.Now;
            await Repository.Update(msg, "ReadTime");
            await SendUnreadCount(msg.ToUser);
        }

        /// <summary>
        /// 設定本人全部已讀
        /// </summary>
        /// <returns></returns>
        public async Task ReadAll()
        {
            var user = await _uidClient.GetLoginUser();

            await Repository.UpdateMany(o => o.ToUser == user.Account && o.ReadTime == null, o => new Message
            {
                ReadTime = DateTime.Now
            });

            await SendUnreadCount(user.Account);
        }

        /// <summary>
        /// 獲取本人未讀條數
        /// </summary>
        /// <returns></returns>
        public async Task<int> GetUnreadCount()
        {
            var user = await _uidClient.GetLoginUser();
            return await Repository.Count(o => o.ToUser == user.Account && o.ReadTime == null);
        }

        /// <summary>
        /// 推送未讀數到前端
        /// </summary>
        /// <returns></returns>
        private async Task SendUnreadCount(string account)
        {
            var count = await Repository.Count(o => o.ToUser == account && o.ReadTime == null);
            await _messageHub.Clients.Groups(account).SendAsync("unread", count);
        }
    }
IHubContext<MessageHub>可以直接注入並且使用,然後呼叫_messageHub.Clients.Groups(account).SendAsync即可推送。接下來就簡單了,在MessageController裡把這些介面暴露出去,通過HTTP請求新增站內信,或者直接內部呼叫新增站內信介面,就可以新增站內信並且推送給前端頁面了,當然除了站內信,我們還可以做得更多,比如比較重要的順便也推送到第三方app,比如企業微信或釘釘,這樣你還會怕錯過重要資訊?
  接下來到了客戶端了,客戶端只說網頁端的,程式碼如下:
<body>
    <div class="container">
        <input type="button" id="getValues" value="Send" />
        <ul id="discussion"></ul>
    </div>
    <script
        src="https://cdn.jsdelivr.net/npm/@microsoft/[email protected]/dist/browser/signalr.min.js"></script>

    <script type="text/javascript">
        var connection = new signalR.HubConnectionBuilder()
            .withUrl("/message")
            .build();
        connection.serverTimeoutInMilliseconds = 24e4; 
        connection.keepAliveIntervalInMilliseconds = 12e4;

        var button = document.getElementById("getValues");

        connection.on('newmsg', (value) => {
            var liElement = document.createElement('li');
            liElement.innerHTML = 'Someone caled a controller method with value: ' + value;
            document.getElementById('discussion').appendChild(liElement);
        });

        button.addEventListener("click", event => {
            fetch("api/message/sendtest")
                .then(function (data) {
                    console.log(data);
                })
                .catch(function (error) {
                    console.log(err);
                });

        });
        
        var connection = new signalR.HubConnectionBuilder()
            .withUrl("/message")
            .build();

        connection.on('newmsg', (value) => {
            console.log(value);
        });

        connection.start();
    </script>
</body>    

  上面的程式碼還是需要解釋下的,serverTimeoutInMilliseconds和keepAliveIntervalInMilliseconds必須和後端的配置保持一致,不然分分鐘出現下面異常:

 

 這是因為你沒有在我規定的時間內向我傳送“心跳包”,所以我認為你已經“陣亡”了,為了避免不必要的傻傻連線,我停止了連線。另外需要說的是重連機制,有多種重連機制,這裡我選擇每隔10秒重連一次,因為我覺得需要重連,那一般是因為伺服器掛了,既然掛了,那我每隔10秒重連也是不會浪費伺服器效能的,浪費的是瀏覽器的效能,客戶端的就算了,忽略不計。自動重連程式碼如下:

        async function start() {
            try {
                await connection.start();
                console.log(connection)
            } catch (err) {
                console.log(err);
                setTimeout(() => start(), 1e4);
            }
        };
        connection.onclose(async () => {
            await start();
        });
        start();

  當然還有其他很多重連的方案,可以去官網看看。

  當然若你的客戶端是用vue寫的話,寫法會有些不同,如下:

import '../../public/signalR.js'
const wsUrl = process.env.NODE_ENV === 'production' ? '/msg' :'http://xxx.net/msg'
var connection = new signalR.HubConnectionBuilder().withUrl(wsUrl).build()
connection.serverTimeoutInMilliseconds = 24e4
connection.keepAliveIntervalInMilliseconds = 12e4
Vue.prototype.$connection = connection

接下來就可以用this.$connection 愉快的使用了。

  到這裡或許你覺得大功告成了,若沒看瀏覽器的控制檯輸出,我也是這麼認為的,然後控制檯出現了紅色!:

 

 雖然出現了這個紅色,但是依然可以正常使用,只是降級了,不使用WebSocket了,心跳包變成了一個個的post請求,如下圖:

 

 

   這個是咋回事呢,咋就用不了WebSocket呢,我的是谷歌瀏覽器呀,肯定是支援WebSocket的,咋辦,只好去群裡討教了,後來大神告訴我,需要在ngnix配置如下紅框內的:

 

 真是一語驚醒夢中人,我加上如下程式碼就正常了:

        location /msg  {
          proxy_connect_timeout   300;
          proxy_read_timeout        300;
          proxy_send_timeout        300;
          proxy_pass http://xxx.net;
          proxy_http_version 1.1;
          proxy_set_header Upgrade $http_upgrade;
          proxy_set_header Connection "upgrade";
          proxy_set_header Host $host;
          proxy_cache_bypass $http_upgrade;
        }