大資料之Spark(六)--- Spark Streaming介紹,DStream,Receiver,Streamin整合Kafka,Windows,容錯的實現
阿新 • • 發佈:2018-11-19
一、Spark Streaming介紹 ----------------------------------------------------------- 1.介紹 是spark core的擴充套件,針對實時資料的實時流處理技術 具有可擴充套件、高吞吐量、容錯的特點 資料可以是來自於kafka,flume,tcpsocket,使用高階函式(map reduce filter ,join , windows),處理的資料結果可以推送到database,hdfs 資料流處理還可以應用到機器學習和圖計算中 2.內部執行流程: spark接受實時資料流,分成batch(分批次)進行處理,最終在每個batch終產生結果stream. 3.discretized stream or DStream, 離散流,表示的是連續的資料流。 DStream可通過kafka、flume等輸入資料流產生,也可以通過對其他DStream進行高階變換產生(類似於RDD)。 在內部,DStream是表現為RDD序列。 二、SparkStreaming的程式設計實現 ------------------------------------------------------------ 1.新增依賴 <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-streaming_2.11</artifactId> <version>2.1.0</version> </dependency> 2.編寫scala -- SparkStreamingDemo
package com.test.spark.streaming.scala import org.apache.spark._ import org.apache.spark.streaming._ import org.apache.spark.streaming.StreamingContext._ object SparkStreamingDemo { def main(args: Array[String]): Unit = { //注意local[n] n > 1 val conf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCount") //建立sparkstreaming上下文一秒鐘一個批次 val ssc = new StreamingContext(conf, Seconds(1)) //建立套接字文字流 val lines = ssc.socketTextStream("lcoalhost", 9999) // val words = lines.flatMap(_.split(" ")) // val pairs = words.map(word => (word, 1)) // val wordCounts = pairs.reduceByKey(_ + _) // wordCounts.print() //啟動 ssc.start() // Start the computation //等待結束 ssc.awaitTermination() // Wait for the computation to terminate } }
3.先啟動nc伺服器,產生資料來源 cmd> nc -l -p 9999 4.啟動scala程式,檢視結果 run 5.打包,丟到ubuntu上執行 $> nc -lk 9999 $> spark-submit --master spark://s100:7077 --name netwc --class com.test.spark.streaming.scala.SparkStreamingDemo TestSpark-1.0-SNAPSHOT.jar 6.java版
package com.test.spark.streaming.java;
import org.apache.spark.*;
import org.apache.spark.api.java.function.*;
import org.apache.spark.streaming.*;
import org.apache.spark.streaming.api.java.*;
import scala.Tuple2;
import scala.actors.threadpool.Arrays;
public class WCDemoJava {
public static void main(String [] args)
{
SparkConf conf = new SparkConf().setMaster("spark://s100:7077).setAppName("NetworkWordCount");
JavaStreamingContext jssc = new JavaStreamingContext(conf, Durations.seconds(1));
JavaReceiverInputDStream<String> lines = jssc.socketTextStream("s100", 9999);
JavaDStream<String> words = lines.flatMap(x -> Arrays.asList(x.split(" ")).iterator());
JavaPairDStream<String, Integer> pairs = words.mapToPair(s -> new Tuple2<>(s, 1));
JavaPairDStream<String, Integer> wordCounts = pairs.reduceByKey((i1, i2) -> i1 + i2);
wordCounts.print();
jssc.start(); // Start the computation
try {
jssc.awaitTermination(); // Wait for the computation to terminate
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
三、DStream -- 離散流介紹 ------------------------------------------------------------ [注意事項] 1.啟動上下文之後,不能啟動新的流或者新增新的 2.上下文停止後不能restart. 3.同一JVM只有一個active的streamingcontext 4.停止streamingContext會一同stop掉SparkContext,如若只停止StreamingContext. ssc.stop(false|true); 5.SparkContext可以建立多個StreamingContext,建立新的之前要停掉舊的。 四、Receiver -- 接收者 ---------------------------------------------------------- 1.介紹 Receiver是接收者,從source接受資料,儲存在記憶體中,供spark處理。 2.源 基本源:fileSystem | socket,內建API支援 高階源:kafka | flume | ...,需要引入pom.xml依賴 3.注意 使用local模式時,不能使用一個執行緒.使用的local[n],n需要大於receiver的個數。 五、SparkStreaming整合Kafka ------------------------------------------------------------- 1.啟動kafka叢集 a.啟動zk b.啟動kafka[s200,s300,s400] $> /soft/kafka/bin/kafka-server-start.sh -daemon /soft/kafka/config/server.properties c.驗證kafka是否啟動成功 $> netstat -ano | grep 9092 d.建立kafka主題 $> kafka-topics.sh --create --zookeeper s100:2181 --replication-factor 3 --partitions 3 --topic sparktopic1 2.引入pom.xml ... <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-streaming-kafka-0-10_2.11</artifactId> <version>2.1.0</version> </dependency> 3.編寫java程式碼
package com.test.spark.kafka;
import java.util.*;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.streaming.Durations;
import org.apache.spark.streaming.api.java.*;
import org.apache.spark.streaming.kafka010.*;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.serialization.StringDeserializer;
import scala.Tuple2;
public class SparkKafkaDemo {
public static void main(String [] args)
{
SparkConf conf = new SparkConf().setMaster("local[*]").setAppName("NetworkWordCount");
JavaStreamingContext jssc = new JavaStreamingContext(conf, Durations.seconds(10));
Map<String, Object> kafkaParams = new HashMap<>();
kafkaParams.put("bootstrap.servers", "s200:9092,s300:9092,s400:9092");
kafkaParams.put("key.deserializer", StringDeserializer.class);
kafkaParams.put("value.deserializer", StringDeserializer.class);
kafkaParams.put("group.id", "g6");
kafkaParams.put("auto.offset.reset", "latest");
kafkaParams.put("enable.auto.commit", false);
Collection<String> topics = Arrays.asList("sparktopic1");
JavaInputDStream<ConsumerRecord<String, String>> stream =
KafkaUtils.createDirectStream(
jssc,
LocationStrategies.PreferConsistent(),
ConsumerStrategies.<String, String>Subscribe(topics, kafkaParams)
);
JavaDStream js = stream.map(
new Function<ConsumerRecord<String, String>, String>() {
@Override
public String call(ConsumerRecord<String, String> v1) throws Exception {
return v1.value();
}
}
);
JavaDStream js1 = js.flatMap(new FlatMapFunction<String, String>() {
@Override
public Iterator<String> call(String s) throws Exception {
List<String> list = new ArrayList<String>();
String [] strs = s.split(" ");
for (String ss : strs){
list.add(ss);
}
return list.iterator();
}
});
JavaPairDStream<String,Integer> js2 = js1.mapToPair( x -> new Tuple2<>(x,1));
JavaPairDStream<String,Integer> js3 = js2.reduceByKey((x, y) -> x + y);
js3.print();
jssc.start();
try {
jssc.awaitTermination();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
4.開始kafka控制檯生產者,測試 $> kafka-console-producer.sh --broker-list s200:9092 --topic sparktopic1 六、updateStateByKey函式 ----------------------------------------------------------------- 1.對DStream的每個key,value應用更新函式,從而產生新的DStream 2.分兩步 a.定義狀態 --- 狀態可以是任意型別 b.定義狀態更新函式 3.解釋 比如單詞統計,進行到maptopair將單詞變成tiple<"word",1>,對變換的DStream應用 updateStateByKey函式 函式對應的法則是保持key<"word">原來的1的狀態,一旦再出現key<'word'>的時候,就對狀態值+1,kv就變成<'word',2>進行返回了 因為是流計算,是分批次進行計算的,所以,key的狀態會跨批次的,一直應用法則進行更新和迭代 這樣統計出來的單詞就不是單批次的單詞數量,而是從開始到現在,所有批次的總數量了 4.程式碼演示
package com.test.spark.streaming.java;
import org.apache.spark.*;
import org.apache.spark.api.java.Optional;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.streaming.*;
import org.apache.spark.streaming.api.java.*;
import scala.Tuple2;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
public class WCDemoJava {
public static void main(String [] args)
{
SparkConf conf = new SparkConf().setMaster("local[*]").setAppName("NetworkWordCount");
JavaStreamingContext jssc = new JavaStreamingContext(conf, Durations.seconds(10));
JavaReceiverInputDStream<String> lines = jssc.socketTextStream("localhost", 9999);
JavaDStream<String> words = lines.flatMap(
new FlatMapFunction<String, String>() {
@Override
public Iterator<String> call(String s) throws Exception {
List<String> list = new ArrayList<String>();
String [] strs = s.split(" ");
for (String ss : strs){
list.add(ss);
}
return list.iterator();
}
}
);
jssc.checkpoint("file:///d:\\share\\spark\\checkpoint");
JavaPairDStream<String, Integer> pairs = words.mapToPair(s -> new Tuple2<>(s, 1));
JavaPairDStream<String, Integer> pairs1 = pairs.updateStateByKey(new Function2<List<Integer>, Optional<Integer>, Optional<Integer>>() {
public Optional<Integer> call(List<Integer> v1, Optional<Integer> v2) throws Exception {
//取得以前的狀態
Integer newState = v2.isPresent()? v2.get() : 0;
System.out.println("oldState : " + newState );
for (Integer i : v1) {
//更新舊狀態[加上這個批次出現的次數]
newState = newState + i;
}
return Optional.of(newState);
}
});
pairs1.print();
jssc.start(); // Start the computation
try {
jssc.awaitTermination(); // Wait for the computation to terminate
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
七、Streaming Windows 視窗化操作,跨批次 ----------------------------------------------------- 1.batch interval : 批次間隔,最小的時間單位 2.window length : 視窗長度,跨批次,批次的正數倍長度 3.silder interval: 滑塊間隔,視窗計算的間隔時間,多長時間計算一次視窗,批次的整倍數。也就是相鄰視窗之間的間隔[多少個批次] 4.常用視窗化操作: a.window(windowLength, slideInterval) //Return a new DStream which is computed based on windowed batches of the source DStream. //應用於非pair的DS,返回一個新的包含視窗內所有批次的DS b.countByWindow(windowLength, slideInterval) //Return a sliding window count of elements in the stream. //應用於非pair的DS,統計視窗內所有批次的元素的個數 c.reduceByWindow(func, windowLength, slideInterval) //Return a new single-element stream, created by aggregating elements in the stream over a sliding interval using func. //The function should be associative and commutative so that it can be computed correctly in parallel. //應用於非pair的DS, 將視窗內所有批次的元素,按照func進行聚合 d.reduceByKeyAndWindow(func, windowLength, slideInterval, [numTasks]) //When called on a DStream of (K, V) pairs, returns a new DStream of (K, V) pairs //where the values for each key are aggregated using the given reduce function func over batches in a sliding window. //Note: By default, this uses Spark's default number of parallel tasks (2 for local mode, //and in cluster mode the number is determined by the config property spark.default.parallelism) to do the grouping. //You can pass an optional numTasks argument to set a different number of tasks. //應用於pair<k,v>的DS,將視窗內,key相同的value按照func進行聚合操作 e.reduceByKeyAndWindow(func, invFunc, windowLength, slideInterval, [numTasks]) //A more efficient version of the above reduceByKeyAndWindow() //where the reduce value of each window is calculated incrementally using the reduce values of the previous window. //This is done by reducing the new data that enters the sliding window, //and “inverse reducing” the old data that leaves the window. //An example would be that of “adding” and “subtracting” counts of keys as the window slides. //However, it is applicable only to “invertible reduce functions”, //that is, those reduce functions which have a corresponding “inverse reduce” function (taken as parameter invFunc). //Like in reduceByKeyAndWindow, the number of reduce tasks is configurable through an optional argument. //Note that checkpointing must be enabled for using this operation. f.countByValueAndWindow(windowLength, slideInterval, [numTasks]) //When called on a DStream of (K, V) pairs, //returns a new DStream of (K, Long) pairs where the value of each key is its frequency within a sliding window. //Like in reduceByKeyAndWindow, the number of reduce tasks is configurable through an optional argument. //應用於pair<k,v>的DS,將視窗內,所有批次的pair<k,v>作為key,統計出現的次數作為value,進行返回 5.示例 a.reduceByKeyAndWindow(): // Reduce last 30 seconds of data, every 10 seconds //windows length = 30 //slider interval = 10 scala> val windowedWordCounts = pairs.reduceByKeyAndWindow((a:Int,b:Int) => (a + b), Seconds(30), Seconds(10)) // Reduce last 30 seconds of data, every 10 seconds java> JavaPairDStream<String, Integer> windowedWordCounts = pairs.reduceByKeyAndWindow((i1, i2) -> i1 + i2, Durations.seconds(30), Durations.seconds(10)); b.程式碼
package com.test.spark.streaming.java;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.Optional;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.streaming.Duration;
import org.apache.spark.streaming.Seconds;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaPairDStream;
import org.apache.spark.streaming.api.java.JavaReceiverInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import scala.Some;
import scala.Tuple2;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
public class WCSparkDemo1 {
public static void main(String[] args) throws Exception {
SparkConf conf = new SparkConf();
conf.setAppName("wc");
conf.setMaster("local[4]");
//建立Spark流應用上下文
JavaStreamingContext jsc = new JavaStreamingContext(conf, Seconds.apply(5));
jsc.checkpoint("file:///d:/share/spark/check");
//建立socket離散流
JavaReceiverInputDStream sock = jsc.socketTextStream("localhost",9999);
//壓扁
JavaDStream<String> wordsDS = sock.flatMap(new FlatMapFunction<String,String>() {
public Iterator call(String str) throws Exception {
List<String> list = new ArrayList<String>() ;
String[] arr = str.split(" ");
for(String s : arr){
list.add(s);
}
return list.iterator();
}
});
JavaDStream<String> wordsDS1 = wordsDS.window(Seconds.apply(15), Seconds.apply(10));
wordsDS1.print();
// //視窗內的單詞總個數統計[1個批次是5秒,3個批次為一個視窗,2個批次的視窗間隔 : 10秒計算一次,每次計算15秒內的批次值]
// JavaDStream js1 = wordsDS.countByWindow(Seconds.apply(15), Seconds.apply(10));
// js1.print();
//對映成元組
// JavaPairDStream<String,Integer> pairDS = wordsDS.mapToPair(new PairFunction<String, String, Integer>() {
// public Tuple2<String, Integer> call(String s) throws Exception {
// return new Tuple2<String,Integer>(s,1);
// }
// }) ;
//視窗統計,將傳入的pair元素對作為元素,value為統計的元素對的個數
//輸入
//hello wordl
//hello wordl
//hello wordl
//hello wordl
//hello wordl
//輸出
// ((hello,1),5)
// ((wordl,1),5)
// JavaPairDStream countDS = pairDS.countByValueAndWindow(Seconds.apply(15), Seconds.apply(10));
// countDS.print();
// //key視窗聚合[1個批次是5秒,3個批次為一個視窗,2個批次的視窗間隔 : 10秒計算一次,每次計算15秒內的批次值]
// JavaPairDStream<String,Integer> countDS = pairDS.reduceByKeyAndWindow(new Function2<Integer, Integer, Integer>() {
// public Integer call(Integer v1, Integer v2) throws Exception {
// return v1 + v2;
// }
// },Seconds.apply(15), Seconds.apply(10));
//列印
// countDS.print();
jsc.start();
jsc.awaitTermination();
jsc.stop();
}
}
八、生產環境下,SparkStream容錯的實現 ------------------------------------------------------------- 1.兩個概念 a.Spark Driver //驅動,執行使用者編寫的程式程式碼的主機。 b.Executors //執行的spark driver提交的job,內部含有附加元件比如receiver //receiver接受資料並以block方式儲存在memory中,同時,將資料塊複製到其他Executor中,以備於容錯 //每個批次末端會形成新的DStream,交給下游處理 //如果receiver故障,其他執行器中的receiver會啟動進行資料的接收 2.故障型別以及解決辦法 a.如果Executor故障,所有未被處理的資料都會丟失,解決辦法可以通過wal(寫前日誌:hbase,hdfs/WALs都有使用)方式將資料預先寫入到hdfs或者s3中進行儲存. b.如果Driver故障,driver程式就會停止,所有executor都是丟失連線,停止計算過程。解決辦法需要配置和程式設計 3.解決Driver故障 a.第一步,配置Driver程式自動重啟,使用特定的叢集管理器clustermanager實現。 b.第二步,重啟時,從宕機的地方進行重啟,通過檢查點機制可以實現該功能。 1)建立檢查點 //目錄可以是本地,可以是hdfs. jsc.checkpoint("d://...."); 2)使用JavaStreamingContext.getOrCreate方式建立上下文 //不再使用new方式建立SparkStreamContext物件, //而是通過工廠方式JavaStreamingContext.getOrCreate()方法建立上下文物件, //首先會檢查檢查點目錄,看是否有job執行,沒有就new新的。 JavaStreamingContext jsc = JavaStreamingContext.getOrCreate("file:///d:/scala/check", new Function0<JavaStreamingContext>() { public JavaStreamingContext call() throws Exception { JavaStreamingContext jsc = new JavaStreamingContext(conf, Seconds.apply(2)); jsc.checkpoint("file:///d:/scala/check"); return jsc; } }); c.編寫容錯測試程式碼,計算過程要編寫到Function0的call方法中。
package com.test.spark.streaming.java;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.Function0;
import org.apache.spark.streaming.Duration;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
public class SparkStreamingFaultTolerant {
public static void main(String [] args)
{
//建立一個JavaStreamingContext工廠
Function0<JavaStreamingContext> contextFactory = new Function0<JavaStreamingContext>() {
//首次建立context時呼叫該方法
public JavaStreamingContext call() {
SparkConf conf = new SparkConf();
conf.setMaster("local[4]");
conf.setAppName("wc");
JavaStreamingContext jssc = new JavaStreamingContext(conf,new Duration(2000));
JavaDStream<String> lines = jssc.socketTextStream("localhost",9999);
/******* 變換程式碼放到此處 ***********/
JavaDStream<Long> dsCount = lines.countByWindow(new Duration(24 * 60 * 60 * 1000),new Duration(2000));
dsCount.print();
//設定檢察點目錄
jssc.checkpoint("file:///d:/share/spark/checkpoint");
return jssc;
}
};
//失敗重建時會經過檢查點。
JavaStreamingContext context = JavaStreamingContext.getOrCreate("file:///d:/share/spark/checkpoint", contextFactory);
context.start();
try {
context.awaitTermination();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}