1. 程式人生 > >架構設計:系統間通訊(36)——Apache Camel快速入門(上)

架構設計:系統間通訊(36)——Apache Camel快速入門(上)

架構設計:系統間通訊(36)——Apache Camel快速入門(上)

:http://blog.csdn.net/yinwenjie(未經允許嚴禁用於商業用途!) https://blog.csdn.net/yinwenjie/article/details/51692340

1、本專題主旨

1-1、關於技術元件

在這個專題中,我們介紹了相當數量技術元件:Flume、Kafka、ActiveMQ、Rabbitmq、Zookeeper、Thrift 、Netty、DUBBO等等,還包括本文要進行介紹的Apache Camel。有的技術元件講得比較深入,有的技術元件則是點到為止。於是一些讀者朋友發來資訊向我提到,這個專題的文章感覺就像一個技術名詞的大雜燴,並不清楚作者的想要通過這個專題表達什麼思想。

提出這個質疑的朋友不在少數,所以我覺得有必要進行一個統一的說明。這個專題的名字叫做“系統間通訊”,在這個專題中我的講解主要是圍繞自己在實際工作中所總結的理論結構。從系統間通訊最基本的網路IO模型、訊息格式、協議標準開始,再到搭建在其上的通訊元件,然後再關注更上層的進行通訊過程協調、統一通訊過程的中介軟體。這就是本專題的主旨。要說到本專題中筆者介紹的每一個技術元件,如果要做更深入的研究都可以再開新的專題,行業內各自都有很多的著作進行專門介紹。

那麼為什麼本專題中還要介紹這些技術元件呢?無非有三個原因:其一,是為了說清楚理論結構而服務。介紹JAVA對BIO、NIO和AIO三種網路IO模型的支援,是為了讓讀者對這三種網路模型的工作過程從感性過渡到理性;介紹RMI、Thrift是為了例項化筆者介紹的,搭建在網路IO模型上的RPC協議。其二,是筆者認為一些技術元件的設計思路,可以為各位架構師朋友在實際工作中所借鑑

,例如筆者對DUBBO中一些功能模組的設計進行講解、對Apache Thrift中序列化方式進行詳細講解就是出於第二個原因。其三,是因為本專題中設計的實戰內容需要對將要用到的技術元件預先進行講解。在本專題中,我們基於Thrift、Zookeeper自己設計了一個服務治理框架;我們還基於ActiveMQ、Kafka和Flume闡述了不同場景的日誌系統設計方案。所以為了讓更多的讀者能夠看懂這些技術方案,最好的方式就是將它們快速的介紹一下。

感謝那些在本專題中為了點“贊”的朋友,感謝那些在留言中向我提出修改意見、指出錯誤的朋友,特別是RMI的那篇文章犯的錯誤,太低階了。有你們的支援,讓我覺得付出是有收穫的;另外,目前這個系列的35片文章一共收到了7個“踩”,點“踩”的朋友能否在留言中為我指出文章的錯誤和不足,幫助我重新整理自己的思路,修正知識點的問題。謝謝。

1-2、關於程式碼示例

本專題中筆者貼出了佔文章相當篇幅的程式碼片段,貼出這些程式碼片段主要也有三種可能的情況。其一,是為了各位讀者快速瞭解某一種技術元件的基本使用;其二,是為了實現文章中描述的設計思路;其三,是為了進行技術驗證,例如《架構設計:系統間通訊(29)——Kafka及場景應用(中2)》文章中4-4-5小節列出的程式碼,是為了驗證2013年2月2日,Kafka的主要參與者Neha Narkhede發表的一篇講解Kafka Replication過程的技術文件。

有的讀者向我提出,這些程式碼片段過於冗長,甚至有故意加長文章篇幅的目的。這個問題可能是我寫作經驗不足,給大家造成了困擾。不過,在文章中的貼出的程式碼為了達到相應的目的,都配有比較詳細的註釋說明,也為了避免各位讀者在閱讀程式碼時打瞌睡^_^。另外,我將在對這個專題文章的第二次整理時,會去掉類似於get/set這種性質的程式碼片段——它們確實有“佔篇幅”之嫌。

1-3、關於抄襲

最後筆者認為抄襲是最卑劣的行為,所以本專題中的內容均是來源於筆者對鍵盤做功,涉及到的圖例也是筆者一個一個畫出來的,所有引用的圖片、說明均註明出處。筆者歡迎轉載,但未經授權請勿用於商業用途。

2、Apache Camel 快速入門

那麼這裡我們為什麼又要花兩篇文章的篇幅來介紹Apache Camel呢?因為後續文章中,在我們進行一款簡單的ESB中介軟體設計時,我們將會依靠Apache Camel提供的協議轉換、訊息路由等核心能力。那麼,就讓我們開始吧!

2-1、Apache Camel介紹

Apache Camel的官網地址是http://camel.apache.org/,在本篇文章成文時最新的版本是V2.17.1,您可以通過多種手段進行下載。Apache Camel的官網並沒有把Camel定義成一個ESB中介軟體服務,因為Camel並不是服務:

Camel empowers you to define routing and mediation rules in a variety of domain-specific languages, including a Java-based Fluent API, Spring or Blueprint XML Configuration files, and a Scala DSL. This means you get smart completion of routing rules in your IDE, whether in a Java, Scala or XML editor.

Apache Camel uses URIs to work directly with any kind of Transport or messaging model such as HTTP, ActiveMQ, JMS, JBI, SCA, MINA or CXF, as well as pluggable Components and Data Format options. Apache Camel is a small library with minimal dependencies for easy embedding in any Java application. Apache Camel lets you work with the same API regardless which kind of Transport is used - so learn the API once and you can interact with all the Components provided out-of-box.

Apache Camel provides support for Bean Binding and seamless integration with popular frameworks such as CDI, Spring, Blueprint and Guice. Camel also has extensive support for unit testing your routes.

以上引用是Apache Camel官方對它的定義。domain-specific languages指代的是DSL(領域特定語言),首先Apache Camel支援DSL,這個問題已經在上一篇文章中說明過了。Apache Camel支援使用JAVA語言和Scala語言進行DSL規則描述,也支援使用XML檔案進行的規則描述。這裡提一下,JBOSS提供了一套工具“Tools for Apache Camel”可以圖形化Apache Camel的規則編排過程。

Apache Camel在編排模式中依託URI描述規則,實現了傳輸協議和訊息格式的轉換:HTTP, ActiveMQ, JMS, JBI, SCA, MINA or CXF等等。Camel還可以嵌入到任何java應用程式中:看到了吧,Apache Camel不是ESB中介軟體服務,它需要依賴於相應的二次開發才能被當成ESB服務的核心部分進行使用。

2-2、快速使用示例

說了那麼多,那麼我們來看看Apache Camel最簡單的使用方式吧:

package com.yinwenjie.test.cameltest.helloworld;

import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;

import org.apache.camel.Exchange;
import org.apache.camel.ExchangePattern;
import org.apache.camel.Message;
import org.apache.camel.Processor;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.component.http.HttpMessage;
import org.apache.camel.impl.DefaultCamelContext;
import org.apache.camel.model.ModelCamelContext;

/**
 * 鄭重其事的寫下 helloworld for Apache Camel
 * @author yinwenjie
 */
public class HelloWorld extends RouteBuilder {
    public static void main(String[] args) throws Exception {
        // 這是camel上下文物件,整個路由的驅動全靠它了。
        ModelCamelContext camelContext = new DefaultCamelContext();
        // 啟動route
        camelContext.start();
        // 將我們編排的一個完整訊息路由過程,加入到上下文中
        camelContext.addRoutes(new HelloWorld());

        /* 
         * ==========================
         * 為什麼我們先啟動一個Camel服務
         * 再使用addRoutes新增編排好的路由呢?
         * 這是為了告訴各位讀者,Apache Camel支援動態載入/解除安裝編排的路由
         * 這很重要,因為後續設計的Broker需要依賴這種能力
         * ==========================
         * */

        // 通用沒有具體業務意義的程式碼,只是為了保證主執行緒不退出
        synchronized (HelloWorld.class) {
            HelloWorld.class.wait();
        }
    }

    @Override
    public void configure() throws Exception {
        // 在本程式碼段之下隨後的說明中,會詳細說明這個構造的含義
        from("jetty:http://0.0.0.0:8282/doHelloWorld")
        .process(new HttpProcessor())
        .to("log:helloworld?showExchangeId=true");
    }

    /**
     * 這個處理器用來完成輸入的json格式的轉換
     * @author yinwenjie
     */
    public class HttpProcessor implements Processor {

        /* (non-Javadoc)
         * @see org.apache.camel.Processor#process(org.apache.camel.Exchange)
         */
        @Override
        public void process(Exchange exchange) throws Exception {
            // 因為很明確訊息格式是http的,所以才使用這個類
            // 否則還是建議使用org.apache.camel.Message這個抽象介面
            HttpMessage message = (HttpMessage)exchange.getIn();
            InputStream bodyStream =  (InputStream)message.getBody();
            String inputContext = this.analysisMessage(bodyStream);
            bodyStream.close();

            // 存入到exchange的out區域
            if(exchange.getPattern() == ExchangePattern.InOut) {
                Message outMessage = exchange.getOut();
                outMessage.setBody(inputContext + " || out");
            }
        }

        /**
         * 從stream中分析字串內容
         * @param bodyStream
         * @return
         */
        private String analysisMessage(InputStream bodyStream) throws IOException {
            ByteArrayOutputStream outStream = new ByteArrayOutputStream();
            byte[] contextBytes = new byte[4096];
            int realLen;
            while((realLen = bodyStream.read(contextBytes , 0 ,4096)) != -1) {
                outStream.write(contextBytes, 0, realLen);
            }

            // 返回從Stream中讀取的字串
            try {
                return new String(outStream.toByteArray() , "UTF-8");
            } finally {
                outStream.close();
            }
        }
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
  • 87
  • 88
  • 89
  • 90
  • 91
  • 92
  • 93
  • 94
  • 95
  • 96
  • 97
  • 98

以上程式碼可以直接拿來使用,它展示了一個簡單的可以實際執行的訊息路由規則:首先from語句中填寫的“jetty:http://0.0.0.0:8282/doHelloWorld”表示這個編排好的路由的訊息入口:使用http傳輸協議,訪問本物理節點上任何IP(例如127.0.0.1或者192.168.1.1),在埠8282上的請求,都可以將HTTP攜帶的訊息傳入這個路由。

接下來訊息會在HttpProcessor這個自定義的處理器中被進行轉換。為了讓各位讀者看清楚原理,HttpProcessor 中的訊息轉換很簡單:在HTTP傳入的json字串的末尾,加上一個” || out”並進行字串輸出。Apache Camel中自帶了很多處理器,並且可以自行實現Processor介面來實現自己的處理邏輯。

最後,訊息被傳輸到最後一個endPoint控制端點,這個endPoint控制端點的URI描述(log:helloworld?showExchangeId=true)表明它是一個Log4j的實現,所以訊息最終會以Log日誌的方式輸出到控制檯上。到此,整個編排的路由就執行完成了。

2-3、EIP

Camel supports most of the Enterprise Integration Patterns from the excellent book by Gregor Hohpe and Bobby Woolf.

EIP的概念在之前的文章中已經進行了介紹,它來源於Gregor Hohpe 和Bobby Woolf合著的一本書《Enterprise Integration Patterns》。在書中Gregor Hohpe 和Bobby Woolf闡述瞭如何對企業內的各個業務系統整合進行設計,包括:如何進行路由設計、如何進行訊息傳遞等等。Apache Camel的設計方案就源於這本書中提出的解決思路,下面我們就對Camel中的重點要素進行講解。

3、Camel要素

3-1、Endpoint 控制端點

Apache Camel中關於Endpoint最直白的解釋就是,Camel作為系統整合的基礎服務元件,在已經編排好的路由規則中,和其它系統進行通訊的設定點。這個“其它系統”,可以是存在於本地或者遠端的檔案系統,可以是進行業務處理的訂單系統,可以是訊息佇列服務,可以是提供了訪問地址、訪問ip、訪問路徑的任何服務。Apache Camel利用自身提供的廣泛的通訊協議支援,使這裡的“通訊”動作可以採用大多數已知的協議,例如各種RPC協議、JMS協議、FTP協議、HTTP協議。。。

Camel中的Endpoint控制端點使用URI的方式描述對目標系統的通訊。例如以下URI描述了對外部MQ服務的通訊,訊息格式是Stomp:

// 以下程式碼表示從名為test的MQ佇列中接收訊息,訊息格式為stomp
// 使用者名稱為username,監聽本地埠61613
from("stomp:queue:test?tcp://localhost:61613&login=username")

// 以下程式碼表示將訊息傳送到名為test的MQ佇列中,訊息格式為stomp
to("stomp:queue:test?tcp://localhost:61613&login=username");
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6

更多的stomp控制端點的說明可參見Camel中的說明:http://camel.apache.org/stomp.html。再例如,我們可以使用Http協議和某一個外部系統進行通訊,更多關於Http控制端點的說明也可參見Camel中的說明:http://camel.apache.org/http.html

// 主動向http URI描述的路徑發出請求(http的URI筆者不需要再介紹了吧)
from("http://localhost:8080/dbk.manager.web/queryOrgDetailById")

// 將上一個路由元素上Message Out中訊息作為請求內容,
// 向http URI描述的路徑發出請求
// 注意,Message Out中的Body內容將作為資料流對映到Http Request Body中
to("http://localhost:8080/dbk.manager.web/queryOrgDetailById")
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7

以上的示例中,請注意“from”部分的說明。它並不是等待某個Http請求匹配描述的URI傳送到路由路徑上,而是主動向http URI描述的路徑傳送請求。如果想要達到前者的效果,請使用Jetty/Servlet開頭的相關通訊方式:http://camel.apache.org/servlet.htmlhttp://camel.apache.org/jetty.html。而通過Apache Camel官網中 http://camel.apache.org/uris.html 路徑可以檢視大部分Camel通過URI格式所支援的Endpoint。

Camel makes extensive use of URIs to allow you to refer to endpoints which are lazily created by a Component if you refer to them within Routes.
  • 1

以上引用是Apache Camel官方文件中,關於endpoint和URI之間關係的描述。從這段官方描述可以看出,不同的endpoint都是通過URI格式進行描述的,並且通過Camel中的org.apache.camel.Component(endpoint構建器)介面的響應實現進行endpoint例項的建立。需要注意的是,Camel通過plug方式提供對某種協議的endpoint支援,所以如果讀者需要使用某種Camel的endpoint,就必須確定自己已經在工程中引入了相應的plug。例如,如果要使用Camel對Netty4-Endpoint的支援,就需要在工程中引入Camel對Netty4的支援,如下:

<dependency>
    <groupId>org.apache.camel</groupId>
    <artifactId>camel-netty4</artifactId>
    <version>x.x.x</version>
</dependency>
  • 1
  • 2
  • 3
  • 4
  • 5

在這個camel-plug引用中,就包含了Netty4對Endpoint的實現和Netty4對Component的實現:org.apache.camel.component.netty4.NettyEndpoint、org.apache.camel.component.netty4.NettyComponent。

3-2、Exchange和Message訊息格式

訊息在我們已經編排好的業務路徑上進行傳遞,通過我們自定義的訊息轉換方式或者Apache Camel提供的訊息轉換方式進行訊息格式轉換。那麼為了完成這些訊息傳遞、訊息轉換過程Camel中的訊息必須使用統一的訊息描述格式,並且保證路徑上的控制端點都能存取訊息

Camel提供的Exchange要素幫助開發人員在控制端點到處理器、處理器到處理器的路由過程中完成訊息的統一描述。一個Exchange元素的結構如下圖所示:

這裡寫圖片描述

3-2-1、Exchange中的基本屬性

  • ExchangeID

一個Exchange貫穿著整個編排的路由規則,ExchangeID就是它的唯一編號資訊,同一個路由規則的不同例項(對路由規則分別獨立的兩次執行),ExchangeID不相同。

  • fromEndpoint

表示exchange例項初始來源的Endpoint控制端點(類的例項),一般來說就是開發人員設定路由時由“from”關鍵字所表達的Endpoint。例如本文2-2小節中的程式碼示例,from關鍵字填寫的URI資訊就是”jetty:http://0.0.0.0:8282/doHelloWorld“,而實現Jetty協議頭支援的org.apache.camel.Endpoint介面實現類就是org.apache.camel.component.jetty.JettyHttpEndpoint。所以在2-2小節中,Exchange物件中的fromEndpoint屬性就是JettyHttpEndpoint類的一個例項化物件。

  • properties

Exchange物件貫穿整個路由執行過程中的控制端點、處理器甚至還有表示式、路由條件判斷。為了讓這些元素能夠共享一些開發人員自定義的引數配置資訊,Exchange以K-V結構提供了這樣的引數配置資訊儲存方式。在org.apache.camel.impl.DefaultExchange類中,對應properties的實現程式碼如下所示:

......
public Map<String, Object> getProperties() {
    if (properties == null) {
        properties = new ConcurrentHashMap<String, Object>();
    }
    return properties;
}
......
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • Pattern

Exchange中的pattern屬性非常重要,它的全稱是:ExchangePattern(交換器工作模式)。其實現是一個列舉型別:org.apache.camel.ExchangePattern。可以使用的值包括:InOnly, RobustInOnly, InOut, InOptionalOut, OutOnly, RobustOutOnly, OutIn, OutOptionalIn。從Camel官方已公佈的文件來看,這個屬性描述了Exchange中訊息的傳播方式。

例如Event Message型別的訊息,其ExchangePattern預設設定為InOnly。Request/Reply Message型別的訊息,其ExchangePattern設定為InOut。但是筆者通過程式碼排查,發現並不是ExchangePattern都被Camel-Core核心實現部分所使用(並不能說明沒有被諸如 Camel-CXF這些pluin所使用),而且Camel的官方文件對於它們的介紹也只有寥寥數筆(http://camel.apache.org/exchange-pattern.html)。例如RobustOutOnly、OutOptionalIn、OutOnly這些列舉值就沒有在Camel-Core實現部分發現引用。

  • Exception

如果在處理器Processor的處理過程中,開發人員需要丟擲異常並終止整個訊息路由的執行過程,可以通過設定Exchange中的exception屬性來實現。

3-2-2、Exchange中的Message

Exchange中還有兩個重要屬性inMessage和outMessage。這兩個屬性分別代表Exchange在某個處理元素(處理器、表示式等)上的輸入訊息和輸出訊息。

當控制端點和處理器、處理器和處理器間的Message在Exchange中傳遞時(雖然ExchangePattern列舉中存在isInCapable()、isInCapable()這樣的判斷方法,但是通過程式碼排查,筆者並沒有發現在camel-core中有關於這些方法的任何使用),Exchange會自動將上一個元素的輸出值作為作為這個元素的輸入值進行使用。但是如果在上一個處理器中,開發人員沒有在Exchange中設定任何out message內容(即Excahnge中out屬性為null),那麼上一個處理器中的in message內容將作為這個處理器的in message內容。

這裡需要注意一個問題,在DefaultExchange類中關於getOut()方法的實現,有這樣的程式碼片段:

......
public Message getOut() {
    // lazy create
    if (out == null) {
        out = (in != null && in instanceof MessageSupport)
            ? ((MessageSupport)in).newInstance() : new DefaultMessage();
        configureMessage(out);
    }
    return out;
}
......
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11

所以,在處理器中對out message屬性的賦值,並不需要開發人員明確的“new”一個Message物件。只需要呼叫getOut()方法,就可以完成out message屬性賦值。以下路由程式碼片段在fromEndpoint後,連續進入兩個Processor處理器,且Exchange的ExchangePattern為InOut。我們來觀察從第一個處理處理完後,到第二個處理收到訊息時Exchange物件中的各個屬性產生的變化:

......
from("jetty:http://0.0.0.0:8282/doHelloWorld")
.process(new HttpProcessor())
.process(new OtherProcessor())
......
  • 1
  • 2
  • 3
  • 4
  • 5
  • 第一個HttpProcessor執行末尾時,Exchange中的屬性

這裡寫圖片描述

上圖顯示了當前記憶體區域中,Exchange物件的id為452,fromEndpoint屬性是一個JettyHttpEndpoint的例項,物件id為479。注意兩個重要的inMessage和outMessage,它們分別是HttpMessage的例項(物件id467)和DefaultMessage的例項(物件id476),這裡說明一下無論是HttpMessage還是DefaultMessage,它們都是org.apache.camel.Message介面的實現。

outMessage中的body部分儲存了一個字串資訊,我們隨後驗證一下資訊在下一個OtherProcessor處理器中的記錄方式。

  • 第二個OtherProcessor開始執行時,Exchange中的屬性

這裡寫圖片描述

可以看到HttpProcessor處理器中outMessage的Message物件作為了這個OtherProcessor處理器的inMessage屬性,物件的id編號都是476,說明他們使用的記憶體區域都是相同的,是同一個物件。Excahnge物件的其它資訊也從HttpProcessor處理器原封不動的傳遞到了OtherProcessor處理器。

每一個Message(無論是inMessage還是outMessage)物件主要包括四個屬性:MessageID、Header、Body和Attachment。

  • MessageID

在系統開發階段,提供給開發人員使用的標示訊息物件唯一性的屬性,這個屬性可以沒有值。

  • Header

訊息結構中的“頭部”資訊,在這個屬性中的資訊採用K-V的方式進行儲存,並可以隨著Message物件的傳遞將資訊帶到下一個參與路由的元素中。

主要注意的是在org.apache.camel.impl.DefaultMessage中對headers屬性的實現是一個名叫org.apache.camel.util.CaseInsensitiveMap的類。看這個類的名字就知道:headers屬性的特點是忽略大小寫。也就是說:

......
outMessage.setHeader("testHeader", "headerValue");
outMessage.setHeader("TESTHEADER", "headerValue");
outMessage.setHeader("testheader", "HEADERVALUE");
......
  • 1
  • 2
  • 3
  • 4
  • 5

以上程式碼片段設定後,Message中的Headers屬性中只有一個K-V鍵值對資訊,且以最後一次設定的testheader為準。

  • Body

Message的業務訊息內容存放在這裡

  • Attachment

Message中使用attachment屬性儲存各種檔案內容資訊,以便這些檔案內容在Camel路由的各個元素間進行流轉。attachment同樣使用K-V鍵值對形式進行檔案內容的儲存。但不同的是,這裡的V是一個javax.activation.DataHandler型別的物件。

=====================================
(接下文)