1. 程式人生 > >事件驅動模型和觀察者模式

事件驅動模型和觀察者模式

你有一件事情,做這件事情的過程包含了許多職責單一的子過程。這樣的情況及其常見。當這些子過程有如下特點時,我們應該考慮設計一種合適的框架,讓框架來完成一些業務無關的事情,從而使得各個子過程的開發可以專注於自己的業務。

  •  這些子過程有一定的執行次序;
  •  這些子過程之間需要較靈活的跳轉;
  •  這些子過程也許需要圍繞同一個上下文做操作;

此時可以考慮使用事件驅動的方式來組織這些子過程,此時這些子過程可以被稱之為事件處理器(或監聽器),而將事件處理器組織起來的管理者,叫做事件中心。最顯而易見的實現方式,是觀察者模式,或者監聽者模式。作為一個例子,考慮一個訊息轉發系統,它從上游接收訊息,然後轉發給正確的下游使用者。整個過程可以拆分為訊息解析、訊息儲存、訊息傳送等步驟。

事件Event

首先定義事件Event。事件將作為一個基本元素,在處理器和事件中心之間建立其連線。這裡為了能夠統一處理異常。以及針對異常打出日誌,除了業務相關的事件,還增加了異常事件和日誌事件。當然相應的也應該新增與之對應的事件處理器。

複製程式碼
 1 package me.test.eventcenter;
 2 
 3 /**
 4  * Created by chng on 2015/12/18.
 5  */
 6 public class EventName {
 7 
 8     private final String name;
 9     public EventName(String name) {
10 this.name = name; 11 } 12 13 public static EventName msg_received = new EventName("msg_received"); 14 public static EventName msg_resolved = new EventName("msg_resolved"); 15 public static EventName msg_stored = new EventName("msg_stored"); 16 public static EventName msg_pushed = new
EventName("msg_pushed"); 17 public static EventName exception_occured = new EventName("exception_occured"); 18 public static EventName end_and_log = new EventName("end_and_log"); 19 20 public String getName() { 21 return name; 22 } 23 }
複製程式碼

事件處理器 EventHandler

隨後,定義一個簡單的事件處理器的抽象類,其中包含一個單例的事件中心,每個處理器通過持有這個事件中心來執行註冊自己(即訂閱一個事件)和呼起下一個事件的操作。

複製程式碼
package me.test.eventcenter.handler;

import me.test.eventcenter.EventCenter;
import org.springframework.beans.factory.InitializingBean;

import javax.annotation.Resource;

/**
 * Created by chng on 2015/12/18.
 */
public abstract class EventHandler implements InitializingBean {
    @Resource
    EventCenter eventCenter;

    public abstract void handle(Object ... param);
}
複製程式碼

事件中心 EventCenter

有了事件和事件處理器,接下來定義一個事件中心,將二者粘起來。

複製程式碼
package me.test.eventcenter;

import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import me.test.eventcenter.handler.EventHandler;
import org.springframework.stereotype.Component;
import org.springframework.util.CollectionUtils;

import java.util.List;
import java.util.Map;

/**
 * Created by chng on 2015/12/18.
 */
@Component
public class EventCenter {

    private Map<EventName, List<EventHandler>> regTable = Maps.newHashMap();

    /**
     * 向事件中心廣播一個時間,驅使事件中心執行該事件的處理器
     * @param eventName
     * @param param
     */
    public void fire(EventName eventName, Object ... param) {
        System.out.println(eventName.getName());
        List<EventHandler> handlerList = regTable.get(eventName);
        if(CollectionUtils.isEmpty(handlerList)) {
            // log
            return;
        }
        for(EventHandler handler: handlerList) {
            try {
                handler.handle(param);
            } catch (Exception e) {
                fire(EventName.exception_occured, e);
            }
        }
    }

    /**
     * 將自己註冊為事件中心的某個事件的處理器
     * @param eventName
     * @param handler
     */
    public void register(EventName eventName, EventHandler handler) {

        List<EventHandler> handlerList = regTable.get(eventName);
        if(null == handlerList) {
            handlerList = Lists.newLinkedList();
        }

        handlerList.add(handler);
        regTable.put(eventName, handlerList);
    }
}
複製程式碼

在事件中心中,事件和處理器之間的關係表示為一個HashMap,每個事件可以被多個處理器監聽,而一個處理器只能監聽一個事件(這樣的關係並非是固定的,也可在執行時動態地改變)。當呼起一個事件時,事件中心找到該事件的監聽者,逐個呼叫他們的處理方法。將各子模組的執行集中在這裡管理,還有兩個額外的好處:

1 如果發生異常,則呼起異常處理器。這樣,一旦業務模組發生了不得不終止整個過程的時候,不需要自己寫try/catch子句,而只需要將異常往上拋,直到拋給框架層,由它來做這些統一的事情。然而這並不意味著各業務模組徹徹底底地擺脫了難看的try/catch/finally,執行時發生的異常被catch後,並非都可以直接END,何去何從仍然視情況而定,直接將異常吞掉也未嘗不可能。

2 打日誌的活兒交給EventCenter就好了,沒人比它更清楚當前執行到了哪一步。而各子模組裡面,可以省去許多散佈在各處的日誌語句。對於散彈式日誌的問題,解決方法不止一種,AOP也是個不錯的選擇。

測試

為了讓整個過程跑起來,我們只需要發起一個初始的事件,將所有的事件處理器都依次驅動起來:

複製程式碼
/**
 * Created by OurEDA on 2015/12/18.
 */
public class TestEventCenter extends BaseTest {

    @Resource
    EventCenter eventCenter;

    @Test
    public void test() {
        RawMessage rawMessage = new RawMessage("NotifyType: amq");
        rawMessage.setType(RawMessage.MessageType.amq);
        eventCenter.fire(EventName.msg_received, notify);
    }
}
複製程式碼

以測試通過為目標,我們開始定義一系列的EventHandler,並將這些Handler註冊到合適的事件上。例如一個訊息解析的Handler,對msg_receive事件感興趣,解析完成後將發起msg_store事件,那麼: 

複製程式碼
package me.test.eventcenter.handler;

import me.test.eventcenter.*;
import me.test.messagedo.Message;
import me.test.messagedo.RawMessage;
import me.test.resolvers.MsgResolverList;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.stereotype.Component;

import javax.annotation.Resource;

/**
 * Created by chng on 2015/12/18.
 */
@Component
public class MsgResolveHandler extends EventHandler implements InitializingBean {

    @Resource
    private MsgResolverList resolverList;

    @Override
    public void handle(Object... param) {

        /**
         * Resolver
         */
        RawMessage rm = (RawMessage) param[0];
        Message message = resolverList.resolve(rm);
        eventCenter.fire(EventName.msg_resolved, message);
    }

    public void afterPropertiesSet() throws Exception {
        eventCenter.register(EventName.msg_received, this);
    }
}
複製程式碼

可以看到,物件在初始階段把自己(this)註冊到了事件中心裡。handler方法則只關心如何解析訊息,不需要關係別的事情。針對不同型別的訊息,解析器可以寫成Map的形式,一種型別對應一個解析器;如果訊息的分類比較複雜,還可以寫成職責鏈的形式當然這都無關緊要,我們需要知道的是,這個模組只解析訊息,與其他子模組之間是完全解耦的。

例如,一種可能的解析器組合體是這樣的:

MsgResolver.java (interface)
複製程式碼
package me.test.resolvers;

import me.test.messagedo.Message;
import me.test.messagedo.RawMessage;

/**
 * Created by OurEDA on 2015/12/18.
 */
public interface MsgResolver {

    public boolean canResolve(RawMessage rm);

    public Message resolve(RawMessage rm);

}
複製程式碼
MsgResolverList.java
複製程式碼
package me.test.resolvers;

import me.test.messagedo.Message;
import me.test.messagedo.RawMessage;
import org.springframework.stereotype.Component;

import java.util.List;

/**
 * Created by chng on 2015/12/18.
 */
@Component
public class MsgResolverList implements MsgResolver{

    //職責鏈
    private List<MsgResolver> resolvers;
    public List<MsgResolver> getResolvers() {
        return resolvers;
    }
    public void setResolvers(List<MsgResolver> resolvers) {
        this.resolvers = resolvers;
    }

    public boolean canResolve(RawMessage rawMessage) {
        return true;
    }

    public Message resolve(RawMessage rawMessage) {
        for(MsgResolver resolver: resolvers) {
            if(resolver.canResolve(rawMessage)) {
                System.out.println("NotifyType: "+rawMessage.type);
                return resolver.resolve(rawMessage);
            }
        }
        return null;
    }
}
複製程式碼

 不必額外打日誌,用例的輸出是這樣的:

哪一步出了問題,出了什麼問題,通通一目瞭然。

其他:

1 上下文 Context

各個處理器都圍繞一個上下文做處理,此例為了體現通用性,上下文直接用Object表示。在實際的場景下,則需要一個統一的結構體。不同的Handler將對該統一上下文的不同內容感興趣。

2 執行緒封閉 ThreadLocal

當有多個執行緒都在事件中心中進行週轉時,還需要考慮執行緒安全問題,保證執行緒的排程不會對事件處理器的呼起次序造成干擾。因此整個事件中心和上下文,都需要做隔離。

3 反思

上面這種寫法有兩個明確的缺點:事件的註冊操作寫死在每個處理器的初始化程式碼中,一來缺乏靈活性,二來對於各Handler是如何組織起來的,沒有一個統一而清晰的bigmap。