1. 程式人生 > >eShopOnContainers學習系列(三):RabbitMQ訊息匯流排實踐

eShopOnContainers學習系列(三):RabbitMQ訊息匯流排實踐

今天研究了下eShopOnContainers裡的RabbitMQ的使用,在專案裡是以封裝成訊息匯流排的方式使用的,但是仍然是以其釋出、訂閱兩個方法作為基礎封裝的,我們今天就來實際使用一下。

為了簡單起見,就在同一個API專案裡實現釋出訂閱。

新建API專案 RabbitMQ_Bus_Test ,類庫 EventBus、EventBusRabbitMQ,這兩個類庫中將會實現訊息匯流排最主要的方法、釋出訂閱。

在EventBus中新增訊息事件類:IntegrationEvent,這個類在事件裡是作為一個訊息父類,所有的訊息類都需要繼承這個類,在實際專案裡按需修改。

public
class IntegrationEvent { public IntegrationEvent() { Id = Guid.NewGuid(); CreationDate = DateTime.UtcNow; } [JsonConstructor] public IntegrationEvent(Guid id, DateTime createDate) { Id = id; CreationDate
= createDate; } [JsonProperty] public Guid Id { get; private set; } [JsonProperty] public DateTime CreationDate { get; private set; } }

新增泛型介面類:IIntegrationEventHandler.cs,通過訊息佇列傳送的訊息需要有處理程式,而這個介面則是作為一個約束類存在,所有的處理程式都需要繼承這個泛型介面類:

    public interface
IIntegrationEventHandler<in TIntegrationEvent> : IIntegrationEventHandler where TIntegrationEvent : IntegrationEvent { Task Handle(TIntegrationEvent @event); } public interface IIntegrationEventHandler { }

新增事件訂閱管理介面:IEventBusSubscriptionsManager,從類名就可以看出這個是專門用來管理訊息處理方法的,我們這裡主要看AddSubscription這個訂閱方法,它的作用是繫結訊息類和訊息處理程式,即哪一個訊息被哪一個方法消費,當然這裡有兩個約束,訊息類需要繼承IntegrationEvent,處理類需要繼承IIntegrationEventHandler:

public interface IEventBusSubscriptionsManager
    {
        bool IsEmpty { get; }
        event EventHandler<string> OnEventRemoved;
        void AddDynamicSubscription<TH>(string eventName)
            where TH : IDynamicIntegrationEventHandler;

        void AddSubscription<T, TH>()
           where T : IntegrationEvent
           where TH : IIntegrationEventHandler<T>;

        void RemoveSubscription<T, TH>()
            where TH : IIntegrationEventHandler<T>
            where T : IntegrationEvent;

        void RemoveDynamicSubscription<TH>(string eventName)
            where TH : IDynamicIntegrationEventHandler;

        bool HasSubscriptionsForEvent<T>() where T : IntegrationEvent;
        bool HasSubscriptionsForEvent(string eventName);
        Type GetEventTypeByName(string eventName);
        void Clear();
        IEnumerable<SubscriptionInfo> GetHandlersForEvent<T>() where T : IntegrationEvent;
        IEnumerable<SubscriptionInfo> GetHandlersForEvent(string eventName);
        string GetEventKey<T>();
    }

接著新增它的實現類InMemoryEventBusSubscriptionsManager.cs,從類名可以看出訂閱的管理都是在記憶體中處理的,在生產專案裡我們可以使用Redis來進行儲存:

public partial class InMemoryEventBusSubscriptionsManager: IEventBusSubscriptionsManager
    {
        private readonly Dictionary<string, List<SubscriptionInfo>> _handlers;
        private readonly List<Type> _eventTypes;

        public event EventHandler<string> OnEventRemoved;

        public InMemoryEventBusSubscriptionsManager()
        {
            _handlers = new Dictionary<string, List<SubscriptionInfo>>();
            _eventTypes = new List<Type>();
        }

        public bool IsEmpty => !_handlers.Keys.Any();
        public void Clear() => _handlers.Clear();

        public bool HasSubscriptionsForEvent(string eventName) => _handlers.ContainsKey(eventName);

        public void AddDynamicSubscription<TH>(string eventName)
            where TH : IDynamicIntegrationEventHandler
        {
            DoAddSubscription(typeof(TH), eventName, isDynamic: true);
        }

        public void AddSubscription<T,TH>()
            where T:IntegrationEvent
            where TH:IIntegrationEventHandler<T>
        {
            var eventName = GetEventKey<T>();

            DoAddSubscription(typeof(TH), eventName, isDynamic: false);

            if(!_eventTypes.Contains(typeof(T)))
            {
                _eventTypes.Add(typeof(T));
            }
        }

        public void DoAddSubscription(Type handlerType,string eventName,bool isDynamic)
        {
            if(!HasSubscriptionsForEvent(eventName))
            {
                _handlers.Add(eventName, new List<SubscriptionInfo>());
            }

            if(_handlers[eventName].Any(s=>s.HandlerType==handlerType))
            {
                throw new ArgumentException($"Handler Type {handlerType.Name} already registered for '{eventName}'", nameof(handlerType));
            }

            if(isDynamic)
            {
                _handlers[eventName].Add(SubscriptionInfo.Dynamic(handlerType));
            }
            else
            {
                _handlers[eventName].Add(SubscriptionInfo.Typed(handlerType));
            }
        }

        public void RemoveDynamicSubscription<TH>(string eventName)
            where TH: IDynamicIntegrationEventHandler
        {
            var handlerToRemove = FindDynamicSubscriptionToRemove<TH>(eventName);
            
        }

        private SubscriptionInfo FindDynamicSubscriptionToRemove<TH>(string eventName)
            where TH:IDynamicIntegrationEventHandler
        {
            return DoFindSubscriptionToRemove(eventName, typeof(TH));
        }


        private SubscriptionInfo DoFindSubscriptionToRemove(string eventName,Type handlerType)
        {
            if(!HasSubscriptionsForEvent(eventName))
            {
                return null;
            }

            return _handlers[eventName].SingleOrDefault(s => s.HandlerType == handlerType);
        }

        public void DoRemoveHandler(string eventName, SubscriptionInfo subsToRemove)
        {
            if(subsToRemove!=null)
            {
                _handlers[eventName].Remove(subsToRemove);
                if(!_handlers[eventName].Any())
                {
                    _handlers.Remove(eventName);
                    var eventType = _eventTypes.SingleOrDefault(e => e.Name == eventName);
                    if(eventType!=null)
                    {
                        _eventTypes.Remove(eventType);
                    }
                    RaiseOnEventRemoved(eventName);
                }
            }
        }

        public void RaiseOnEventRemoved(string eventName)
        {
            var handler = OnEventRemoved;
            if(handler!=null)
            {
                OnEventRemoved(this, eventName);
            }
        }

        public void RemoveSubscription<T, TH>()
            where T : IntegrationEvent
            where TH : IIntegrationEventHandler<T>
        {
            var handlerToRemove = FindSubscriptionToRemove<T, TH>();
            var eventName = GetEventKey<T>();
            DoRemoveHandler(eventName, handlerToRemove);
        }

        private SubscriptionInfo FindSubscriptionToRemove<T,TH>()
            where T:IntegrationEvent
            where TH:IIntegrationEventHandler<T>
        {
            var eventName = GetEventKey<T>();
            return DoFindSubscriptionToRemove(eventName, typeof(TH));
        }

        public Type GetEventTypeByName(string eventName) => _eventTypes.SingleOrDefault(t => t.Name == eventName);

        public IEnumerable<SubscriptionInfo> GetHandlersForEvent<T>() where T : IntegrationEvent
        {
            var key = GetEventKey<T>();
            return GetHandlersForEvent(key);
        }

        public IEnumerable<SubscriptionInfo> GetHandlersForEvent(string eventName) => _handlers[eventName];

        public string GetEventKey<T>()
        {
            return typeof(T).Name;
        }

        public bool HasSubscriptionsForEvent<T>() where T : IntegrationEvent
        {
            var key = GetEventKey<T>();
            return HasSubscriptionsForEvent(key);
        }
    }

這個實現類裡有許多關於訂閱事件的處理方法,我們本次需要關注的有兩個地方,這裡使用字典Dictionary<string, List<SubscriptionInfo>>儲存訊息類和訊息處理類,這裡有個事件資源類訂閱資訊類,儲存兩個屬性,一個是否是動態的,另一個是處理程式型別:

public class SubscriptionInfo
    {
        public bool IsDynamic { get; }
        public Type HandlerType { get; }

        private SubscriptionInfo(bool isDynamic, Type handlerType)
        {
            IsDynamic = isDynamic;
            HandlerType = handlerType;
        }

        public static SubscriptionInfo Dynamic(Type handlerType)
        {
            return new SubscriptionInfo(true, handlerType);
        }

        public static SubscriptionInfo Typed(Type handlerType)
        {
            return new SubscriptionInfo(false, handlerType);
        }
    }

簡單描述一下訊息的處理流程,在程式初次載入時會將訊息類和對應的處理類進行繫結,當訊息傳送的時候從根據傳送的訊息類從字典裡查詢對應的處理類,通過反射進行載入呼叫。

 

 

OK,關於訊息資源方面的方法就這些,下面我們看下關於RabbitMQ的封裝,新增事件匯流排介面,這裡面有兩個最重要的方法,釋出和訂閱:

public interface IEventBus
    {
        void Publish(IntegrationEvent @event);

        void Subscribe<T, TH>()
            where T : IntegrationEvent
            where TH : IIntegrationEventHandler<T>;

        void SubscribeDynamic<TH>(string eventName)
            where TH : IDynamicIntegrationEventHandler;

        void UnsubscribeDynamic<TH>(string eventName)
            where TH : IDynamicIntegrationEventHandler;

        void Unsubscribe<T, TH>()
            where TH : IIntegrationEventHandler<T>
            where T : IntegrationEvent;
    }

 

看一下它的實現類EventBusRabbitMQ.cs,這裡我們只看最重要的兩個方法Publish和Subscribe:

public class EventBusRabbitMQ : IEventBus, IDisposable
    {
        const string BROKER_NAME = "eshop_event_bus";

        private readonly IRabbitMQPersistentConnection _persistentConnection;
        private readonly ILogger<EventBusRabbitMQ> _logger;
        private readonly IEventBusSubscriptionsManager _subsManager;
        private readonly ILifetimeScope _autofac;
        private readonly string AUTOFAC_SCOPE_NAME = "eshop_event_bus";
        private readonly int _retryCount;

        private IModel _consumerChannel;
        private string _queueName;


        public EventBusRabbitMQ(IRabbitMQPersistentConnection persistentConnection, ILogger<EventBusRabbitMQ> logger,
    ILifetimeScope autofac, IEventBusSubscriptionsManager subsManager, string queueName = null, int retryCount = 5)
        {
            _persistentConnection = persistentConnection ?? throw new ArgumentNullException(nameof(persistentConnection));
            _logger = logger ?? throw new ArgumentNullException(nameof(logger));
            _subsManager = subsManager ?? new InMemoryEventBusSubscriptionsManager();
            _queueName = queueName;
            _consumerChannel = CreateConsumerChannel();
            _autofac = autofac;
            _retryCount = retryCount;
            _subsManager.OnEventRemoved += SubsManager_OnEventRemoved;
        }

        private void SubsManager_OnEventRemoved(object sender, string eventName)
        {
            if (!_persistentConnection.IsConnected)
            {
                _persistentConnection.TryConnect();
            }

            using (var channel = _persistentConnection.CreateModel())
            {
                channel.QueueUnbind(queue: _queueName, exchange: BROKER_NAME, routingKey: eventName);

                if (_subsManager.IsEmpty)
                {
                    _queueName = string.Empty;
                    _consumerChannel.Close();
                }
            }
        }

        public void Publish(IntegrationEvent @event)
        {
            if (!_persistentConnection.IsConnected)
            {
                _persistentConnection.TryConnect();
            }

            var policy = RetryPolicy.Handle<BrokerUnreachableException>()
                .Or<SocketException>()
                .WaitAndRetry(_retryCount, retryAttempt => TimeSpan.FromSeconds(Math.Pow(2, retryAttempt)), (ex, time) =>
                {
                    _logger.LogWarning(ex.ToString());
                });

            using (var channel = _persistentConnection.CreateModel())
            {
                var eventName = @event.GetType()
                    .Name;

                channel.ExchangeDeclare(exchange: BROKER_NAME, type: "direct");

                var message = JsonConvert.SerializeObject(@event);
                var body = Encoding.UTF8.GetBytes(message);

                try
                {
                    policy.Execute(() =>
                    {
                        var properties = channel.CreateBasicProperties();
                        properties.DeliveryMode = 2; // persistent
                        channel.BasicPublish(exchange: BROKER_NAME, routingKey: eventName, mandatory: false, basicProperties: properties, body: body);
                    });
                }
                catch(Exception ex)
                {
                    Console.WriteLine(ex.Message);
                }
            }
        }

        /// <summary>
        /// 訂閱動態
        /// </summary>
        public void SubscribeDynamic<TH>(string eventName)
    where TH : IDynamicIntegrationEventHandler
        {
            DoInternalSubscription(eventName);
            _subsManager.AddDynamicSubscription<TH>(eventName);
        }


        /// <summary>
        /// 訂閱
        /// </summary>
        /// <typeparam name="T"></typeparam>
        /// <typeparam name="TH"></typeparam>
        public void Subscribe<T, TH>()
            where T : IntegrationEvent
            where TH : IIntegrationEventHandler<T>
        {
            var eventName = _subsManager.GetEventKey<T>();
            DoInternalSubscription(eventName);
            _subsManager.AddSubscription<T, TH>();
        }

       /// <summary>
        /// 進行內部訂閱
        /// </summary>
        /// <param name="eventName"></param>
        private void DoInternalSubscription(string eventName)
        {
            var containsKey = _subsManager.HasSubscriptionsForEvent(eventName);
            if(!containsKey)
            {
                if(!_persistentConnection.IsConnected)
                {
                    _persistentConnection.TryConnect();
                }

                using (var channel = _persistentConnection.CreateModel())
                {
                    channel.QueueBind(queue: _queueName, exchange: BROKER_NAME, routingKey: eventName);
                }
            }
        }

        /// <summary>
        /// 取消訂閱
        /// </summary>
        public void Unsubscribe<T,TH>()
            where TH:IIntegrationEventHandler<T>
            where T:IntegrationEvent
        {
            _subsManager.RemoveSubscription<T, TH>();
        }

        /// <summary>
        /// 取消訂閱動態
        /// </summary>
        public void UnsubscribeDynamic<TH>(string eventName)
    where TH : IDynamicIntegrationEventHandler
        {
            _subsManager.RemoveDynamicSubscription<TH>(eventName);
        }

        /// <summary>
        /// 釋放資源
        /// </summary>
        public void Dispose()
        {
            if(_consumerChannel!=null)
            {
                _consumerChannel.Dispose();
            }

            _subsManager.Clear();
        }

          /// <summary>
        /// 建立消費者通道
        /// </summary>
        private IModel CreateConsumerChannel()
        {
            if (!_persistentConnection.IsConnected)
            {
                _persistentConnection.TryConnect();
            }

            var channel = _persistentConnection.CreateModel();

            channel.ExchangeDeclare(exchange: BROKER_NAME, type: "direct");

            channel.QueueDeclare(queue: _queueName
                , durable: true,
                exclusive: false,
                autoDelete: false,
                arguments: null);

            var consumer = new EventingBasicConsumer(channel);
            consumer.Received += async (model, ea) =>
            {
                var eventName = ea.RoutingKey;
                var message = Encoding.UTF8.GetString(ea.Body);

                await ProcessEvent(eventName, message);

                channel.BasicAck(ea.DeliveryTag, multiple: false);
            };
            channel.BasicConsume(queue: _queueName, autoAck: false, consumer: consumer);

            channel.CallbackException += (sender, ea) =>
            {
                _consumerChannel.Dispose();
                _consumerChannel = CreateConsumerChannel();
            };

            return channel;
        }

/// <summary>
        /// 流程事件
        /// </summary>
        private async Task ProcessEvent(string eventName, string message)
        {
            if (_subsManager.HasSubscriptionsForEvent(eventName))
            {
                using (var scope = _autofac.BeginLifetimeScope(AUTOFAC_SCOPE_NAME))
                {
                    var subscriptions = _subsManager.GetHandlersForEvent(eventName);
                    foreach (var subscription in subscriptions)
                    {
                        if (subscription.IsDynamic)
                        {
                            var handler = scope.ResolveOptional(subscription.HandlerType) as IDynamicIntegrationEventHandler;
                            if (handler == null) continue;
                            dynamic eventData = JObject.Parse(message);
                            await handler.Handle(eventData);
                        }
                        else
                        {
                            var handler = scope.ResolveOptional(subscription.HandlerType);
                            if (handler == null) continue;
                            var eventType = _subsManager.GetEventTypeByName(eventName);
                            var integrationEvent = JsonConvert.DeserializeObject(message, eventType);
                            var concreteType = typeof(IIntegrationEventHandler<>).MakeGenericType(eventType);
                            await (Task)concreteType.GetMethod("Handle").Invoke(handler, new object[] { integrationEvent });
                        }
                    }
                }
            }
        }
    }

首先是釋出流程,當呼叫釋出方法的時候首先會檢查RabbitMQ是否連線,關於RabbitMQ連線的操作,專案裡又封裝了一個單獨和介面和實現,下面貼一下程式碼:

    public interface IRabbitMQPersistentConnection:IDisposable
    {
        bool IsConnected { get; }
        bool TryConnect();
        IModel CreateModel();
    }
 public class DefaultRabbitMQPersistentConnection:IRabbitMQPersistentConnection
    {
        private readonly IConnectionFactory _connectionFactory;
        private readonly ILogger<DefaultRabbitMQPersistentConnection> _logger;
        private readonly int _retryCount;
        IConnection _connection;
        bool _disposed;

        object sync_root = new object();

        public DefaultRabbitMQPersistentConnection(IConnectionFactory connectionFactory,ILogger<DefaultRabbitMQPersistentConnection> logger,int retryCount=5)
        {
            _connectionFactory = connectionFactory ?? throw new ArgumentNullException(nameof(connectionFactory));
            _logger = logger ?? throw new ArgumentNullException(nameof(logger));
            _retryCount = retryCount;
        }

        public bool IsConnected
        {
            get
            {
                return _connection != null && _connection.IsOpen && !_disposed;
            }
        }

        public IModel CreateModel()
        {
            if(!IsConnected)
            {
                throw new InvalidOperationException("No RabbitMQ connections are available to perform this action");
            }

            return _connection.CreateModel();
        }


        public void Dispose()
        {
            if (_disposed) return;

            _disposed = true;

            try
            {
                _connection.Dispose();
            }
            catch(IOException ex)
            {
                _logger.LogCritical(ex.ToString());
            }
        }


        public bool TryConnect()
        {
            _logger.LogInformation("RabbitMQ Client is trying to connect");

            lock(sync_root)
            {
                var policy = RetryPolicy.Handle<SocketException>()
                    .Or<BrokerUnreachableException>()
                    .WaitAndRetry(_retryCount, retryAttempt => TimeSpan.FromSeconds(Math.Pow(2, retryAttempt)), (ex, time) =>
                    {
                        _logger.LogWarning(ex.ToString());
                    });

                policy.Execute(() =>
                {
                    _connection = _connectionFactory
                          .CreateConnection();
                });

                if(IsConnected)
                {
                    _connection.ConnectionShutdown += OnConnectionShutdown;
                    _connection.CallbackException += OnCallbackException;
                    _connection.ConnectionBlocked += OnConnectionBlocked;

                    _logger.LogInformation($"RabbitMQ persistent connection acquired a connection {_connection.Endpoint.HostName} and is subscribed to failure events");

                    return true;
                }
                else
                {
                    _logger.LogCritical("FATAL ERROR: RabbitMQ connections could not be created and opened");

                    return false;
                }
            }
        }


        private void OnConnectionBlocked(object sender, ConnectionBlockedEventArgs e)
        {
            if (_disposed) return;

            _logger.LogWarning("A RabbitMQ connection is shutdown. Trying to re-connect...");

            TryConnect();
        }

        void OnCallbackException(object sender, CallbackExceptionEventArgs e)
        {
            if (_disposed) return;

            _logger.LogWarning("A RabbitMQ connection throw exception. Trying to re-connect...");

            TryConnect();
        }

        void OnConnectionShutdown(object sender,ShutdownEventArgs reason)
        {
            if (_disposed) return;

            _logger.LogWarning("A RabbitMQ connection is on shutdown. Trying to re-connect...");

            TryConnect();
        }
    }

首在程式載入的時候會建立一個消費者通道,同時給通道的Received委託註冊了一個消費方法,這個消費方法的關鍵點是呼叫了ProcessEvent方法,完成後就傳送一個ack確認:

private IModel CreateConsumerChannel()
        {
            if (!_persistentConnection.IsConnected)
            {
                _persistentConnection.TryConnect();
            }

            var channel = _persistentConnection.CreateModel();

            channel.ExchangeDeclare(exchange: BROKER_NAME, type: "direct");

            channel.QueueDeclare(queue: _queueName
                , durable: true,
                exclusive: false,
                autoDelete: false,
                arguments: null);

            var consumer = new EventingBasicConsumer(channel);
            consumer.Received += async (model, ea) =>
            {
                var eventName = ea.RoutingKey;
                var message = Encoding.UTF8.GetString(ea.Body);

                await ProcessEvent(eventName, message);

                channel.BasicAck(ea.DeliveryTag, multiple: false);
            };
            channel.BasicConsume(queue: _queueName, autoAck: false, consumer: consumer);

            channel.CallbackException += (sender, ea) =>
            {
                _consumerChannel.Dispose();
                _consumerChannel = CreateConsumerChannel();
            };

            return channel;
        }

我們看一下ProcessEvent方法,這個方法是專門用來消費MQ的,前面我們看過,通過字典記錄了每個訊息類對應的消費類Handler,這個方法的邏輯就是從字典里根據訊息類查詢對應的消費Handler並通過反射呼叫:

private async Task ProcessEvent(string eventName, string message)
        {
            if (_subsManager.HasSubscriptionsForEvent(eventName))
            {
                using (var scope = _autofac.BeginLifetimeScope(AUTOFAC_SCOPE_NAME))
                {
                    var subscriptions = _subsManager.GetHandlersForEvent(eventName);
                    foreach (var subscription in subscriptions)
                    {
                        if (subscription.IsDynamic)
                        {
                            var handler = scope.ResolveOptional(subscription.HandlerType) as IDynamicIntegrationEventHandler;
                            if (handler == null) continue;
                            dynamic eventData = JObject.Parse(message);
                            await handler.Handle(eventData);
                        }
                        else
                        {
                            var handler = scope.ResolveOptional(subscription.HandlerType);
                            if (handler == null) continue;
                            var eventType = _subsManager.GetEventTypeByName(eventName);
                            var integrationEvent = JsonConvert.DeserializeObject(message, eventType);
                            var concreteType = typeof(IIntegrationEventHandler<>).MakeGenericType(eventType);
                            await (Task)concreteType.GetMethod("Handle").Invoke(handler, new object[] { integrationEvent });
                        }
                    }
                }
            }
        }

方法看起來一堆變數名有點亂,其實梳理一下就會發現很簡單,方法需要傳入一個訊息類名eventName和訊息內容message,首先會根據eventName判斷字典裡是註冊了當前訊息體,如果沒有,則直接跳過,並返回ack確認,相當於丟棄該訊息。如果字典裡存在,則從字典中取出這些處理方法,通過反射載入呼叫。

 

OK,那問題來了,字典裡的內容是什麼時候存進去的呢,那就是再呼叫訂閱方法的時候存進去的,這個訂閱方法是在啟動類裡呼叫的,我們看一下:

        public void Configure(IApplicationBuilder app, IHostingEnvironment env)
        {
            ConfigureEventBus(app);

            if (env.IsDevelopment())
            {
                app.UseDeveloperExceptionPage();
            }

            app.UseMvcWithDefaultRoute();
        }
        private void ConfigureEventBus(IApplicationBuilder app)
        {
            var eventBus = app.ApplicationServices.GetRequiredService<IEventBus>();
            eventBus.Subscribe<PublisherIntegrationEvent, OrderStatusChangedValidationIntegrationEventHandle>();
        }

這個是在Startup.cs的Configure方法裡呼叫的,這裡就是通過呼叫訂閱方法Subscribe繫結訊息類和處理類:

public void Subscribe<T, TH>()
            where T : IntegrationEvent
            where TH : IIntegrationEventHandler<T>
        {
            var eventName = _subsManager.GetEventKey<T>();
            DoInternalSubscription(eventName);
            _subsManager.AddSubscription<T, TH>();
        }

可以看到這裡呼叫了兩個方法,DoInternalSubscription方法是繫結routingKey到指定的交換器和佇列:

private void DoInternalSubscription(string eventName)
        {
            var containsKey = _subsManager.HasSubscriptionsForEvent(eventName);
            if(!containsKey)
            {
                if(!_persistentConnection.IsConnected)
                {
                    _persistentConnection.TryConnect();
                }

                using (var channel = _persistentConnection.CreateModel())
                {
                    channel.QueueBind(queue: _queueName, exchange: BROKER_NAME, routingKey: eventName);
                }
            }
        }

而AddSubscription方法則是將訊息類和處理類新增到字典中:

public void AddSubscription<T,TH>()
            where T:IntegrationEvent
            where TH:IIntegrationEventHandler<T>
        {
            var eventName = GetEventKey<T>();

            DoAddSubscription(typeof(TH), eventName, isDynamic: false);

            if(!_eventTypes.Contains(typeof(T)))
            {
                _eventTypes.Add(typeof(T));
            }
        }
public void DoAddSubscription(Type handlerType,string eventName,bool isDynamic)
        {
            if(!HasSubscriptionsForEvent(eventName))
            {
                _handlers.Add(eventName, new List<SubscriptionInfo>());
            }

            if(_handlers[eventName].Any(s=>s.HandlerType==handlerType))
            {
                throw new ArgumentException($"Handler Type {handlerType.Name} already registered for '{eventName}'", nameof(handlerType));
            }

            if(isDynamic)
            {
                _handlers[eventName].Add(SubscriptionInfo.Dynamic(handlerType));
            }
            else
            {
                _handlers[eventName].Add(SubscriptionInfo.Typed(handlerType));
            }
        }

這裡將訊息類名和處理類的型別Type加到了字典裡,後面會通過反射來進行呼叫。

 

OK,到這裡主要的釋出訂閱方法就梳理完成了,寫的有點亂,不過不要緊!!!後面有時間我再重新排版一下!!!!我們現在在新建的專案的試一下,我再ValueController裡呼叫了釋出方法,通過MQ傳送一個訂單號OrderId:

        [HttpGet]
        public IEnumerable<string> Get()
        {
            var @event = new PublisherIntegrationEvent(5);
            _eventBus.Publish(@event);
            _logger.LogError("傳送MQ成功!");

            return new string[] { "value1", "value2" };
        }

訊息體PublisherIntegrationEvent是這樣的:

    public class PublisherIntegrationEvent : IntegrationEvent
    {
        public int OrderId { get; }

        public PublisherIntegrationEvent(int orderId) => OrderId = orderId;
    }

我在啟動類裡註冊了對當前訊息類的處理類OrderStatusChangedValidationIntegrationEventHandle:

public class OrderStatusChangedValidationIntegrationEventHandle: IIntegrationEventHandler<PublisherIntegrationEvent>
    {
        private readonly ILogger<OrderStatusChangedValidationIntegrationEventHandle> _logger = null;
        public OrderStatusChangedValidationIntegrationEventHandle(ILogger<OrderStatusChangedValidationIntegrationEventHandle> logger)
        {
            _logger = logger;
        }
        public async Task Handle(PublisherIntegrationEvent @event)
        {
            _logger.LogError("收到MQ" + @event.OrderId);
        }
    }

這裡釋出和訂閱都在一個API裡,直接啟動:

 

 

OK,eShopOnContainers的梳理就到這裡,接下來會在.NET Core商城系列裡加入訊息匯流排,但是會改一下,使用資料庫+Redis儲存訊息的資訊。

在我公司專案裡現在也是使用RabbitMQ來實現服務與服務之間的非同步通訊,但是方式肯定和這裡是不一樣的,我們是根據RoutingKey與服務地址來繫結,通過windows服務處理訊息的釋出,根據RoutingKey查詢對應的服務地址並進行呼叫,沒有使用RabbitMQ的訂閱方法,我們提供了一個web介面專門用來註冊,這種方式還是挺好用的。

OK,大功告成,下面我會在商城專案里加入RabbitMQ的使用。