1. 程式人生 > >簡化軟體整合:一個Apache Camel教程

簡化軟體整合:一個Apache Camel教程

本文來自於阮一峰,文章主要講解了構建的流程,每個步驟介紹的較為詳細,希望對大家有幫助。

軟體很少(如果有的話)存在於資訊真空中。至少,這是我們軟體工程師可以為我們開發的大多數應用程式做出的假設。

在任何規模上,每種軟體都以某種方式與其他軟體進行通訊,出於各種原因:從某處獲取參考資料,傳送監控訊號,與其他服務保持聯絡,同時作為分散式的一部分系統等等。

簡化軟體整合:一個Apache Camel教程

在本教程中,您將瞭解整合大型軟體的一些最大挑戰,以及Apache Camel如何輕鬆解決這些難題。

問題:系統整合的體系結構設計

在您的軟體工程中,您可能至少做了一次以下操作:

1.確定應啟動資料傳送的業務邏輯片段。

2.在相同的應用程式層,根據收件人的期望寫入資料轉換。

3.將資料封裝在適合通過網路傳輸和路由的結構中。

4.使用適當的驅動程式或客戶端SDK開啟到目標應用程式的連線。

5.傳送資料並處理響應。

為什麼這是一個不好的行為?

雖然你只有這種幾個連線,它仍然是可管理的。隨著系統之間關係的增加,應用程式的業務邏輯與整合邏輯混合在一起,即整合資料,補償兩個系統之間的技術差異,並通過SOAP,REST或更多異常請求將資料傳輸到外部系統。

如果您要整合多個應用程式,那麼在這樣的程式碼中追溯依賴關係的整個畫面是非常困難的:資料產生在哪裡以及哪些服務使用它?您將有許多地方整合邏輯重複,以引導。

有了這樣的方法,雖然這個任務在技術上已經完成,但是我們在整合的可維護性和可伸縮性方面遇到了很大的問題。這個系統中資料流的快速重組幾乎是不可能的,更不用說更深層次的問題,比如缺少監視,斷路,資料恢復等等。

當將軟體整合到一個相當大的企業的範圍時,這一點尤為重要。要處理企業整合,就意味著要與一組應用程式一起工作,這些應用程式執行在廣泛的平臺上,並且存在於不同的位置。在這樣一個軟體環境中,資料交換是相當苛刻的。它必須符合行業的高安全標準,並提供可靠的資料傳輸方式。在企業環境中,系統整合需要一個獨立的、全面的架構設計。

本文將向您介紹軟體整合面臨的獨特困難,併為整合任務提供一些經驗驅動的解決方案。我們將熟悉Apache Camel,這是一個有用的框架,可以減輕整合開發人員頭痛的最壞情況。我們將以駱駝如何幫助建立由Kubernetes提供支援的微服務叢集中的通訊為例。

整合困難

解決該問題的一個廣泛使用的方法是在應用程式中分離一個整合層。它可以存在於同一個應用程式中,也可以作為一個獨立執行的專用軟體 - 在後一種情況下稱為中介軟體。

在開發和支援中介軟體時,您通常會遇到什麼問題?一般來說,你有以下關鍵點:

所有資料通道在一定程度上都不可靠。資料強度低到中等時,可能不會出現由此不可靠性引起的問題。從應用程式記憶體到下面的快取和裝置的每個儲存級別都可能出現故障。只有大量的資料才會出現一些罕見的錯誤。即使成熟的生產就緒供應商產品也有未解決的與資料丟失有關的錯誤跟蹤器問題。一箇中間件系統應該能夠通知你這些資料的傷亡,並及時提供訊息重新傳遞。

應用程式使用不同的協議和資料格式。這意味著整合系統是資料轉換和介面卡到其他參與者的帷幕,並利用了各種技術。這些方法可以包括簡單的REST API呼叫,但也可以訪問佇列代理,通過FTP傳送CSV命令,或者將資料批量拖到資料庫表中。這是一張長長的單子,它不會變短的。

資料格式和路由規則的變化是不可避免的。應用程式開發過程中的每個步驟都會改變資料結構,這通常會導致整合資料格式和轉換的變化。有時候,重組企業資料流的基礎設施變化是必要的。例如,引入一個驗證參考資料的單點時,可能會發生這些更改,這些參考資料必須處理整個公司的所有主資料條目。有了N系統,我們最終可能N^2在它們之間有最大的連線,所以必須應用更改的地方的數量增長得相當快。這將像雪崩一樣。為了保持可維護性,中介軟體層必須通過多種路由和資料轉換提供清晰的依賴關係圖。

在設計整合和選擇最合適的中介軟體解決方案時,應該牢記這些想法。處理這個問題的可能方法之一是利用企業服務匯流排(ESB)。但是主要供應商提供的ESB通常過於沉重,而且往往比他們的價值更麻煩:ESB幾乎不可能快速啟動,它的學習曲線相當陡峭,而且它的靈活性被犧牲於一長串的功能和內建工具。在我看來,輕量級的開源整合解決方案要優越得多 - 它們更具彈性,易於部署到雲中,並且易於擴充套件。

軟體整合並不容易。今天,當我們構建微服務架構並處理大量的小型服務時,我們對於它們應該如何有效溝通也抱有很高的期望。

企業整合模式

正如所料,像一般的軟體開發一樣,資料路由和轉換的發展涉及重複的操作。經過一段時間的處理整合問題的專業人員對這方面的經驗進行了總結和系統化。在結果中,有一組稱為企業整合模式的提取模板,用於設計資料流。這些整合方法在Gregor Hophe和Bobby Wolfe的同名書中有描述,這很像“四人幫”的書,但是在膠合軟體方面。

舉一個例子,規範化模式引入了一個元件,它將具有不同資料格式的語義相同的訊息對映到單個規範模型,或者聚合器是一個將一系列訊息合併為一個的EIP。

由於它們是用於解決架構問題的技術無關的抽象,所以EIP有助於編寫一個架構設計,它不會深入到程式碼級別,而是足夠詳細地描述資料流。這種描述整合路線的符號不僅使設計簡潔,而且在解決與各業務領域的團隊成員的整合任務的背景下,設定了一個通用的術語和通用的語言,這是非常重要的。

介紹Apache Camel

整合路由被寫成由塊組成的管道。它建立了一個完全透明的影象來幫助追蹤資料流。

駱駝有許多流行的API介面卡。例如,從Apache Kafka獲取資料,監控AWS EC2例項,與Salesforce整合 - 所有這些任務都可以使用現成的元件來解決。

幾年前,我正在一個大型食品雜貨零售網路中建立一個企業整合體系,商店分佈廣泛。我從一個專有的ESB解決方案開始,後來證明這個方案過於繁瑣。然後,我們的團隊遇到了Apache Camel,在做了一些“概念驗證”工作之後,我們很快地將所有的資料流改寫成了Camel路由。

Apache Camel可以被描述為一個“中介路由器”,它是一個面向訊息的中介軟體框架,實現了我熟悉的EIP列表。它利用這些模式,支援所有常見的傳輸協議,並且包含了大量有用的介面卡。駱駝能夠處理大量的整合例程,而無需編寫自己的程式碼。

除此之外,我會選出下面的Apache Camel特性:

整合路由被寫成由塊組成的管道。它建立了一個完全透明的影象來幫助追蹤資料流。

Camel有許多流行的API介面卡。例如,從Apache Kafka獲取資料,監控AWS EC2例項,與Salesforce整合 - 所有這些任務都可以使用現成的元件來解決。

Apache Camel路由可以用Java或Scala DSL編寫。(XML配置也可用,但過於冗長,除錯功能更差)。它不會對通訊服務的技術堆疊施加限制,但是如果您使用Java或Scala編寫,則可以將Camel嵌入到應用程式中獨立執行。

Camel使用的路由符號可以用下面的簡單虛擬碼來描述:


from(Source)
.transform(Transformer)
.to(Destination)

的Source,Transformer以及Destination是指由其uri指向實現元件的端點。

是什麼讓Camel解決了我之前描述的整合問題?我們來看一下。首先,路由和轉換邏輯現在只能用於專門的Apache Camel配置。其次,通過簡潔自然的DSL結合EIP的使用,出現了系統之間的依賴關係圖。它由易理解的抽象構成,路由邏輯易於調整。最後,我們不必編寫轉換程式碼的堆,因為適當的介面卡可能已經包含在內。

我應該補充一點,Apache Camel是一個成熟的框架,並定期更新。它有一個偉大的社群和相當龐大的知識庫。

它確實有它自己的缺點。駱駝不應該被視為一個複雜的整合套件。這是一個沒有高階功能(如業務流程管理工具或活動監視器)的工具箱,但可用於建立此類軟體。

替代系統可能是,例如Spring Integration或Mule ESB。對於Spring Integration來說,儘管它被認為是輕量級的,但根據我的經驗,把它放在一起並編寫大量的XML配置檔案可能會變得異常複雜,並且不是一個簡單的出路。Mule ESB是一個功能強大且功能強大的工具集,但顧名思義,它是一種企業服務匯流排,因此它屬於不同的權重類別。Mule可以與Fuse ESB進行比較,Fuse ESB是一款基於Apache Camel的類似產品,具有豐富的功能。對我來說,使用Apache Camel來貼上服務是一件不容易的事情。它很容易使用,併產生一個乾淨的描述,在什麼地方,同時,它的功能足夠建設複雜的整合。

編寫一個示例路線

我們開始編寫程式碼。我們將從一個同步資料流開始,這個資料流將訊息從單一來源路由到收件人列表。路由規則將用Java DSL編寫。

我們將使用Maven構建專案。首先將以下依賴項新增到pom.xml:


<dependencies>
...
<dependency>
<groupId>org.apache.camel</groupId>
<artifactId>camel-core</artifactId>
<version>2.20.0</version>
</dependency>
</dependencies>

或者,應用程式可以建立在camel-archetype-java原型之上。

Camel路徑定義在RouteBuilder.configure方法中宣告。

public void configure() {

errorHandler(defaultErrorHandler().maximum

Redeliveries(0));

from("file:orders?noop=true").routeId("main")
.log("Incoming File: ${file:onlyname}")
.unmarshal().json(JsonLibrary.Jackson,

Order.class)

// unmarshal JSON to Order class containing

List

<OrderItem>
.split().simple("body.items") // split list to

process one by one
.to("log:inputOrderItem")
.choice()
.when().simple("${body.type} == 'Drink'")
.to("direct:bar")
.when().simple("${body.type} == 'Dessert'")
.to("direct:dessertStation")
.when().simple("${body.type} == 'Hot Meal'")
.to("direct:hotMealStation")
.when().simple("${body.type} == 'Cold Meal'")
.to("direct:coldMealStation")
.otherwise()
.to("direct:others");

from("direct:bar").routeId("bar").log

("Handling Drink");
from("direct:dessertStation").routeId

("dessertStation").

log("Handling Dessert");
from("direct:hotMealStation").routeId

("hotMealStation").

log("Handling Hot Meal");
from("direct:coldMealStation").routeId

("coldMealStation").

log("Handling Cold Meal");
from("direct:others").routeId("others")

.log("Handling

Something Other");
}

在這個定義中,我們建立了一個從JSON檔案中獲取記錄的路徑,將它們拆分成條目,並根據訊息內容路由到一組處理程式。

讓我們在準備好的測試資料上執行它。我們將得到輸出:


INFO | Total 6 routes, of which 6 are started
INFO | Apache Camel 2.20.0 (CamelContext:

camel-1) started in 10.716 seconds
INFO | Incoming File: order1.json
INFO | Exchange[ExchangePattern: InOnly,

BodyType: com.antongoncharov.camel.example.model

.OrderItem, Body: OrderItem{id='1', type='Drink', name='Americano', qty='1'}]
INFO | Handling Drink
INFO | Exchange[ExchangePattern: InOnly,

BodyType: com.antongoncharov.camel.example.model.OrderItem, Body: OrderItem{id='2', type='Hot Meal',

name='French Omelette', qty='1'}]
INFO | Handling Hot Meal
INFO | Exchange[ExchangePattern: InOnly,

BodyType: com.antongoncharov.camel.example.

model.

OrderItem, Body: OrderItem{id='3', type='Hot

Meal', name='Lasagna', qty='1'}]
INFO | Handling Hot Meal
INFO | Exchange[ExchangePattern: InOnly,

BodyType: com.antongoncharov.camel.example.model.OrderItem, Body: OrderItem{id='4', type='Hot Meal',

name='Rice Balls', qty='1'}]
INFO | Handling Hot Meal
INFO | Exchange[ExchangePattern: InOnly,

BodyType: com.antongoncharov.camel.example.model.

OrderItem, Body: OrderItem{id='5', type='Dessert', name='Blueberry Pie', qty='1'}]
INFO | Handling Dessert

正如所料,Camel路由訊息到目的地。

資料傳輸選擇

在上面的示例中,元件之間的互動是同步的,並通過應用程式記憶體執行。但是,當我們處理不共享記憶體的單獨應用程式時,還有更多的通訊方式:

1.檔案交換。一個應用程式產生共享資料檔案供另一個使用。這是老派精神的生存之地。這種溝通方式帶來了諸多後果:缺乏交易和一致性,效能較差,系統之間的孤立協調。許多開發人員最終編寫了自制的整合解決方案,使這個過程或多或少地可以管理。

2.通用資料庫。讓應用程式將他們希望共享的資料儲存在單個數據庫的通用模式中。設計統一模式和處理併發訪問表是這種方法最突出的挑戰。與檔案交換一樣,這很容易成為永久的瓶頸。

3.遠端API呼叫。提供一個介面,允許應用程式與另一個正在執行的應用程式進行互動,如典型的方法呼叫。應用程式通過API呼叫共享功能,但是它在過程中緊密耦合它們。

4.訊息。讓每個應用程式連線到一個通用的訊息傳遞系統,並使用訊息非同步交換資料和呼叫行為。傳送者和接收者都不必同時啟動並執行訊息。

有更多的互動方式,但是我們應該記住,從廣義上講,有兩種型別的互動:同步和非同步。第一個就像在你的程式碼中呼叫一個函式 - 執行流程將一直等待,直到它執行並返回一個值。使用非同步方法,相同的資料通過中間訊息佇列或訂閱主題傳送。非同步遠端函式呼叫可以作為請求 - 回覆EIP來實現。

非同步訊息傳遞不是萬能的,它涉及到一定的限制。您很少在網路上看到訊息API; 同步REST服務更受歡迎。但是訊息中介軟體被廣泛用於企業內部網或分散式系統後端基礎設施。

使用訊息佇列

讓我們的示例非同步。管理佇列和訂閱主題的軟體系統稱為訊息代理。這就像一個表和列的RDBMS。佇列用作點對點整合,而主題用於與許多接收者的釋出 - 訂閱通訊。我們將使用Apache ActiveMQ作為JMS訊息代理,因為它是可靠且可嵌入的。

新增以下依賴項。有時activemq-all,向專案中新增包含所有ActiveMQ jar 的過度,但我們會保持我們的應用程式的依賴關係不復雜。


<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-all</artifactId>
<version>5.15.2</version>
</dependency>

然後以程式設計方式啟動代理。在Spring Boot中,通過插入spring-boot-starter-activemqMaven依賴關係,我們得到了一個自動配置。

使用以下命令執行新的訊息代理,只指定聯結器的端點:


BrokerService broker = new BrokerService();
broker.addConnector("tcp://localhost:61616"); 
broker.start();

並將以下配置片段新增到configure方法體:


ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616");
this.getContext().addComponent("activemq", ActiveMQComponent.jmsComponent(connectionFactory));

現在我們可以使用訊息佇列來更新前面的例子。佇列將自動建立訊息傳遞。

public void configure() {

errorHandler(defaultErrorHandler().maximumRede

liveries(0));
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616");
this.getContext().addComponent("activemq", ActiveMQComponent.jmsComponent(connectionFactory));

from("file:orders?noop=true").routeId("main")
.log("Incoming File: ${file:onlyname}")
.unmarshal().json(JsonLibrary.Jackson, Order.class)

// unmarshal JSON to Order class containing

List<OrderItem>
.split().simple("body.items") // split list

to process one by one
.to("log:inputOrderItem")
.choice()
.when().simple("${body.type} == 'Drink'")
.to("activemq:queue:bar")
.when().simple("${body.type} == 'Dessert'")
.to("activemq:queue:dessertStation")
.when().simple("${body.type} == 'Hot Meal'")
.to("activemq:queue:hotMealStation")
.when().simple("${body.type} == 'Cold Meal'")
.to("activemq:queue:coldMealStation")
.otherwise()
.to("activemq:queue:others");

from("activemq:queue:bar").routeId("barAsync").

log("Drinks");
from("activemq:queue:dessertStation").routeId

("dessertAsync").log("Dessert");
from("activemq:queue:hotMealStation").routeId
from("activemq:queue:coldMealStation").routeId

("coldMealAsync").log("Cold Meals");
from("activemq:queue:others").routeId("othersAsync")

.log("Others");

好了,現在互動已經變得非同步了。這些資料的潛在消費者在準備好時可以訪問它。這是一個鬆耦合的例子,我們試圖在一個被動的架構中實現。其中一項服務不可用將不會阻止其他服務。而且,消費者可以並行地從佇列中縮放和讀取。佇列本身可以擴充套件和分割槽。持久佇列可以將資料儲存在磁碟上,等待處理,即使所有參與者都關閉了。因此,這個系統更容錯。

一個驚人的事實是,CERN使用Apache Camel和ActiveMQ來監視大型強子對撞機(LHC)的系統。還有一個有趣的碩士論文解釋了為這個任務選擇合適的中介軟體解決方案。所以,正如他們在主題演講中所說:“沒有JMS-沒有粒子物理學!”

監控

在前面的例子中,我們建立了兩個服務之間的資料通道。這是架構中一個額外的潛在失敗點,所以我們必須照顧它。我們來看看Apache Camel提供的監視功能。基本上,它通過JMX提供有關其路由的統計資訊。ActiveMQ以相同的方式公開佇列統計資訊。

我們開啟應用程式中的JMX伺服器,使其能夠使用命令列選項執行:


-Dorg.apache.camel.jmx.createRmiConne

ctor=true
-Dorg.apache.camel.jmx.mbeanObjectDoma

inName=org.apache.camel
-Dorg.apache.camel.jmx.rmiConnector.

registryPort=1099
-Dorg.apache.camel.jmx.serviceUrlPath

=camel

現在執行該應用程式,以便該路線已完成其工作。開啟標準jconsole工具並連線到應用程式程序。連線到網址service:jmx:rmi:///jndi/rmi://localhost:1099/camel。轉到MBeans樹中的org.apache.camel域。

我們可以看到,關於路由的一切都在控制之中。我們有正在進行的訊息的數量,錯誤計數和佇列中的訊息計數。這些資訊可以通過流水線連線到一些監視工具集,如Graphana或Kibana。你可以通過實現知名的ELK棧來做到這一點。

還有一個可插拔和可擴充套件的Web控制檯提供了一個使用者介面,用於管理駱駝的ActiveMQ,和更多的人,叫hawt.io。

測試路線

Apache Camel具有相當廣泛的功能,可以用模擬元件編寫測試路由。這是一個強大的工具,但是為了測試而編寫單獨的路由是一個耗時的過程。在生產線上執行測試而不修改管線會更有效率。駱駝有這個功能,可以使用AdviceWith元件來實現。

讓我們在我們的示例中啟用測試邏輯並執行示例測試。


<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.11</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.camel</groupId>
<artifactId>camel-test</artifactId>
<version>2.20.0</version>
<scope>test</scope>
</dependency>

測試類是:


public class AsyncRouteTest extends CamelTestSupport {

@Override
protected RouteBuilder createRouteBuilder() throws Exception {
return new AsyncRouteBuilder();
}

@Before
public void mockEndpoints() throws Exception {
context.getRouteDefinition("main").adviceWith(context, new AdviceWithRouteBuilder() {
@Override
public void configure() throws Exception {
// we substitute all actual queues with mock endpoints
mockEndpointsAndSkip("activemq:queue:bar");
mockEndpointsAndSkip("activemq:queue:dessertStation");
mockEndpointsAndSkip("activemq:queue:hotMealStation");
mockEndpointsAndSkip("activemq:queue:coldMealStation");
mockEndpointsAndSkip("activemq:queue:others");
// and replace the route's source with test endpoint
replaceFromWith("file://testInbox");
}
});
}

@Test
public void testSyncInteraction() throws InterruptedException {
String testJson = "{\"id\": 1, \"order\": [{\"id\": 1, \"name\": \"Americano\", \"type\": \"Drink\", \"qty\": \"1\"}, {\"id\": 2, \"name\": \"French Omelette\", \"type\": \"Hot Meal\", \"qty\": \"1\"}, {\"id\": 3, \"name\": \"Lasagna\", \"type\": \"Hot Meal\", \"qty\": \"1\"}, {\"id\": 4, \"name\": \"Rice Balls\", \"type\": \"Hot Meal\", \"qty\": \"1\"}, {\"id\": 5, \"name\": \"Blueberry Pie\", \"type\": \"Dessert\", \"qty\": \"1\"}]}";
// get mocked endpoint and set an expectation
MockEndpoint mockEndpoint = getMockEndpoint("mock:activemq:queue:hotMealStation");
mockEndpoint.expectedMessageCount(3);
// simulate putting file in the inbox folder
template.sendBodyAndHeader("file://testInbox", testJson, Exchange.FILE_NAME, "test.json");
//checks that expectations were met
assertMockEndpointsSatisfied();
}

}

現在執行與應用程式的測試mvn test。我們可以看到,我們的路線已經成功地通過了測試建議。沒有訊息通過實際的佇列傳遞,測試已經通過。

INFO | Route: main started and consuming from: file://testInbox
<...>
INFO | Incoming File: test.json
<...>
INFO | Asserting: mock://activemq:queue:hotMealStation is satisfied

今天的一個整合問題是應用程式不再是靜態的。在雲基礎架構中,我們同時處理在多個節點上執行的虛擬服務。它使得微服務架構能夠與小型,輕量級服務網路相互作用。這些服務的壽命是不可靠的,我們必須動態地發現它們。

將雲服務合併在一起是Apache Camel可以解決的任務。特別有趣的是,由於EIP的風格和駱駝有足夠的介面卡和支援多種協議的事實。最近的2.18版本添加了ServiceCall元件,該元件引入了呼叫API並通過叢集發現機制解析其地址的功能。目前,它支援Consul,Kubernetes,Ribbon等。可以很容易地找到代理的一些例子,其中ServiceCall用Consul配置。我們將在這裡使用Kubernetes,因為這是我最喜歡的叢集解決方案。

整合架構如下:

該Order服務和Inventory服務將是一個簡單的Spring Boot應用程式返回靜態資料。我們不是繫結在這裡的一個特定的技術堆疊。這些服務正在產生我們想要處理的資料。

訂購服務控制器:

@RestController
public class OrderController {

private final OrderStorage orderStorage;

@Autowired
public OrderController(OrderStorage orderStorage) {
this.orderStorage = orderStorage;
}

@RequestMapping("/info")
public String info() {
return "Order Service UUID = " + OrderApplication.serviceID;
}

@RequestMapping("/orders")
public List<Order> getAll() {
return orderStorage.getAll();
}

@RequestMapping("/orders/{id}")
public Order getOne(@PathVariable Integer id) {
return orderStorage.getOne(id);
}
}

它以如下格式產生資料:

[{"id":1,"items":[2,3,4]},{"id":2,"items":[5,3]}]

該Inventory服務控制器是完全類似Order服務的:

@RestController
public class InventoryController {

private final InventoryStorage inventoryStorage;

@Autowired
public InventoryController(InventoryStorage inventoryStorage) {
this.inventoryStorage = inventoryStorage;
}

@RequestMapping("/info")
public String info() {
return "Inventory Service UUID = " + InventoryApplication.serviceID;
}

@RequestMapping("/items")
public List<InventoryItem> getAll() {
return inventoryStorage.getAll();
}

@RequestMapping("/items/{id}")
public InventoryItem getOne(@PathVariable Integer id) {
return inventoryStorage.getOne(id);
}

}

InventoryStorage是儲存資料的通用儲存庫。在這個例子中,它返回靜態預定義的物件,這些物件被封送到下面的格式。

[{"id":1,"name":"Laptop","description":"Up to 12-hours battery life","price":499.9},{"id":2,"name"

讓我們編寫一個連線它們的閘道器路由,但在這個步驟中沒有ServiceCall:



rest("/orders")
.get("/").description("Get all orders with details").outType(TestResponse.class)
.route()
.setHeader("Content-Type", constant("application

/json"))
.setHeader("Accept", constant("application/json"))
.setHeader(Exchange.HTTP_METHOD, constant("GET"))
.removeHeaders("CamelHttp*")
.to("http4://localhost:8082/orders?bridgeEndpoint

=true")
.unmarshal(formatOrder)
.enrich("direct:enrichFromInventory", new OrderAggregationStrategy())
.to("log:result")
.endRest();

from("direct:enrichFromInventory")
.transform().simple("${null}")
.setHeader("Content-Type", constant("application/

json"))
.setHeader("Accept", constant("application/json"))
.setHeader(Exchange.HTTP_METHOD, constant("GET"))
.removeHeaders("CamelHttp*")
.to("http4://localhost:8081/items?bridgeEndpoint=

true")
.unmarshal(formatInventory);

現在想象一下,每個服務不再是一個特定的例項,而是一個執行一個例項的雲。我們將使用Minikube在本地嘗試Kubernetes叢集。

配置網路路由以在本地檢視Kubernetes節點(



# remove existing routes
sudo route -n delete 10/24 > /dev/null 2>&1
# add routes
sudo route -n add 10.0.0.0/24 $(minikube ip) 
# 172.17.0.0/16 ip range is used by docker

in minikube 
sudo route -n add 172.17.0.0/16 $(minikube ip) 
ifconfig 'bridge100' | grep member | awk '

{print $2}’ 
# use interface name from the output of the

previous command 
# needed for xhyve driver, which I'm using

for testing
sudo ifconfig bridge100 -hostfilter en5

 

給出的示例適用於Mac / Linux環境):

使用Dockerfile配置將服務包裝在Docker容器中,如下所示:


FROM openjdk:8-jdk-alpine
VOLUME /tmp
ADD target/order-srv-1.0-SNAPSHOT.jar app.jar
ADD target/lib lib
ENV JAVA_OPTS=""
ENTRYPOINT exec java $JAVA_OPTS -Djava.security.egd=file:/dev/./urandom

-jar /app.jar

構建並將服務映像推送到Docker登錄檔。現在執行本地Kubernetes叢集中的節點。

Kubernetes.yaml部署配置:

apiVersion: extensions/v1beta1
kind: Deployment
metadata:
name: inventory
spec:
replicas: 3
selector:
matchLabels:
app: inventory
template:
metadata:
labels:
app: inventory
spec:
containers:
- name: inventory
image: inventory-srv:latest
imagePullPolicy: Never
ports:
- containerPort: 8081

將這些部署作為叢集中的服務公開:


kubectl expose deployment order-srv --type=NodePort
kubectl expose deployment inventory-srv --type=NodePort

現在我們可以檢查請求是否由叢集中隨機選擇的節點提供服務。curl -X http://192.168.99.100:30517/info依次執行幾次以訪問minikube NodePort以獲得公開的服務(使用您的主機和埠)。在輸出中,我們看到我們已經實現了請求平衡。

Inventory Service UUID = 22f8ca6b-f56b-4984-927b-cbf9fcf81da5
Inventory Service UUID = b7a4d326-1e76-4051-a0a6-1016394fafda
Inventory Service UUID = b7a4d326-1e76-4051-a0a6-1016394fafda
Inventory Service UUID = 22f8ca6b-f56b-4984-927b-cbf9fcf81da5
Inventory Service UUID = 50323ddb-3ace-4424-820a-6b4e85775af4

新增camel-kubernetes和camel-netty4-http依賴專案的pom.xml。然後將ServiceCall元件配置為使用共享路徑定義中的所有服務呼叫的Kubernetes主節點發現:


KubernetesConfiguration kubernetesConfiguration =

new KubernetesConfiguration();
kubernetesConfiguration.setMasterUrl("https://192

.168.64.2:8443");
kubernetesConfiguration.setClientCertFile("/Users

/antongoncharov/.minikube/client.crt");
kubernetesConfiguration.setClientKeyFile("/Users

/antongoncharov/.minikube/client.key");
kubernetesConfiguration.setNamespace("default”);

ServiceCallConfigurationDefinition config = new

ServiceCallConfigurationDefinition();
config.setServiceDiscovery(new KubernetesClientSe

rviceDiscovery(kubernetesCon

figuration));
context.setServiceCallConfiguration(config);

ServiceCall EIP完善了Spring Boot。大多數選項可以直接在application.properties檔案中配置。

使用ServiceCall元件授權Camel 路由:

rest("/orders")
.get("/").description("Get all orders with details").outType(TestResponse.class)
.route()
.hystrix()
.setHeader("Content-Type", constant("application/json"))
.setHeader("Accept", constant("application/json"))
.setHeader(Exchange.HTTP_METHOD, constant("GET"))
.removeHeaders("CamelHttp*")
.serviceCall("customer-srv","http4:customer-deployment?bridgeEndpoint=true")
.unmarshal(formatOrder)
.enrich("direct:enrichFromInventory", new OrderAggregationStrategy())
.to("log:result")
.endRest();

from("direct:enrichFromInventory")
.transform().simple("${null}")
.setHeader("Content-Type", constant("application/json"))
.setHeader("Accept", constant("application/json"))
.setHeader(Exchange.HTTP_METHOD, constant("GET"))
.removeHeaders("CamelHttp*")
.serviceCall("order-srv","http4:order-srv?bridgeEndpoint=true")
.unmarshal(formatInventory);

我們還啟動了路線中的斷路器。這是一個整合掛鉤,允許在傳送錯誤或收件人不可用的情況下暫停遠端系統呼叫。這旨在避免級聯絡統故障。Hystrix元件通過實現斷路器模式來幫助實現這一點。

讓我們執行它併發送測試請求; 我們會得到這兩個服務聚合的響應。

[{"id":1,"items":[{"id":2,"name":"Monitor","description":"27-inch, response time: 7ms","price":200

結果如預期。

其他用例

我展示了Apache Camel如何在一個叢集中整合微服務。這個框架的其他用途是什麼?一般來說,在基於規則的路由可能是解決方案的任何地方都是有用的。例如,Apache Camel可以成為Eclipse Kura介面卡的物聯網中介軟體。它可以處理來自各種元件和服務的日誌訊號的監視,就像在CERN系統中一樣。它也可以是企業級SOA的整合框架,也可以是批量資料處理的管道,雖然它在這方面與Apache Spark沒有很好的競爭。

結論

你可以看到系統整合不是一個簡單的過程。我們很幸運,因為收集了很多經驗。正確應用它來構建靈活和容錯的解決方案非常重要。

為了確保正確的應用,我建議有一個重要的整合方面的清單。必須具備的專案包括:

1.是否有單獨的整合層?

2.是否有整合測試?

3.我們知道預期的峰值資料強度嗎?

4.我們是否知道預期的資料交付時間?

5.訊息相關性是否重要?如果序列中斷?

6.我們應該以同步還是非同步的方式來做?

7.格式和路由規則更頻繁地變化在哪裡?

8.我們有辦法監督這個過程嗎?

在本文中,我們嘗試了Apache Camel,這是一個輕量級整合框架,可幫助您在解決整合問題時節省時間和精力。正如我們所展示的,它可以作為一個工具,支援相關的微服務體系結構,全面負責微服務之間的資料交換。

from: http://www.uml.org.cn/zjjs/201801181.asp