1. 程式人生 > >Spark定製班第1課:通過案例對Spark Streaming透徹理解三板斧之一:解密Spark Streaming另類實驗及Spark Streaming本質解析

Spark定製班第1課:通過案例對Spark Streaming透徹理解三板斧之一:解密Spark Streaming另類實驗及Spark Streaming本質解析

package com.dt.spark.streaming

import org.apache.spark.SparkConf
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.Seconds

object OnlineBlackListFilter {
    def main(args: Array[String]){
      /**
       * 第1步:建立Spark的配置物件SparkConf,設定Spark程式的執行時的配置資訊,
       * 例如說通過setMaster來設定程式要連結的Spark叢集的Master的URL,如果設定
       * 為local,則代表Spark程式在本地執行,特別適合於機器配置條件非常差(例如
       * 只有1G的記憶體)的初學者。
       */
      // 建立SparkConf物件
      val conf = new SparkConf()
      // 設定應用程式的名稱,在程式執行的監控介面可以看到名稱
      conf.setAppName("OnlineBlackListFilter")
      // 此時,程式在Spark叢集
      conf.setMaster("spark://Master:7077")

      val ssc = new StreamingContext(conf, Seconds(30))

      /**
       * 黑名單資料準備,實際上黑名單一般都是動態的,例如在Redis或者資料庫中,
       * 黑名單的生成往往有複雜的業務邏輯,具體情況演算法不同,
       * 但是在Spark Streaming進行處理的時候每次都能夠訪問完整的資訊。
       */
      val blackList = Array(("Spy", true),("Cheater", true))
      val blackListRDD = ssc.sparkContext.parallelize(blackList, 8)

      val adsClickStream = ssc.socketTextStream("Master", 9999)

      /**
       * 此處模擬的廣告點選的每條資料的格式為:time、name
       * 此處map操作的結果是name、(time,name)的格式
       */
      val adsClickStreamFormatted = adsClickStream.map { ads => (ads.split(" ")(1), ads) }
      adsClickStreamFormatted.transform(userClickRDD => {
        // 通過leftOuterJoin操作既保留了左側使用者廣告點選內容的RDD的所有內容,
        // 又獲得了相應點選內容是否在黑名單中
        val joinedBlackListRDD = userClickRDD.leftOuterJoin(blackListRDD)

        /**
         * 進行filter過濾的時候,其輸入元素是一個Tuple:(name,((time,name), boolean))
         * 其中第一個元素是黑名單的名稱,第二元素的第二個元素是進行leftOuterJoin的時候是否存在的值。
         * 如果存在的話,表明當前廣告點選是黑名單,需要過濾掉,否則的話是有效點選內容;
         */
        val validClicked = joinedBlackListRDD.filter(joinedItem => {
          if(joinedItem._2._2.getOrElse(false))
          {
            false
          } else {
            true
          }

        })

        validClicked.map(validClick => {validClick._2._1})
      }).print

      /**
       * 計算後的有效資料一般都會寫入Kafka中,下游的計費系統會從kafka中pull到有效資料進行計費
       */
      ssc.start()
      ssc.awaitTermination()

    }
}
   把程式的Batch Interval設定從30秒改成300秒: