Apache Flink 零基礎入門(三)編寫最簡單的helloWorld
阿新 • • 發佈:2019-08-27
實驗環境
JDK 1.8
IDE Intellij idea
Flink 1.8.1
實驗內容
建立一個Flink簡單Demo,可以從流資料中統計單詞個數。
實驗步驟
首先建立一個maven專案,其中pom.xml檔案內容如下:
<properties> <flink.version>1.8.1</flink.version> </properties> <dependencies> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-java</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-java_2.11</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-scala_2.11</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-wikiedits_2.11</artifactId> <version>${flink.version}</version> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <configuration> <source>8</source> <target>8</target> </configuration> </plugin> <plugin> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-maven-plugin</artifactId> <version>2.1.4.RELEASE</version> <configuration> <mainClass>wikiedits.StreamingJob</mainClass> </configuration> <executions> <execution> <goals> <goal>repackage</goal> </goals> </execution> </executions> </plugin> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-surefire-plugin</artifactId> <configuration> <skip>true</skip> </configuration> </plugin> </plugins> </build>
建立一個包com.vincent,並且建立一個類StreamingJob.java
public class WikipediaAnalysis {
public static void main(String[] args) throws Exception {
}
}
Flink 程式的第一步是建立一個StreamExecutionEnvironment。StreamExecutionEnvironment可以設定引數並且匯入一些外部系統的資料來源。
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
接下來建立一個外部資料來源,外部資料來源使用nc -l 9000 表示伺服器端開啟監聽9000埠,並可以傳送資料。
DataStream<String> text = env.socketTextStream("192.168.152.45", 9000);
這樣就添加了一個流文字資料來源,有了DataStream就可以獲取資料了,然後對資料進行分析:
DataStream<Tuple2<String, Integer>> dataStream = text.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() { @Override public void flatMap(String s, Collector<Tuple2<String, Integer>> collector) throws Exception { String[] tokens = s.toLowerCase().split("\\W+"); for (String token : tokens) { if (token.length() > 0) { collector.collect(new Tuple2<String, Integer>(token, 1)); } } } }).keyBy(0).timeWindow(Time.seconds(5)).sum(1);
flatMap表示將巢狀集合轉換並平鋪成非巢狀集合,字串是s,返回值是Collector<Tuple2<String, Integer>>。並且根據keyBy(0)即第0個欄位進行統計加一操作。.timeWindow()指定視窗大小是5秒。
所以整體程式碼如下:
public class StreamingJob {
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<String> text = env.socketTextStream("192.168.152.45", 9000);
DataStream<Tuple2<String, Integer>> dataStream = text.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
@Override
public void flatMap(String s, Collector<Tuple2<String, Integer>> collector) throws Exception {
String[] tokens = s.toLowerCase().split("\\W+");
for (String token : tokens) {
if (token.length() > 0) {
collector.collect(new Tuple2<String, Integer>(token, 1));
}
}
}
}).keyBy(0).timeWindow(Time.seconds(5)).sum(1);
dataStream.print();
// execute program
env.execute("Java WordCount from SocketTextStream Example");
}
}
執行
執行main方法,然後在伺服器端執行nc -l 9000 並且輸入文字:
iie4bu@swarm-manager:~$ nc -l 9000
a b d d e f
然後在intellij控制檯將輸出:
1> (b,1)
3> (a,1)
1> (f,1)
3> (d,2)
1> (e,1)
可以統計出