1. 程式人生 > >大資料實戰專案------中國移動運營分析實時監控平臺 || 專案需求實現(文章最後有資料檔案)

大資料實戰專案------中國移動運營分析實時監控平臺 || 專案需求實現(文章最後有資料檔案)

1.業務概況(顯示總訂單量、訂單成功量、總金額、花費時間)
2.業務詳細概述(每小時的充值訂單量、每小時的充值成功訂單量)
3.業務質量(每個省份的充值成功訂單量)
4.實時統計每分鐘的充值金額和訂單量

整體步驟:
提取資料庫中儲存的偏移量–>廣播省份對映關係–>獲取kafka的資料–>資料處理(JSON物件解析,省份、時間、結果、費用)
–>計算業務概況(顯示總訂單量、訂單成功量、總金額、花費時間)–>業務概述(每小時的充值總訂單量,每小時的成功訂單量)
—>業務質量(每個省份的成功訂單量)—>實時統計每分鐘的充值金額和訂單量

下面是程式碼封裝的包
在這裡插入圖片描述

專案需求實現:
1)用flume收集資料,放入到kafka,下面是詳細配置。

#定義這個agent中各元件的名字
a1.sources = r1
a1.sinks = k1
a1.channels = c1

# 描述和配置source元件:r1
a1.sources.r1.type = spooldir
a1.sources.r1.spoolDir = /usr/local/datas/flume
a1.sources.r1.fileHeader = true

# 描述和配置sink元件:k1
a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.kafka.topic = flumeLogs
a1.sinks.k1.kafka.bootstrap.servers = hadoop01:9092,hadoop02:9092,hadoop03:9092
a1.sinks.k1.kafka.flumeBatchSize = 20
a1.sinks.k1.kafka.producer.acks = 1
a1.sinks.k1.kafka.producer.linger.ms = 1
a1.sinks.ki.kafka.producer.compression.type = snappy


# 描述和配置channel元件,此處使用是記憶體快取的方式
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

# 描述和配置source  channel   sink之間的連線關係
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

2)用SparkStreaming去消費kafka裡面的資料前,做一些Kafka引數的配置以及放入Redis資料庫所需要的配置。

(1)在IDEA中配置kafka和Redis相關引數,方便獲取kafka裡面的資料並且儲存到redis裡面


import com.typesafe.config.{Config, ConfigFactory}
import org.apache.kafka.common.serialization.StringDeserializer

object AppParams {

  /**
    * 解析application.conf配置檔案
    * 載入resource下面的配置檔案,預設規則:application.conf->application.json->application.properties
    */
  private lazy val config: Config = ConfigFactory.load()

  /**
    * 返回訂閱的主題
    */
  val topic = config.getString("kafka.topic").split(",")

  /**
    * kafka叢集所在的主機和埠
    */
  val borkers = config.getString("kafka.broker.list")

  /**
    * 消費者的ID
    */
  val groupId = config.getString("kafka.group.id")

  /**
    * kafka的相關引數
    */
  val kafkaParams = Map[String, Object](
    "bootstrap.servers" -> borkers,
    "key.deserializer" -> classOf[StringDeserializer],
    "value.deserializer" -> classOf[StringDeserializer],
    "group.id" -> groupId,
    "auto.offset.reset" -> "earliest",
    "enable.auto.commit" -> "false"
  )

  /**
    * redis伺服器地址
    */
  val redisHost = config.getString("redis.host")

  /**
    * 將資料寫入到哪個庫
    */
  val selectDBIndex = config.getInt("redis.db.index")
  /**
    * 省份code和省份名稱的對映關係
    */
  import scala.collection.JavaConversions._
  val pCode2PName  = config.getObject("pcode2pname").unwrapped().toMap
}

(2)方便計算訂單完成所需要的時間,封裝了一個類


import org.apache.commons.lang3.time.FastDateFormat


object CaculateTools {
  // 非執行緒安全的
  //private val format = new SimpleDateFormat("yyyyMMddHHmmssSSS")
  // 執行緒安全的DateFormat
  private val format = FastDateFormat.getInstance("yyyyMMddHHmmssSSS")
  /**
    * 計算時間差
    */
  def caculateTime(startTime:String,endTime:String):Long = {
    val start = startTime.substring(0,17)
    format.parse(endTime).getTime - format.parse(start).getTime
  }

}

(3)做一個Redis池去操作Redis中的資料


import com.alibaba.fastjson.JSON
import org.apache.commons.pool2.impl.GenericObjectPoolConfig
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.spark.rdd.RDD
import redis.clients.jedis.JedisPool

object Jpools {

  private val poolConfig = new GenericObjectPoolConfig()
  poolConfig.setMaxIdle(5)      //最大的空閒連線數,連線池中最大的空閒連線數,預設是8
  poolConfig.setMaxTotal(2000)  //只支援最大的連線數,連線池中最大的連線數,預設是8

  //連線池是私有的不能對外公開訪問
  private lazy val jedisPool = new JedisPool(poolConfig, AppParams.redisHost)

  def getJedis={
    val jedis = jedisPool.getResource
    jedis.select(AppParams.selectDBIndex)
    jedis
  }


}

(4)每次放入Redis前需要判斷偏移量,防止資料重複以及消耗資源


import org.apache.kafka.common.TopicPartition
import org.apache.spark.streaming.kafka010.OffsetRange
import scalikejdbc.{DB, SQL}
import scalikejdbc.config.DBs

object OffsetManager {
 DBs.setup()
  /**
    * 獲取自己儲存的偏移量資訊
    */
  def getMydbCurrentOffset: Unit ={
    DB.readOnly(implicit session =>
    SQL("select * from streaming_offset where groupId=?").bind(AppParams.groupId)
          .map(rs =>
            (
              new TopicPartition(rs.string("topicName"),rs.int("partitionId")),
              rs.long("offset")
            )
          ).list().apply().toMap
    )
  }

  /**
    * 持久化儲存當前批次的偏移量
    */
  def saveCurrentOffset(offsetRanges: Array[OffsetRange]) = {
    DB.localTx(implicit session =>{
      offsetRanges.foreach(or =>{
        SQL("replace into streaming_offset values (?,?,?,?)")
          .bind(or.topic,or.partition,or.untilOffset,AppParams.groupId)
          .update()
          .apply()
      })
    })
  }
}

(5)設定自己的kafka、mysql(儲存偏移量)、redis的配置

kafka.topic = "flumeLog"
kafka.broker.list = "hadoop01:9092,hadoop02:9092,hadoop03:9092"
kafka.group.id = "day2_001"
# MySQL example
db.default.driver="com.mysql.jdbc.Driver"
db.default.url="jdbc:mysql://localhost/bigdata?characterEncoding=utf-8"
db.default.user="root"
db.default.password="926718"
# redis
redis.host="hadoop02"
redis.db.index=10

# 對映配置
pcode2pname {
  100="北京"
  200="廣東"
  210="上海"
  220="天津"
  230="重慶"
  240="遼寧"
  250="江蘇"
  270="湖北"
  280="四川"
  290="陝西"
  311="河北"
  351="山西"
  371="河南"
  431="吉林"
  451="黑龍江"
  471="內蒙古"
  531="山東"
  551="安徽"
  571="浙江"
  591="福建"
  731="湖南"
  771="廣西"
  791="江西"
  851="貴州"
  871="雲南"
  891="西藏"
  898="海南"
  931="甘肅"
  951="寧夏"
  971="青海"
  991="新疆"
}

3)做好一系列配置之後就開始SparkStreaming資料處理的核心
先說明一下日誌檔案中欄位的含義
在這裡插入圖片描述
在這裡插入圖片描述

(1)下面是SparkStreaming核心程式碼


import com.alibaba.fastjson.JSON
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.rdd.RDD

object KpiTools {

  /**
    * 業務概況(總訂單量、成功訂單量、總金額、花費時間
    *
    * @param baseData
    */
  def kpi_general(baseData: RDD[(String, String, List[Double], String, String)]): Unit = {
    baseData.map(tp => (tp._1, tp._3)).reduceByKey((list1, list2) => {
      //將所有的元素拉鍊為一個列表之後進行相加計算
      list1.zip(list2).map(tp => tp._1 + tp._2)
    })
      .foreachPartition(partition => {
        val jedis = Jpools.getJedis
        partition.foreach(tp => {
          //所有的資料都計算完成之後,顯示在資料庫中
          jedis.hincrBy("A-" + tp._1, "total", tp._2(0).toLong)
          jedis.hincrBy("A-" + tp._1, "succ", tp._2(1).toLong)
          jedis.hincrByFloat("A-" + tp._1, "money", tp._2(2))
          jedis.hincrBy("A-" + tp._1, "cost", tp._2(3).toLong)
          // key的有效期
          jedis.expire("A-" + tp._1, 48 * 60 * 60)
        })
        jedis.close()
      })
  }

  /**
    * 業務概述:每小時的充值總訂單量,每小時的成功訂單量
    * 日期、時間、LIST(總訂單量、成功訂單量、充值成功總金額、時長)、
    *
    * @param baseData
    */
  def kpi_general_hour(baseData: RDD[(String, String, List[Double], String, String)]): Unit = {
    baseData.map(tp => ((tp._1, tp._2), List(tp._3(0), tp._3(1)))).reduceByKey((list1, list2) => {
      //將所有的元素拉鍊為一個列表之後進行相加計算
      list1.zip(list2).map(tp => tp._1 + tp._2)
    })
      .foreachPartition(partition => {
        val jedis = Jpools.getJedis
        partition.foreach(tp => {
          //所有的資料都計算完成之後,顯示在資料庫中
          jedis.hincrBy("B-" + tp._1._1, "T:" + tp._1._2, tp._2(0).toLong)
          jedis.hincrBy("B-" + tp._1._1, "S" + tp._1._2, tp._2(1).toLong)
          // key的有效期
          jedis.expire("B-" + tp._1, 48 * 60 * 60)
        })
        jedis.close()
      })
  }

  /**
    * 業務質量
    * 總的充值成功訂單量
    */

  def kpi_quality(baseData: RDD[(String, String, List[Double], String, String)], p2p: Broadcast[Map[String, AnyRef]]) = {
    baseData.map(tp => ((tp._1,tp._4),tp._3(1))).reduceByKey(_+_).foreachPartition(partition => {
      val jedis = Jpools.getJedis
      partition.foreach(tp => {
        //總的充值成功和失敗訂單數量
        jedis.hincrBy("C-" + tp._1._1,p2p.value.getOrElse(tp._1._2,tp._1._2).toString,tp._2.toLong)
        jedis.expire("C-" + tp._1._1, 48 * 60 * 60)
      })
      jedis.close()
    })
  }

  /**
    * 實時統計每分鐘的充值金額和訂單量
    * // (日期, 小時, Kpi(訂單,成功訂單,訂單金額,訂單時長),省份Code,分鐘數)
    */
  def kpi_realtime_minute(baseData: RDD[(String, String, List[Double], String, String)]) = {
    baseData.map(tp => ((tp._1,tp._2,tp._5),List(tp._3(1),tp._3(2)))).reduceByKey((list1,list2)=>{
      list1.zip(list2).map(tp => tp._1+tp._2)
    }).foreachPartition(partition => {
      val jedis = Jpools.getJedis
      partition.foreach(tp => {
        //每分鐘充值成功的筆數和充值金額
        jedis.hincrBy("D-" + tp._1._1,"C:"+ tp._1._2+tp._1._3,tp._2(0).toLong)
        jedis.hincrByFloat("D-" + tp._1._1,"M"+tp._1._2+tp._1._3,tp._2(1))
        jedis.expire("D-" + tp._1._1, 48 * 60 * 60)
      })
      jedis.close()
    })
  }

  /**
    * 整理基礎資料
    */
  def baseDataRDD(rdd: RDD[ConsumerRecord[String, String]]): RDD[(String, String, List[Double], String, String)] = {
    rdd // ConsumerRecord => JSONObject
      .map(cr => JSON.parseObject(cr.value())) // 過濾出充值通知日誌
      .filter(obj => obj.getString("serviceName").equalsIgnoreCase("reChargeNotifyReq")).map(obj => {
      // 判斷該條日誌是否是充值成功的日誌
      val result = obj.getString("bussinessRst")
      val fee = obj.getDouble("chargefee")

      // 充值發起時間和結束時間
      val requestId = obj.getString("requestId")
      // 資料當前日期
      val day = requestId.substring(0, 8)
      val hour = requestId.substring(8, 10)
      val minute = requestId.substring(10, 12)
      val receiveTime = obj.getString("receiveNotifyTime")

      //省份Code
      val provinceCode = obj.getString("provinceCode")
      val costTime = CaculateTools.caculateTime(requestId, receiveTime)
      val succAndFeeAndTime: (Double, Double, Double) = if (result.equals("0000")) (1, fee, costTime) else (0, 0, 0)

      // (日期, 小時, Kpi(訂單,成功訂單,訂單金額,訂單時長),省份Code,分鐘數)
      (day, hour, List[Double](1, succAndFeeAndTime._1, succAndFeeAndTime._2, succAndFeeAndTime._3), provinceCode, minute)
    }).cache()


  }
}

(2)將封裝好的方法呼叫



import cn.sheep.utils.{AppParams, KpiTools, OffsetManager}
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.spark.SparkConf
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.streaming.dstream.InputDStream
import org.apache.spark.streaming.kafka010._
import org.apache.spark.streaming.{Seconds, StreamingContext}

/**
  * 中國移動實時監控平臺(優化版)
  * Created by zhangjingcun on 2018/10/16 16:34.
  */
object BootStarpAppV2 {
  def main(args: Array[String]): Unit = {

    val sparkConf = new SparkConf()
    sparkConf.setAppName("中國移動運營實時監控平臺-Monitor") //如果在叢集上執行的話,需要去掉:sparkConf.setMaster("local[*]")
    sparkConf.setMaster("local[*]") //將rdd以序列化格式來儲存以減少記憶體的佔用
    //預設採用org.apache.spark.serializer.JavaSerializer
    //這是最基本的優化
    sparkConf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer") //rdd壓縮
    sparkConf.set("spark.rdd.compress", "true") //batchSize = partitionNum * 分割槽數量 * 取樣時間
    sparkConf.set("spark.streaming.kafka.maxRatePerPartition", "10000") //優雅的停止
    sparkConf.set("spark.streaming.stopGracefullyOnShutdown", "true")
    val ssc = new StreamingContext(sparkConf, Seconds(2))

    /**
      * 提取資料庫中儲存的偏移量
      */
    val currOffset = OffsetManager.getMydbCurrentOffset

    /**
      * 廣播省份對映關係
      */
    val pcode2PName: Broadcast[Map[String, AnyRef]] = ssc.sparkContext.broadcast(AppParams.pCode2PName)

    /** 獲取kafka的資料
      * LocationStrategies:位置策略,如果kafka的broker節點跟Executor在同一臺機器上給一種策略,不在一臺機器上給另外一種策略
      * 設定策略後會以最優的策略進行獲取資料
      * 一般在企業中kafka節點跟Executor不會放到一臺機器的,原因是kakfa是訊息儲存的,Executor用來做訊息的計算,
      * 因此計算與儲存分開,儲存對磁碟要求高,計算對記憶體、CPU要求高
      * 如果Executor節點跟Broker節點在一起的話使用PreferBrokers策略,如果不在一起的話使用PreferConsistent策略
      * 使用PreferConsistent策略的話,將來在kafka中拉取了資料以後儘量將資料分散到所有的Executor上 */
    val stream: InputDStream[ConsumerRecord[String, String]] = KafkaUtils.createDirectStream(ssc,
      LocationStrategies.PreferConsistent,
      ConsumerStrategies.Subscribe[String, String](AppParams.topic, AppParams.kafkaParams)
    )

    /**
      * 資料處理
      */
    stream.foreachRDD(rdd=>{
      val offsetRanges: Array[OffsetRange] = rdd.asInstanceOf[HasOffsetRanges].offsetRanges

      val baseData = KpiTools.baseDataRDD(rdd)

      /**
        * 計算業務概況
        */
      KpiTools.kpi_general(baseData)
      KpiTools.kpi_general_hour(baseData)

      /**
        * 業務質量
        */
      KpiTools.kpi_quality(baseData, pcode2PName)

      /**
        * 實時充值情況分析
        */
      KpiTools.kpi_realtime_minute(baseData)

      /**
        * 儲存偏移量
        */
      OffsetManager.saveCurrentOffset(offsetRanges)
    })

    ssc.start()
    ssc.awaitTermination()
  }
}

pom檔案

<?xml version="1.0" encoding="UTF-8"?>


4.0.0

<groupId>cn.sheep</groupId>
<artifactId>cmcc_monitor</artifactId>
<version>1.0-SNAPSHOT</version>

<properties>
    <spark.version>2.2.1</spark.version>
    <mysql.version>5.1.40</mysql.version>
    <jedis.version>2.9.0</jedis.version>
    <config.version>1.3.3</config.version>
    <fastjson.version>1.2.51</fastjson.version>
    <scalikejdbc.version>3.3.1</scalikejdbc.version>
</properties>

<dependencies>
    <!-- 匯入spark streaming依賴包-->
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-streaming_2.11</artifactId>
        <version>${spark.version}</version>
    </dependency>

    <!-- 匯入streaming kafka依賴包-->
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-streaming-kafka-0-10_2.11</artifactId>
        <version>${spark.version}</version>
    </dependency>

    <!-- 匯入mysql資料庫驅動-->
    <dependency>
        <groupId>mysql</groupId>
        <artifactId>mysql-connector-java</artifactId>
        <version>${mysql.version}</version>
    </dependency>

    <!-- 匯入redis客戶端依賴包-->
    <dependency>
        <groupId>redis.clients</groupId>
        <artifactId>jedis</artifactId>
        <version>${jedis.version}</version>
    </dependency>

    <!-- 匯入config配置檔案依賴包-->
    <dependency>
        <groupId>com.typesafe</groupId>
        <artifactId>config</artifactId>
        <version>${config.version}</version>
    </dependency>

    <!-- 匯入json依賴包-->
    <dependency>
        <groupId>com.alibaba</groupId>
        <artifactId>fastjson</artifactId>
        <version>${fastjson.version}</version>
    </dependency>

    <dependency>
        <groupId>org.scalikejdbc</groupId>
        <artifactId>scalikejdbc_2.11</artifactId>
        <version>3.3.1</version>
    </dependency>
    <dependency>
        <groupId>org.scalikejdbc</groupId>
        <artifactId>scalikejdbc-core_2.11</artifactId>
        <version>3.3.1</version>
    </dependency>
    <dependency>
        <groupId>org.scalikejdbc</groupId>
        <artifactId>scalikejdbc-config_2.11</artifactId>
        <version>3.3.1</version>
    </dependency>
    <dependency>
        <groupId>junit</groupId>
        <artifactId>junit</artifactId>
        <version>4.12</version>
        <scope>compile</scope>
    </dependency>
</dependencies>