1. 程式人生 > >Apache Flink 零基礎入門(三)編寫最簡單的helloWorld

Apache Flink 零基礎入門(三)編寫最簡單的helloWorld

實驗環境

JDK 1.8

IDE Intellij idea

Flink 1.8.1

實驗內容

建立一個Flink簡單Demo,可以從流資料中統計單詞個數。

實驗步驟

首先建立一個maven專案,其中pom.xml檔案內容如下:

    <properties>
        <flink.version>1.8.1</flink.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-java</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java_2.11</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-scala_2.11</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-wikiedits_2.11</artifactId>
            <version>${flink.version}</version>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <configuration>
                    <source>8</source>
                    <target>8</target>
                </configuration>
            </plugin>

            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
                <version>2.1.4.RELEASE</version>
                <configuration>
                    <mainClass>wikiedits.StreamingJob</mainClass>
                </configuration>
                <executions>
                    <execution>
                        <goals>
                            <goal>repackage</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>

            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-surefire-plugin</artifactId>
                <configuration>
                    <skip>true</skip>
                </configuration>
            </plugin>
        </plugins>
    </build>

建立一個包com.vincent,並且建立一個類StreamingJob.java

public class WikipediaAnalysis {
	public static void main(String[] args) throws Exception {

	}
}

Flink 程式的第一步是建立一個StreamExecutionEnvironment。StreamExecutionEnvironment可以設定引數並且匯入一些外部系統的資料來源。

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

接下來建立一個外部資料來源,外部資料來源使用nc -l 9000 表示伺服器端開啟監聽9000埠,並可以傳送資料。

DataStream<String> text = env.socketTextStream("192.168.152.45", 9000);

這樣就添加了一個流文字資料來源,有了DataStream就可以獲取資料了,然後對資料進行分析:

        DataStream<Tuple2<String, Integer>> dataStream = text.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
            @Override
            public void flatMap(String s, Collector<Tuple2<String, Integer>> collector) throws Exception {
                String[] tokens = s.toLowerCase().split("\\W+");

                for (String token : tokens) {
                    if (token.length() > 0) {
                        collector.collect(new Tuple2<String, Integer>(token, 1));
                    }
                }
            }
        }).keyBy(0).timeWindow(Time.seconds(5)).sum(1);

flatMap表示將巢狀集合轉換並平鋪成非巢狀集合,字串是s,返回值是Collector<Tuple2<String, Integer>>。並且根據keyBy(0)即第0個欄位進行統計加一操作。.timeWindow()指定視窗大小是5秒。

所以整體程式碼如下:

public class StreamingJob {
    public static void main(String[] args) throws Exception {
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStream<String> text = env.socketTextStream("192.168.152.45", 9000);
        DataStream<Tuple2<String, Integer>> dataStream = text.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
            @Override
            public void flatMap(String s, Collector<Tuple2<String, Integer>> collector) throws Exception {
                String[] tokens = s.toLowerCase().split("\\W+");

                for (String token : tokens) {
                    if (token.length() > 0) {
                        collector.collect(new Tuple2<String, Integer>(token, 1));
                    }
                }
            }
        }).keyBy(0).timeWindow(Time.seconds(5)).sum(1);

        dataStream.print();
        // execute program
        env.execute("Java WordCount from SocketTextStream Example");
    }
}

執行

執行main方法,然後在伺服器端執行nc -l 9000 並且輸入文字:

iie4bu@swarm-manager:~$ nc -l 9000
a b d d e f

然後在intellij控制檯將輸出:

1> (b,1)
3> (a,1)
1> (f,1)
3> (d,2)
1> (e,1)

可以統計出