1. 程式人生 > >讀取大檔案資料進入redis作為快取:贈(廣播變數)

讀取大檔案資料進入redis作為快取:贈(廣播變數)

在專案中使用Redis做快取檔案(目的等同於廣播變數):

package com.app
import com.utils.{JedisConnectionPool, RptUtils}
import org.apache.commons.lang.StringUtils
import org.apache.spark.sql.{DataFrame, Row, SQLContext}
import org.apache.spark.{SparkConf, SparkContext}
/**
  * 使用redis存放字典檔案
  */
object AppRpt2 {
  def main(args: Array[String]): Unit = {
    if(args.length != 3){
      println("目錄不存在,請重新輸入")
      sys.exit()
    }
    val Array(inputPath,ouputPath,resultPath) = args
    val conf = new SparkConf().setAppName(s"${this.getClass.getName}").setMaster("local[*]")
          //設定spark序列化方式
          .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
    val sc = new SparkContext(conf)
    val sQLContext = new SQLContext(sc)
    //配置壓縮格式
    sQLContext.setConf("spark.sql.parquet.compression.codec", "snappy")
    //讀取字典檔案
    val dicMap = sc.textFile(resultPath).map(_.split("\t",-1)).filter(_.length>=5)
      .map(arr=>{
        // com.123.cn 愛奇藝
        (arr(4),arr(1))
      })
    //將字典檔案存到Redis中
    dicMap.foreachPartition(part=>{
      val jedis = JedisConnectionPool.getConnection()
      part.foreach(t=>{
        //存redis   ip  nams
        jedis.set(t._1,t._2)
      })
    })
    
    //讀取parquet檔案
    val df = sQLContext.read.parquet(inputPath)
    df.mapPartitions(maps=>{
      val jedis = JedisConnectionPool.getConnection()
      maps.map(row=> {
        var appname = row.getAs[String]("appname")
        //廣播變數對比  redis快取  從redis快取讀取
        if (!StringUtils.isNotBlank(appname)) {
          //如果取到的值是null   則用他的id去字典表裡得到name
          //appname = broadcast.value.getOrElse("appid","unknow")
          val appid = row.getAs[String]("appid")
          appname = jedis.get(appid)
        }
        
        //把需要的欄位拿出來
        // 原始請求數,有效請求數,廣告請求數
        val requestmode = row.getAs[Int]("requestmode")
        val processnode = row.getAs[Int]("processnode")
        val iseffective = row.getAs[Int]("iseffective")
        // 參與競價數 競價成功數,展示數,點選數
        val isbilling = row.getAs[Int]("isbilling")
        val isbid = row.getAs[Int]("isbid")
        val iswin = row.getAs[Int]("iswin")
        val adorderid = row.getAs[Int]("adorderid")
        // 廣告費用  廣告成本費用
        val winPrice = row.getAs[Double]("winprice")
        val adpayment = row.getAs[Double]("adpayment")
        //呼叫業務的方法
        val reqlist = RptUtils.calculateReq(requestmode, processnode)
        val rtblist = RptUtils.calculateRtb(iseffective, isbilling, isbid, iswin, adorderid, winPrice, adpayment)
        val cliklist = RptUtils.calculateTimes(requestmode, iseffective)
        (appname, reqlist ++ rtblist ++ cliklist)
      })
    }).reduceByKey((list1,list2)=>{
      // list(0,2,1,5) list(2,5,4,7)   zip((0,2),(2,5),(1,4),(5,7))
      list1.zip(list2).map(t=>t._1+t._2)
    }).map(t=>{
      t._1+","+t._2.mkString(",")
    }).take(10).toBuffer.foreach(println)
  }
}

贈:利用廣播變數廣播小檔案:

package com.app

import com.utils.RptUtils
import org.apache.commons.lang.StringUtils
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.SQLContext

/**
  * 使用廣播變數broadcast廣播小檔案
  */
object AppRpt {
  def main(args: Array[String]): Unit = {
    if (args.length != 3) {
      println("目錄不存在,請重新輸入")
      sys.exit()
    }
    val Array(inputPath, outputPath,resultPath) = args
    val conf = new SparkConf().setAppName(s"${this.getClass.getName}").setMaster("local[*]")
      //搞定第二個需求
      .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
    val sc = new SparkContext(conf)
    val sQLContext = new SQLContext(sc)
    //在1.6版本時候預設的壓縮方式還不是snappy,到了2.0之後預設是snappy
    sQLContext.setConf("spark.sql.parquet.compression.codec", "snappy")
    //讀取字典檔案
    val dicMap: Map[String, String] = sc.textFile(resultPath).map(_.split("\t",-1)).filter(_.length>=5)
        .map(arr=>{
          // com.123.cn 愛奇藝
          (arr(4),arr(1))
        }).collect().toMap
    //dicMap.take(100).toBuffer.foreach(println)
    //將小檔案廣播出去
    val broadcast = sc.broadcast(dicMap)

    val df = sQLContext.read.parquet(inputPath)
    df.map(row=>{
      // 如果我們取到的是空值的話,那麼將取字典檔案中進行查詢
      var appname = row.getAs[String]("appname")
      if(!StringUtils.isNotBlank(appname)){
        // 這一塊 做的是通過我們的時間APPId獲取字典檔案中對應的APPID
        // 然後取到它的value
        //com.123.cn 愛奇藝
          appname = broadcast.value.getOrElse(row.getAs[String]("appid"),"unknow")
      }
      //val appname = broadcast.value.getOrElse(row.getAs[String]("appid"),"unknow")
      //先把需要的欄位拿出來,再進行操作
      //處理 原始請求數,有效請求數,廣告請求數
      val requestmode = row.getAs[Int]("requestmode")
      val processnode = row.getAs[Int]("processnode")
      val iseffective = row.getAs[Int]("iseffective")
      //參與競價數,競價成功數,展示數,點選數
      val isbilling = row.getAs[Int]("isbilling")
      val isbid = row.getAs[Int]("isbid")
      val iswin = row.getAs[Int]("iswin")
      val adorderid = row.getAs[Int]("adorderid")
      // 處理 廣告消費,廣告成本
      val winPrice = row.getAs[Double]("winprice")
      val adpayment = row.getAs[Double]("adpayment")

      //呼叫業務的方法
      val reqlist = RptUtils.calculateReq(requestmode,processnode)
      val rtblist = RptUtils.calculateRtb(iseffective,isbilling,isbid,iswin,adorderid,winPrice,adpayment)
      val cliklist = RptUtils.calculateTimes(requestmode,iseffective)
      (appname, reqlist++rtblist++cliklist)
    }).reduceByKey((list1,list2)=>{
      // list(0,2,1,5) list(2,5,4,7)   zip((0,2),(2,5),(1,4),(5,7))
      list1.zip(list2).map(t=>t._1+t._2)
    }).map(t=>{
      t._1+","+t._2.mkString(",")
      }).take(1000).toBuffer.foreach(println)
  }
}