1. 程式人生 > >效能提升五十倍:訊息佇列延時聚合通知的重要性

效能提升五十倍:訊息佇列延時聚合通知的重要性

前言

這個話題對我而言,是影響很久的事情。從第一次使用訊息佇列開始,業務背景是報名系統通知到我們的系統。正常流量下資料都能正常通知過來,但遇到匯入報名人時,採用了Task非同步通知,資料量一大,佇列就死了。當時是儘量採用同步方式,減少併發量。

 後來業務上有了專門的營銷系統,各種資料的增刪改都要進營銷系統,我採用的方式在倉儲層對需要通知的表的任何更新都通知到佇列,這樣的方式幾乎對其他業務無侵犯。

好處有,壞處也有。很多批量任務的更新如果採用同步方式頻繁通知是十分浪費速度的,既影響資料的更新速度,也對佇列帶來了挑戰。我曾經專門拉了個分支來優化批量任務,但由於需要涉及很多批量任務最後不了了之。更合理的推送模型應該是這樣,更新訊息先到記憶體佇列,積累一段時間(5秒或30秒)後,聚合到一起推送到訊息佇列,如下圖:

挑戰過去

其實也說不上是問題,原因知道,解決方法也知道。只是現狀還能支撐,就沒有去解決,但這些事情總要面對的。挑戰過去的糟糕程式碼,優化提升效能,本身就是一個技術成長的過程。

邁出第一步

第一步當然是Demo,先列出程式碼。先貼上一個基於Rabbitmq.Client的客戶端幫助程式碼,用於推送單條資料和多條資料。

public class RabbitProvider
    {
        public const string RABBITMQURL = "amqp://test:[email protected]
:5672/test"; private static IConnection conn; /// <summary> /// 獲取連線。 /// </summary> /// <param name="url"></param> /// <returns></returns> public static IConnection CreateConnection(string url) { ConnectionFactory factory = new ConnectionFactory(); factory.Uri = new Uri(url); factory.AutomaticRecoveryEnabled = true; IConnection conn = factory.CreateConnection(); return conn; } /// <summary> /// 單個 /// </summary> /// <param name="data"></param> public static void Publish<T>(string exchange, string queue, string route, T data) { if (conn == null || !conn.IsOpen) { conn = CreateConnection(RABBITMQURL); } using (IModel model = conn.CreateModel()) { model.ExchangeDeclare(exchange, ExchangeType.Direct); model.QueueDeclare(queue, false, false, false, null); model.QueueBind(queue, exchange, route, null); //IBasicProperties props = ch.CreateBasicProperties(); //FillInHeaders(props); // or similar // byte[] body = ComputeBody(props); // or similar model.BasicPublish(exchange, route, null, System.Text.Encoding.Default.GetBytes(data.ToString())); } } /// <summary> /// 多條資料 /// </summary> /// <param name="data"></param> public static void Publish<T>(string exchange, string queue, string route, List<T> data) { if (conn == null || !conn.IsOpen) { conn = CreateConnection(RABBITMQURL); } using (IModel model = conn.CreateModel()) { model.ExchangeDeclare(exchange, ExchangeType.Direct); model.QueueDeclare(queue, false, false, false, null); model.QueueBind(queue, exchange, route, null); //IBasicProperties props = ch.CreateBasicProperties(); //FillInHeaders(props); // or similar // byte[] body = ComputeBody(props); // or similar foreach (var item in data) { model.BasicPublish(exchange, route, null, System.Text.Encoding.Default.GetBytes(item.ToString())); } } } }

也許在部分人眼裡能提供支援單條和多條推送的方式已經能解決絕大多數問題,看起來確實如此。但單純的推送批量資料是有業務方發起,是對每個批量任務都有較大侵入的,雖然它很好,但不夠好。接下來我們貼上基於BlockingCollection<T>提供的執行緒安全集合來完成的佇列程式碼。

 public class DANQueue<T> : IDANQueue<T>
    {
        private static BlockingCollection<DANMessage<T>> GlobalCollection;

        static DANQueue()
        {
            GlobalCollection = new BlockingCollection<DANMessage<T>>();
        }
        /// <summary>
        /// 新增
        /// </summary>
        /// <param name="item"></param>
        /// <returns></returns>
        public static bool TryAdd(DANMessage<T> item)
        {
            return GlobalCollection.TryAdd(item);
        }
        /// <summary>
        /// 獲取一個
        /// </summary>
        /// <param name="item"></param>
        public static DANMessage<T> TryTake()
        {
            var msg = new DANMessage<T>();
            if (GlobalCollection.TryTake(out msg))
            {
                return msg;
            }
            return null;
        }
        /// <summary>
        /// 獲取所有
        /// </summary>
        /// <returns></returns>
        public static List<DANMessage<T>> TryTakeAll()
        {
            var list = new List<DANMessage<T>>();
            while (true)
            {
                var q = TryTake();
                if (q == null)
                {
                    return list;
                }
                list.Add(q);
            }
        }
        /// <summary>
        /// 統計
        /// </summary>
        public static int Count()
        {
            return GlobalCollection.Count;
        }
    }

測試業務Demo

    /// <summary>
    /// 使用者
    /// </summary>
   public class User
    {
        public string Mobile { get; set; }
        public long CompanyId { get; set; }
    }
   /// <summary>
    /// 倉儲
    /// </summary>
  public class Repository<TDocument> : IRepository<TDocument>
    {
        public bool Update(User user)
        {
            DANQueue<User>.TryAdd(new DANMessage<User>() { Body = user, Key = user.CompanyId + user.Mobile, Type = typeof(User).Name, TimeStamp = DateTime.Now.Ticks });
            return true;
        }
    }

分別測試批量更新資料下迴圈通知和只通知一次耗時,程式碼如下:

  public const string ExchangeStr = "fanTest";
        public const string QueueStr = "fanQueueTest";
        private static string TypeUserName = typeof(User).Name;
        static void Main(string[] args)
        {
            //這裡就不引入依賴注入了。
            Repository<User> repository = new Repository<User>();
            Stopwatch stopwatch = new Stopwatch();
            stopwatch.Start();
            for (var i = 0; i <= 1000; i++)
            {
                var user = new User()
                {
                    CompanyId = 13232,
                    Mobile = "11111" + i
                };
                repository.Update(user);
                RabbitProvider.Publish(ExchangeStr, QueueStr, TypeUserName, DANQueue<User>.TryTake());
            }
            stopwatch.Stop();
            Console.WriteLine($"100000UpdateWithPush-Time:" + stopwatch.ElapsedMilliseconds);


            //批量測試。
            stopwatch.Restart();
            for (var i = 0; i <= 1000; i++)
            {
                var user = new User()
                {
                    CompanyId = 13232,
                    Mobile = "11111" + i
                };
                repository.Update(user);
            }
            RabbitProvider.Publish(ExchangeStr, QueueStr, TypeUserName, DANQueue<User>.TryTakeAll());
            stopwatch.Stop();
            Console.WriteLine($"100000UpdateDelayPush-Time:" + stopwatch.ElapsedMilliseconds);

            Console.ReadLine();
        }
    }

結果如下:

UpdateWithPush-Time:4103
UpdateDelayPush-Time:73

這裡列舉的只是1000條,當我改成1萬條的時候,佇列掛了!這充分說明了延時聚合通知的重要性。相同的環境下,迴圈通知支撐不了1萬,但聚合後只通知一次的情況下,10萬資料也花了9秒。雙方效能對比結果是指數級的。

UpdateDelayPush-Time:9671

引入定時機制

上面已經對比了迴圈通知和聚合通知的效能,但普通的聚合十分侵入業務。每種型別的業務都需要引入程式碼,使用不方便,而且維護起來也麻煩。這時候可以考慮引入定時任務來處理聚合通知。先來個1百萬的更新。

 System.Timers.Timer timer = new System.Timers.Timer(5000);
            timer.Elapsed += Timer_Elapsed;
            timer.Start();
            //批量測試大量資料
            for (var i = 0; i <= 1000000; i++)
            {
                var user = new User()
                {
                    CompanyId = 13232,
                    Mobile = "11111" + i
                };
                repository.Update(user);
               
            }

定時觸發的方法如下:

private static void Timer_Elapsed(object sender, System.Timers.ElapsedEventArgs e)
        {
            var list = DANQueue<User>.TryTakeAll();
            if (list.Count > 0)
            {
                RabbitProvider.Publish(ExchangeStr, QueueStr, TypeUserName, list);
            }
        }

執行Debug測試,為方便顯示,我減少了一些列,只顯示queue名和釋出速度,能達到每秒1萬左右的量。

| test | [fanQueueTest](/#/queues/test/fanQueueTest) | 10,569/s |
| test | [fanQueueTest](/#/queues/test/fanQueueTest) | 12,336/s |

談到此時的推送速度,再來回顧下剛開始迴圈通知的速度,每秒250左右,可見速度提升了50倍!

| test | [fanQueueTest](/#/queues/test/fanQueueTest) |  249/s |

原始碼

總結

經過以上對比,效能從幾千就掛到支撐到每秒上萬的推送量,並且支撐百萬(更高級別沒測試)以上級更新依然健壯執行。結果如此明顯,如果還沒有動力改變,那還有什麼能拯救你呢?這裡的Timer以後可以替換成hangfire,因為hangfire有UI監控,可以檢視狀態。hangfire貌似不推薦大資料量的引數,這些細節問題以後可以根據測試情況去取捨。

以上僅為了測試,如果要變成通用可複用,還有更長的路需要走,但比起分散式追蹤簡單多了,一步一步來,用目標約束自己慢慢實現。

本篇完畢,謝謝觀看。