SparkStreaming部分:updateStateByKey運算元(包含從Linux端獲取資料,flatmap切分,maptopair分類,寫入到本地建立的資料夾中)【Java版純程式碼】
阿新 • • 發佈:2019-02-05
package com.bjsxt; import java.util.List; import org.apache.spark.SparkConf; import org.apache.spark.api.java.function.FlatMapFunction; import org.apache.spark.api.java.function.Function2; import org.apache.spark.api.java.function.PairFunction; import org.apache.spark.streaming.Durations; import org.apache.spark.streaming.api.java.JavaDStream; import org.apache.spark.streaming.api.java.JavaPairDStream; import org.apache.spark.streaming.api.java.JavaReceiverInputDStream; import org.apache.spark.streaming.api.java.JavaStreamingContext; import com.google.common.base.Optional; import scala.Tuple2; import scala.actors.threadpool.Arrays; public class UpdateStateByKeyOperator { public static void main(String[] args) { SparkConf conf = new SparkConf().setAppName("test").setMaster("local[2]"); JavaStreamingContext jsc = new JavaStreamingContext(conf, Durations.seconds(5)); /** * 去除日誌的重複 */ jsc.sparkContext().setLogLevel("ERROR"); /** * 設定checkpoint目錄: 多久會接收記憶體中的資料(每一個key所對應的狀態)寫入到磁碟上呢? * 如果你的batchinterval小於10S,那麼10S會將記憶體中的資料寫入到磁碟一份 * 如果batchinterval大於10S,那麼就以batchinterval為準 這樣做是為了防止頻繁的寫HDFS * 設定checkpoint目錄的兩種方式: 1.jsc.checkpoint("./checkpoint"); 2.JavaSparkContext * sc=jsc.sparkContxt(); sc.setCheckpointDir("./checkpoint"); * */ jsc.checkpoint("./checkpoint"); /** * 從Linux端接收資料 */ JavaReceiverInputDStream<String> lines = jsc.socketTextStream("node01", 9999); /** * 呼叫flatmap運算元進行切分 */ JavaDStream<String> words = lines.flatMap(new FlatMapFunction<String, String>() { @Override public Iterable<String> call(String s) throws Exception { return Arrays.asList(s.split(" ")); } }); /** * 呼叫mapToPair 進行分類 */ JavaPairDStream<String, Integer> ones = words.mapToPair(new PairFunction<String, String, Integer>() { @Override public Tuple2<String, Integer> call(String s) throws Exception { return new Tuple2<String, Integer>(s, 1); } }); /** * 呼叫updateStateByKey運算元 * */ JavaPairDStream<String, Integer> counts = ones .updateStateByKey(new Function2<List<Integer>, Optional<Integer>, Optional<Integer>>() { @Override public Optional<Integer> call(List<Integer> values, Optional<Integer> state) throws Exception { /** * values:經過分組最後 這個key所對應的value [1,1,1,1,1] state:這個key在本次之前之前的狀態 */ Integer updateValue = 0; if (state.isPresent()) { updateValue = state.get(); } for (Integer value : values) { updateValue += value; } return Optional.of(updateValue); } }); // output operator counts.print(); jsc.start(); jsc.awaitTermination(); jsc.close(); } }