1. 程式人生 > >Flink之WordCount

Flink之WordCount

pom依賴

<dependencies>
    <dependency>
        <groupId>junit</groupId>
        <artifactId>junit</artifactId>
        <version>3.8.1</version>
        <scope>test</scope>
    </dependency>

    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-java</artifactId>
        <version>1.2.0</version>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-streaming-java_2.10</artifactId>
        <version>1.2.0</version>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-clients_2.10</artifactId>
        <version>1.2.0</version>
    </dependency>

    <dependency>
        <groupId>org.codehaus.jackson</groupId>
        <artifactId>jackson-core-asl</artifactId>
        <version>1.9.0</version>
    </dependency>

    <dependency>
        <groupId>com.google.code.findbugs</groupId>
        <artifactId>jsr305</artifactId>
        <version>3.0.2</version>
    </dependency>

    <dependency>
        <groupId>com.google.protobuf</groupId>
        <artifactId>protobuf-java</artifactId>
        <version>3.2.0</version>
    </dependency>

    <dependency>
        <groupId>com.sun.jersey</groupId>
        <artifactId>jersey-core</artifactId>
        <version>1.19.3</version>
    </dependency>

    <dependency>
        <groupId>org.uncommons.maths</groupId>
        <artifactId>uncommons-maths</artifactId>
        <version>1.2.1</version>
    </dependency>

    <dependency>
        <groupId>com.typesafe</groupId>
        <artifactId>config</artifactId>
        <version>1.2.0</version>
    </dependency>

    <dependency>
        <groupId>org.apache.commons</groupId>
        <artifactId>commons-lang3</artifactId>
        <version>3.4</version>
    </dependency>
</dependencies>
public class WindowWordCount {
    public static void main(String[] args) throws Exception {
        // 1.獲取execution environment
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        // 2.載入資料,指定資料的處理流程
        DataStream<Tuple2<String, Integer>> dataStreaming = env
                .socketTextStream("localhost", 9999)
                .flatMap(new Splitter())
                .keyBy(0)
                .timeWindow(Time.seconds(5))
                .sum(1);

        dataStreaming.print();
        // 3.出發程式的執行
        env.execute("window worcount");
    }

    public static class Splitter implements FlatMapFunction<String, Tuple2<String, Integer>> {

        public void flatMap(String sentence, Collector<Tuple2<String, Integer>> out) throws Exception {
            for (String word : sentence.split(" ")) {
                out.collect(new Tuple2<String, Integer>(word, 1));
            }
        }

    }
}

在啟動程式之前,先開啟netcat工具執行nc -lp 9999,然後輸入一些內容,在IDE的控制檯會出現相應的日誌資訊。