1. 程式人生 > >SparkStreaming(二)入門案例

SparkStreaming(二)入門案例

2、入門案例

2.1、計算單詞的數量

Java版本jdk.1.8以下:

public class WordCountOnline {
	public static void main(String[] args) throws InterruptedException {
		SparkConf conf = new SparkConf();
		/*
		 * 1、配置應用名稱以及配置兩個執行緒(注意要大於等於兩個執行緒)
		 */
		conf.setAppName("WordCountOnline").setMaster("local[2]");
		
		/*
		 *2、 建立SparkStreamingContext
		 * 	可以基於SparkConf引數,也可以基於持久化的SparkStreamingContext進行狀態恢復。
		 * 	典型的場景是Driver崩潰後由於SparkStreaming具有連續不斷的24小時不間斷的執行,所以需要再Driver
		 * 	重現啟動後從上次執行的狀態恢復過來,此時的狀態需要基於曾經的CheckPoint。
		 */
		 JavaStreamingContext jssc = new JavaStreamingContext(conf,Durations.seconds(1));
	
		 /*
		  * 3、建立SparkStreaming輸入資料來源
		  * 	a.資料輸入來源可以基於File,HDFS,Flume,Kafka,Socket等。
		  * 	b.在這裡我們指定資料來源於網路Socket埠,SparkStreaming連線上該埠,並在執行的時候一直監聽該埠的資料,
		  * 		並且後續根據業務需要不斷的有資料產生。
		  * 	c.如果經常在每隔5秒沒有資料就不斷啟動空的job其實是對資源的浪費,因為沒有接受到資料,仍然提交了job。
		  * 		實際的做法是提交job會判斷是否有資料,如果沒有的話就不再提交job。
		  */
		 JavaReceiverInputDStream<String> lines = jssc.socketTextStream("local", 9999);
		 
		 /*
		  * 4、我們就像對RDD程式設計一樣,基於DStream進行程式設計,原因是DStream是RDD產生的模板,在SparkStreaming發生計算之前,其實質
		  * 	是把每個Batch的DStream的操作翻譯成為了RDD操作
		  */
		 
		 //4.1、faltMap操作:將遍歷每一行,並且將每一行分割單詞返回String的Iterator
		JavaDStream<String> words = lines.flatMap(new FlatMapFunction<String, String>() {
			private static final long serialVersionUID = 1L;

			@Override
			public Iterable<String> call(String line) throws Exception {
				return Arrays.asList(line.split(","));
			}
		});
		
		//4.2、mapToPair操作:將每個單詞計數標記為1
		 JavaPairDStream<String, Integer> pairs = words.mapToPair(new PairFunction<String, String, Integer>() {

			@Override
			public Tuple2<String, Integer> call(String word) throws Exception {
				return new Tuple2<String, Integer>(word, 1);
			}
		});
		 
		//4.3、reduceByKey操作:將每個相同單詞的計數標記1相加
		JavaPairDStream<String, Integer> word_count = pairs.reduceByKey(new Function2<Integer, Integer, Integer>() {
			@Override
			public Integer call(Integer v1, Integer v2) throws Exception {
				return v1 + v2;
			}
		});
	
		/*
		 * 4.4、print操作:此處的print方法並不會觸發job執行,因為目前程式碼還處於SparkStreaming框架的控制之下,
		 * 	具體是否觸發時取決於設定的Duration時間的間隔。
		 */
		word_count.print();
		
		/*
		 * 5、開始計算:SparkStreaming引擎開始執行,也就是Driver開始執行,Driver啟動時位於一條執行緒中,
		 * 	當然內部當然還有訊息迴圈體,接收應用程式本身或者Executor傳送過來的訊息。
		 */
		jssc.start();
		
		//6、等待程式執行結束
		jssc.awaitTermination();
	}
}

Java版本jdk1.8:可以使用lambda表示式簡化程式碼:

public class WordCount {
	public static void main(String[] args) throws InterruptedException {
		//1、建立一個帶有兩個執行執行緒的本地StreamingContext,並且設定流資料每批的間隔為1秒
		/**
		 * appName引數是應用程式在叢集UI上顯示的名稱。
		 * master是Spark,Mesos或YARN叢集URL,或者是在本地模式下執行的特殊"local[*]"字串。
		 * 實際上,當在叢集上執行時,不希望在程式中對master進行硬編碼,而是使用spark-submit啟動應用程式並在那裡接收它。
		 * 但是,對於本地測試和單元測試,您可以傳遞"local[*]"以在程序中執行Spark Streaming。
		 * 請注意,這會在內部建立一個JavaSparkContext(所有Spark功能的起點),可以作為ssc.sparkContext訪問。
		 */
		SparkConf conf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCount");
		JavaStreamingContext jssc = new JavaStreamingContext(conf,Durations.seconds(1));
		
		/**
		 * 定義上下文後,需要執行以下操作:
		 * 	1.通過建立輸入DStreams來定義輸入源
		 * 	2.通過將轉換和輸出操作應用於DStream來定義流式計算。
		 * 	3.開始接收資料並使用streamingContext.start()處理它。
		 * 	4.等待使用streamingContext.awaitTermination()停止處理(手動或由於任何錯誤)。
		 * 	5.可以使用streamingContext.stop()手動停止處理。
		 * 要記住的要點:
		 *  1.一旦啟動了上下文,就不能設定或新增新的流式計算。
		 *  2.上下文停止後,無法重新啟動。
		 *  3.在JVM中只能同時啟用一個StreamingContext。
		 *  4.StreamingContext上的stop()也會停止SparkContext。要僅停止StreamingContext,請將名為stopSparkContext的stop()的可選引數設定為false。
		 *  5.只要在建立下一個StreamingContext之前停止前一個StreamingContext(不停止SparkContext),就可以重複使用SparkContext來建立多個StreamingContexts。
		 */
		//2、使用此context,我們可以建立一個DStream,它表示來自特定主機名(例如localhost)和埠(例如9999)TCP源的流資料。
		JavaReceiverInputDStream<String> lines = jssc.socketTextStream("localhost", 9999);
		//3、將每行文字以空格符切分成一個個單詞
		JavaDStream<String> words = lines.flatMap(x -> Arrays.asList(x.split(" ")).iterator());
		//4、計算每批單詞的數量
		JavaPairDStream<String, Integer> pairs = words.mapToPair(s -> new Tuple2<>(s, 1));
		JavaPairDStream<String, Integer> wordCounts = pairs.reduceByKey((i1,i2) -> i1 + i2);
		wordCounts.print();
		//5、開始計算
		jssc.start();
		//6、等待計算終止
		jssc.awaitTermination();
	}
}

2.2、流式篩選並打印出包含”error”的行

public class WordFilter {
	public static void main(String[] args) throws InterruptedException {
		//建立一個Java版本的Spark Context 
		SparkConf conf = new SparkConf().setMaster("local[2]").setAppName("WordFilter");
		//從SparkConf建立StreamingContext並指定1秒鐘的批處理大小
		JavaStreamingContext jssc = new JavaStreamingContext(conf,Durations.seconds(1));
		//以埠7777作為輸入來源建立DStream
		JavaReceiverInputDStream<String> lines = jssc.socketTextStream("localhost", 7777);
		//從DStream中篩選出包含字串"error"的行
		JavaDStream<String> errorLines = lines.filter(new Function<String,Boolean>(){
			@Override
			public Boolean call(String line) throws Exception {
				return line.contains("error");
			}
		});
		//打印出有"error"的行
		errorLines.print();
		//啟動流計算環境StreamingContext並等待它"完成"
		jssc.start();
		//等待作業完成
		jssc.awaitTermination();
	}
}