Spring Cloud資料流中的組合函式支援
Spring Cloud Stream最近添加了一項Function ,可將函式定義組合到現有的Spring Cloud Stream應用程式中。在本部落格中,我們將看到Spring Cloud Data Flow如何利用此功能在Streaming管道中組合函式。
它有什麼不同?
在Spring Cloud Data Flow中,流資料管道由Spring Cloud Stream應用程式組成,開發人員可以選擇開箱即用的流應用程式,其中包含許多常見用例。開發人員還可以使用Spring Cloud Stream框架擴充套件這些開箱即用的應用程式或建立自定義應用程式。
Spring Cloud Stream 2.1.0 GA 已經集成了一個基於Spring Cloud Function -based程式設計模型,可以使用java.util.Function,一個java.util.Consumer,和一個java.util.Supplier表示業務邏輯,其相應的對應的角色是Spring Cloud Stream中的Processor,Sink和Source。
鑑於這種兩者結合對映的靈活性,Spring Cloud Stream框架現在支援一種簡單但功能強大的函式組合方法。這種函式組合可以是源Source和處理器Processor組合成一個單個應用程式:“新源Source”;或者,它可能是處理器Processor+接收器Sink組合到一個新的應用程式中:“新的Sink”。這種靈活性為流應用程式開發人員開闢了有趣的新方式。
讓我們看看如何通過三個應用程式建立管道來執行簡單轉換,然後使用兩個使用函式組合的應用程式來了解如何將其實現為管道。
Streaming Pipeline有三個應用程式
對於第一個流,
我們將使用開箱即用的http-source,transform-processor和log-sink的三個應用程式。
首先,啟動Spring Cloud Data Flow local伺服器:
java -jar spring-cloud-dataflow-server-local-1.7.3.RELEASE.jar
然後,啟動Spring Cloud資料流shell:
java -jar spring-cloud-dataflow-shell-1.7.3.RELEASE.jar
現在讓我們使用RabbitMQ繫結器(或Kafka繫結器)分別註冊HTTP源source,變換器處理器processor和日誌接收器sink 作為應用程式:
dataflow:>app register --name http --type source --uri https:<font><i>//repo.spring.io/milestone/org/springframework/cloud/stream/app/http-source-rabbit/2.1.0.M2/http-source-rabbit-2.1.0.M2.jar</i></font><font> </font>
註冊處理器:
dataflow:>app register --name transformer --type processor --uri https:<font><i>//repo.spring.io/milestone/org/springframework/cloud/stream/app/transform-processor-rabbit/2.1.0.M2/transform-processor-rabbit-2.1.0.M2.jar</i></font><font> </font>
註冊接收器sink:
dataflow:>app register --name log --type sink --uri https:<font><i>//repo.spring.io/milestone/org/springframework/cloud/stream/app/log-sink-rabbit/2.1.0.M2/log-sink-rabbit-2.1.0.M2.jar</i></font><font> </font>
現在我們可以建立一個沒有函式組合的簡單流:
dataflow:>stream create hello --definition <font>"http --server.port=9000 | transformer --expression=(\"Hello \"+payload.toString().toUpperCase()) | log"</font><font> </font>
然後我們可以部署流:
dataflow:>stream deploy hello --properties <font>"deployer.*.local.inheritLogging=true"</font><font> dataflow:>http post --data </font><font>"friend"</font><font> --target </font><font>"http://localhost:9000"</font><font> POST (text/plain) http:</font><font><i>//localhost:9000 friend</i></font><font> 202 ACCEPTED </font>
您可以在log應用程式中看到以下日誌訊息:
[sformer.hello-1] log-sink: Hello FRIEND
在此流中,我們將http(Source),轉換器(Processor)和日誌(Sink)這個應用程式部署為目標平臺中的獨立的應用程式(在本例中,它是local)。對於某些用例,對於這種簡單的有效負載轉換邏輯,我們可能希望將Processor應用程式與Source或Sink應用程式結合使用。例如,在源輸出資料中遮蔽某些特定使用者特定欄位的轉換方案不一定需要部署為單獨的獨立應用程式。相反,它可以在Source或Sink應用程式中組合。
為了將Processor函式組合到源或接收器應用程式中,我們使用Spring Cloud Stream的函式組合支援。
Cloud Stream的函式組合
在Spring Cloud Stream中的函式組合的支援是基於Spring Cloud 函式的,讓java.util.Supplier,java.util.Consumer以及java.util.Function註冊作為春季@Bean的定義。這些函式@Bean定義可在執行時用於組合。
Spring Cloud Stream引入了一個名為的新屬性,spring.cloud.stream.function.definition它對應於Spring Cloud Function中的函式定義DSL。設定此屬性後,在執行時將自動連結所需的函式@bean。
函式組合以下列方式發生:
- 當Spring Cloud Stream應用程式是Source型別時,在源Source之後作為output應用組合函式。
- 當Spring Cloud Stream應用程式是Sink型別時,組合函式應用在接收器sink之前作為input。
這使得能夠將函式(在Spring Cloud Function DSL中定義)組合到現有的Spring Cloud Stream應用程式中,然後由Spring Cloud Data Flow在流資料管道中進行編排。
函式組合案例
讓我們建立並部署一個流,該流將前一個示例的變換器表示式組合進入Source應用程式本身。變換器邏輯通過使用兩個java.util.Function實現來完成。
我們將建立一個新的源應用程式,我們將其稱為http-transformer擴充套件開箱即用的http源應用程式。可以在此處 找到新源應用程式的原始碼。
該http-transformer應用程式包含upper和concat函式bean,定義如下:
@SpringBootApplication @Import(org.springframework.cloud.stream.app.http.source.HttpSourceConfiguration.<b>class</b>) <b>public</b> <b>class</b> HttpSourceRabbitApplication { @Bean <b>public</b> Function<String, String> upper() { <b>return</b> value -> value.toUpperCase(); } @Bean <b>public</b> Function<String, String> concat() { <b>return</b> value -> <font>"Hello "</font><font>+ value; } <b>public</b> <b>static</b> <b>void</b> main(String[] args) { SpringApplication.run(HttpSourceRabbitApplication.<b>class</b>, args); } } </font>
clone githubrepo後 ,您可以使用maven構建應用程式:
cd function-composition/http-transformer
./mvnw clean package
現在http-transformer使用Data Flow Shell 註冊應用程式。
注意:對於以下app註冊--uri選項,請使用適合您系統的值替換工件的目錄名稱和路徑。
dataflow:>app register --name http-transformer --type source --uri file:<font><i>///Users/igopinathan/dev/git/ilayaperumalg/sandbox/function-composition/http-transformer/target/http-transformer-2.1.0.BUILD-SNAPSHOT.jar</i></font><font> </font>
現在讓我們建立一個流:
dataflow:>stream create helloComposed --definition <font>"http-transformer --server.port=9001 | log"</font><font> </font>
在部署流時,我們傳遞spring.cloud.stream.function.definition屬性以定義組合函式DSL(在Spring Cloud Function中定義)。在這種情況下,它是:
dataflow:>stream deploy helloComposed --properties <font>"app.http-transformer.spring.cloud.stream.function.definition=upper|concat,deployer.*.local.inheritLogging=true"</font><font> </font>
上面的部署將upper和concat函式bean組合到http源應用程式中。
然後我們可以將負載(測試內容)傳送到http應用程式:
dataflow:>http post --data <font>"friend"</font><font> --target </font><font>"http://localhost:9001"</font><font> > POST (text/plain) http:</font><font><i>//localhost:9001 friend</i></font><font> > 202 ACCEPTED </font>
然後你可以在log應用程式中看到輸出:
[helloComposed-1] log-sink: Hello FRIEND
請注意,函式組合支援不適用於開箱即用的Spring Cloud Stream Processor應用程式,因為在現有處理器的應用程式邏輯之前或之後是否需要應用該功能存在不確定性。
但是,您可以使用標準java.util.Function API建立自己的處理器應用程式,使用函式組合,如以下示例所示:
@Configuration <b>public</b> <b>static</b> <b>class</b> FunctionProcessorConfiguration { @Bean <b>public</b> Function<String, String> upperAndConcat() { <b>return</b> upper().andThen(concat()); } @Bean <b>public</b> Function<String, String> upper() { <b>return</b> value -> value.toUpperCase(); } @Bean <b>public</b> Function<String, String> concat() { <b>return</b> value -> <font>"Hello "</font><font>+ value; } } </font>
然後,您需要使用以下屬性進行部署:
spring.cloud.stream.function.definition=upperAndConcat
Kotlin支援
另一個有趣的特性是Spring Cloud Function支援Kotlin函式的功能組合。這允許我們將任何Kotlin函式bean新增到組合函式Source或Sink應用程式中。
要看到這個工作,讓我們使用http-transformer-kotlin-processor我們的示例github儲存庫中 的應用程式。
Kotlin函式bean配置為處理器。這裡,Kotlin函式bean是transform如下定義的函式:
@Bean open fun transform(): (String) -> String { <b>return</b> { <font>"How are you "</font><font>.plus(it) } } </font>
此外,該專案還具有spring-cloud-function-kotlin依賴性,可以對Kotlin函式應用函式配置支援,定義如下:
<dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-function-kotlin</artifactId> <version>2.0.0.RELEASE</version> </dependency>
cd function-composition/http-transformer-kotlin
./mvnw clean package
對於以下app註冊--uri選項,請使用適合您系統的值替換工件的目錄名稱和路徑:
dataflow:>app register --name http-transformer-kotlin --type source --uri file:<font><i>///Users/igopinathan/dev/git/ilayaperumalg/sandbox/function-composition/http-transformer-kotlin/target/http-transformer-kotlin-2.1.0.BUILD-SNAPSHOT.jar</i></font><font> </font>
要使用此應用程式建立流,請執行以下操作Source:
dataflow:>stream create helloComposedKotlin --definition <font>"http-transformer-kotlin --server.port=9002 | log"</font><font> </font>
正如我們在http-transformer示例中所做的那樣,我們可以使用該spring.cloud.stream.function.definition屬性來指定任何有效的組合函式DSL來建構函式組合。在這種情況下,讓我們將通過Java配置註冊的函式bean與來自Kotlin處理器配置的函式bean結合起來。
dataflow:>stream deploy helloComposedKotlin --properties <font>"app.http-transformer-kotlin.spring.cloud.stream.function.definition=upper|transform|concat,deployer.*.local.inheritLogging=true"</font><font> </font>
這裡,函式名transform對應於Kotlin函式。
注意:我們可以在Kotlin函式和Java函式之間執行組合,因為Kotlin函式在內部轉換為java.util.Function。
dataflow:>http post --data <font>"friend"</font><font> --target </font><font>"http://localhost:9002"</font><font> > POST (text/plain) http:</font><font><i>//localhost:9002 friend</i></font><font> > 202 ACCEPTED </font>
並且,您可以在log應用程式中看到輸出為:
[omposedKotlin-1] log-sink : Hello How are you FRIEND
在此示例中,http-transformer還包含函式的原始碼。但是,您可以通過在單獨的工件中定義函式bean來使應用程式更加模組化。然後,您可以通過僅向專案新增maven依賴項並設定spring.cloud.stream.function.definition屬性來構建應用程式。通過這種方式,您可以將大部分業務邏輯編碼為函式,並且如果需要,可以使用Source或Sink組合它。