1. 程式人生 > >[Abp 源碼分析]九、事件總線

[Abp 源碼分析]九、事件總線

apps throw eof 發布 斷開連接 all 一是 ica header

0.簡介

事件總線就是訂閱/發布模式的一種實現,本質上事件總線的存在是為了降低耦合而存在的。

技術分享圖片

從上圖可以看到事件由發布者發布到事件總線處理器當中,然後經由事件總線處理器調用訂閱者的處理方法,而發布者和訂閱者之間並沒有耦合關系。

像 Windows 本身的設計也是基於事件驅動,當用戶點擊了某個按鈕,那麽就會觸發相應的按鈕點擊事件,而程序只需要監聽這個按鈕點擊事件即可進行相應的處理,而事件被觸發的時候往往都會附帶相應的事件源,事件所產生的數據等。

還是以按鈕被點擊為例,該事件被觸發的時候會裝填上觸發時間,被誰觸發的數據放在一個 EventArgs 內部,然後將其存放到事件處理器中,然後處理器根據事件的類型去查找訂閱了該事件類型的對象,附帶上事件數據去調用這些訂閱者對象的處理方法。

Abp 本身也實現了事件總線,並且在框架內部也實現了豐富的事件類型,例如實體更新事件、異常事件等等。

註意:在下文當中處理器的含義等同於訂閱者,請閱讀的時候自行切換。

0.1.使用方法

在引用了 Abp 框架的項目當中使用事件總線相當簡單,只需要直接註入 IEventBus 即可觸發相應的事件。如果你想要監聽某個事件,並且你也想在事件被觸發的時候進行處理,那麽直接繼承自 IEventHandler<TEventData> / IAsyncEventHandler<TEventData> 實現其接口方法 HandleEvent() 即可。

比如說,我們首先定義了一個 TestEventData

的事件,如下:

/// <summary>
/// 測試事件
/// </summary>
public class TestEventData : EventData
{
    public TestEventData(string code)
    {
        Code = code;
    }

    /// <summary>
    /// 待驗證的編碼
    /// </summary>
    public string Code { get; }
}

很簡單,這個事件觸發的時候會傳遞一個 string 類型的 Code 參數。

之後我們使用 TestEventHandler

訂閱這個事件,當然訂閱的方式很簡單,實現接口即可。

public class TestEventHandler : IAsyncEventHandler<TestEventData>, IEventHandler<TestEventData>
{
    public Task HandleEventAsync(TestEventData eventData)
    {
        if (eventData.Code == "1") Console.WriteLine("# 異步測試,編碼正確");

        Console.WriteLine("# 異步測試,編碼錯誤");
        return Task.FromResult(0);
    }

    public void HandleEvent(TestEventData eventData)
    {
        if (eventData.Code == "1") Console.WriteLine("# 同步測試,編碼正確");

        Console.WriteLine("# 同步測試,編碼錯誤");
    }
}

Abp 在底層會掃描實現了 IEventHandler<TEventData> / IAsyncEventHandler<TEventData> 這兩個接口的類型,將其自動註冊為訂閱者。

當然你也可以手動訂閱:

public class TestAppService : ApplicationService
{
    private readonly IEventBus _eventBus;

    public TestAppService(IEventBus eventBus)
    {
        _eventBus = eventBus;
    }

    public async Task TestMethod()
    {
        // 同步觸發
        _eventBus.Trigger(new TestEventData("Code1"));
        // 異步觸發,3.6.x 版本新增的
        await _eventBus.TriggerAsync(new TestEventData("Code1"));

        // 手動註冊事件範例
        _eventBus.Register<TestEventData, TestEventHandler>();
    }
}

這裏的 Register() 方法會讓你傳入一個事件數據類型,以及該事件對應的處理器。

同一個事件可以被多個對象所訂閱,只要該對象實現 IEventHandler<TEventData> / IAsyncEventHandler<TEventData> 接口或者是顯式地被 IEventBus.Register() 註冊,他們都會在事件被觸發的時候調用。

2.啟動流程

按照慣例我們來分析一下 Abp 針對事件總線的實現,看一下它的整體啟動流程,什麽時候被註入,被初始化。

事件總線比起其他 Abp 基礎設施來說他的註冊點就一個,在 EventBusInstaller 裏面,包含了針對 IEventBus 的註冊以及對實現了 IEventHandler 處理器的註冊。
EventBusInstaller 在 Abp 框架的核心模塊 AbpKernelModuleInitialize 被註冊調用:

public override void Initialize()
{
    // ...其他代碼

    IocManager.IocContainer.Install(new EventBusInstaller(IocManager));

    // ... 其他代碼
}

裏面的 Install() 方法做了兩個動作,第一是根據事件總線配置來決定 IEventBus 的註冊方式,第二則是將訂閱者(事件處理器)通過 IEventBus.Register() 方法自動放到事件總線管理器裏面。

public void Install(IWindsorContainer container, IConfigurationStore store)
{
    // 這裏是註入的配置類
    if (_eventBusConfiguration.UseDefaultEventBus)
    {
        container.Register(
            Component.For<IEventBus>().UsingFactoryMethod(() => EventBus.Default).LifestyleSingleton()
            );
    }
    else
    {
        container.Register(
            Component.For<IEventBus>().ImplementedBy<EventBus>().LifestyleSingleton()
            );
    }

    // 解析事件總線管理器
    _eventBus = container.Resolve<IEventBus>();

    // 註冊訂閱者對象
    container.Kernel.ComponentRegistered += Kernel_ComponentRegistered;
}

Emmmm,具體的代碼分析請看下面:

private void Kernel_ComponentRegistered(string key, IHandler handler)
{
    // 判斷當前註入的對象是否實現了 IEventHandler 接口,沒有實現則跳過
    if (!typeof(IEventHandler).GetTypeInfo().IsAssignableFrom(handler.ComponentModel.Implementation))
    {
        return;
    }

    // 獲得當前註入對象實現的所有接口
    var interfaces = handler.ComponentModel.Implementation.GetTypeInfo().GetInterfaces();
    // 遍歷獲取到的所有接口
    foreach (var @interface in interfaces)
    {
        // 如果當前被遍歷的接口類型不是 IEventHandler 或者不是從 IEventHandler 繼承的,則跳過
        if (!typeof(IEventHandler).GetTypeInfo().IsAssignableFrom(@interface))
        {
            continue;
        }

        // 到這裏獲取這個 IEventHandler 處理器的泛型參數
        var genericArgs = @interface.GetGenericArguments();
        // 並且它的泛型參數應該只有一個
        if (genericArgs.Length == 1)
        {
            // 根據 IEventHandler 的定義,拿到的泛型參數肯定就是事件數據類型啦
            // 第二個參數就是一個 Handler 的工廠咯,每次觸發事件的時候都會從這個
            // 工廠解析出具體的事件處理器來響應事件的操作。
            _eventBus.Register(genericArgs[0], new IocHandlerFactory(_iocResolver, handler.ComponentModel.Implementation));
        }
    }
}

目前看來還是十分簡單的。

3.代碼分析

3.1 事件總線管理器

整個事件總線的核心就是這個管理器(IEventBus/EventBus),事件的註冊,事件的觸發,所有這些東西都是由它來提供的,其實嘛事件總線沒你想象得那麽復雜。

它的基本原理很簡單,就是用戶向事件總線管理器註冊我想要觸發的事件,還有響應我事件的訂閱者,將其放在一個字典裏面。當 A 對象在數據庫 斷開連接 的時候,通過事件總線管理器觸發 斷開連接事件,事件總線管理器就會從之前註冊的字典,根據觸發時候傳遞的類型拿到響應的處理器集合,遍歷這個集合調用對應的方法。

說這麽多,我們來看一下代碼吧,首先看看事件總線管理器的定義(當然接口太多,這裏是精簡過的):

public interface IEventBus
{
    // 註冊並訂閱事件
    IDisposable Register(Type eventType, IEventHandlerFactory factory);
    // 這裏沒有異步註冊的原因是它最後還是會調用上面這個方法

    // 取消事件的訂閱,這裏傳入的是一個 Action
    void Unregister<TEventData>(Action<TEventData> action) where TEventData : IEventData;
    // 這裏傳入的是一個 IEventHandler
    void Unregister<TEventData>(IEventHandler<TEventData> handler) where TEventData : IEventData;
    
    // 異步取消事件訂閱
    void AsyncUnregister<TEventData>(Func<TEventData, Task> action) where TEventData : IEventData;

    // 同樣是取消事件訂閱
    void Unregister(Type eventType, IEventHandler handler);
    void Unregister(Type eventType, IEventHandlerFactory factory);
    void UnregisterAll(Type eventType);

    // 觸發事件
    void Trigger(Type eventType, object eventSource, IEventData eventData);
    // 異步觸發事件
    Task TriggerAsync(Type eventType, object eventSource, IEventData eventData);
}

Emm,看了一下,大概就分為三類,註冊事件取消事件的訂閱觸發事件,其他定義的接口大多都是不同形式的重載,本質上還是會調用到上述方法的。

首先在事件總線管理器內部有一個字典,這個字典就是我們剛才所提到的事件總線維護的事件字典,大概長這個樣子:

private readonly ConcurrentDictionary<Type, List<IEventHandlerFactory>> _handlerFactories;

可以看到,這個字典的 Key 是一個 Type 類型,其實就是我們所註冊的事件類型罷了,後面那個呢就是事件處理器的工廠。那為什麽這個工廠會用一個 List 來存儲呢?

原因有兩點:

  1. 因為我們對應的事件處理器的生命周期與生成方式都有所不同,比如說 Abp 它自己本身就提供了IocHandlerFactoryTransientEventHandlerFactorySingleInstanceHandlerFactory 這三種實現。
  2. 因為一個事件可能會被多個處理器所訂閱,那麽一個處理器擁有一個工廠,所以會是一個集合。

3.1.1 註冊事件

在默認的 Register() 方法裏面就是使用的 IocHandlerFactory 來進行註冊事件的,如果你需要手動註冊事件呢,可以使用簽名為:

public IDisposable Register(Type eventType, IEventHandlerFactory factory);

的方法,來傳入自己實現的處理器工廠或者是 Abp 提供的事件處理器工廠。

看了它的定義之後,我們來看一下它的具體實現,首先來看看註冊事件的 Register() 方法:

public IDisposable Register(Type eventType, IEventHandlerFactory factory)
{
    // 獲得指定事件類型的工廠集合,然後往這個集合添加一個事件處理器工廠
    GetOrCreateHandlerFactories(eventType)
        .Locking(factories => factories.Add(factory));

    // Emm,這裏面就是一個 Dispose 方法,用於釋放創建好的工廠對象,裏面的 Dispose 方法
    // 最終會調用 IEventBus 的 UnRegister 方法來卸載工廠
    return new FactoryUnregistrar(this, eventType, factory);
}

private List<IEventHandlerFactory> GetOrCreateHandlerFactories(Type eventType)
{
    // 根據事件類型創建/獲取一個事件處理器工廠集合
    return _handlerFactories.GetOrAdd(eventType, (type) => new List<IEventHandlerFactory>());
}

可以看到調用了註冊方法之後,它返回了一個 FactoryUnregistrar ,查看它的定義如下:

internal class FactoryUnregistrar : IDisposable
{
    private readonly IEventBus _eventBus;
    private readonly Type _eventType;
    private readonly IEventHandlerFactory _factory;

    public FactoryUnregistrar(IEventBus eventBus, Type eventType, IEventHandlerFactory factory)
    {
        _eventBus = eventBus;
        _eventType = eventType;
        _factory = factory;
    }

    public void Dispose()
    {
        _eventBus.Unregister(_eventType, _factory);
    }
}

很簡單的一個類,重點就是在 Dispose() 內部調用了 IEventBusUnregister() 方法,下面就會講解這東西。

3.1.2 取消事件的訂閱

接著是 UnRegister() 方法,UnRegister 方法有很多個,一般分為兩類,第一是取消訂閱,第二就是卸載工廠。

public void Unregister<TEventData>(Action<TEventData> action) where TEventData : IEventData
{
    // 確保不為空
    Check.NotNull(action, nameof(action));

    // 根據類型得到該類型所有的事件處理器集合
    GetOrCreateHandlerFactories(typeof(TEventData))
        // 使用 lock 加鎖,防止線程同步問題
        .Locking(factories =>
        {
            // 調用 List 的 RemoveAll() 方法清除指定條件的工廠
            factories.RemoveAll(
                factory =>
                {
                    // 判斷工廠是否為單例工廠
                    var singleInstanceFactory = factory as SingleInstanceHandlerFactory;
                    // 如果不是單例工廠則不移除
                    if (singleInstanceFactory == null)
                    {
                        return false;
                    }
                    
                    // 如果是單例工廠,拿到其內部的具體事件處理器,並強制換為 ActionEventHandler
                    var actionHandler = singleInstanceFactory.HandlerInstance as ActionEventHandler<TEventData>;
                    // 為空的話,不移除
                    if (actionHandler == null)
                    {
                        return false;
                    }

                   // 判斷傳入的處理邏輯是否與事件處理器邏輯相等,相等則移除
                    return actionHandler.Action == action;
                });
        });
}

// 取消訂閱的另一種實現,只是針對 SingleInstanceHandlerFactory 進行了處理
public void Unregister(Type eventType, IEventHandler handler)
{
    GetOrCreateHandlerFactories(eventType)
        .Locking(factories =>
                    {
                        factories.RemoveAll(
                            factory =>
                            factory is SingleInstanceHandlerFactory &&
                            (factory as SingleInstanceHandlerFactory).HandlerInstance == handler
                        );
                    });
}

// 第二種情況,卸載工廠,也就是 Register() 之後返回的 FactoryUnregistrar 釋放時調用的方法
public void Unregister(Type eventType, IEventHandlerFactory factory)
{
    // 根據傳入的類型,獲得事件處理器工廠集合,移除相應工廠
    GetOrCreateHandlerFactories(eventType).Locking(factories => factories.Remove(factory));
}

在上方代碼可以看到,似乎 Unregister() 方法只針對 SingleInstanceHandlerFactory 工廠進行了處理,而沒有處理 IocHandlerFactoryTransientEventHandlerFactory

這是因為在 IEventBus 當中實現了這兩個方法:

IDisposable Register(Type eventType, IEventHandler handler);
IDisposable Register<TEventData>(Action<TEventData> action) where TEventData : IEventData;

可以看到這兩個方法都沒有傳入工廠,第一個允許你傳入一個事件處理器對象,第二個則是讓你傳入一個 Action 作為其事件訂閱者。

看看實現:

public IDisposable Register<TEventData>(Action<TEventData> action) where TEventData : IEventData
{
    // new 了一個 ActionEventHandler 作為處理器
    return Register(typeof(TEventData), new ActionEventHandler<TEventData>(action));
}

public IDisposable Register<TEventData>(IEventHandler<TEventData> handler) where TEventData : IEventData
{
    // 傳入具體的處理器對象進行註冊
    return Register(typeof(TEventData), handler);
}

public IDisposable Register(Type eventType, IEventHandler handler)
{
    // 使用 SingleInstanceHandlerFactory 工廠進行註冊。
    return Register(eventType, new SingleInstanceHandlerFactory(handler));
}

因為單例工廠與其他兩個工廠不一樣,單例工廠的生命周期貫穿整個程序的生命周期,也就是說除非程序被結束,那麽單例工廠內部的事件處理器就會一直存在,所以在 UnRegister() 方法內部只會針對 SingleInstanceHandlerFactory 工廠進行處理。

TransientEventHandlerFactory

IocHandlerFactory 工廠產生的對象的生命周期是隨著具體類型在被註入時候的生命周期所決定,有可能是瞬時對象,也有可能是單例對象,下文會詳細解說。

3.1.3 觸發事件

當事件的發布者需要發布(觸發)一個事件的時候,會調用 IEventBus 提供的 Trigger()/TriggerAsync() 方法。

然後事件總線管理器從自己的字典內匹配對應的事件,得到對應的事件處理器工廠集合,然後呢使用工廠產生具體的處理器對象,調用其 HandleEvent / HandleEventAsync 方法,執行完成之後釋放對象。

public void Trigger(Type eventType, object eventSource, IEventData eventData)
{
    // 異常集合
    var exceptions = new List<Exception>();

    eventData.EventSource = eventSource;

    // 獲得所有需要觸發的處理器工廠,遍歷傳入的事件類型以及他的子類事件
    foreach (var handlerFactories in GetHandlerFactories(eventType))
    {
        // 遍歷事件類型綁定的工廠集合
        foreach (var handlerFactory in handlerFactories.EventHandlerFactories)
        {
            // 獲得處理器類型
            var handlerType = handlerFactory.GetHandlerType();

            // 如果是異步處理器,以同步方式運行
            if (IsAsyncEventHandler(handlerType))
            {
                AsyncHelper.RunSync(() => TriggerAsyncHandlingException(handlerFactory, handlerFactories.EventType, eventData, exceptions));
            }
            else if (IsEventHandler(handlerType))
            {
                // 調用處理器的處理方法,並回收異常信息
                TriggerHandlingException(handlerFactory, handlerFactories.EventType, eventData, exceptions);
            }
            else
            {
                // 說明這個事件沒有對應的處理器實現,拋出異常
                var message = $"Event handler to register for event type {eventType.Name} does not implement IEventHandler<{eventType.Name}> or IAsyncEventHandler<{eventType.Name}> interface!";
                exceptions.Add(new AbpException(message));
            }
        }
    }

    // 處理繼承事件的情況
    if (eventType.GetTypeInfo().IsGenericType &&
        eventType.GetGenericArguments().Length == 1 &&
        typeof(IEventDataWithInheritableGenericArgument).IsAssignableFrom(eventType))
    {
        var genericArg = eventType.GetGenericArguments()[0];
        var baseArg = genericArg.GetTypeInfo().BaseType;
        if (baseArg != null)
        {
            var baseEventType = eventType.GetGenericTypeDefinition().MakeGenericType(baseArg);
            var constructorArgs = ((IEventDataWithInheritableGenericArgument)eventData).GetConstructorArgs();
            var baseEventData = (IEventData)Activator.CreateInstance(baseEventType, constructorArgs);
            baseEventData.EventTime = eventData.EventTime;
            Trigger(baseEventType, eventData.EventSource, baseEventData);
        }
    }

    if (exceptions.Any())
    {
        // 如果產生的異常數量為 1 個的話,重新拋出具體的異常信息
        if (exceptions.Count == 1)
        {
            exceptions[0].ReThrow();
        }

        // 如果在執行過程中產生了多個異常,將異常集合放在內部異常當中並拋出
        throw new AggregateException("More than one error has occurred while triggering the event: " + eventType, exceptions);
    }
}

// 篩選所有需要觸發的事件類型,並將其封裝為 EventTypeWithEventHandlerFactories
private IEnumerable<EventTypeWithEventHandlerFactories> GetHandlerFactories(Type eventType)
{
    var handlerFactoryList = new List<EventTypeWithEventHandlerFactories>();

    foreach (var handlerFactory in _handlerFactories.Where(hf => ShouldTriggerEventForHandler(eventType, hf.Key)))
    {
        handlerFactoryList.Add(new EventTypeWithEventHandlerFactories(handlerFactory.Key, handlerFactory.Value));
    }

    return handlerFactoryList.ToArray();
}

// 判斷傳入的類型是否是事件類型的子類
private static bool ShouldTriggerEventForHandler(Type eventType, Type handlerType)
{
    if (handlerType == eventType)
    {
        return true;
    }

    if (handlerType.IsAssignableFrom(eventType))
    {
        return true;
    }

    return false;
}

// 拿著具體的處理器工廠去執行處理器的處理方法
private void TriggerHandlingException(IEventHandlerFactory handlerFactory, Type eventType, IEventData eventData, List<Exception> exceptions)
{
    // 獲得一個新鮮的處理器對象
    var eventHandler = handlerFactory.GetHandler();
    try
    {
        if (eventHandler == null)
        {
            throw new ArgumentNullException($"Registered event handler for event type {eventType.Name} is null!");
        }

        var handlerType = typeof(IEventHandler<>).MakeGenericType(eventType);

        // 從這個處理器獲取到處理方法
        var method = handlerType.GetMethod(
            "HandleEvent",
            new[] { eventType }
        );

        // 調用處理方法,並傳入事件數據
        method.Invoke(eventHandler, new object[] { eventData });
    }
    // 產生異常進行處理
    catch (TargetInvocationException ex)
    {
        exceptions.Add(ex.InnerException);
    }
    catch (Exception ex)
    {
        exceptions.Add(ex);
    }
    finally
    {
        // 釋放資源
        handlerFactory.ReleaseHandler(eventHandler);
    }
}

3.2 處理器工廠

所有事件所對應的處理器對象都是由工廠所創建的,當一個事件被觸發,事件總線管理器就會從事件類型所對應的工廠產生一個相應的處理器對象執行調用。

簡而言之,每個事件處理器都擁有一個單獨的工廠。

其接口定義如下:

public interface IEventHandlerFactory
{
    // 獲得一個事件處理器對象
    IEventHandler GetHandler();

    // 獲得當前工廠所產生的處理器類型
    Type GetHandlerType();

    // 釋放指定的處理器對象
    void ReleaseHandler(IEventHandler handler);
}
具體實現 生命周期 描述
TransientEventHandlerFactory 瞬時 工廠產生的事件處理器生命周期是瞬時的,是一個標準
的可以被 GC 回收的對象。
SingleInstanceHandlerFactory 單例 該工廠產生的對象都會被保存在一個 Instance 內部,每
次生成對象的時候都會使用該 Instance 的值。
IocHandlerFactory 由類型註冊時決定 在使用 IocHandlerFactory 的時候,會傳入事件處理
器,該工廠在創建事件處理器對象的時候會從 Ioc 容器當中
解析對應的對象出來,而該對象的生命周期取決於註冊時
的定義。

4.擴展

4.1 實體更新事件

Abp 在倉儲每次執行 CRUD 操作的時候都會自動觸發響應的實體更新事件,這些事件的觸發都存放在 EntityChangeEventHelper 類當中,一共有以下幾個事件,你訂閱該這些事件之後就會在實體產生更改的時候被觸發。

  • EntityChangedEventData<TEntity> 實體被更改的時候觸發。

  • EntityCreatedEventData<TEntity> 實體創建完成後觸發。
  • EntityCreatingEventData<TEntity> 實體創建時被觸發。
  • EntityDeletedEventData<TEntity> 實體刪除完成後觸發。
  • EntityDeletingEventData<TEntity> 實體刪除時被觸發。
  • EntityUpdatedEventData<TEntity> 實體更新後觸發。
  • EntityUpdatingEventData<TEntity> 實體更新時被觸發。

public class TestHandler : IEventHandler<EntityChangedEventData<TestEntity>>
{
    public void HandleEvent(EntityChangedEventData<TestEntity> eventData)
    {
        Console.WriteLine($"測試實體,ID為 {eventDate.Entity.Id} 被更改了");
    }
}

4.2 異常事件

Abp 在運行期間遇到的異常也會自動觸發異常事件,其類型為 AbpHandledExceptionDataExceptionData

[Abp 源碼分析]九、事件總線