1. 程式人生 > >大資料之Spark(六)--- Spark Streaming介紹,DStream,Receiver,Streamin整合Kafka,Windows,容錯的實現

大資料之Spark(六)--- Spark Streaming介紹,DStream,Receiver,Streamin整合Kafka,Windows,容錯的實現

一、Spark Streaming介紹
-----------------------------------------------------------
    1.介紹
        是spark core的擴充套件,針對實時資料的實時流處理技術
        具有可擴充套件、高吞吐量、容錯的特點
        資料可以是來自於kafka,flume,tcpsocket,使用高階函式(map reduce filter ,join , windows),處理的資料結果可以推送到database,hdfs
        資料流處理還可以應用到機器學習和圖計算中

    2.內部執行流程:
        spark接受實時資料流,分成batch(分批次)進行處理,最終在每個batch終產生結果stream.

    3.discretized stream or DStream,
        離散流,表示的是連續的資料流。
        DStream可通過kafka、flume等輸入資料流產生,也可以通過對其他DStream進行高階變換產生(類似於RDD)。
        在內部,DStream是表現為RDD序列。


二、SparkStreaming的程式設計實現
------------------------------------------------------------
    1.新增依賴
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-streaming_2.11</artifactId>
            <version>2.1.0</version>
        </dependency>

    2.編寫scala -- SparkStreamingDemo
        
package com.test.spark.streaming.scala

        import org.apache.spark._
        import org.apache.spark.streaming._
        import org.apache.spark.streaming.StreamingContext._

        object SparkStreamingDemo {

            def main(args: Array[String]): Unit = {
                //注意local[n] n > 1
                val conf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCount")
                //建立sparkstreaming上下文一秒鐘一個批次
                val ssc = new StreamingContext(conf, Seconds(1))
                //建立套接字文字流
                val lines = ssc.socketTextStream("lcoalhost", 9999)
                //
                val words = lines.flatMap(_.split(" "))
                //
                val pairs = words.map(word => (word, 1))
                //
                val wordCounts = pairs.reduceByKey(_ + _)
                //
                wordCounts.print()
                //啟動
                ssc.start()             // Start the computation
                //等待結束
                ssc.awaitTermination()  // Wait for the computation to terminate
            }
        }

    3.先啟動nc伺服器,產生資料來源
        cmd> nc -l -p 9999

    4.啟動scala程式,檢視結果
        run

    5.打包,丟到ubuntu上執行
        $> nc -lk 9999
        $> spark-submit --master spark://s100:7077 --name netwc --class com.test.spark.streaming.scala.SparkStreamingDemo TestSpark-1.0-SNAPSHOT.jar

    6.java版
        
package com.test.spark.streaming.java;

        import org.apache.spark.*;
        import org.apache.spark.api.java.function.*;
        import org.apache.spark.streaming.*;
        import org.apache.spark.streaming.api.java.*;
        import scala.Tuple2;
        import scala.actors.threadpool.Arrays;

        public class WCDemoJava {
            public static void main(String [] args)
            {
                SparkConf conf = new SparkConf().setMaster("spark://s100:7077).setAppName("NetworkWordCount");
                JavaStreamingContext jssc = new JavaStreamingContext(conf, Durations.seconds(1));
                JavaReceiverInputDStream<String> lines = jssc.socketTextStream("s100", 9999);
                JavaDStream<String> words = lines.flatMap(x -> Arrays.asList(x.split(" ")).iterator());
                JavaPairDStream<String, Integer> pairs = words.mapToPair(s -> new Tuple2<>(s, 1));
                JavaPairDStream<String, Integer> wordCounts = pairs.reduceByKey((i1, i2) -> i1 + i2);
                wordCounts.print();
                jssc.start();              // Start the computation
                try {
                    jssc.awaitTermination();   // Wait for the computation to terminate
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }


三、DStream -- 離散流介紹
------------------------------------------------------------
[注意事項]
    1.啟動上下文之後,不能啟動新的流或者新增新的
    2.上下文停止後不能restart.
    3.同一JVM只有一個active的streamingcontext
    4.停止streamingContext會一同stop掉SparkContext,如若只停止StreamingContext.
      ssc.stop(false|true);
    5.SparkContext可以建立多個StreamingContext,建立新的之前要停掉舊的。


四、Receiver -- 接收者
----------------------------------------------------------
    1.介紹
        Receiver是接收者,從source接受資料,儲存在記憶體中,供spark處理。

    2.源
        基本源:fileSystem | socket,內建API支援
        高階源:kafka | flume | ...,需要引入pom.xml依賴

    3.注意
        使用local模式時,不能使用一個執行緒.使用的local[n],n需要大於receiver的個數。


五、SparkStreaming整合Kafka
-------------------------------------------------------------
    1.啟動kafka叢集
        a.啟動zk

        b.啟動kafka[s200,s300,s400]
            $> /soft/kafka/bin/kafka-server-start.sh -daemon /soft/kafka/config/server.properties

        c.驗證kafka是否啟動成功
            $> netstat -ano | grep 9092

        d.建立kafka主題
            $> kafka-topics.sh --create --zookeeper s100:2181 --replication-factor 3 --partitions 3 --topic sparktopic1

    2.引入pom.xml
        ...
        <dependency>
          <groupId>org.apache.spark</groupId>
          <artifactId>spark-streaming-kafka-0-10_2.11</artifactId>
          <version>2.1.0</version>
        </dependency>

    3.編寫java程式碼
        
package com.test.spark.kafka;

        import java.util.*;
        import org.apache.spark.SparkConf;
        import org.apache.spark.api.java.function.FlatMapFunction;
        import org.apache.spark.api.java.function.Function;
        import org.apache.spark.api.java.function.PairFunction;
        import org.apache.spark.streaming.Durations;
        import org.apache.spark.streaming.api.java.*;
        import org.apache.spark.streaming.kafka010.*;
        import org.apache.kafka.clients.consumer.ConsumerRecord;
        import org.apache.kafka.common.serialization.StringDeserializer;
        import scala.Tuple2;

        public class SparkKafkaDemo {
            public static void main(String [] args)
            {
                SparkConf conf = new SparkConf().setMaster("local[*]").setAppName("NetworkWordCount");
                JavaStreamingContext jssc = new JavaStreamingContext(conf, Durations.seconds(10));
                Map<String, Object> kafkaParams = new HashMap<>();
                kafkaParams.put("bootstrap.servers", "s200:9092,s300:9092,s400:9092");
                kafkaParams.put("key.deserializer", StringDeserializer.class);
                kafkaParams.put("value.deserializer", StringDeserializer.class);
                kafkaParams.put("group.id", "g6");
                kafkaParams.put("auto.offset.reset", "latest");
                kafkaParams.put("enable.auto.commit", false);

                Collection<String> topics = Arrays.asList("sparktopic1");

                JavaInputDStream<ConsumerRecord<String, String>> stream =
                        KafkaUtils.createDirectStream(
                                jssc,
                                LocationStrategies.PreferConsistent(),
                                ConsumerStrategies.<String, String>Subscribe(topics, kafkaParams)
                        );

                JavaDStream js = stream.map(
                        new Function<ConsumerRecord<String, String>, String>() {
                            @Override
                            public String call(ConsumerRecord<String, String> v1) throws Exception {
                                return v1.value();
                            }
                        }
                );

                JavaDStream js1 = js.flatMap(new FlatMapFunction<String, String>() {
                    @Override
                    public Iterator<String> call(String s) throws Exception {
                        List<String> list = new ArrayList<String>();
                        String [] strs = s.split(" ");
                        for (String ss : strs){
                            list.add(ss);
                        }
                        return list.iterator();
                    }

                });
                JavaPairDStream<String,Integer> js2 = js1.mapToPair( x -> new Tuple2<>(x,1));
                JavaPairDStream<String,Integer> js3 = js2.reduceByKey((x, y) -> x + y);
                js3.print();
                jssc.start();
                try {
                    jssc.awaitTermination();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }

    4.開始kafka控制檯生產者,測試
        $> kafka-console-producer.sh --broker-list s200:9092 --topic sparktopic1


六、updateStateByKey函式
-----------------------------------------------------------------
    1.對DStream的每個key,value應用更新函式,從而產生新的DStream

    2.分兩步
        a.定義狀態 --- 狀態可以是任意型別
        b.定義狀態更新函式

    3.解釋
        比如單詞統計,進行到maptopair將單詞變成tiple<"word",1>,對變換的DStream應用 updateStateByKey函式
        函式對應的法則是保持key<"word">原來的1的狀態,一旦再出現key<'word'>的時候,就對狀態值+1,kv就變成<'word',2>進行返回了
        因為是流計算,是分批次進行計算的,所以,key的狀態會跨批次的,一直應用法則進行更新和迭代
        這樣統計出來的單詞就不是單批次的單詞數量,而是從開始到現在,所有批次的總數量了

    4.程式碼演示
      
package com.test.spark.streaming.java;

      import org.apache.spark.*;
      import org.apache.spark.api.java.Optional;
      import org.apache.spark.api.java.function.FlatMapFunction;
      import org.apache.spark.api.java.function.Function2;
      import org.apache.spark.streaming.*;
      import org.apache.spark.streaming.api.java.*;
      import scala.Tuple2;

      import java.util.ArrayList;
      import java.util.Iterator;
      import java.util.List;


      public class WCDemoJava {
          public static void main(String [] args)
          {
              SparkConf conf = new SparkConf().setMaster("local[*]").setAppName("NetworkWordCount");
              JavaStreamingContext jssc = new JavaStreamingContext(conf, Durations.seconds(10));
              JavaReceiverInputDStream<String> lines = jssc.socketTextStream("localhost", 9999);
              JavaDStream<String> words = lines.flatMap(
                      new FlatMapFunction<String, String>() {
                          @Override
                          public Iterator<String> call(String s) throws Exception {
                              List<String> list = new ArrayList<String>();
                              String [] strs = s.split(" ");
                              for (String ss : strs){
                                  list.add(ss);
                              }
                              return list.iterator();
                          }
                      }
              );

              jssc.checkpoint("file:///d:\\share\\spark\\checkpoint");
              JavaPairDStream<String, Integer> pairs = words.mapToPair(s -> new Tuple2<>(s, 1));

              JavaPairDStream<String, Integer> pairs1 = pairs.updateStateByKey(new Function2<List<Integer>, Optional<Integer>, Optional<Integer>>() {
                  public Optional<Integer> call(List<Integer> v1, Optional<Integer> v2) throws Exception {
                      //取得以前的狀態
                      Integer newState = v2.isPresent()? v2.get() : 0;
                      System.out.println("oldState : " + newState );
                      for (Integer i : v1) {
                          //更新舊狀態[加上這個批次出現的次數]
                          newState = newState + i;
                      }
                      return Optional.of(newState);
                  }
              });

              pairs1.print();

              jssc.start();              // Start the computation
              try {
                  jssc.awaitTermination();   // Wait for the computation to terminate
              } catch (InterruptedException e) {
                  e.printStackTrace();
              }
          }
      }


七、Streaming Windows 視窗化操作,跨批次
-----------------------------------------------------
    1.batch interval : 批次間隔,最小的時間單位

    2.window length  : 視窗長度,跨批次,批次的正數倍長度

    3.silder interval: 滑塊間隔,視窗計算的間隔時間,多長時間計算一次視窗,批次的整倍數。也就是相鄰視窗之間的間隔[多少個批次]

    4.常用視窗化操作:
        a.window(windowLength, slideInterval)
            //Return a new DStream which is computed based on windowed batches of the source DStream.
            //應用於非pair的DS,返回一個新的包含視窗內所有批次的DS

        b.countByWindow(windowLength, slideInterval)
            //Return a sliding window count of elements in the stream.
            //應用於非pair的DS,統計視窗內所有批次的元素的個數

        c.reduceByWindow(func, windowLength, slideInterval)
            //Return a new single-element stream, created by aggregating elements in the stream over a sliding interval using func.
            //The function should be associative and commutative so that it can be computed correctly in parallel.
            //應用於非pair的DS, 將視窗內所有批次的元素,按照func進行聚合

        d.reduceByKeyAndWindow(func, windowLength, slideInterval, [numTasks])
            //When called on a DStream of (K, V) pairs, returns a new DStream of (K, V) pairs
            //where the values for each key are aggregated using the given reduce function func over batches in a sliding window.
            //Note: By default, this uses Spark's default number of parallel tasks (2 for local mode,
            //and in cluster mode the number is determined by the config property spark.default.parallelism) to do the grouping.
            //You can pass an optional numTasks argument to set a different number of tasks.
            //應用於pair<k,v>的DS,將視窗內,key相同的value按照func進行聚合操作

        e.reduceByKeyAndWindow(func, invFunc, windowLength, slideInterval, [numTasks])
            //A more efficient version of the above reduceByKeyAndWindow()
            //where the reduce value of each window is calculated incrementally using the reduce values of the previous window.
            //This is done by reducing the new data that enters the sliding window,
            //and “inverse reducing” the old data that leaves the window.
            //An example would be that of “adding” and “subtracting” counts of keys as the window slides.
            //However, it is applicable only to “invertible reduce functions”,
            //that is, those reduce functions which have a corresponding “inverse reduce” function (taken as parameter invFunc).
            //Like in reduceByKeyAndWindow, the number of reduce tasks is configurable through an optional argument.
            //Note that checkpointing must be enabled for using this operation.

        f.countByValueAndWindow(windowLength, slideInterval, [numTasks])
            //When called on a DStream of (K, V) pairs,
            //returns a new DStream of (K, Long) pairs where the value of each key is its frequency within a sliding window.
            //Like in reduceByKeyAndWindow, the number of reduce tasks is configurable through an optional argument.
            //應用於pair<k,v>的DS,將視窗內,所有批次的pair<k,v>作為key,統計出現的次數作為value,進行返回

    5.示例
         a.reduceByKeyAndWindow():
            // Reduce last 30 seconds of data, every 10 seconds //windows length = 30  //slider interval = 10
            scala> val windowedWordCounts = pairs.reduceByKeyAndWindow((a:Int,b:Int) => (a + b), Seconds(30), Seconds(10))

            // Reduce last 30 seconds of data, every 10 seconds
            java> JavaPairDStream<String, Integer> windowedWordCounts = pairs.reduceByKeyAndWindow((i1, i2) -> i1 + i2, Durations.seconds(30), Durations.seconds(10));

         b.程式碼
            
package com.test.spark.streaming.java;

            import org.apache.spark.SparkConf;
            import org.apache.spark.api.java.Optional;
            import org.apache.spark.api.java.function.FlatMapFunction;
            import org.apache.spark.api.java.function.Function2;
            import org.apache.spark.api.java.function.PairFunction;
            import org.apache.spark.streaming.Duration;
            import org.apache.spark.streaming.Seconds;
            import org.apache.spark.streaming.api.java.JavaDStream;
            import org.apache.spark.streaming.api.java.JavaPairDStream;
            import org.apache.spark.streaming.api.java.JavaReceiverInputDStream;
            import org.apache.spark.streaming.api.java.JavaStreamingContext;
            import scala.Some;
            import scala.Tuple2;

            import java.util.ArrayList;
            import java.util.Iterator;
            import java.util.List;

            public class WCSparkDemo1 {

                public static void main(String[] args) throws Exception {
                    SparkConf conf = new SparkConf();
                    conf.setAppName("wc");
                    conf.setMaster("local[4]");
                    //建立Spark流應用上下文
                    JavaStreamingContext jsc = new JavaStreamingContext(conf, Seconds.apply(5));

                    jsc.checkpoint("file:///d:/share/spark/check");
                    //建立socket離散流
                    JavaReceiverInputDStream sock = jsc.socketTextStream("localhost",9999);
                    //壓扁
                    JavaDStream<String> wordsDS = sock.flatMap(new FlatMapFunction<String,String>() {
                        public Iterator call(String str) throws Exception {
                            List<String> list = new ArrayList<String>() ;
                            String[] arr = str.split(" ");
                            for(String s : arr){
                                list.add(s);
                            }
                            return list.iterator();
                        }
                    });

                    JavaDStream<String> wordsDS1 = wordsDS.window(Seconds.apply(15), Seconds.apply(10));
                    wordsDS1.print();

            //        //視窗內的單詞總個數統計[1個批次是5秒,3個批次為一個視窗,2個批次的視窗間隔 : 10秒計算一次,每次計算15秒內的批次值]
            //        JavaDStream js1 =  wordsDS.countByWindow(Seconds.apply(15), Seconds.apply(10));
            //        js1.print();

                    //對映成元組
            //        JavaPairDStream<String,Integer> pairDS = wordsDS.mapToPair(new PairFunction<String, String, Integer>() {
            //            public Tuple2<String, Integer> call(String s) throws Exception {
            //                return new Tuple2<String,Integer>(s,1);
            //            }
            //        }) ;

                    //視窗統計,將傳入的pair元素對作為元素,value為統計的元素對的個數
                    //輸入
                    //hello wordl
                    //hello wordl
                    //hello wordl
                    //hello wordl
                    //hello wordl
                    //輸出
                    // ((hello,1),5)
                    // ((wordl,1),5)
            //        JavaPairDStream countDS = pairDS.countByValueAndWindow(Seconds.apply(15), Seconds.apply(10));
            //        countDS.print();

            //        //key視窗聚合[1個批次是5秒,3個批次為一個視窗,2個批次的視窗間隔 : 10秒計算一次,每次計算15秒內的批次值]
            //        JavaPairDStream<String,Integer> countDS = pairDS.reduceByKeyAndWindow(new Function2<Integer, Integer, Integer>() {
            //            public Integer call(Integer v1, Integer v2) throws Exception {
            //                return v1 + v2;
            //            }
            //        },Seconds.apply(15), Seconds.apply(10));


                    //列印
            //        countDS.print();

                    jsc.start();

                    jsc.awaitTermination();

                    jsc.stop();
                }
            }


八、生產環境下,SparkStream容錯的實現
-------------------------------------------------------------
    1.兩個概念
        a.Spark Driver
            //驅動,執行使用者編寫的程式程式碼的主機。
        b.Executors
            //執行的spark driver提交的job,內部含有附加元件比如receiver
            //receiver接受資料並以block方式儲存在memory中,同時,將資料塊複製到其他Executor中,以備於容錯
            //每個批次末端會形成新的DStream,交給下游處理
            //如果receiver故障,其他執行器中的receiver會啟動進行資料的接收

    2.故障型別以及解決辦法
        a.如果Executor故障,所有未被處理的資料都會丟失,解決辦法可以通過wal(寫前日誌:hbase,hdfs/WALs都有使用)方式將資料預先寫入到hdfs或者s3中進行儲存.

        b.如果Driver故障,driver程式就會停止,所有executor都是丟失連線,停止計算過程。解決辦法需要配置和程式設計

    3.解決Driver故障
        a.第一步,配置Driver程式自動重啟,使用特定的叢集管理器clustermanager實現。

        b.第二步,重啟時,從宕機的地方進行重啟,通過檢查點機制可以實現該功能。
            1)建立檢查點
                //目錄可以是本地,可以是hdfs.
                jsc.checkpoint("d://....");

            2)使用JavaStreamingContext.getOrCreate方式建立上下文
                //不再使用new方式建立SparkStreamContext物件,
                //而是通過工廠方式JavaStreamingContext.getOrCreate()方法建立上下文物件,
                //首先會檢查檢查點目錄,看是否有job執行,沒有就new新的。
                JavaStreamingContext jsc = JavaStreamingContext.getOrCreate("file:///d:/scala/check", new Function0<JavaStreamingContext>() {
                    public JavaStreamingContext call() throws Exception {
                        JavaStreamingContext jsc = new JavaStreamingContext(conf, Seconds.apply(2));
                        jsc.checkpoint("file:///d:/scala/check");
                        return jsc;
                    }
                 });

        c.編寫容錯測試程式碼,計算過程要編寫到Function0的call方法中。
           










 package com.test.spark.streaming.java;

            import org.apache.spark.SparkConf;
            import org.apache.spark.api.java.function.Function0;
            import org.apache.spark.streaming.Duration;
            import org.apache.spark.streaming.api.java.JavaDStream;
            import org.apache.spark.streaming.api.java.JavaStreamingContext;

            public class SparkStreamingFaultTolerant {

                public static void main(String [] args)
                {
                    //建立一個JavaStreamingContext工廠
                    Function0<JavaStreamingContext> contextFactory = new Function0<JavaStreamingContext>() {
                        //首次建立context時呼叫該方法
                        public JavaStreamingContext call() {
                            SparkConf conf = new SparkConf();
                            conf.setMaster("local[4]");
                            conf.setAppName("wc");
                            JavaStreamingContext jssc = new JavaStreamingContext(conf,new Duration(2000));
                            JavaDStream<String> lines = jssc.socketTextStream("localhost",9999);

                            /*******  變換程式碼放到此處 ***********/
                            JavaDStream<Long> dsCount = lines.countByWindow(new Duration(24 * 60 * 60 * 1000),new Duration(2000));
                            dsCount.print();
                            //設定檢察點目錄
                            jssc.checkpoint("file:///d:/share/spark/checkpoint");
                            return jssc;
                        }
                    };

                    //失敗重建時會經過檢查點。
                    JavaStreamingContext context = JavaStreamingContext.getOrCreate("file:///d:/share/spark/checkpoint", contextFactory);

                    context.start();
                    try {
                        context.awaitTermination();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }