Spark定製班第1課:通過案例對Spark Streaming透徹理解三板斧之一:解密Spark Streaming另類實驗及Spark Streaming本質解析
阿新 • • 發佈:2019-01-23
把程式的Batch Interval設定從30秒改成300秒:package com.dt.spark.streaming import org.apache.spark.SparkConf import org.apache.spark.streaming.StreamingContext import org.apache.spark.streaming.Seconds object OnlineBlackListFilter { def main(args: Array[String]){ /** * 第1步:建立Spark的配置物件SparkConf,設定Spark程式的執行時的配置資訊, * 例如說通過setMaster來設定程式要連結的Spark叢集的Master的URL,如果設定 * 為local,則代表Spark程式在本地執行,特別適合於機器配置條件非常差(例如 * 只有1G的記憶體)的初學者。 */ // 建立SparkConf物件 val conf = new SparkConf() // 設定應用程式的名稱,在程式執行的監控介面可以看到名稱 conf.setAppName("OnlineBlackListFilter") // 此時,程式在Spark叢集 conf.setMaster("spark://Master:7077") val ssc = new StreamingContext(conf, Seconds(30)) /** * 黑名單資料準備,實際上黑名單一般都是動態的,例如在Redis或者資料庫中, * 黑名單的生成往往有複雜的業務邏輯,具體情況演算法不同, * 但是在Spark Streaming進行處理的時候每次都能夠訪問完整的資訊。 */ val blackList = Array(("Spy", true),("Cheater", true)) val blackListRDD = ssc.sparkContext.parallelize(blackList, 8) val adsClickStream = ssc.socketTextStream("Master", 9999) /** * 此處模擬的廣告點選的每條資料的格式為:time、name * 此處map操作的結果是name、(time,name)的格式 */ val adsClickStreamFormatted = adsClickStream.map { ads => (ads.split(" ")(1), ads) } adsClickStreamFormatted.transform(userClickRDD => { // 通過leftOuterJoin操作既保留了左側使用者廣告點選內容的RDD的所有內容, // 又獲得了相應點選內容是否在黑名單中 val joinedBlackListRDD = userClickRDD.leftOuterJoin(blackListRDD) /** * 進行filter過濾的時候,其輸入元素是一個Tuple:(name,((time,name), boolean)) * 其中第一個元素是黑名單的名稱,第二元素的第二個元素是進行leftOuterJoin的時候是否存在的值。 * 如果存在的話,表明當前廣告點選是黑名單,需要過濾掉,否則的話是有效點選內容; */ val validClicked = joinedBlackListRDD.filter(joinedItem => { if(joinedItem._2._2.getOrElse(false)) { false } else { true } }) validClicked.map(validClick => {validClick._2._1}) }).print /** * 計算後的有效資料一般都會寫入Kafka中,下游的計費系統會從kafka中pull到有效資料進行計費 */ ssc.start() ssc.awaitTermination() } }