spark學習記錄(十三、SparkStreaming)
阿新 • • 發佈:2019-01-13
一、SparkStreaming簡介
SparkStreaming是流式處理框架,是Spark API的擴充套件,支援可擴充套件、高吞吐量、容錯的實時資料流處理,實時資料的來源可以是:Kafka, Flume, Twitter, ZeroMQ或者TCP sockets,並且可以使用高階功能的複雜運算元來處理流資料。例如:map,reduce,join,window 。最終,處理後的資料可以存放在檔案系統,資料庫等,方便實時展現。
SparkStreaming與Storm的區別:
- Storm是純實時的流式處理框架,SparkStreaming是準實時的處理框架(微批處理)。因為微批處理,SparkStreaming的吞吐量比Storm要高。Storm是來一條資料就處理一次,SparkStreaming是處理某段時間內來的資料。
- Storm 的事務機制要比SparkStreaming的要完善。
- Storm支援動態資源排程。(spark1.2開始和之後也支援)
- SparkStreaming擅長複雜的業務處理,Storm不擅長複雜的業務處理,擅長簡單的彙總型計算。
- receiver task是7*24小時一直在執行,一直接收資料,將一段時間內接收來的資料儲存到batch中。假設batchInterval為5s,那麼會將接收來的資料每隔5秒封裝到一個batch中,batch沒有分散式計算特性,這一個batch的資料又被封裝到一個RDD中,RDD最終封裝到一個DStream中。
例如:假設batchInterval為5秒,每隔5秒通過SparkStreamin將得到一個DStream,在第6秒的時候計算這5秒的資料,假設執行任務的時間是3秒,那麼第6~9秒一邊在接收資料,一邊在計算任務,9~10秒只是在接收資料。然後在第11秒的時候重複上面的操作。
- 如果job執行的時間大於batchInterval會有什麼樣的問題?
如果接受過來的資料設定的級別是僅記憶體,接收來的資料會越堆積越多,最後可能會導致OOM(如果設定StorageLevel包含disk, 則記憶體存放不下的資料會溢寫至disk, 加大延遲 )
二、java程式碼
新增依賴:
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.12</artifactId>
<version>2.4.0</version>
<scope>provided</scope>
</dependency>
在hadoop1上輸入命令:
nc -lk 9999
hello Sam
hello Tom
hello Jetty
public class JavaExample {
public static void main(String[] args) throws InterruptedException {
SparkConf conf = new SparkConf();
conf.setMaster("local[2]");
conf.setAppName("SparkStreamingTest");
JavaStreamingContext jsc = new JavaStreamingContext(conf, Durations.seconds(5));
JavaReceiverInputDStream<String> stream = jsc.socketTextStream("192.168.30.141", 9999);
final JavaDStream<String> words = stream.flatMap(new FlatMapFunction<String, String>() {
public Iterator<String> call(String s) throws Exception {
return Arrays.asList(s.split(" ")).iterator();
}
});
JavaPairDStream<String, Integer> pair = words.mapToPair(new PairFunction<String, String, Integer>() {
public Tuple2<String, Integer> call(String s) throws Exception {
return new Tuple2<String, Integer>(s, 1);
}
});
JavaPairDStream<String, Integer> reduceByKey = pair.reduceByKey(new Function2<Integer, Integer, Integer>() {
public Integer call(Integer v1, Integer v2) throws Exception {
return v1 + v2;
}
});
//列印輸出
reduceByKey.print();
/**
* foreachRDD可以拿到DStream中的RDD,對拿到的RDD可以使用RDD的transformations運算元轉換,
* 要對拿到的RDD使用action觸發執行,否則foreachRDD也不會執行
* foreachRDD中call方法內,拿到RDD的運算元外,程式碼在Driver執行,
* 可以使用這個運算元實現動態改變廣播變數,即廣播讀取一個檔案,只修改檔案內容,不停止程式碼
*/
reduceByKey.foreachRDD(new VoidFunction<JavaPairRDD<String, Integer>>() {
public void call(JavaPairRDD<String, Integer> rdd) throws Exception {
//SparkStreaming廣播變數
SparkContext context = rdd.context();
JavaSparkContext javaSparkContext = new JavaSparkContext(context);
Broadcast<String> broadcast = javaSparkContext.broadcast("hello");
System.out.println(broadcast.getValue());
JavaPairRDD<String, Integer> mapToPair = rdd.mapToPair(new PairFunction<Tuple2<String, Integer>, String, Integer>() {
public Tuple2<String, Integer> call(Tuple2<String, Integer> tuple2) throws Exception {
return new Tuple2<String, Integer>(tuple2._1 + "~", tuple2._2);
}
});
mapToPair.foreach(new VoidFunction<Tuple2<String, Integer>>() {
public void call(Tuple2<String, Integer> tuple2) throws Exception {
System.out.println(tuple2);
}
});
}
});
jsc.start();
jsc.awaitTermination();
}
}
三、運算元
/**
* updateStateByKey:
* 返回一個新的“狀態”Dstream,通過給定的func來更新之前的每個狀態的key對應的value值,這也可以用於維護key的任意狀態資料。
* 注意:作用在(K,V)格式的DStream上
* <p>
* updateStateByKey的主要功能:
* 1、Spark Streaming中為每一個Key維護一份state狀態,state型別可以是任意型別的的, 可以是一個自定義的物件,那麼更新函式也可以是自定義的。
* 2、通過更新函式對該key的狀態不斷更新,對於每個新的batch而言,Spark Streaming會在使用updateStateByKey的時候為已經存在的key進行
* state的狀態更新
* (對於每個新出現的key,會同樣的執行state的更新函式操作),
* 如果要不斷的更新每個key的state,就一定涉及到了狀態的儲存和容錯,這個時候就需要開啟checkpoint機制和功能
*
* @author root
*/
public class Operate_updateStateByKey {
public static void main(String[] args) throws InterruptedException {
SparkConf conf = new SparkConf().setMaster("local").setAppName("Operate_count");
JavaStreamingContext jsc = new JavaStreamingContext(conf, Durations.seconds(5));
/**
* checkpoint儲存地址
* 如果batchInterval小於10,那麼10s會將記憶體中的資料寫入硬碟中
* 如果batchInterval大於10,那麼以batchInterval為準
*
* 這樣做是為了防止頻繁的寫hdfs
*/
jsc.checkpoint("checkpoint");
//讀取檔案儲存地址
JavaDStream<String> textFileStream = jsc.textFileStream("data");
/**
* 實現一個累加統計word的功能
*/
JavaPairDStream<String, Integer> mapToPair = textFileStream.flatMap(new FlatMapFunction<String, String>() {
private static final long serialVersionUID = 1L;
public Iterator<String> call(String t) throws Exception {
return Arrays.asList(t.split(" ")).iterator();
}
}).mapToPair(new PairFunction<String, String, Integer>() {
private static final long serialVersionUID = 1L;
public Tuple2<String, Integer> call(String t) throws Exception {
return new Tuple2<String, Integer>(t.trim(), 1);
}
});
JavaPairDStream<String, Integer> updateStateByKey = mapToPair.updateStateByKey(new Function2<List<Integer>, Optional<Integer>, Optional<Integer>>() {
private static final long serialVersionUID = 1L;
public Optional<Integer> call(List<Integer> values, Optional<Integer> state)
throws Exception {
/**
* values:經過分組最後 這個key所對應的value [1,1,1,1,1]
* state:這個key在本次之前之前的狀態
*/
Integer updateValue = 0;
if (state.isPresent()) {
updateValue = state.get();
}
for (Integer i : values) {
updateValue += i;
}
return Optional.of(updateValue);
}
});
updateStateByKey.print();
jsc.start();
jsc.awaitTermination();
jsc.close();
}
}
public class Operate_window {
public static void main(String[] args) throws InterruptedException {
SparkConf conf = new SparkConf();
conf.setMaster("local[2]");
conf.setAppName("Operate_window");
JavaStreamingContext jsc = new JavaStreamingContext(conf, Durations.seconds(5));
/**
* 設定日誌級別
*/
jsc.sparkContext().setLogLevel("WARN");
//設定checkpoint
jsc.checkpoint("./checkpoint");
JavaReceiverInputDStream<String> textStream = jsc.socketTextStream("192.168.30.141", 9999);
// 切分
JavaDStream<String> flatMap = textStream.flatMap(new FlatMapFunction<String, String>() {
public Iterator<String> call(String s) throws Exception {
return Arrays.asList(s.split(" ")).iterator();
}
});
// 計數
JavaPairDStream<String, Integer> mapToPair = flatMap.mapToPair(new PairFunction<String, String, Integer>() {
public Tuple2<String, Integer> call(String s) throws Exception {
return new Tuple2<String, Integer>(s, 1);
}
});
/**
*每個10s,計算最近60s內的資料,那麼這個視窗大小為60s,裡面有12個rdd,在沒有計算之前,這些rdd是不會計算的
* 那麼在計算的時候會將這12個rdd聚合起來,然後一起執行reduceByKeyAndWindow,
* reduceByKeyAndWindow是針對視窗操作而不是DStream
*/
JavaPairDStream<String, Integer> reduceByKeyAndWindow = mapToPair.reduceByKeyAndWindow(new Function2<Integer, Integer, Integer>() {
public Integer call(Integer v1, Integer v2) throws Exception {
//加上新進視窗的批次
return v1 + v2;
}
//每隔5s計算最近15s的資料,這兩個引數要是bathInterval的整數倍即上面設定的5s的整數倍
}, new Function2<Integer, Integer, Integer>() {
public Integer call(Integer v1, Integer v2) throws Exception {
//減去新離開視窗的批次
return v1 - v2;
}
}, Durations.seconds(15), Durations.seconds(5));
reduceByKeyAndWindow.print();
jsc.start();
jsc.awaitTermination();
jsc.close();
}
}
public class Operate_transform {
public static void main(String[] args) throws InterruptedException {
SparkConf conf = new SparkConf();
conf.setMaster("local[2]").setAppName("Operate_transform");
JavaStreamingContext jsc = new JavaStreamingContext(conf, Durations.seconds(5));
//黑名單
List<String> list = Arrays.asList("Sam");
final Broadcast<List<String>> broadcast = jsc.sparkContext().broadcast(list);
JavaReceiverInputDStream<String> stream = jsc.socketTextStream("192.168.30.141", 9999);
JavaPairDStream<String, String> mapToPair = stream.mapToPair(new PairFunction<String, String, String>() {
public Tuple2<String, String> call(String s) throws Exception {
return new Tuple2<String, String>(s.split(" ")[1], s);
}
});
/**
* 過濾黑名單上的名字
* transform可以拿到DStream中的RDD,做RDD到RDD之間的轉換,不需要Action運算元觸發,需要返回型別RDD
* 注意:transform call方法內,拿到RDD運算元外的程式碼在Driver端執行,也可以做到動態改變廣播變數
*/
JavaDStream<String> transform = mapToPair.transform(new Function<JavaPairRDD<String, String>, JavaRDD<String>>() {
public JavaRDD<String> call(JavaPairRDD<String, String> nameRDD) throws Exception {
//資料過濾
JavaPairRDD<String, String> filter = nameRDD.filter(new Function<Tuple2<String, String>, Boolean>() {
public Boolean call(Tuple2<String, String> tuple2) throws Exception {
return !broadcast.getValue().contains(tuple2._1);
}
});
JavaRDD<String> map = filter.map(new Function<Tuple2<String, String>, String>() {
public String call(Tuple2<String, String> tuple2) throws Exception {
return tuple2._2;
}
});
return map;
}
});
transform.print();
jsc.start();
jsc.awaitTermination();
jsc.close();
}
}