Apache Flink 流處理例項
維基百科在 IRC 頻道上記錄 Wiki 被修改的日誌,我們可以通過監聽這個 IRC 頻道,來實時監控給定時間視窗內的修改事件。Apache Flink 作為流計算引擎,非常適合處理流資料,並且,類似於 Hadoop MapReduce 等框架,Flink 提供了非常良好的抽象,使得業務邏輯程式碼編寫非常簡單。我們通過這個簡單的例子來感受一下 Flink 的程式的編寫。
通過 Flink Quickstart 構建 Maven 工程
Flink 提供了 flink-quickstart-java
和 flink-quickstart-scala
外掛,允許使用 Maven 的開發者建立統一的專案模版,應用專案模板可以規避掉很多部署上的坑。
構建這次工程的命令如下
$ mvn archetype:generate \
-DarchetypeGroupId=org.apache.flink \
-DarchetypeArtifactId=flink-quickstart-java \
-DarchetypeCatalog=https://repository.apache.org/content/repositories/snapshots/ \
-DarchetypeVersion=1.6-SNAPSHOT \
-DgroupId=wiki-edits \
-DartifactId=wiki-edits \
-Dversion=0.1 \
-Dpackage=wikiedits \
-DinteractiveMode=false
注意高版本的 Maven 不支援 -DarchetypeCatalog
引數,可以將第一行改為 mvn org.apache.maven.plugins:maven-archetype-plugin:2.4::generate \
或者去掉 -DarchetypeCatalog
行,並將 .m2/settings.xml
修改如下,其中主要是在 //profiles/profile/repositories
下設定好搜尋 archetype
的倉庫地址
<settings xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/settings-1.0.0.xsd">
<profiles>
<profile>
<id>acme</id>
<repositories>
<repository>
<id>archetype</id>
<name>Apache Development Snapshot Repository</name>
<url>https://repository.apache.org/content/repositories/snapshots/</url>
<releases>
<enabled>false</enabled>
</releases>
<snapshots>
<enabled>true</enabled>
</snapshots>
</repository>
</repositories>
</profile>
</profiles>
<activeProfiles>
<activeProfile>acme</activeProfile>
</activeProfiles>
</settings>
成功下載專案模板後,在當前目錄下應當能看到 wiki-edit
目錄。執行命令 rm wiki-edits/src/main/java/wikiedits/*.java
清除模板自帶的 Java 檔案。
為了監聽維基百科的 IRC 頻道,在 pom.xml
檔案下新增如下依賴,分別是 Flink 的客戶端和 WikiEdit 的聯結器
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-wikiedits_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
編寫 Flink 程式
接下來的程式碼編寫工作假定你是在 IDE 下編寫的,主要是為了避免囉嗦的 import
語句。包含 import
等模板程式碼的全部程式碼在末尾給出。
首先我們建立用於執行的主程式程式碼 src/main/java/wikiedits/WikipediaAnalysis.java
package wikiedits;
public class WikipediaAnalysis {
publicstaticvoidmain(String[] args) throws Exception {
}
}
流處理的 Flink 程式的第一步是建立流處理執行上下文 StreamExecutionEnvironment
,它類似於其他框架內的 Configuration 類,用於配製 Flink 程式和執行時的各個引數,對應的語句如下
StreamExecutionEnvironment see = StreamExecutionEnvironment.getExecutionEnvironment();
下一步我們以維基百科 IRC 頻道的日誌作為資料來源建立連線
DataStream<WikipediaEditEvent> edits = see.addSource(new WikipediaEditsSource());
這個語句建立了填充 WikipediaEditEvent
的 DataStream
,拿到資料流之後我們就可以對它做進一步的操作了。
我們的目標是統計給定時間視窗內,比如說五秒內,使用者對維基百科的修改位元組數。因此我們對每個 WikipediaEditEvent
以使用者名稱作為鍵來標記(keyed)。Flink 相容 Java 1.6 版本,因此古老的版本中 Flink 提供 KeySelector
函式式介面來標記
KeyedStream<WikipediaEditEvent, String> keyedEdits = edits
.keyBy(new KeySelector<WikipediaEditEvent, String>() {
@Override
public String getKey(WikipediaEditEvent event) {
return event.getUser();
}
});
當前版本的 Flink 主要支援的是 Java 8 版本,因此我們也可以用 Lambda 表示式來改寫這段較為繁瑣的程式碼
KeyedStream<WikipediaEditEvent, String> keyedEdits = edits
.keyBy(WikipediaEditEvent::getUser);
這個語句定義了 keyedEdits
變數,它是一個概念上形如(String, WikipediaEditEvent)
的資料流,即以字串(使用者名稱)為鍵,WikipediaEditEvent
為值的資料的流。這一步驟類似於 MapReduce 的 Shuffle 過程,針對 keyedEdits
的處理將自動按照鍵分組,因此我們可以直接對資料進行 fold
操作以摺疊聚合同一使用者名稱的修改位元組數
DataStream<Tuple2<String, Long>> result = keyedEdits
.timeWindow(Time.seconds(5))
.fold(new Tuple2<>("", 0L), new FoldFunction<WikipediaEditEvent, Tuple2<String, Long>>() {
@Override
public Tuple2<String, Long> fold(Tuple2<String, Long> acc, WikipediaEditEvent event) {
acc.f0 = event.getUser();
acc.f1 += event.getByteDiff();
return acc;
}
});
在新版的 Flink 中,FoldFunction
因為無法支援部分聚合被廢棄了,如果對程式有強迫症,我們可以採用類似於 MapReduce 的辦法來改寫上邊的程式碼,各個方法呼叫的作用與它們的名字一致,其中,為了繞過型別擦除導致的問題使用了 returns
函式
DataStream<Tuple2<String, Long>> result = keyedEdits
.map((event) -> new Tuple2<>(event.getUser(), Long.valueOf(event.getByteDiff())))
.returns(new TypeHint<Tuple2<String, Long>>(){})
.timeWindowAll(Time.seconds(5))
.reduce((acc, a) -> new Tuple2<>(a.f0, acc.f1+a.f1));
經過處理後的資料流 result
中就包含了我們所需要的資訊,具體地說是填充了 Tuple2<String, Long>
,即(使用者名稱,修改位元組數)元組的流,我們可以使用 result.print()
來列印它。
程式至此主要處理邏輯就寫完了,但是 Flink 還需要在 StreamExecutionEnvironment
型別的變數上呼叫 execute
方法以實際執行整個 Flink 程式,該方法執行時將整個 Flink 程式轉化為任務圖並提交到 Flink 叢集中。
整個程式的程式碼,包括模板程式碼,如下所示
package wikiedits;
import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.connectors.wikiedits.WikipediaEditEvent;
import org.apache.flink.streaming.connectors.wikiedits.WikipediaEditsSource;
import org.apache.flink.api.java.tuple.Tuple2;
public class WikipediaAnalysis {
publicstaticvoidmain(String[] args) throws Exception {
StreamExecutionEnvironment see = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<WikipediaEditEvent> edits = see.addSource(new WikipediaEditsSource());
KeyedStream<WikipediaEditEvent, String> keyedEdits = edits
.keyBy(WikipediaEditEvent::getUser);
DataStream<Tuple2<String, Long>> result = keyedEdits
.map((event) -> new Tuple2<>(event.getUser(), Long.valueOf(event.getByteDiff())))
.returns(new TypeHint<Tuple2<String, Long>>(){})
.timeWindowAll(Time.seconds(5))
.reduce((acc, a) -> new Tuple2<>(a.f0, acc.f1+a.f1));
result.print();
see.execute();
}
}
可以通過 IDE 執行程式,在控制檯看到類似下面格式的輸出,每一行前面的數字代表了這是由 print
的並行例項中的編號為幾的例項執行的結果
1> (LilHelpa,1966)
2> (1.70.80.5,2066)
3> (Beyond My Ken,-6550)
4> (Aleksandr Grigoryev,725)
1> (6.77.155.31,1943)
2> (Serols,1639)
3> (ClueBot NG,1907)
4> (GSS,3155)