1. 程式人生 > >Apache Flink 流處理例項

Apache Flink 流處理例項

維基百科在 IRC 頻道上記錄 Wiki 被修改的日誌,我們可以通過監聽這個 IRC 頻道,來實時監控給定時間視窗內的修改事件。Apache Flink 作為流計算引擎,非常適合處理流資料,並且,類似於 Hadoop MapReduce 等框架,Flink 提供了非常良好的抽象,使得業務邏輯程式碼編寫非常簡單。我們通過這個簡單的例子來感受一下 Flink 的程式的編寫。

通過 Flink Quickstart 構建 Maven 工程

Flink 提供了 flink-quickstart-java 和 flink-quickstart-scala 外掛,允許使用 Maven 的開發者建立統一的專案模版,應用專案模板可以規避掉很多部署上的坑。

構建這次工程的命令如下

$ mvn archetype:generate \
    -DarchetypeGroupId=org.apache.flink \
    -DarchetypeArtifactId=flink-quickstart-java \
    -DarchetypeCatalog=https://repository.apache.org/content/repositories/snapshots/ \
    -DarchetypeVersion=1.6-SNAPSHOT \
    -DgroupId=wiki-edits \
    -DartifactId=wiki-edits \
    -Dversion=0.1
\ -Dpackage=wikiedits \ -DinteractiveMode=false

注意高版本的 Maven 不支援 -DarchetypeCatalog 引數,可以將第一行改為  mvn org.apache.maven.plugins:maven-archetype-plugin:2.4::generate \ 或者去掉 -DarchetypeCatalog 行,並將 .m2/settings.xml 修改如下,其中主要是在 //profiles/profile/repositories 下設定好搜尋 archetype 的倉庫地址

<settings xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/settings-1.0.0.xsd"> <profiles> <profile> <id>acme</id> <repositories> <repository> <id>archetype</id> <name>Apache Development Snapshot Repository</name> <url>https://repository.apache.org/content/repositories/snapshots/</url> <releases> <enabled>false</enabled> </releases> <snapshots> <enabled>true</enabled> </snapshots> </repository> </repositories> </profile> </profiles> <activeProfiles> <activeProfile>acme</activeProfile> </activeProfiles> </settings>

成功下載專案模板後,在當前目錄下應當能看到 wiki-edit 目錄。執行命令 rm wiki-edits/src/main/java/wikiedits/*.java 清除模板自帶的 Java 檔案。

為了監聽維基百科的 IRC 頻道,在 pom.xml 檔案下新增如下依賴,分別是 Flink 的客戶端和 WikiEdit 的聯結器

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-wikiedits_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>

編寫 Flink 程式

接下來的程式碼編寫工作假定你是在 IDE 下編寫的,主要是為了避免囉嗦的 import 語句。包含 import 等模板程式碼的全部程式碼在末尾給出。

首先我們建立用於執行的主程式程式碼 src/main/java/wikiedits/WikipediaAnalysis.java

package wikiedits;

public class WikipediaAnalysis {
    publicstaticvoidmain(String[] args) throws Exception {

    }
}

流處理的 Flink 程式的第一步是建立流處理執行上下文 StreamExecutionEnvironment,它類似於其他框架內的 Configuration 類,用於配製 Flink 程式和執行時的各個引數,對應的語句如下

StreamExecutionEnvironment see = StreamExecutionEnvironment.getExecutionEnvironment();

下一步我們以維基百科 IRC 頻道的日誌作為資料來源建立連線

DataStream<WikipediaEditEvent> edits = see.addSource(new WikipediaEditsSource());

這個語句建立了填充 WikipediaEditEvent 的 DataStream,拿到資料流之後我們就可以對它做進一步的操作了。

我們的目標是統計給定時間視窗內,比如說五秒內,使用者對維基百科的修改位元組數。因此我們對每個 WikipediaEditEvent 以使用者名稱作為鍵來標記(keyed)。Flink 相容 Java 1.6 版本,因此古老的版本中 Flink 提供 KeySelector 函式式介面來標記

KeyedStream<WikipediaEditEvent, String> keyedEdits = edits
    .keyBy(new KeySelector<WikipediaEditEvent, String>() {
        @Override
        public String getKey(WikipediaEditEvent event) {
            return event.getUser();
        }
    });

當前版本的 Flink 主要支援的是 Java 8 版本,因此我們也可以用 Lambda 表示式來改寫這段較為繁瑣的程式碼

KeyedStream<WikipediaEditEvent, String> keyedEdits = edits
        .keyBy(WikipediaEditEvent::getUser);

這個語句定義了 keyedEdits 變數,它是一個概念上形如(String, WikipediaEditEvent) 的資料流,即以字串(使用者名稱)為鍵,WikipediaEditEvent 為值的資料的流。這一步驟類似於 MapReduce 的 Shuffle 過程,針對 keyedEdits 的處理將自動按照鍵分組,因此我們可以直接對資料進行 fold 操作以摺疊聚合同一使用者名稱的修改位元組數

DataStream<Tuple2<String, Long>> result = keyedEdits
    .timeWindow(Time.seconds(5))
    .fold(new Tuple2<>("", 0L), new FoldFunction<WikipediaEditEvent, Tuple2<String, Long>>() {
        @Override
        public Tuple2<String, Long> fold(Tuple2<String, Long> acc, WikipediaEditEvent event) {
            acc.f0 = event.getUser();
            acc.f1 += event.getByteDiff();
            return acc;
        }
    });

在新版的 Flink 中,FoldFunction 因為無法支援部分聚合被廢棄了,如果對程式有強迫症,我們可以採用類似於 MapReduce 的辦法來改寫上邊的程式碼,各個方法呼叫的作用與它們的名字一致,其中,為了繞過型別擦除導致的問題使用了 returns 函式

DataStream<Tuple2<String, Long>> result = keyedEdits
        .map((event) -> new Tuple2<>(event.getUser(), Long.valueOf(event.getByteDiff())))
        .returns(new TypeHint<Tuple2<String, Long>>(){})
        .timeWindowAll(Time.seconds(5))
        .reduce((acc, a) -> new Tuple2<>(a.f0, acc.f1+a.f1));

經過處理後的資料流 result 中就包含了我們所需要的資訊,具體地說是填充了 Tuple2<String, Long>,即(使用者名稱,修改位元組數)元組的流,我們可以使用 result.print() 來列印它。

程式至此主要處理邏輯就寫完了,但是 Flink 還需要在 StreamExecutionEnvironment 型別的變數上呼叫 execute 方法以實際執行整個 Flink 程式,該方法執行時將整個 Flink 程式轉化為任務圖並提交到 Flink 叢集中。

整個程式的程式碼,包括模板程式碼,如下所示

package wikiedits;

import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.connectors.wikiedits.WikipediaEditEvent;
import org.apache.flink.streaming.connectors.wikiedits.WikipediaEditsSource;
import org.apache.flink.api.java.tuple.Tuple2;

public class WikipediaAnalysis {
    publicstaticvoidmain(String[] args) throws Exception {
        StreamExecutionEnvironment see = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStream<WikipediaEditEvent> edits = see.addSource(new WikipediaEditsSource());
        KeyedStream<WikipediaEditEvent, String> keyedEdits = edits
                .keyBy(WikipediaEditEvent::getUser);
        DataStream<Tuple2<String, Long>> result = keyedEdits
                .map((event) -> new Tuple2<>(event.getUser(), Long.valueOf(event.getByteDiff())))
                .returns(new TypeHint<Tuple2<String, Long>>(){})
                .timeWindowAll(Time.seconds(5))
                .reduce((acc, a) -> new Tuple2<>(a.f0, acc.f1+a.f1));
        result.print();
        see.execute();
    }
}

可以通過 IDE 執行程式,在控制檯看到類似下面格式的輸出,每一行前面的數字代表了這是由 print 的並行例項中的編號為幾的例項執行的結果

1> (LilHelpa,1966)
2> (1.70.80.5,2066)
3> (Beyond My Ken,-6550)
4> (Aleksandr Grigoryev,725)
1> (6.77.155.31,1943)
2> (Serols,1639)
3> (ClueBot NG,1907)
4> (GSS,3155)