Flink之WordCount
阿新 • • 發佈:2018-11-30
pom依賴
<dependencies> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>3.8.1</version> <scope>test</scope> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-java</artifactId> <version>1.2.0</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-java_2.10</artifactId> <version>1.2.0</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-clients_2.10</artifactId> <version>1.2.0</version> </dependency> <dependency> <groupId>org.codehaus.jackson</groupId> <artifactId>jackson-core-asl</artifactId> <version>1.9.0</version> </dependency> <dependency> <groupId>com.google.code.findbugs</groupId> <artifactId>jsr305</artifactId> <version>3.0.2</version> </dependency> <dependency> <groupId>com.google.protobuf</groupId> <artifactId>protobuf-java</artifactId> <version>3.2.0</version> </dependency> <dependency> <groupId>com.sun.jersey</groupId> <artifactId>jersey-core</artifactId> <version>1.19.3</version> </dependency> <dependency> <groupId>org.uncommons.maths</groupId> <artifactId>uncommons-maths</artifactId> <version>1.2.1</version> </dependency> <dependency> <groupId>com.typesafe</groupId> <artifactId>config</artifactId> <version>1.2.0</version> </dependency> <dependency> <groupId>org.apache.commons</groupId> <artifactId>commons-lang3</artifactId> <version>3.4</version> </dependency> </dependencies>
public class WindowWordCount { public static void main(String[] args) throws Exception { // 1.獲取execution environment StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // 2.載入資料,指定資料的處理流程 DataStream<Tuple2<String, Integer>> dataStreaming = env .socketTextStream("localhost", 9999) .flatMap(new Splitter()) .keyBy(0) .timeWindow(Time.seconds(5)) .sum(1); dataStreaming.print(); // 3.出發程式的執行 env.execute("window worcount"); } public static class Splitter implements FlatMapFunction<String, Tuple2<String, Integer>> { public void flatMap(String sentence, Collector<Tuple2<String, Integer>> out) throws Exception { for (String word : sentence.split(" ")) { out.collect(new Tuple2<String, Integer>(word, 1)); } } } }
在啟動程式之前,先開啟netcat工具執行nc -lp 9999
,然後輸入一些內容,在IDE的控制檯會出現相應的日誌資訊。