1. 程式人生 > >c#開源訊息佇列中介軟體EQueue 教程

c#開源訊息佇列中介軟體EQueue 教程

一、簡介

EQueue是一個參照RocketMQ實現的開源訊息佇列中介軟體,相容Mono,具體可以參看作者的文章《分享一個c#寫的開源分散式訊息佇列equeue》。專案開源地址:https://github.com/tangxuehua/equeue,專案中包含了佇列的全部原始碼以及如何使用的示例。 二、安裝EQueue Producer、Consumer、Broker支援分散式部署,安裝EQueue需要.NET 4, Visual Studio 2010/2012/2013. 目前EQueue是個類庫,需要自己實現Broker的宿主,可以參照QuickStart,建立一個QuickStart.BrokerServer專案,通過Visual Studio的Nuget 查詢equeue
using System;
using System.Text;
using ECommon.Autofac;
using ECommon.Configurations;
using ECommon.JsonNet;
using ECommon.Log4Net;
using EQueue.Broker;
using EQueue.Configurations;
using EQueue.Protocols;

namespace QuickStart.BrokerServer {
    class Program {
        static void Main(string[] args) {
            InitializeEQueue();
            var setting = new BrokerSetting();
            setting.NotifyWhenMessageArrived = false;
            setting.DeleteMessageInterval = 1000;
            new BrokerController(setting).Initialize().Start();
            Console.ReadLine();
        }

        static void InitializeEQueue() {
            Configuration.Create().UseAutofac().RegisterCommonComponents().UseLog4Net().UseJsonNet().RegisterEQueueComponents();
        }
    }
}

InitializeEQueue方法初始化EQueue的環境,使用了Autofac作為IOC容器,使用log4Net記錄日誌, 我們看一下RegisterEQueueComponents方法:

public static class ConfigurationExtensions {
    public static Configuration RegisterEQueueComponents(this Configuration configuration) {
        configuration.SetDefault < IAllocateMessageQueueStrategy,
        AverageAllocateMessageQueueStrategy > ();
        configuration.SetDefault < IQueueSelector,
        QueueHashSelector > ();
        configuration.SetDefault < ILocalOffsetStore,
        DefaultLocalOffsetStore > ();
        configuration.SetDefault < IMessageStore,
        InMemoryMessageStore > ();
        configuration.SetDefault < IMessageService,
        MessageService > ();
        configuration.SetDefault < IOffsetManager,
        InMemoryOffsetManager > ();
        return configuration;
    }
}
程式碼中涉及到6個元件:
IAllocateMessageQueueStrategy
IQueueSelector
ILocalOffsetStore
IMessageStore
IMessageService
IOffsetManager
DeleteMessageInterval 這個屬性是用來設定equeue的定時刪除間隔,單位為毫秒,預設值是一個小時。另外還有ProducerSocketSetting 和 ConsumerSocketSetting 分別用於設定Producer連線Broker和Consumer連線Broker的IP和埠,預設埠是5000和5001。
public class BrokerSetting {
    public SocketSetting ProducerSocketSetting {
        get;
        set;
    }
    public SocketSetting ConsumerSocketSetting {
        get;
        set;
    }
    public bool NotifyWhenMessageArrived {
        get;
        set;
    }
    public int DeleteMessageInterval {
        get;
        set;
    }

    public BrokerSetting() {
        ProducerSocketSetting = new SocketSetting {
            Address = SocketUtils.GetLocalIPV4().ToString(),
            Port = 5000,
            Backlog = 5000
        };
        ConsumerSocketSetting = new SocketSetting {
            Address = SocketUtils.GetLocalIPV4().ToString(),
            Port = 5001,
            Backlog = 5000
        };
        NotifyWhenMessageArrived = true;
        DeleteMessageInterval = 1000 * 60 * 60;
    }
}
執行專案,如果顯示下面類似內容,說明Broker啟動成功:
2014-03-23 20:10:30,255  INFO BrokerController - Broker started, producer:[169.254.80.80:5000], consumer:[169.254.80.80:5001] 三、在Visual Studio中開發測試 1.建立一個VS專案 QuickStart.ProducerClient,通過Nuget引用EQueue,編寫下面Producer程式碼
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using ECommon.Autofac;
using ECommon.Configurations;
using ECommon.IoC;
using ECommon.JsonNet;
using ECommon.Log4Net;
using ECommon.Scheduling;
using EQueue.Clients.Producers;
using EQueue.Configurations;
using EQueue.Protocols;

namespace QuickStart.ProducerClient {
    class Program {
        static void Main(string[] args) {
            InitializeEQueue();

            var scheduleService = ObjectContainer.Resolve < IScheduleService > ();
            var producer = new Producer().Start();
            var total = 1000;
            var parallelCount = 10;
            var finished = 0;
            var messageIndex = 0;
            var watch = Stopwatch.StartNew();

            var action = new Action(() = >{
                for (var index = 1; index <= total; index++) {
                    var message = "message" + Interlocked.Increment(ref messageIndex);
                    producer.SendAsync(new Message("SampleTopic", Encoding.UTF8.GetBytes(message)), index.ToString()).ContinueWith(sendTask = >{
                        var finishedCount = Interlocked.Increment(ref finished);
                        if (finishedCount % 1000 == 0) {
                            Console.WriteLine(string.Format("Sent {0} messages, time spent:{1}", finishedCount, watch.ElapsedMilliseconds));
                        }
                    });
                }
            });

            var actions = new List < Action > ();
            for (var index = 0; index < parallelCount; index++) {
                actions.Add(action);
            }

            Parallel.Invoke(actions.ToArray());

            Console.ReadLine();
        }

        static void InitializeEQueue() {
            Configuration.Create().UseAutofac().RegisterCommonComponents().UseLog4Net().UseJsonNet().RegisterEQueueComponents();
        }
    }
}
Producer物件在使用之前必須要呼叫Start初始化,初始化一次即可, 注意:切記不可以在每次傳送訊息時,都呼叫Start方法。Producer 預設連線本機的5000埠,可以通過ProducerSetting 進行設定,可以參看下面的程式碼:
public class ProducerSetting {
    public string BrokerAddress {
        get;
        set;
    }
    public int BrokerPort {
        get;
        set;
    }
    public int SendMessageTimeoutMilliseconds {
        get;
        set;
    }
    public int UpdateTopicQueueCountInterval {
        get;
        set;
    }

    public ProducerSetting() {
        BrokerAddress = SocketUtils.GetLocalIPV4().ToString();
        BrokerPort = 5000;
        SendMessageTimeoutMilliseconds = 1000 * 10;
        UpdateTopicQueueCountInterval = 1000 * 5;
    }
2、建立一個VS專案 QuickStart.ConsumerClient,通過Nuget引用EQueue,編寫下面Consumer程式碼
using System;
using System.Linq;
using System.Text;
using System.Threading;
using ECommon.Autofac;
using ECommon.Configurations;
using ECommon.IoC;
using ECommon.JsonNet;
using ECommon.Log4Net;
using ECommon.Scheduling;
using EQueue.Broker;
using EQueue.Clients.Consumers;
using EQueue.Configurations;
using EQueue.Protocols;

namespace QuickStart.ConsumerClient {
    class Program {
        static void Main(string[] args) {
            InitializeEQueue();

            var messageHandler = new MessageHandler();
            var consumer1 = new Consumer("Consumer1", "group1").Subscribe("SampleTopic").Start(messageHandler);
            var consumer2 = new Consumer("Consumer2", "group1").Subscribe("SampleTopic").Start(messageHandler);
            var consumer3 = new Consumer("Consumer3", "group1").Subscribe("SampleTopic").Start(messageHandler);
            var consumer4 = new Consumer("Consumer4", "group1").Subscribe("SampleTopic").Start(messageHandler);

            Console.WriteLine("Start consumer load balance, please wait for a moment.");
            var scheduleService = ObjectContainer.Resolve < IScheduleService > ();
            var waitHandle = new ManualResetEvent(false);
            var taskId = scheduleService.ScheduleTask(() = >{
                var c1AllocatedQueueIds = consumer1.GetCurrentQueues().Select(x = >x.QueueId);
                var c2AllocatedQueueIds = consumer2.GetCurrentQueues().Select(x = >x.QueueId);
                var c3AllocatedQueueIds = consumer3.GetCurrentQueues().Select(x = >x.QueueId);
                var c4AllocatedQueueIds = consumer4.GetCurrentQueues().Select(x = >x.QueueId);
                if (c1AllocatedQueueIds.Count() == 1 && c2AllocatedQueueIds.Count() == 1 && c3AllocatedQueueIds.Count() == 1 && c4AllocatedQueueIds.Count() == 1) {
                    Console.WriteLine(string.Format("Consumer load balance finished. Queue allocation result: c1:{0}, c2:{1}, c3:{2}, c4:{3}", string.Join(",", c1AllocatedQueueIds), string.Join(",", c2AllocatedQueueIds), string.Join(",", c3AllocatedQueueIds), string.Join(",", c4AllocatedQueueIds)));
                    waitHandle.Set();
                }
            },
            1000, 1000);

            waitHandle.WaitOne();
            scheduleService.ShutdownTask(taskId);

            Console.ReadLine();
        }

        static void InitializeEQueue() {
            Configuration.Create().UseAutofac().RegisterCommonComponents().UseLog4Net().UseJsonNet().RegisterEQueueComponents();
        }
    }

    class MessageHandler: IMessageHandler {
        private int _handledCount;

        public void Handle(QueueMessage message, IMessageContext context) {
            var count = Interlocked.Increment(ref _handledCount);
            if (count % 1000 == 0) {
                Console.WriteLine("Total handled {0} messages.", count);
            }
            context.OnMessageHandled(message);
        }
    }
}
使用方式給使用者感覺是訊息從EQueue伺服器推到了應用客戶端。 但是實際Consumer內部是使用長輪詢Pull方式從EQueue伺服器拉訊息,然後再回呼叫戶Listener方法。Consumer預設連線本機的5001埠,可以通過ConsumerSetting 進行設定,可以參看下面的程式碼:
public class ConsumerSetting {
    public string BrokerAddress {
        get;
        set;
    }
    public int BrokerPort {
        get;
        set;
    }
    public int RebalanceInterval {
        get;
        set;
    }
    public int UpdateTopicQueueCountInterval {
        get;
        set;
    }
    public int HeartbeatBrokerInterval {
        get;
        set;
    }
    public int PersistConsumerOffsetInterval {
        get;
        set;
    }
    public PullRequestSetting PullRequestSetting {
        get;
        set;
    }
    public MessageModel MessageModel {
        get;
        set;
    }
    public MessageHandleMode MessageHandleMode {
        get;
        set;
    }

    public ConsumerSetting() {
        BrokerAddress = SocketUtils.GetLocalIPV4().ToString();
        BrokerPort = 5001;
        RebalanceInterval = 1000 * 5;
        HeartbeatBrokerInterval = 1000 * 5;
        UpdateTopicQueueCountInterval = 1000 * 5;
        PersistConsumerOffsetInterval = 1000 * 5;
        PullRequestSetting = new PullRequestSetting();
        MessageModel = MessageModel.Clustering;
        MessageHandleMode = MessageHandleMode.Parallel;
    }

相關推薦

c#開源訊息佇列中介軟體EQueue 教程

一、簡介 EQueue是一個參照RocketMQ實現的開源訊息佇列中介軟體,相容Mono,具體可以參看作者的文章《分享一個c#寫的開源分散式訊息佇列equeue》。專案開源地址:https://github.com/tangxuehua/equeue,專案中包含了佇列

訊息佇列中介軟體??

  訊息佇列中介軟體是分散式系統中重要的元件,主要解決應用耦合,非同步訊息,流量削鋒等問題 二、訊息佇列應用場景 以下介紹訊息佇列在實際應用中常用的使用場景。非同步處理,應用解耦,流量削鋒和訊息通訊四個場景 2.1非同步處理 場景說明:使用者註冊後,需要發註冊郵件和註冊簡訊。傳統的做法有兩

Delayer 基於 Redis 的延遲訊息佇列中介軟體

Delayer 基於 Redis 的延遲訊息佇列中介軟體,採用 Golang 開發,支援 PHP、Golang 等多種語言客戶端。 參考 有贊延遲佇列設計 中的部分設計,優化後實現。 專案連結:https://github.com/mixstart/d... ,有需要的朋友加 Star 哦。 應用場景

訊息佇列中介軟體(一)介紹

訊息佇列中介軟體(一)介紹 訊息佇列介紹 訊息佇列中介軟體是大型系統中的重要元件,已經逐漸成為企業系統內部通訊的核心手段。它具有鬆耦合、非同步訊息、流量削峰、可靠投遞、廣播、流量控制、最終一致性等一系列功能,已經成為非同步RPC的主要手段之一。 目前常見的訊息中介軟體有ActiveMQ、Ra

訊息佇列中介軟體(二)使用 ActiveMQ

ActiveMQ 介紹 Active MQ 是由 Apache 出品的一款流行的功能強大的開源訊息中介軟體,它速度快,支援跨語言的客戶端,具有易於使用的企業整合模式和許多的高階功能,同時完全支援 JSM1.1 和 J2EE1.4 。 官方下載地址: http://activemq.apache.or

訊息佇列中介軟體(三)Kafka 入門指南

Kafka 來源 Kafka的前身是由LinkedIn開源的一款產品,2011年初開始開源,加入了 Apache 基金會,2012年從 Apache Incubator 畢業變成了 Apache 頂級開源專案。同時LinkedIn還有許多著名的開源產品。如: 分散式資料同步系統Databus

什麼是訊息佇列中介軟體詳解

Apache kafka是訊息中介軟體的一種,我發現很多人不知道訊息中介軟體是什麼,在開始學習之前,我這邊就先簡單的解釋一下什麼是訊息中介軟體,只是粗略的講解,目前kafka已經可以做更多的事情。舉個例子,生產者消費者,生產者生產雞蛋,消費者消費雞蛋,生產者生產一個雞蛋,消費者就消費一個雞蛋,假設消費者消費雞

常用的訊息佇列中介軟體mq對比

訊息佇列中介軟體是分散式系統中重要的元件,主要解決應用耦合,非同步訊息,流量削鋒等問題 實現高效能,高可用,可伸縮和最終一致性架構 使用較多的訊息佇列有ActiveMQ,RabbitMQ,ZeroMQ,Kafka,MetaMQ,RocketMQ 二、訊息佇列應用場景

訊息佇列中介軟體之RabbitMQ簡介

Spring boot RabbitMQ詳解 訊息佇列中介軟體簡介 當下流行:RabbitMQ、ActiveMQ、Kafka 速度:Kafka(大資料)> RabbitMQ > ActiveMQ(電商會用) 安全:Kafka(大資料)< 

08 如果讓你來開發一個訊息佇列中介軟體,你會怎麼設計架構?

目錄   1、面試題 2、面試官心裡分析 3、面試題剖析 1、面試題 如果讓你寫一個訊息佇列,該如何進行架構設計啊?說一下你的思路 2、面試官心裡分析 其實聊到這個問題,一般面試官要考察兩塊: (1)你有沒有對某一個訊息佇列做過較為深入的原理的瞭解,

在 CentOS7 上安裝 RabbitMQ 訊息佇列中介軟體

add_user <UserName> <Password> delete_user <UserName> change_password <UserName> <NewPassword> list_users add_vhost <

訊息佇列中介軟體記錄

僅作為個人學習筆記,遊客勿噴。 訊息佇列 訊息佇列有無數開源實現,一般沒必要自己實現。zmq也好rabbitmq也好甚至redis也好,找一個合適的裝上用就行 就好像rdbms/nosql一樣。 技術都是解決問題的,訊息佇列解決的是將突發大量請求轉換為後端能承受的佇列請

C#分散式訊息佇列 EQueue 2.0 釋出啦

前言 最近花了我幾個月的業餘時間,對EQueue做了一個重大的改造,訊息持久化採用本地寫檔案的方式。到現在為止,總算完成了,所以第一時間寫文章分享給大家這段時間我所積累的一些成果。 昨天,我寫過一篇關於EQueue 2.0效能測試結果的文章,有興趣的可以看看。 為什麼要改為檔案儲存? SQL

MyCat:開源分散式資料庫中介軟體

為什麼需要MyCat? 雖然雲端計算時代,傳統資料庫存在著先天性的弊端,但是NoSQL資料庫又無法將其替代。如果傳統資料易於擴充套件,可切分,就可以避免單機(單庫)的效能缺陷。 MyCat的目標就是:低成本地將現有的單機資料庫和應用平滑遷移到“雲”端,解決資料儲存和業務規模迅速

中介軟體系列ActiveMQ,Rocketmq,Rabbitmq,Kafka,Mycat讓你深入理解學習中介軟體視訊教程網盤

中介軟體系列ActiveMQ,Rocketmq,Rabbitmq,Kafka,Mycat讓你深入理解學習中介軟體視訊教程網盤39套Java架構師,高併發,高效能,高可用,分散式,叢集,電商,快取,微服務,微信支付寶支付,公眾號開發,java8新特性,P2P金融專案,程式設計,功能設計,資料庫設計,第

MySQL開源資料傳輸中介軟體架構設計實踐

本文根據洪斌10月27日在「3306π」技術 Meetup - 武漢站現場演講內容整理而成。 主要內容: 本次分享將介紹目前資料遷移、資料同步、資料消費,多IDC架構中資料複製技術所面臨問題及現有的產品和方案,並分享新開源的能在異構資料儲存之間提供高效能和強大複製功能的DTLE相關技術

RabbitMQ訊息通訊中介軟體中的那些概念

本章主要內容 瞭解rabbitmq的誕生 環境設定與安裝 AMQP協議 訊息通訊概念-----生產者與消費者 訊息持久化   瞭解rabbitmq的誕生         20世紀80年代,IBM、微軟

愛可生開源資料傳輸中介軟體DTLE亮相「3306π」深圳站年會

12月15日,上海愛可生資訊科技股份有限公司贊助的「3306π」年會-深圳站成功舉辦,此次年會圍繞MySQL核心技術,邀請各大行業一線大咖分享最新鮮的前沿技術與最生動的實踐案例。愛可生技術服務總監洪斌現場分享了開源資料傳輸中介軟體DTLE的相關技術。     技

愛可生MySQL開源資料傳輸中介軟體DTLE首次技術分享

10月27日,上海愛可生資訊科技股份有限公司贊助的「3306π」技術 Meetup - 武漢站成功舉辦,愛可生技術服務總監洪斌分享了《MySQL 開源資料傳輸中介軟體架構設計實踐》的主題演講,並對愛可生10月24日最新開源專案 DTLE 相關技術細節進行了詳細講解。   D

開源訊息佇列QMQ的設計與實現理念

文章概要 背 景 2012 年,隨著公司業務的快速增長,公司當時的單體應用架構很難滿足業務快速增長的要求,和其他很多公司一樣,去哪兒網也開始了服務化改造,按照業務等要素將原來龐大的單體應用拆分成不同的服務。那麼在進行服務化改造之前首先就是面臨是服務化基礎設施的技術選型,其中最重要的就是服