1. 程式人生 > >架構設計:系統間通訊(38)——Apache Camel快速入門(下1)

架構設計:系統間通訊(38)——Apache Camel快速入門(下1)

3-5-2-3迴圈動態路由 Dynamic Router

動態迴圈路由的特點是開發人員可以通過條件表示式等方式,動態決定下一個路由位置。在下一路由位置處理完成後Exchange將被重新返回到路由判斷點,並由動態迴圈路由再次做出新路徑的判斷。如此迴圈執行直到動態迴圈路由不能再找到任何一條新的路由路徑為止。下圖來源於官網(http://camel.apache.org/dynamic-router.html),展示了動態迴圈路由的工作效果:

這裡寫圖片描述

這裡可以看出動態迴圈路由(dynamicRouter)和之前介紹的動態路由(recipientList)在工作方式上的差異。dynamicRouter一次選擇只能確定一條路由路徑,而recipientList只進行一次判斷並確定多條路由分支路徑;dynamicRouter確定的下一路由在執行完成後,Exchange物件還會被返回到dynamicRouter中以便開始第二次迴圈判斷,而recipientList會為各個分支路由複製一個獨立的Exchange物件,並且各個分支路由執行完成後Exchange物件也不會返回到recipientList;下面我們還是通過原始碼片段,向各位讀者展示dynamicRouter的使用方式。在程式碼中,我們編排了三個路由DirectRouteA主要負責通過Http協議接收處理請求,並執行dynamicRouter。DirectRouteB和DirectRouteC兩個路由是可能被dynamicRouter選擇的分支路徑:

  • DirectRouteA
/**
 * 第一個路由,主要用於定義整個路由的起點
 * 通過Http協議接收處理請求
 * @author yinwenjie
 */
public class DirectRouteA extends RouteBuilder {

    /* (non-Javadoc)
     * @see org.apache.camel.builder.RouteBuilder#configure()
     */
    @Override
    public void configure() throws Exception {
        from("jetty:http://0.0.0.0:8282/dynamicRouterCamel"
) // 使用dynamicRouter,進行“動態路由”迴圈, // 直到指定的下一個元素為null為止 .dynamicRouter().method(this, "doDirect") .process(new OtherProcessor()); } /** * 該方法用於根據“動態迴圈”的次數,確定下一個執行的Endpoint * @param properties 通過註解能夠獲得的Exchange中properties屬性,可以進行操作,並反映在整個路由過程中 * @return
*/
public String doDirect(@Properties Map<String, Object> properties) { // 在Exchange的properties屬性中,取出Dynamic Router的迴圈次數 AtomicInteger time = (AtomicInteger)properties.get("time"); if(time == null) { time = new AtomicInteger(0); properties.put("time", time); } else { time = (AtomicInteger)time; } LOGGER.info("這是Dynamic Router迴圈第:【" + time.incrementAndGet() + "】次執行!執行執行緒:" + Thread.currentThread().getName()); // 第一次選擇DirectRouteB if(time.get() == 1) { return "direct:directRouteB"; } // 第二次選擇DirectRouteC else if(time.get() == 2) { return "direct:directRouteC"; } // 第三次選擇一個Log4j-Endpoint執行 else if(time.get() == 3) { return "log:DirectRouteA?showExchangeId=true&showProperties=ture&showBody=false"; } // 其它情況返回null,終止 dynamicRouter的執行 return null; } }

在DirectRouteA中我們使用“通過一個method方法返回資訊”的方式確定dynamicRouter“動態迴圈路由”的下一個Endpoint。當然在實際使用中,開發人員還可以有很多方式向dynamicRouter“動態迴圈路由”返回指定的下一Endpoint。例如使用JsonPath指定JSON格式資料中的某個屬性值,或者使用XPath指定XML資料中的某個屬性值,又或者使用header方法指定Exchange中Header部分的某個屬性。但是無論如何請開發人員確定一件事情:向dynamicRouter指定下一個Endpoint的方式中是會返回null進行迴圈終止的,否則整個dynamicRouter會無限的執行下去。

以上doDirect方法中,我們將一個計數器儲存在了Exchange物件的properties區域,以便在同一個Exchange物件執行doDirect方法時進行計數操作。當同一個Exchange物件第一次執行動態迴圈路由判斷時,選擇directRouteB最為一下路由路徑;當Exchange物件第二次執行動態迴圈路由判斷時,選擇DirectRouteC作為下一路由路徑;當Exchange物件第三次執行時,選擇一個Log4j-Endpoint作為下一個路由路徑;當Exchange物件第四次執行時,作為路由路徑判斷的方法doDirect返回null,以便終止dynamicRouter的執行。

不能在DirectRouteA類中定義一個全域性變數作為迴圈路由的計數器,因為由Jetty-HttpConsumer生成的執行緒池中,執行緒數量和執行緒物件是固定的,並且Camel也不是為每一個Exchange物件的執行建立新的DirectRouteA物件例項。所以如果在DirectRouteA類中定義全域性變數作為迴圈路由的計數器,各位讀者自己想想會發生什麼樣的結果吧。別罵娘……

  • DirectRouteB 和 DirectRouteC
/**
 * 這是另一條路由分支
 * @author yinwenjie
 */
public class DirectRouteC extends RouteBuilder {
    @Override
    public void configure() throws Exception {
        from("direct:directRouteC")
        .to("log:DirectRouteC?showExchangeId=true&showProperties=ture&showBody=false");
    }
}

由於DirectRouteB和DirectRouteC兩個路由定義的程式碼非常類似,所以這裡只貼出其中一個。

  • 啟動Camel應用程式,並將路由加入CamelContext
......
public static void main(String[] args) throws Exception { 
    // 這是camel上下文物件,整個路由的驅動全靠它了。
    ModelCamelContext camelContext = new DefaultCamelContext();
    // 啟動route
    camelContext.start();
    // 將我們編排的一個完整訊息路由過程,加入到上下文中
    DynamicRouterCamel dynamicRouterCamel = new DynamicRouterCamel();
    camelContext.addRoutes(dynamicRouterCamel.new DirectRouteA());
    camelContext.addRoutes(dynamicRouterCamel.new DirectRouteB());
    camelContext.addRoutes(dynamicRouterCamel.new DirectRouteC());

    // 通用沒有具體業務意義的程式碼,只是為了保證主執行緒不退出
    synchronized (DynamicRouterCamel.class) {
        DynamicRouterCamel.class.wait();
    } 
} 
......
  • 執行效果
[2016-06-27 20:44:52] INFO  qtp1392999621-16 這是Dynamic Router迴圈第:【1】次執行!執行執行緒:qtp1392999621-16 (DynamicRouterCamel.java:105)

[2016-06-27 20:44:56] INFO  qtp1392999621-16 Exchange[Id: ID-yinwenjie-240-57818-1467030193866-0-3, ExchangePattern: InOut, BodyType: org.apache.camel.converter.stream.InputStreamCache] (MarkerIgnoringBase.java:96)

[2016-06-27 20:44:56] INFO  qtp1392999621-16 這是Dynamic Router迴圈第:【2】次執行!執行執行緒:qtp1392999621-16 (DynamicRouterCamel.java:105)

[2016-06-27 20:44:56] INFO  qtp1392999621-16 Exchange[Id: ID-yinwenjie-240-57818-1467030193866-0-3, ExchangePattern: InOut, BodyType: org.apache.camel.converter.stream.InputStreamCache] (MarkerIgnoringBase.java:96)

[2016-06-27 20:44:56] INFO  qtp1392999621-16 這是Dynamic Router迴圈第:【3】次執行!執行執行緒:qtp1392999621-16 (DynamicRouterCamel.java:105)

[2016-06-27 20:44:56] INFO  qtp1392999621-16 Exchange[Id: ID-yinwenjie-240-57818-1467030193866-0-3, ExchangePattern: InOut, BodyType: org.apache.camel.converter.stream.InputStreamCache] (MarkerIgnoringBase.java:96)

[2016-06-27 20:44:56] INFO  qtp1392999621-16 這是Dynamic Router迴圈第:【4】次執行!執行執行緒:qtp1392999621-16 (DynamicRouterCamel.java:105)

[2016-06-27 20:44:56] INFO  qtp1392999621-16 最後exchangeID = ID-yinwenjie-240-57818-1467030193866-0-3 | org.apache.camel.converter.stream.InputStreamCache@2abaa89c || 被OtherProcessor處理 | time = 4 (DynamicRouterCamel.java:150)

從以上執行效果看,無論dynamicRouter執行的是第幾次迴圈判斷,Exchange都是同一個(ID號為【ID-yinwenjie-240-57818-1467030193866-0-3】)。

3-6、Service與生命週期

在Apache Camel中有一個比Endpoint、Component、CamelContext等元素更基礎的概念元素:Service。Camel官方文件中對Service的解釋是:

Camel uses a simple lifecycle interface called Service which has a single start() and stop() method.

Various classes implement Service such as CamelContext along with a number of Component and Endpoint classes.

When you use Camel you typically have to start the CamelContext which will start all the various components and endpoints and activate the routing rules until the context is stopped again.

......

包括Endpoint、Component、CamelContext等元素在內的大多數工作在Camel中的元素,都是一個一個的Service。例如,我們雖然定義了一個JettyHttpComponent(就是在程式碼中使用DSL定義的”jetty:http://0.0.0.0:8282/directCamel“頭部所表示的Component),但是我們想要在Camel應用程式執行階段使用這個Component,就需要利用start方法將這個Component啟動起來。

實際上通過閱讀org.apache.camel.component.jetty.JettyHttpComponent的原始碼,讀者可以發現JettyHttpComponent的啟動過程起始大多數情況下什麼都不會做,只是在org.apache.camel.support.ServiceSupport中更改了JettyHttpComponent物件的一些狀態屬性。倒是HttpConsumer這個Service,在啟動的過程中啟動了JettyHttpComponent物件的連線監聽,並建立了若干個名為【qtp-*】的處理執行緒。下圖為讀者展示了org.apache.camel.Service介面的主要繼承/實現體系:

這裡寫圖片描述

Service有且只有兩個介面方法定義:start()和stop(),這兩個方法的含義顯而易見,啟動服務和終止服務。另外繼承自Service的另外兩個子級介面SuspendableService、ShutdownableService分別還定義了另外幾個方法:suspend()、resume()和shutdown()方法,分別用來暫停服務、恢復服務和徹底停止服務(徹底停止服務意味著在Camel應用程式執行的有生之年不能再次啟動了)。

Camel應用程式中的每一個Service都是獨立執行的,各個Service的關聯銜接通過CamelContext上下文物件完成。每一個Service通過呼叫start()方法被啟用並參與到Camel應用程式的工作中,直到它的stop()方法被呼叫。也就是說,每個Service都有獨立的生命週期。(http://camel.apache.org/lifecycle.html

那麼問題來了,既然每個Service都有獨立的生命週期,我們啟動Camel應用程式時就要啟動包括Route、Endpoint、Component、Producer、Consumer、LifecycleStrategy等概念元素在內的無數多個Service實現,那麼作為開發人員不可能編寫程式碼一個一個的Service來進行啟動(大多數開發人員不瞭解Camel的內部結構,也根本不知道要啟動哪些Service)。那麼作為Camel應用程式肯定需要提供一個辦法,在應用程式啟動時分析應用程式所涉及到的所有的Service,並統一管理這些Service啟動和停止的動作。這就是CamelContext所設計的另一個功能。

4、CamelContext上下文

CamelContext從英文字面上理解,是Camel服務上下文的意思。CamelContext在Apache Camel中的重要性,就像ApplicationContext之於Spring、ServletContext之於Servlet…… 但是包括Camel官方文件在內的,所有讀者能夠在網際網路上找到的資料對於CamelContext的介紹都只有聊聊數筆。

The context component allows you to create new Camel Components from a CamelContext with a number of routes which is then treated as a black box, allowing you to refer to the local endpoints within the component from other CamelContexts.

First you need to create a CamelContext, add some routes in it, start it and then register the CamelContext into the Registry (JNDI, Spring, Guice or OSGi etc).

………

以上是Camel官方文件(http://camel.apache.org/context.html)對於CamelContext作用的一些說明,大致的意思是說CamelContext橫跨了Camel服務的整個生命週期,並且為Camel服務的工作環境提供支撐。

4-1、CamelContext實現結構

那麼CamelContext中到底儲存了哪些重要的元素,又是如何工作的呢?看樣子官方的使用手冊中並沒有說明,我們還是通過分析CamelContext的原始碼來看看它的一些什麼內容吧。下面我們應用已經講解過的Apache Camel相關知識,對org.apache.camel.CamelContext介面以及它的主要實現類進行分析,以便儘可能的去理解為什麼CamelContext非常重要:

這裡寫圖片描述

上圖是Apache Camel中實現了org.apache.camel.CamelContext介面的主要類。其中有兩個實現類需要特別說明一下:SpringCamelContext和DefaultCamelContext。Camel可以和Spring框架進行無縫整合,例如可以將您的某個Processor處理器以Spring Bean的形式注入到Spring Ioc容器中,然後Camel服務就可以通過在Spring Ioc容器中定義的bean id(XML方式或者註解方式都行)取得這個Processor處理器的例項。

為了實現以上描述的功能,需要Camel服務能夠從Spring的ApplicationContext取得Bean,而SpringCamelContext可以幫助Camel服務完成這個關鍵動作:通過SpringCamelContext中重寫的createRegistry方法建立一個ApplicationContextRegistry例項,並通過後者從ApplicationContext的“getBean”方法中獲取Spring Ioc容器中符合指定的Bean id的例項。這就是Camel服務和Spring進行無縫整合的一個關鍵點,如以下程式碼片段所示:

public class SpringCamelContext extends DefaultCamelContext implements InitializingBean, DisposableBean, ApplicationContextAware {
    ......
     @Override
    protected Registry createRegistry() {
        return new ApplicationContextRegistry(getApplicationContext());
    }
    ......
}

public class ApplicationContextRegistry implements Registry {
    ......

    @Override
    public Object lookupByName(String name) {
        try {
            return applicationContext.getBean(name);
        } catch (NoSuchBeanDefinitionException e) {
            return null;
        }
    }
    ......
}

另外一個需要說明的是DefaultCamelContext類,這個類是我們在前文涉及到Camel示例程式碼時使用最多的CamelContext實現。而我們將要分析的CamelContext工作原理也基本上是在這個類中進行了完整的實現——其子類只是根據不同的Camel執行環境重寫了其中某些方法(例如之前提到的createRegistry方法)。

4-2、DefaultCamelContext結構和啟動過程

如果我們翻閱DefaultCamelContext的原始碼,首先就會發現在其中定義了許多全域性變數,數量在70個左右(實際上根據《程式碼大全》的描述,一個類中不應該有這麼多全域性變數。究竟這個類的作者當時是怎樣的想法,就不清楚了)。其中一些變數負責記錄CamelContext的狀態屬性、一些負責引用輔助工具還有一些記錄關聯的頂層工作物件(例如Endpoint、Servcie、Routes、)Components等等)。很明顯我們無法對這些變數逐一進行深入分析講解,但是經過前兩篇文章的介紹至少以下變數資訊我們是需要理解其作用的:

public class DefaultCamelContext extends ServiceSupport implements ModelCamelContext, SuspendableService {
    ......

    // java的基礎概念:類載入器,一般進行執行緒操作時會用到它
    private ClassLoader applicationContextClassLoader;
    // 已定義的endpoint URI(完整的)和Endpoint物件的對映關係
    private Map<EndpointKey, Endpoint> endpoints;
    // 已使用的元件名稱(即Endpoint URI頭所代表的元件名稱)和元件物件的對應關係
    private final Map<String, Component> components = new HashMap<String, Component>();
    // 針對原始路由編排所分析出的路由物件,路由物件是作為CamelContext從路由中的一個元素傳遞到下一個元素的依據
    //  路由物件中還包含了,將路由定義中各元素連線起來的其它Service。例如DefaultChannel
    private final Set<Route> routes = new LinkedHashSet<Route>();
    // 由DSL或者XML描述的原始路由編排。每一個RouteDefinition元素中都包含了參與這個路由的所有Service定義。
    private final List<RouteDefinition> routeDefinitions = new ArrayList<RouteDefinition>();
    // 生命週期策略,實際上是一組監聽,文章後面的內容會重點講到
    private List<LifecycleStrategy> lifecycleStrategies = new CopyOnWriteArrayList<LifecycleStrategy>();
    // 這是一個計數器,記錄當前每一個不同的Routeid中正在執行的的Exchange數量
    private InflightRepository inflightRepository = new DefaultInflightRepository();
    // 服務停止策略
    private ShutdownStrategy shutdownStrategy = new DefaultShutdownStrategy(this);
    ......
}

看來CamelContext是挺重要的,它基本將Camel應用程式執行所需要的所有基本資訊都記錄在案。另外,Apache Camel中還有一個名叫org.apache.camel.CamelContextAware的介面,只要實現該介面的就必須實現這個介面定義的兩個方法:setCamelContext和getCamelContext。而實際上在Camel中的大多數元素都實現了這個介面,所以我們在閱讀程式碼時可以發現DefaultCamelContext在一邊啟動各個Service的時候,順便將自己所為引數賦給了正在啟動的Service,最終實現了各個Service之間的共享上下文資訊的效果

// 這是CamelContextAware介面的定義
public interface CamelContextAware {
    /**
     * Injects the {@link CamelContext}
     *
     * @param camelContext the Camel context
     */
    void setCamelContext(CamelContext camelContext);

    /**
     * Get the {@link CamelContext}
     *
     * @return camelContext the Camel context
     */
    CamelContext getCamelContext();
}

............

// 這是DefaultCamelContext的doAddService方法中
// 對實現了CamelContextAware介面的Service
// 進行CamelContext設定的程式碼
private void doAddService(Object object, boolean closeOnShutdown) throws Exception {
    ......
    if (object instanceof CamelContextAware) {
        CamelContextAware aware = (CamelContextAware) object;
        aware.setCamelContext(this);
    }
    ......
}

為了和本文3-6小節的內容向呼應,這裡我們著重分析一下DefaultCamelContext的啟動過程:DefaultCamelContext是如何幫助整個Camel應用程式中若干Service完成啟動過程的?首先說明DefaultCamelContext 也是一個Service,所以它必須實現Service介面的start()方法和stop()方法。而DefaultCamelContext對於start()方法的實現就是“啟動其它已知的Service”。

更具體的來說,DefaultCamelContext將所有需要啟動的Service按照它們的作用型別進行區分,例如負責策略管理的Service、負責Components元件描述的Service、負責註冊管理的Service等等,然後再按照順序啟動這些Service。以下程式碼片段提取自DefaultCamelContext的doStartCamel()私有方法,並加入了筆者的中文註釋(原有作者的註釋依然保留),這個私有方法由DefaultCamelContext中的start()方法間接呼叫,用於完成上述各Service啟動操作。

// 為了呼叫該私有方法,之前的方法執行棧分別為:
// start()
// super.start()
// doStart()
......
private void doStartCamel() throws Exception {
    // 獲取classloader是有必要的,這樣保證了Camel服務中的classloader和環境中的其他元件(例如spring)一致
    if (applicationContextClassLoader == null) {
       // Using the TCCL as the default value of ApplicationClassLoader
        ClassLoader cl = Thread.currentThread().getContextClassLoader();
        if (cl == null) {
            // use the classloader that loaded this class
            cl = this.getClass().getClassLoader();
        }
        setApplicationContextClassLoader(cl);
    }

    ......

    // 首先啟動的是ManagementStrategy策略管理器,它的預設實現是DefaultManagementStrategy。
    // 還記得我們在分析DUBBO時提到的Java spi機制吧,Camel-Core也使用了這個機制,並進行了二次封裝。詳見org.apache.camel.spi程式碼包。
    // 啟動ManagementStrategy,可以幫助Camel實現第三方元件包(例如Camel-JMS)的動態載入
    // start management strategy before lifecycles are started
    ManagementStrategy managementStrategy = getManagementStrategy();
    // inject CamelContext if aware
    if (managementStrategy instanceof CamelContextAware) {
        ((CamelContextAware) managementStrategy).setCamelContext(this);
    }
    ServiceHelper.startService(managementStrategy);

    ......
    // 然後啟動的是 生命週期管理策略 
    // 這個lifecycleStrategies變數是一個LifecycleStrategy泛型的List集合。
    // 實際上LifecycleStrategy是指是一組監聽,詳見程式碼片段後續的描述
    ServiceHelper.startServices(lifecycleStrategies);

    ......
    // 接著做一系列的Service啟動動作
    // 首先是Endpoint註冊管理服務,要進行重點介紹的是org.apache.camel.util.LRUSoftCache
    // 它使用了java.lang.ref.SoftReference進行實現,這是Java提供的
    endpoints = new EndpointRegistry(this, endpoints);
        addService(endpoints);

    ......
    // 啟動執行緒池管理策略和一些列其它服務
    // 基本上這些Service已經在上文中提到過
    doAddService(executorServiceManager, false);
    addService(producerServicePool);
    addService(inflightRepository);
    addService(shutdownStrategy);
    addService(packageScanClassResolver);
    addService(restRegistry);

    ......
    // start components
    startServices(components.values());
    // 啟動路由定義,路由定義RouteDefinition本身並不是Service,但是其中包含了參與路由的各種元素,例如Endpoint。
    // start the route definitions before the routes is started
    startRouteDefinitions(routeDefinitions);

    ......
}
......

以上程式碼片段已經做了比較詳細的註釋。下文中,我們將以上程式碼片段中無法用幾句話在程式碼註釋中表達的關鍵知識點再進行說明:

========================
(接下文)

4-2-1、LifecycleStrategy
4-2-2、CopyOnWriteArrayList與監聽者模式
4-2-3、LRUSoftCache和SoftReference