1. 程式人生 > >spark streaming中transform過濾廣告黑名單

spark streaming中transform過濾廣告黑名單

/*
transform操作,應用在DStream上時,可以用於執行任意的RDD到RDD的轉換操作。它可以用於實現,DStream API中
所沒有提供的操作。比如說,DStream API中,並沒有提供將一個DStream中的每個batch,與一個特定的RDD進行join
的操作。但是我們自己就可以使用transform操作來實現該功能。
DStream.join(),只能join其他DStream。在DStream每個batch的RDD計算出來之後,會去跟其他DStream的RDD進行
join
 */
import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.DStream
import org.apache.spark.streaming.{Seconds, StreamingContext}

/**
  * 過濾廣告黑名單案例
  * 使用者對我們的網站上的廣告可以進行點選
  * 點選之後,是不是要進行實時計費,點一下,算一次錢
  * 但是,對於那些幫助某些無良商家刷廣告的人,那麼我們有一個黑名單
  * 只要是黑名單中的使用者點選的廣告,我們就給過濾掉
  * transform操作,應用在DStream上,可以用於執行任意的RDD到RDD的轉換操作。
  *他可以用於實現,DStream API中所沒有提供的操作。
  *
  */
object transformDemo {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setAppName("windowOpObj").setMaster("local[*]")
    val ssc= new StreamingContext(conf,Seconds(5))

    //定義一個黑名單
    val blackList=List(("tom",true),("jerry",true))
    val blRDD = ssc.sparkContext.parallelize(blackList)
    //使用socketTestStream來監聽埠,也就是接收到的實時資料
   val sockrtData =  ssc.socketTextStream("192.168.88.130",8888)
    //解析出使用者的資訊,將接收到的資料轉換成和我們黑名單對應得形式資料格式
    val user: DStream[(String, String)] = sockrtData.map(line=>(line.split("_")(1),line))
    //將兩個結果集放到一個RDD裡面

    //使用Spark Streaming中的transform運算元操作,實現過濾
    //transform轉換之後DStream變為RDD
    val fllterRDD = user.transform(u=>{
      //(KEY,("收到的行資訊",null/true))
      val joinrdd=u.leftOuterJoin(blRDD)
      //我們將黑名單過濾後,將處理真正的白名單資料
val filterrdd =  joinrdd.filter(tuple=>{
  if(tuple._2._2.getOrElse(false)){
  false
}else{
  true
}
})
  val valedRDD = filterrdd.map(tuple=>tuple._2._1)
  valedRDD
})
  fllterRDD.print()
  ssc.start()
  ssc.awaitTermination()
}
}
/*
Time: 1541755835000 ms
Time: 1541755840000 ms
 */