1. 程式人生 > >dotnet core使用IO合併技巧輕鬆實現千萬級訊息推送

dotnet core使用IO合併技巧輕鬆實現千萬級訊息推送

之前講述過多路複用實現單服百萬級別RPS吞吐,但在文中有一點是沒有說的就是訊息IO合併,如果缺少了訊息IO合併即使怎樣多路複用也很難達到百萬級別的請求響畢竟所有應用層面的網路IO讀寫都是非常損耗效能的(需要硬體配置很高的伺服器)。這一章主要講述的是IO合併的應用,並通過這個特性實現普通單服務千萬級別的訊息推送測試。

什麼是訊息IO合併

所謂的訊息IO合併即是由原來一個訊息對應一個網路讀寫設計成多個訊息共享一個網路讀寫。那這樣的設計到底會帶來多大的效能提升,最簡單的對比場就是每次執行1條SQL執行1萬次和直接批執行1萬條SQL的差別,相信做過的朋友一定非常清楚其效能提升的幅度。那在網路通訊中如何設計才能讓多個訊息進行IO合併呢?作者在實際實踐中的方式有兩種:1)通過定時器把佇列中的所有訊息定期合併傳送,2)通過一個狀態機歸遞訊息佇列,一旦佇列存在訊息一次過合併傳送。定時器這種比較損耗效能,在連線量大的情況存在延時間相互影響;對於後者則比較好控制很多也不存在延時性,原理髮送訊息進佇列後和網路傳送完成再回到狀態機檢測訊息佇列狀態即可。 

訊息推送相對於請求響應來說相還是簡單很多的,畢竟訊息推送是單向並不需要有高效的響應機制。不過對於普通伺服器間實現千萬級的訊息推送還是需要做些規劃,畢竟是需要在有限的IO讀寫量的情況來達到這麼大規模的訊息處理。還有這麼大量的訊息序列化和反序列化也是一非常損耗效能的事情,所以這次實踐並沒有使用Protobuf,而是採用自定義序列化。測試的通訊元件選擇Beetlex因為它具備了自動訊息合併能力,並配合高效的多復路用機制在服務之間進行千萬級別的訊息推變得簡單。

測試簡述

這一次測試主要是向服務端推著一個簡單的訂單資訊,由客戶每次生成不同的訂單資訊推送給服務端,伺服器接收訂單訊息後進行統計,並計算每秒接收的訂單數量。

訊息結構

    public class Order
    {
        public long ID;
        public string Product;
        public int Quantity;
        public double Price;
        public double Total;
    }

建立訂單

private static long mId;
private static string[] mProducts = new string[] { "Apple", "Orange", "
Banana", "Citrus", "Mango" }; private static int[] mQuantity = new int[] { 3, 10, 20, 23, 6, 9, 21 }; private static double[] mPrice = new double[] { 2.3, 1.6, 3.2, 4.6, 20, 4 }; public static Order CreateOrder() { Order order = new Order(); order.ID = System.Threading.Interlocked.Increment(ref mId); order.Product = mProducts[order.ID % mPrice.Length]; order.Quantity = mQuantity[order.ID % mQuantity.Length]; order.Price = mPrice[order.ID % mPrice.Length]; order.Total = order.Quantity * order.Price; return order; }

基於測試資源有限,這次的測試並沒像之前跑PRS那樣採用Protobuf,因為這量的物件處理量實在太大,測試的硬體環境不變所以採了自定義的序列化方式,具體可以參考原始碼。

接收端程式碼

        public override void SessionPacketDecodeCompleted(IServer server, PacketDecodeCompletedEventArgs e)
        {
            PushMessages.Order order = (PushMessages.Order)e.Message;
            if (order.ID > MaxOrerID)
                MaxOrerID = order.ID;
            System.Threading.Interlocked.Increment(ref Count);
        }

由於是接收推送的訊息,服務端接收訊息後統計相關數量即可完成,對於之前的RPS測試所需處理的東西就少很多了。

推送端壓測程式碼

        public void Run()
        {
            foreach (var item in mClients)
                System.Threading.ThreadPool.QueueUserWorkItem(OnRun, item);
        }
        private void OnRun(object state)
        {
            AsyncTcpClient item = (AsyncTcpClient)state;
            while (true)
            {
                Order order = OrderFactory.CreateOrder();
                item.Send(order);
                if (order.ID > MaxOrerID)
                    MaxOrerID = order.ID;
                System.Threading.Interlocked.Increment(ref Count);
                if (item.Count > 2000)
                    System.Threading.Thread.Sleep(1);
            }
        }

為了防止壓爆連線內部的訊息佇列,壓測端當連線佇列超過2000個訊息的時候停止一下。由於採用了訊息合併機制所以並不需要太多連線,在整個測試過程中開啟了三個壓測例項,每個例項使用5個連線,換句話說BeetleX通過15個連線,實現千萬級訊息的推送能力。

測試伺服器資源

這次測試使用了兩家雲伺服器,第一家名字就不說了,開啟了V16核的虛擬伺服器,內部頻寬6G和100萬pps,結果實際壓測2G頻寬就壓不上去了,剛開始以為是linux系統要配置問題,換了windows系統試一下還是不行……,最終還是換回了阿里雲測,在v12核的虛擬伺服器上順利完成了這一次測試。

服務端: v12核,24G記憶體,作業系統ubuntu16.04 一臺,內網最大頻寬4Gb.

壓測端: v12核,24G記憶體,作業系統ubuntu16.04,內網最大頻寬4Gb, 兩臺(主要測試方式有些暴力一臺無法達到壓測目標)

測試結果

二臺壓測機共開啟了3個例項,每個例項5個連線,每個連線應用層處理的buffer 32k;整個測結果訊息推送量達到了1000萬個/秒。服務端記錄接收IO每秒15000次,平均每次receive得到的訊息大概在600個左右。以下是測試情況的截圖:

服務程式統計情況

服務端CPU情況

網路使用情況