1. 程式人生 > >《Flink官方文件》監控Wikipedia 編輯流

《Flink官方文件》監控Wikipedia 編輯流

原文連結 譯者:ivansong

在本指南中,我們會從頭開始,從從建立一個Flink專案到在一個Flink叢集上執行一個流分析程式。

Wikipedia 提供了一個記錄所有wiki的編輯的IRC通道。我們將會接入這個通道,計算每個使用者在給定的時間視窗上編輯的位元組數。用Flink能足夠簡單地在短時間內實現,但是給了你一個建立更復雜的分析程式的好的基礎。

構建一個maven專案

我們將會用一個Flink Maven Archetype來建立我們的專案結構。關於這個請看Java API Quickstart章節來獲取更多的細節。為了我們的目的,在命令視窗中執行如下:

$ mvn archetype:generate \
-DarchetypeGroupId=org.apache.flink \ -DarchetypeArtifactId=flink-quickstart-java \ -DarchetypeVersion=1.2.0 \ -DgroupId=wiki-edits \ -DartifactId=wiki-edits \ -Dversion=0.1 \ -Dpackage=wikiedits \ -DinteractiveMode=false

如果你原意,你可以編輯groupId,artifactId和 package

 。用上面的引數,Maven會建立一個如下的專案結構:

$ tree wiki-edits
wiki-edits/
├── pom.xml
└── src
    └── main
        ├── java
        │   └── wikiedits
        │       ├── BatchJob.java
        │       ├── SocketTextStreamWordCount.java
        │       ├── StreamingJob.java
        │       └── WordCount.java
        └── resources
            └── log4j.properties

在根目錄的pom.xml已經添加了 Flink的依賴,同時在src/main/java目錄中用幾個Flink程式的示例。我們要從頭開始,所以可以刪除這些示例程式:

$ rm wiki-edits/src/main/java/wikiedits/*.java

最後一步,我們為了能在程式中使用Wikipedia需要新增Flink Wikipedia聯結器的依賴。編輯pom.xml中依賴的部分,如下:

<dependencies>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-java</artifactId>
        <version>${flink.version}</version>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-streaming-java_2.11</artifactId>
        <version>${flink.version}</version>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-clients_2.11</artifactId>
        <version>${flink.version}</version>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-connector-wikiedits_2.11</artifactId>
        <version>${flink.version}</version>
    </dependency>
</dependencies>

注意新增的這個flink-connector-wikiedits_2.11依賴(這個示例和Wikipedia 聯結器是受Apache Samza的Hello Samza示例啟發

下面是編碼時間。啟動你最喜歡的IDE並引入這個maven專案或者開啟一個文字編輯器,建立檔案src/main/java/wikiedits/WikipediaAnalysis.java:

package wikiedits;
public class WikipediaAnalysis {
  public static void main(String[] args) throws Exception {
  }
}

這個程式非常簡單,但接下來我們會填充它。注意的是我不會在這裡引入一些宣告,因為IDE能自動引入。如果你簡單地想跳過前面在你的編輯器裡直接輸入,在這塊兒內容的結束的地方,我會展示有引入的宣告的完整的程式碼。
一個Flink程式的第一步是建立一個StreamExecutionEnvironment物件(如果你寫的是一個批處理任務那就是ExecutionEnvironment物件)。這個物件可以用來設定執行引數,為從外部系統讀取建立資源。因此,我們新增這個到main方法中:

   StreamExecutionEnvironme see = StreamExecutionEnvironment.getExecutionEnvironment();

接下來我們會建立一個讀取 Wikipedia IRC 日誌的資源:

DataStream<WikipediaEditEvent> edits = see.addSource(new WikipediaEditsSource());

這行程式碼建立了一個我們能進一步執行的WikipediaEditEvent元素DateStream物件。為了本示例的目的,假設五秒鐘,我們監控這段時間內在特定視窗中使用者產生的新增或者刪除的位元組數。為此,我們首先應該指定此使用者名稱上的我們想要的輸入流,意思是在這個流的操作上我們應該繫結使用者名稱。在我們的示例中,視窗中編輯的位元組總數應該是每一個唯一的使用者。對於鍵入的流我們應該提供一個KeySelector,如下:

KeyedStream<WikipediaEditEvent, String> keyedEdits = edits
    .keyBy(new KeySelector<WikipediaEditEvent, String>() {
        @Override
        public String getKey(WikipediaEditEvent event) {
            return event.getUser();
        }
    });

這段程式碼給我們一個以String作為key即使用者名稱作為key的WikipediaEditEvent 的Stream。現在我們可以指定想要的加在這個l流 上的視窗了,並且可以統計一個基於在這些視窗上的元素的結果。一個視窗指定了一個在其上執行運算的Steam的一個片段。在計算一個無窮流的元素的聚合時,視窗是很有必要的。在示例中,我們會假設我們想要聚合五秒中編輯的位元組總數:

DataStream<Tuple2<String, Long>> result = keyedEdits
    .timeWindow(Time.seconds(5))
    .fold(new Tuple2<>("", 0L), new FoldFunction<WikipediaEditEvent, Tuple2<String, Long>>() {
        @Override
        public Tuple2<String, Long> fold(Tuple2<String, Long> acc, WikipediaEditEvent event) {
            acc.f0 = event.getUser();
            acc.f1 += event.getByteDiff();
            return acc;
        }
    });

第一個呼叫,.timeWindow(),指定了我們想要有五秒鐘的滾動(非重疊)視窗。第二個呼叫為每個唯一鍵在每個視窗片段上指定了一個Fold 轉換。在例子中,我們從一個初始值為(“”, 0L)開始,為每個使用者新增在該時間內每次編輯的位元組碼差異。結果流現在包含了一個每個使用者五秒中產生的Tuple2<String, Long>。

下面僅僅要做的是將這個流輸出到控制檯,並開始執行:

result.print();

see.execute();

最後一個呼叫對啟動一個真實的Flink任務是必要的。所有的操作,如建立資源,轉換,聚合,這些僅是建立了內部操作的圖。僅僅是當呼叫execute()方法的時候,這個操作圖才會被扔到一個叢集中執行或者在你本機執行。

下面是完整的程式碼:

package wikiedits;

import org.apache.flink.api.common.functions.FoldFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.connectors.wikiedits.WikipediaEditEvent;
import org.apache.flink.streaming.connectors.wikiedits.WikipediaEditsSource;

public class WikipediaAnalysis {

  public static void main(String[] args) throws Exception {

    StreamExecutionEnvironment see = StreamExecutionEnvironment.getExecutionEnvironment();

    DataStream<WikipediaEditEvent> edits = see.addSource(new WikipediaEditsSource());

    KeyedStream<WikipediaEditEvent, String> keyedEdits = edits
      .keyBy(new KeySelector<WikipediaEditEvent, String>() {
        @Override
        public String getKey(WikipediaEditEvent event) {
          return event.getUser();
        }
      });

    DataStream<Tuple2<String, Long>> result = keyedEdits
      .timeWindow(Time.seconds(5))
      .fold(new Tuple2<>("", 0L), new FoldFunction<WikipediaEditEvent, Tuple2<String, Long>>() {
        @Override
        public Tuple2<String, Long> fold(Tuple2<String, Long> acc, WikipediaEditEvent event) {
          acc.f0 = event.getUser();
          acc.f1 += event.getByteDiff();
          return acc;
        }
      });

    result.print();

    see.execute();
  }
}

你可以用maven在你的IDE或者命令列中執行這個示例:

$ mvn clean package
$ mvn exec:java -Dexec.mainClass=wikiedits.WikipediaAnalysis

第一個命令是構建我們的專案,第二個命令是執行我們的主要類。輸出的內容跟下面相似:

1> (Fenix down,114)
6> (AnomieBOT,155)
8> (BD2412bot,-3690)
7> (IgnorantArmies,49)
3> (Ckh3111,69)
5> (Slade360,0)
7> (Narutolovehinata5,2195)
6> (Vuyisa2001,79)
4> (Ms Sarah Welch,269)
4> (KasparBot,-245)

每行前面的數字告訴你每個並行的例項產生的輸出。

本示例應該能讓你開始寫自己的Flink程式。為了學習更多,你可以開啟我們的基本概念DataStream API的指南。如果你想學習如何構建一個Flink叢集在自己機器上並將結果寫入Kafka,請看接下來的激勵練習。

激勵練習:在一個Flink叢集上執行,並將結果寫入Kafka

請按照我們的快速開始裡面的內容來在你的機器上構建一個Flink分散式,再參考Kafka的快速開始來安裝Kafka,然後我們繼續。

第一步,我們為了能使用Kafka聯結器,需要新增Flink Kafka聯結器的依賴。將這個新增的pom.xml檔案的依賴模組中:

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-connector-kafka-0.8_2.11</artifactId>
    <version>${flink.version}</version>
</dependency>

接下來,我們需要修改我們的程式。我們會移除print(),替換成使用Kafka 接收器。新的程式碼示例如下:

result
    .map(new MapFunction<Tuple2<String,Long>, String>() {
        @Override
        public String map(Tuple2<String, Long> tuple) {
            return tuple.toString();
        }
    })
    .addSink(new FlinkKafkaProducer08<>("localhost:9092", "wiki-result", new SimpleStringSchema()));

也需要引入相關的類:

import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer08;
import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
import org.apache.flink.api.common.functions.MapFunction;

注意我們首先是如何使用一個MapFunction將Tuple2<String, Long>的流轉換成一個字串流的。我們就是在做這個,因為這個對寫入簡單的字串到Kafka更容易。因而,我們建立了一個Kafka接收器。你肯能要適配一下你設定的主機名和埠號。”wiki-result”是執行我們程式之前我們將會建立的Kafka 流的名字。因為我們需要一個在叢集上執行的jar檔案,故用Maven 命令構建這個專案:

$ mvn clean package

產生的jar檔案會在target的子資料夾中: target/wiki-edits-0.1.jar。我們接下來會用到這個。現在我們準備安裝一個Flink叢集,並在其上執行寫入到Kafka的程式。到你安裝的Flink目錄下,開啟一個本地的叢集:

$ cd my/flink/directory
$ bin/start-local.sh

我們也需要建立這個Kafka Topic,以便我們的程式能寫入:

$ cd my/kafka/directory
$ bin/kafka-topics.sh --create --zookeeper localhost:2181 --topic wiki-results

現在我們準備在本地的Flink叢集上執行我們的jar檔案:

$ cd my/flink/directory
$ bin/flink run -c wikiedits.WikipediaAnalysis path/to/wikiedits-0.1.jar

如果一切按照計劃執行,命令列輸出會跟下面的相似:

03/08/2016 15:09:27 Job execution switched to status RUNNING.
03/08/2016 15:09:27 Source: Custom Source(1/1) switched to SCHEDULED
03/08/2016 15:09:27 Source: Custom Source(1/1) switched to DEPLOYING
03/08/2016 15:09:27 TriggerWindow(TumblingProcessingTimeWindows(5000), FoldingStateDescriptor{name=window-contents, defaultValue=(,0), serializer=null}, ProcessingTimeTrigger(), WindowedStream.fold(WindowedStream.java:207)) -> Map -> Sink: Unnamed(1/1) switched to SCHEDULED
03/08/2016 15:09:27 TriggerWindow(TumblingProcessingTimeWindows(5000), FoldingStateDescriptor{name=window-contents, defaultValue=(,0), serializer=null}, ProcessingTimeTrigger(), WindowedStream.fold(WindowedStream.java:207)) -> Map -> Sink: Unnamed(1/1) switched to DEPLOYING
03/08/2016 15:09:27 TriggerWindow(TumblingProcessingTimeWindows(5000), FoldingStateDescriptor{name=window-contents, defaultValue=(,0), serializer=null}, ProcessingTimeTrigger(), WindowedStream.fold(WindowedStream.java:207)) -> Map -> Sink: Unnamed(1/1) switched to RUNNING
03/08/2016 15:09:27 Source: Custom Source(1/1) switched to RUNNING

你可以看到各個操作是如何開始執行的。只有兩個操作是因為由於效能原因視窗後面的操作摺疊成了一個。在Flink中,我們稱這個為chaining。

你可以用Kafka 控制檯消費者通過檢測Kafka主題來觀察程式的輸出:

bin/kafka-console-consumer.sh  --zookeeper localhost:2181 --topic wiki-result

你可以檢視執行在 http://localhost:8081上面的Flink儀表盤。你可以對你的叢集資源和執行的任務有個整體的感知:


如果你點選執行的任務,你會看到一個可以觀察單個操作的檢視,例如,看到執行的元素的數量:

結束了我們的Flink之旅,如果你有如何問題,請不要猶豫在我們的Mailing Lists提問。