1. 程式人生 > >【專案】美團廣告流量實時統計

【專案】美團廣告流量實時統計

1.專案分析

技術分析:

    SparkStreaming或者Strom

資料:

    廣告流量點選資料

需求分析:

   1)【 實時】統計【每天】【各省】【熱門】廣告(分組求廣告點選次數多的TopN)

   2)實時統計某個階段廣告投放趨勢

資料調研:

timestamp:時間戳,使用者點選廣告的時間

province:省份,使用者在哪個省份點選的廣告

city:城市,使用者在哪個城市點選的廣告

userid:使用者的唯一標識

advid:被點選的廣告id

現在有資料來源在kafka裡面

2.黑名單過濾

import kafka.serializer.StringDecoder
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.rdd.RDD
import org.apache.spark.streaming.dstream.DStream
import org.apache.spark.streaming.kafka.KafkaUtils
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.{SparkConf, SparkContext}

/**
  * Create by jenrey on 2018/5/27 21:07
  */
object AdvApplicationTest {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf()
    conf.setAppName("AdvApplicationTest")
    conf.setMaster("local")
    conf.set("", "") //序列化
    val sc = new SparkContext(conf)
    val ssc = new StreamingContext(sc, Seconds(5))

    /**
      * TODO:第一步:從kafka獲取資料(direct 方式)
      */
    /* K: ClassTag,
       V: ClassTag,
       KD <: Decoder[K]: ClassTag,
       VD <: Decoder[V]: ClassTag] (
       ssc: StreamingContext,
       kafkaParams: Map[String, String],
       topics: Set[String]*/
    val kafkaParams = Map("metadata.broker.list" -> "hadoop04:9092")
    val topics = Set("aura")
    val logDStream: DStream[String] = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topics).map(_._2)

    //TODO:如果【一個使用者】【一天內】對【某個廣告】點選的次數超過了【100次】,這樣的使用者屬於黑名單使用者,這樣的資料就不統計了
    /**
      * TODO:第二步:進行黑名單過濾
      */
    val filterLogDStream: DStream[String] = blackListFileter(logDStream,ssc)
    /**
      * TODO:第三步:動態生成黑名單
      */
    /**
      * TODO:第四步:實時統計每天各省各城市廣告點選量
      */
    /**
      * TODO:第五步:實時統計每天各省熱門廣告點選量
      */
    /**
      * TODO:第六步:實時統計每天每個廣告在最近一小時的滑動視窗的點選趨勢
      */
    ssc.start()
    ssc.awaitTermination()
    ssc.stop()
  }

  /**
    * 對黑名單進行過濾的方法
    *
    * @param logDStream 從kafka讀取資料
    * @return 進行黑名單過濾以後的資料
    */
  def blackListFileter(logDStream: DStream[String], ssc: StreamingContext): DStream[String] = {
    //這個地方的黑名單,應該是從我們持久化的資料庫裡面讀取的:有三個資料庫是我們常用的(Redis,hbase,mysql)
    val blackList = List((1L, true), (2L, true), (3L, true))
    //把黑名單轉化成RDD
    val blackListRDD: RDD[(Long, Boolean)] = ssc.sparkContext.parallelize(blackList)
    //廣播黑名單
    val blackListBroadcast: Broadcast[Array[(Long, Boolean)]] = ssc.sparkContext.broadcast(blackListRDD.collect())
    //transform對傳進來的DStream中的每一個RDD進行操作
    logDStream.transform(rdd => {
      //把傳進來的資料切分,組成kv形式
      val user_lineRDD: RDD[(Long, String)] = rdd.map(line => {
        val fields: Array[String] = line.split(",")
        (fields(3).toLong, line)
      })
      //注意廣播出去後,需要使用.value來獲取播放值
      val blackRDD: RDD[(Long, Boolean)] = rdd.sparkContext.parallelize(blackListBroadcast.value)
      /**
        * List((22L, "qwe"), (2L, "asd"), (3L, "zxc"))
        * List((1L, true), (2L, true), (3L, true))
        * leftOuterJoin 後的結果如下,此運算元必須都是kv形式才行
        * (22,(qwe,None))
        * (3,(zxc,Some(true)))
        * (2,(asd,Some(true)))
        */
      val resultRDD: RDD[(Long, (String, Option[Boolean]))] = user_lineRDD.leftOuterJoin(blackRDD)
      //這個是返回值,返回進行黑名單過濾以後的資料
      resultRDD.filter(tuple=>{
        tuple._2._2.isEmpty
      }).map(_._2._1)
    })
  }
}

3.動態生成黑名單

import java.util.{Date, Properties}

import kafka.serializer.StringDecoder
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.rdd.RDD
import org.apache.spark.sql._
import org.apache.spark.streaming.dstream.DStream
import org.apache.spark.streaming.kafka.KafkaUtils
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.{SparkConf, SparkContext}
import utils.{ConnectionPool, DateUtils}

/**
  * Create by jenrey on 2018/5/27 21:07
  */
object AdvApplicationTest {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf()
    conf.setAppName("AdvApplicationTest")
    conf.setMaster("local")
    conf.set("", "") //序列化
    val sc = new SparkContext(conf)
    val ssc = new StreamingContext(sc, Seconds(5))
    val spark: SparkSession = SparkSession.builder().config(conf).getOrCreate()
    /**
      * TODO:第一步:從kafka獲取資料(direct 方式)
      */
    /* K: ClassTag,
       V: ClassTag,
       KD <: Decoder[K]: ClassTag,
       VD <: Decoder[V]: ClassTag] (
       ssc: StreamingContext,
       kafkaParams: Map[String, String],
       topics: Set[String]*/
    val kafkaParams = Map("metadata.broker.list" -> "hadoop04:9092")
    val topics = Set("aura")
    val logDStream: DStream[String] = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topics).map(_._2)

    //TODO:如果【一個使用者】【一天內】對【某個廣告】點選的次數超過了【100次】,這樣的使用者屬於黑名單使用者,這樣的資料就不統計了
    /**
      * TODO:第二步:進行黑名單過濾
      */
    val filterLogDStream: DStream[String] = blackListFileter(logDStream, ssc)

    /**
      * TODO:第三步:動態生成黑名單  實時生成黑名單
      */
    DynamicGenerationBlacklists(filterLogDStream,spark)
    /**
      * TODO:第四步:實時統計每天各省各城市廣告點選量
      */
    /**
      * TODO:第五步:實時統計每天各省熱門廣告點選量
      */
    /**
      * TODO:第六步:實時統計每天每個廣告在最近一小時的滑動視窗的點選趨勢
      */
    ssc.start()
    ssc.awaitTermination()
    ssc.stop()
  }

  /**
    * TODO:對黑名單進行過濾的方法
    *
    * @param logDStream 從kafka讀取資料
    * @return 進行黑名單過濾以後的資料
    */
  def blackListFileter(logDStream: DStream[String], ssc: StreamingContext): DStream[String] = {
    //這個地方的黑名單,應該是從我們持久化的資料庫裡面讀取的:有三個資料庫是我們常用的(Redis,hbase,mysql)
    val blackList = List((1L, true), (2L, true), (3L, true))
    //把黑名單轉化成RDD
    val blackListRDD: RDD[(Long, Boolean)] = ssc.sparkContext.parallelize(blackList)
    //廣播黑名單
    val blackListBroadcast: Broadcast[Array[(Long, Boolean)]] = ssc.sparkContext.broadcast(blackListRDD.collect())
    //transform對傳進來的DStream中的每一個RDD進行操作
    logDStream.transform(rdd => {
      //把傳進來的資料切分,組成kv形式
      val user_lineRDD: RDD[(Long, String)] = rdd.map(line => {
        val fields: Array[String] = line.split(",")
        (fields(3).toLong, line)
      })
      //注意廣播出去後,需要使用.value來獲取播放值
      val blackRDD: RDD[(Long, Boolean)] = rdd.sparkContext.parallelize(blackListBroadcast.value)
      /**
        * List((22L, "qwe"), (2L, "asd"), (3L, "zxc"))
        * List((1L, true), (2L, true), (3L, true))
        * leftOuterJoin 後的結果如下,此運算元必須都是kv形式才行
        * (22,(qwe,None))
        * (3,(zxc,Some(true)))
        * (2,(asd,Some(true)))
        */
      val resultRDD: RDD[(Long, (String, Option[Boolean]))] = user_lineRDD.leftOuterJoin(blackRDD)
      //這個是返回值,返回進行黑名單過濾以後的資料
      resultRDD.filter(tuple => {
        tuple._2._2.isEmpty
      }).map(_._2._1)
    })
  }

  /**
    * TODO:動態生成黑名單
    *
    * @param filterLogDStream 黑名單過濾完了以後的資料
    *                         如果【一個使用者】【一天內】對【某個廣告】點選的次數超過了【100次】,這樣的使用者屬於黑名單使用者
    *                         有三種方式:1)使用UpdateStateByKey 2)reduceByKey 存入HBase 3)Mysql的方式
    */
  def DynamicGenerationBlacklists(filterLogDStream: DStream[String], spark: SparkSession): Unit = {
    val date_userid_advid_ds: DStream[(String, Long)] = filterLogDStream.map(line => {
      val fields: Array[String] = line.split(",")
      val time = new Date(fields(0).toLong)
      val date: String = DateUtils.formatDateKey(time)
      val userid: String = fields(3)
      val advid: String = fields(4)
      (date + "_" + userid + "_" + advid, 1L)
    }).reduceByKey(_ + _)

    date_userid_advid_ds.foreachRDD(rdd => {
      rdd.foreachPartition(partition => {
        //下面是寫好的工具類,連線Mysql
        val connection = ConnectionPool.getConnection()
        val statement = connection.createStatement()
        partition.foreach {
          case (date_userid_advid, count) => {
            val fields = date_userid_advid.split("_")
            val date = fields(0)
            val userid = fields(1).toLong
            val advid = fields(2).toLong
            val sql = s"insert into aura.tmp_advclick_count values($date,$userid,$advid,$count)";
            statement.execute(sql);
          }
        }
        ConnectionPool.returnConnection(connection)
      })
    })

    /**
      * 生成黑名單
      */
    val df: DataFrame = spark.read.format("jdbc")
      .option("url", "jdbc:mysql://localhost:3306/aura")
      .option("user", "aura")
      .option("password", "aura")
      .option("dbtable", "tmp_advclick_count")
      .load()
    df.createOrReplaceTempView("tmp_advclick_count")
    val sql =
      """
         select
              userid
         from
         (
        select
              date,userid,advid,sum(click_count) c_count
          from
              tmp_advclick_count
        group by date,userid,advid
        ) t
            where
            t.c_count>100
      """
    val blacklistdf= spark.sql(sql).distinct()
    val properties = new Properties()
    properties.put("user","aura")
    properties.put("password","aura")
    blacklistdf.write.mode(SaveMode.Append)
      .jdbc("jdbc:mysql://localhost:3306/aura","black_list",properties)

  }
}

4.實時統計每天各省各城市廣告點選量

在上面程式碼後繼續寫下面程式碼就行了。

/**
    * 實時統計每天各省各城市廣告點選量
    *
    * @param filterLogDStream
    */
  def ProvinceCityAdvClick_Count(filterLogDStream: DStream[String]): DStream[(String, Long)] = {

    var f = (input: Seq[Long], state: Option[Long]) => {
      val current_count = input.sum
      val last_count = state.getOrElse(0)
      Some(current_count + last_count)
    }

    filterLogDStream.map(line => {
      val fields = line.split(",")
      val time = fields(0).toLong
      val mydate = new Date(time)
      val date = DateUtils.formatDateKey(mydate)
      val province = fields(1)
      val city = fields(2)
      val advid = fields(4)
      (date + "_" + province + "_" + city + "_" + advid, 1L)
    }).updateStateByKey(f)

    /**
      * 如果開發有需求的話,可以把這些資料庫寫入 MySQL資料庫 ,Hbase
      */

5.實時統計各省熱門廣告

  /**
    * 實時統計 各省熱門廣告
    *
    * transform : rdd  -> datafram  -> table -> sql
    *
    * @param date_province_city_advid_count
    */
  def ProvinceAdvClick_Count(date_province_city_advid_count: DStream[(String, Long)], spark: SparkSession): Unit = {
    date_province_city_advid_count.transform(rdd => {
      var date_province_advid_count = rdd.map {
        case (date_province_city_advid, count) => {
          val fields = date_province_city_advid.split("_")
          val date = fields(0)
          val province = fields(1)
          val advid = fields(3)


          (date + "_" + province + "_" + advid, count)
        }
      }.reduceByKey(_ + _)

      val rowRDD = date_province_advid_count.map(tuple => {
        val fields = tuple._1.split("_")
        val date = fields(0)
        val provnice = fields(1)
        val advid = fields(2).toLong
        val count = tuple._2
        Row(date, provnice, advid, count)
      })

      val schema = StructType(
        StructField("date", StringType, true) ::
          StructField("province", StringType, true) ::
          StructField("advid", LongType, true) ::
          StructField("count", LongType, true) :: Nil

      )

      val df = spark.createDataFrame(rowRDD, schema)

      df.createOrReplaceTempView("temp_date_province_adv_count")

      val sql =
        """
           select
                *
           from
           (
           select
                date,province,advid,count,row_number() over(partition by province ordr by count desc) rank
           from
                temp_date_province_adv_count
           ) temp
           where temp.rank < 10

        """

      /**
        * 把結果持久化到資料庫
        */
      spark.sql(sql)

      rdd

    })

  }

6.總的程式碼

package sparkstreaming.lesson09

import java.sql.Date
import java.util.Properties

import kafka.serializer.StringDecoder
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.types.{LongType, StringType, StructField, StructType}
import org.apache.spark.sql.{DataFrame, Row, SaveMode, SparkSession}
import org.apache.spark.streaming.dstream.DStream
import org.apache.spark.streaming.kafka.KafkaUtils
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.{SparkConf, SparkContext}
import sparkstreaming.demo.lesson01.ConnectionPool
import sparkstreaming.demo.utils.DateUtils

/**
  * Created by Administrator on 2018/5/12.
  *
  * timestamp:
  * 時間戳,使用者點選廣告的時間
  * province:
  * 省份,使用者在哪個省份點選的廣告
  * city:
  * 城市,使用者在哪個城市點選的廣告
  * userid:
  * 使用者的唯一標識
  * advid:
  * 被點選的廣告id
  */
object AdvApplicationTest {

  def main(args: Array[String]): Unit = {
    val conf = new SparkConf()
    conf.setMaster("local")
    conf.setAppName("AdvApplicationTest")
    conf.set("","")  //序列化

    val sc = new SparkContext(conf)

    val ssc = new StreamingContext(sc,Seconds(5))

    val spark = SparkSession.builder()
      .config(conf).getOrCreate()

    /**
      * 第一步:從kafka獲取資料(direct  方式)
      *   K: ClassTag,
          V: ClassTag,
          KD <: Decoder[K]: ClassTag,
          VD <: Decoder[V]: ClassTag] (
      ssc: StreamingContext,
      kafkaParams: Map[String, String],
      topics: Set[String]
      */
    val kafkaParams = Map("metadata.broker.list" -> "hadoop1:9092")
    val topics = Set("aura")
    val logDstream: DStream[String] = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](
      ssc, kafkaParams, topics).map(_._2)

    /**
      * 第二步:進行黑名單過濾
      */
    val filterLogDStream: DStream[String] = blackListFilter(logDstream,ssc)


    /**
      * 【一個使用者】【一天內】對【某個廣告】點選的次數超過了【100次】,這樣的使用者屬於黑名單使用者
      *
      *
      * zhangsan:
      *          A:50  B:60
      * lisi:
      *          A:50   A:20  A:40   這就是黑名單使用者
      * 如果一個使用者今天是黑名單使用者,那麼明天還是黑名單使用者嗎?
      * 這個看業務而定。
      *
      * 第三步:動態生成黑名單  實時生成黑名單
      */
    DynamicGenerationBlacklists(filterLogDStream,spark)

    /**
      * 第四步:
      *        實時統計每天各省各城市廣告點選量
      */
    val dateProvinceCityAdvClick_Count = ProvinceCityAdvClick_Count(filterLogDStream)
    /**
      * 第五步:
      *       實時統計每天各省熱門廣告
      *        分組求TopN
      *
      *   transform  froeachRDD
      *   rdd   => dataframe
      *   SparkSQL:
      *     SQL
      */


    /**
      * 第六步:
      *     實時統計每天每個廣告在最近一小時的滑動視窗的點選趨勢
      */

    ssc.start()
    ssc.awaitTermination()
    ssc.stop()
  }

  /**
    * 對黑名單資料進行過濾
    * @param logDstream  從kafka讀取資料
    * @return  進行黑名單過濾以後的資料
    */
  def blackListFilter(logDstream: DStream[String],ssc:StreamingContext):DStream[String]={

    /**
      * 這個地方應該是去資料庫裡面去讀取資料
      * black_list
      */

    val blackList = List((1L,true),(2L,true),(3L,true))
    val blackListRDD = ssc.sparkContext.parallelize(blackList)
    val balckListBroadcast = ssc.sparkContext.broadcast(blackListRDD.collect())

    /**
      * 這個地方的黑名單,應該是從我們的持久化的資料庫裡面讀取的:有三個資料庫是我們常用的:
      * 1)Reids   自己去百度一下
      * 2) HBase  自己去百度一下
      * 3) Mysql  上課演示過
      * SparkCore的方式讀取的
      * SparkSQL  -> dataframe -> rdd
      */

    logDstream.transform( rdd =>{
     val user_lineRDD=rdd.map( line =>{
       val fields = line.split(",")
       (fields(3).toLong,line)
     })
       val blackRDD = rdd.sparkContext.parallelize(balckListBroadcast.value)
      val resultRDD: RDD[(Long, (String, Option[Boolean]))] = user_lineRDD.leftOuterJoin(blackRDD)
      resultRDD.filter( tuple =>{
        tuple._2._2.isEmpty
      }).map(_._2._1)

    })

  }

  /**
    * 動然生成黑名單
    * @param filterLogDStream  黑名單過濾萬了以後的資料
    * 【一個使用者】【一天內】對【某個廣告】點選的次數超過了【100次】,這樣的使用者屬於黑名單使用者
    *
    * 梳理一下思路:
    *   這個需求 跟 我們單詞計數很像,無非不就是實時統計每個單詞出現了多少次
    *   如果發現某個單詞出現了一個100,那麼他就是黑名單單詞
    *   方式一:
    *   (date_userid_advid,v)=map
    *    實時統計出來每個單詞出現了多少次=updateStateBykey (對記憶體的要求高一點)
    *    張三 A 80
    *    李四 B 99
    *         100
    *    fitler  過濾出來次數 一百以上 把它寫入 MySQL,Reids,HBase 資料庫
    *   方式二:
    *   (date_userid_advid,v)=map
    *    每次處理的是本批次的資料 reduceBykey(對記憶體的要求低一點)
    *    HBase:
    *        rowkey:  date_userid_advid  2
    *          本批次  3
    *            5
    *    Redis
    *   方式三:
    *        MySQL的方式
    *
    *
    *
    *
    */
  def DynamicGenerationBlacklists(filterLogDStream: DStream[String],spark:SparkSession):Unit={

    val date_userid_advid_ds=filterLogDStream.map( line =>{
      val fields = line.split(",")
     val time = new Date( fields(0).toLong)
      val date = DateUtils.formatDateKey(time)
      val userid = fields(3)
      val advid = fields(4)
       //20180512_
      (date+"_"+userid+"_"+advid,1L)
    }).reduceByKey(_+_)

    date_userid_advid_ds.foreachRDD( rdd =>{
      rdd.foreachPartition( partition =>{
        val connection = ConnectionPool.getConnection()
        val statement = connection.createStatement()
        partition.foreach{
          case(date_userid_advid,count) =>{
            val fields = date_userid_advid.split("_")
            val date = fields(0)
            val userid = fields(1).toLong
            val advid = fields(2).toLong
            val sql=s"insert into aura.tmp_advclick_count values($date,$userid,$advid,$count)";
            statement.execute(sql);
          }
        }
        ConnectionPool.returnConnection(connection)

      })
    })

    /**
      *生成黑名單
      */

    val df: DataFrame = spark.read.format("jdbc")
      .option("url", "jdbc:mysql://localhost:3306/aura")
      .option("user", "aura")
      .option("password", "aura")
      .option("dbtable", "tmp_advclick_count")
      .load()

    df.createOrReplaceTempView("tmp_advclick_count")

    val sql=
      """
         SELECT
              userid
         FROM
         (
         SELECT
              date,userid,advid,sum(click_count) c_count
              FROM
              tmp_advclick_count
         GROUP BY
              date,userid,advid
         ) t
              WHERE
              t.c_count > 100
      """

    //統計出來黑名單
    val blacklistdf = spark.sql(sql).distinct()
      val properties = new Properties()
    properties.put("user","aura")
    properties.put("password","aura")
    blacklistdf.write.mode(SaveMode.Append)
        .jdbc("jdbc:mysql://localhost:3306/aura","black_list",properties)
  }

  /**
    * 實時統計每天各省各城市廣告點選量
    * @param filterLogDStream
    */
  def ProvinceCityAdvClick_Count(filterLogDStream: DStream[String]):DStream[(String,Long)]={
    /**
      * 思路
      * map  => (k,v)  => date+province+city+advid  1
      *                updateStateBykey
      */
    var f=(input:Seq[Long],state:Option[Long]) =>{
      val current_count = input.sum
      val last_count = state.getOrElse(0)
      Some(current_count+last_count)
    }

    filterLogDStream.map( line =>{
      val fields = line.split(",")
      val time = fields(0).toLong
      val mydate = new Date(time)
      val date = DateUtils.formatDateKey(mydate)
      val province = fields(1)
      val city = fields(2)
      val advid = fields(4)
      (date+"_"+province+"_"+city+"_"+advid,1L)
    }).updateStateByKey(f)
    /**
      * 如果開發有需求的話,可以把這些資料庫寫入 MySQL資料庫 ,Hbase
      */
  }

  /**
    * 實時統計 各省熱門廣告
    *
    * transform : rdd  -> datafram  -> table -> sql
    * @param date_province_city_advid_count
    */
  def ProvinceAdvClick_Count(date_province_city_advid_count:DStream[(String,Long)],spark:SparkSession): Unit ={
    date_province_city_advid_count.transform( rdd =>{
    var date_province_advid_count=  rdd.map{
        case(date_province_city_advid,count) =>{
          val fields = date_province_city_advid.split("_")
          val date = fields(0)
          val province = fields(1)
          val advid = fields(3)


          (date+"_"+province+"_"+advid,count)
        }
      }.reduceByKey(_+_)

     val rowRDD=date_province_advid_count.map( tuple =>{
        val fields = tuple._1.split("_")
        val date = fields(0)
        val provnice = fields(1)
        val advid = fields(2).toLong
        val count = tuple._2
        Row(date,provnice,advid,count)
      })

      val schema=StructType(
        StructField("date",StringType,true)::
          StructField("province",StringType,true)::
          StructField("advid",LongType,true)::
          StructField("count",LongType,true):: Nil

      )

      val df = spark.createDataFrame(rowRDD,schema)

      df.createOrReplaceTempView("temp_date_province_adv_count")

      val sql=
        """
           select
                *
           from
           (
           select
                date,province,advid,count,row_number() over(partition by province ordr by count desc) rank
           from
                temp_date_province_adv_count
           ) temp
           where temp.rank < 10

        """

      /**
        * 把結果持久化到資料庫
        */
      spark.sql(sql)

     rdd

    })



  }

}