1. 程式人生 > >spark學習記錄(十三、SparkStreaming)

spark學習記錄(十三、SparkStreaming)

一、SparkStreaming簡介

SparkStreaming是流式處理框架,是Spark API的擴充套件,支援可擴充套件、高吞吐量、容錯的實時資料流處理,實時資料的來源可以是:Kafka, Flume, Twitter, ZeroMQ或者TCP sockets,並且可以使用高階功能的複雜運算元來處理流資料。例如:map,reduce,join,window 。最終,處理後的資料可以存放在檔案系統,資料庫等,方便實時展現。

SparkStreaming與Storm的區別:

  1. Storm是純實時的流式處理框架,SparkStreaming是準實時的處理框架(微批處理)。因為微批處理,SparkStreaming的吞吐量比Storm要高。Storm是來一條資料就處理一次,SparkStreaming是處理某段時間內來的資料。
  2. Storm 的事務機制要比SparkStreaming的要完善。
  3. Storm支援動態資源排程。(spark1.2開始和之後也支援)
  4. SparkStreaming擅長複雜的業務處理,Storm不擅長複雜的業務處理,擅長簡單的彙總型計算。

 

  • receiver  task是7*24小時一直在執行,一直接收資料,將一段時間內接收來的資料儲存到batch中。假設batchInterval為5s,那麼會將接收來的資料每隔5秒封裝到一個batch中,batch沒有分散式計算特性,這一個batch的資料又被封裝到一個RDD中,RDD最終封裝到一個DStream中。

例如:假設batchInterval為5秒,每隔5秒通過SparkStreamin將得到一個DStream,在第6秒的時候計算這5秒的資料,假設執行任務的時間是3秒,那麼第6~9秒一邊在接收資料,一邊在計算任務,9~10秒只是在接收資料。然後在第11秒的時候重複上面的操作。

  • 如果job執行的時間大於batchInterval會有什麼樣的問題?

如果接受過來的資料設定的級別是僅記憶體,接收來的資料會越堆積越多,最後可能會導致OOM(如果設定StorageLevel包含disk, 則記憶體存放不下的資料會溢寫至disk, 加大延遲 ) 

二、java程式碼

新增依賴:

<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-streaming_2.12</artifactId>
    <version>2.4.0</version>
    <scope>provided</scope>
</dependency>

在hadoop1上輸入命令:

nc -lk 9999
hello Sam
hello Tom
hello Jetty
public class JavaExample {
    public static void main(String[] args) throws InterruptedException {
        SparkConf conf = new SparkConf();
        conf.setMaster("local[2]");
        conf.setAppName("SparkStreamingTest");

        JavaStreamingContext jsc = new JavaStreamingContext(conf, Durations.seconds(5));

        JavaReceiverInputDStream<String> stream = jsc.socketTextStream("192.168.30.141", 9999);

        final JavaDStream<String> words = stream.flatMap(new FlatMapFunction<String, String>() {
            public Iterator<String> call(String s) throws Exception {
                return Arrays.asList(s.split(" ")).iterator();
            }
        });
        JavaPairDStream<String, Integer> pair = words.mapToPair(new PairFunction<String, String, Integer>() {
            public Tuple2<String, Integer> call(String s) throws Exception {
                return new Tuple2<String, Integer>(s, 1);
            }
        });

        JavaPairDStream<String, Integer> reduceByKey = pair.reduceByKey(new Function2<Integer, Integer, Integer>() {
            public Integer call(Integer v1, Integer v2) throws Exception {
                return v1 + v2;
            }
        });

        //列印輸出
        reduceByKey.print();
        /**
         * foreachRDD可以拿到DStream中的RDD,對拿到的RDD可以使用RDD的transformations運算元轉換,
         * 要對拿到的RDD使用action觸發執行,否則foreachRDD也不會執行
         * foreachRDD中call方法內,拿到RDD的運算元外,程式碼在Driver執行,
         * 可以使用這個運算元實現動態改變廣播變數,即廣播讀取一個檔案,只修改檔案內容,不停止程式碼
         */
        reduceByKey.foreachRDD(new VoidFunction<JavaPairRDD<String, Integer>>() {
            public void call(JavaPairRDD<String, Integer> rdd) throws Exception {
                //SparkStreaming廣播變數
                SparkContext context = rdd.context();
                JavaSparkContext javaSparkContext = new JavaSparkContext(context);
                Broadcast<String> broadcast = javaSparkContext.broadcast("hello");
                System.out.println(broadcast.getValue());

                JavaPairRDD<String, Integer> mapToPair = rdd.mapToPair(new PairFunction<Tuple2<String, Integer>, String, Integer>() {
                    public Tuple2<String, Integer> call(Tuple2<String, Integer> tuple2) throws Exception {
                        return new Tuple2<String, Integer>(tuple2._1 + "~", tuple2._2);
                    }
                });
                mapToPair.foreach(new VoidFunction<Tuple2<String, Integer>>() {
                    public void call(Tuple2<String, Integer> tuple2) throws Exception {
                        System.out.println(tuple2);
                    }
                });
            }
        });
        jsc.start();
        jsc.awaitTermination();
    }
}

三、運算元 

/**
 * updateStateByKey:
 * 返回一個新的“狀態”Dstream,通過給定的func來更新之前的每個狀態的key對應的value值,這也可以用於維護key的任意狀態資料。
 * 注意:作用在(K,V)格式的DStream上
 * <p>
 * updateStateByKey的主要功能:
 * 1、Spark Streaming中為每一個Key維護一份state狀態,state型別可以是任意型別的的, 可以是一個自定義的物件,那麼更新函式也可以是自定義的。
 * 2、通過更新函式對該key的狀態不斷更新,對於每個新的batch而言,Spark Streaming會在使用updateStateByKey的時候為已經存在的key進行
 * state的狀態更新
 * (對於每個新出現的key,會同樣的執行state的更新函式操作),
 * 如果要不斷的更新每個key的state,就一定涉及到了狀態的儲存和容錯,這個時候就需要開啟checkpoint機制和功能
 *
 * @author root
 */
public class Operate_updateStateByKey {
    public static void main(String[] args) throws InterruptedException {
        SparkConf conf = new SparkConf().setMaster("local").setAppName("Operate_count");
        JavaStreamingContext jsc = new JavaStreamingContext(conf, Durations.seconds(5));
        /**
         * checkpoint儲存地址
         * 如果batchInterval小於10,那麼10s會將記憶體中的資料寫入硬碟中
         * 如果batchInterval大於10,那麼以batchInterval為準
         *
         * 這樣做是為了防止頻繁的寫hdfs
         */
        jsc.checkpoint("checkpoint");
        //讀取檔案儲存地址
        JavaDStream<String> textFileStream = jsc.textFileStream("data");
        /**
         * 實現一個累加統計word的功能
         */
        JavaPairDStream<String, Integer> mapToPair = textFileStream.flatMap(new FlatMapFunction<String, String>() {
            private static final long serialVersionUID = 1L;

            public Iterator<String> call(String t) throws Exception {

                return Arrays.asList(t.split(" ")).iterator();
            }
        }).mapToPair(new PairFunction<String, String, Integer>() {

            private static final long serialVersionUID = 1L;

            public Tuple2<String, Integer> call(String t) throws Exception {
                return new Tuple2<String, Integer>(t.trim(), 1);
            }
        });

        JavaPairDStream<String, Integer> updateStateByKey = mapToPair.updateStateByKey(new Function2<List<Integer>, Optional<Integer>, Optional<Integer>>() {
            private static final long serialVersionUID = 1L;

            public Optional<Integer> call(List<Integer> values, Optional<Integer> state)
                    throws Exception {
                /**
                 * values:經過分組最後 這個key所對應的value  [1,1,1,1,1]
                 * state:這個key在本次之前之前的狀態
                 */
                Integer updateValue = 0;

                if (state.isPresent()) {
                    updateValue = state.get();
                }
                for (Integer i : values) {
                    updateValue += i;
                }
                return Optional.of(updateValue);
            }
        });

        updateStateByKey.print();

        jsc.start();
        jsc.awaitTermination();
        jsc.close();
    }

}
public class Operate_window {
    public static void main(String[] args) throws InterruptedException {
        SparkConf conf = new SparkConf();
        conf.setMaster("local[2]");
        conf.setAppName("Operate_window");
        JavaStreamingContext jsc = new JavaStreamingContext(conf, Durations.seconds(5));
        /**
         * 設定日誌級別
         */
        jsc.sparkContext().setLogLevel("WARN");
        //設定checkpoint
        jsc.checkpoint("./checkpoint");
        JavaReceiverInputDStream<String> textStream = jsc.socketTextStream("192.168.30.141", 9999);
//        切分
        JavaDStream<String> flatMap = textStream.flatMap(new FlatMapFunction<String, String>() {
            public Iterator<String> call(String s) throws Exception {
                return Arrays.asList(s.split(" ")).iterator();
            }
        });
//        計數
        JavaPairDStream<String, Integer> mapToPair = flatMap.mapToPair(new PairFunction<String, String, Integer>() {
            public Tuple2<String, Integer> call(String s) throws Exception {
                return new Tuple2<String, Integer>(s, 1);
            }
        });
        /**
         *每個10s,計算最近60s內的資料,那麼這個視窗大小為60s,裡面有12個rdd,在沒有計算之前,這些rdd是不會計算的
         * 那麼在計算的時候會將這12個rdd聚合起來,然後一起執行reduceByKeyAndWindow,
         * reduceByKeyAndWindow是針對視窗操作而不是DStream
         */
        JavaPairDStream<String, Integer> reduceByKeyAndWindow = mapToPair.reduceByKeyAndWindow(new Function2<Integer, Integer, Integer>() {
            public Integer call(Integer v1, Integer v2) throws Exception {
                //加上新進視窗的批次
                return v1 + v2;
            }
            //每隔5s計算最近15s的資料,這兩個引數要是bathInterval的整數倍即上面設定的5s的整數倍
        }, new Function2<Integer, Integer, Integer>() {
            public Integer call(Integer v1, Integer v2) throws Exception {
                //減去新離開視窗的批次
                return v1 - v2;
            }
        }, Durations.seconds(15), Durations.seconds(5));

        reduceByKeyAndWindow.print();
        jsc.start();
        jsc.awaitTermination();
        jsc.close();
    }
}

public class Operate_transform {
    public static void main(String[] args) throws InterruptedException {
        SparkConf conf = new SparkConf();
        conf.setMaster("local[2]").setAppName("Operate_transform");
        JavaStreamingContext jsc = new JavaStreamingContext(conf, Durations.seconds(5));
        //黑名單
        List<String> list = Arrays.asList("Sam");
        final Broadcast<List<String>> broadcast = jsc.sparkContext().broadcast(list);

        JavaReceiverInputDStream<String> stream = jsc.socketTextStream("192.168.30.141", 9999);

        JavaPairDStream<String, String> mapToPair = stream.mapToPair(new PairFunction<String, String, String>() {
            public Tuple2<String, String> call(String s) throws Exception {
                return new Tuple2<String, String>(s.split(" ")[1], s);
            }
        });
        /**
         * 過濾黑名單上的名字
         * transform可以拿到DStream中的RDD,做RDD到RDD之間的轉換,不需要Action運算元觸發,需要返回型別RDD
         * 注意:transform call方法內,拿到RDD運算元外的程式碼在Driver端執行,也可以做到動態改變廣播變數
         */
        JavaDStream<String> transform = mapToPair.transform(new Function<JavaPairRDD<String, String>, JavaRDD<String>>() {
            public JavaRDD<String> call(JavaPairRDD<String, String> nameRDD) throws Exception {
                //資料過濾
                JavaPairRDD<String, String> filter = nameRDD.filter(new Function<Tuple2<String, String>, Boolean>() {
                    public Boolean call(Tuple2<String, String> tuple2) throws Exception {
                        return !broadcast.getValue().contains(tuple2._1);
                    }
                });

                JavaRDD<String> map = filter.map(new Function<Tuple2<String, String>, String>() {
                    public String call(Tuple2<String, String> tuple2) throws Exception {
                        return tuple2._2;
                    }
                });

                return map;
            }
        });

        transform.print();

        jsc.start();
        jsc.awaitTermination();
        jsc.close();

    }
}