大資料實戰專案------中國移動運營分析實時監控平臺 || 專案需求實現(文章最後有資料檔案)
阿新 • • 發佈:2018-12-16
1.業務概況(顯示總訂單量、訂單成功量、總金額、花費時間)
2.業務詳細概述(每小時的充值訂單量、每小時的充值成功訂單量)
3.業務質量(每個省份的充值成功訂單量)
4.實時統計每分鐘的充值金額和訂單量
整體步驟:
提取資料庫中儲存的偏移量–>廣播省份對映關係–>獲取kafka的資料–>資料處理(JSON物件解析,省份、時間、結果、費用)
–>計算業務概況(顯示總訂單量、訂單成功量、總金額、花費時間)–>業務概述(每小時的充值總訂單量,每小時的成功訂單量)
—>業務質量(每個省份的成功訂單量)—>實時統計每分鐘的充值金額和訂單量
下面是程式碼封裝的包
專案需求實現:
1)用flume收集資料,放入到kafka,下面是詳細配置。
#定義這個agent中各元件的名字 a1.sources = r1 a1.sinks = k1 a1.channels = c1 # 描述和配置source元件:r1 a1.sources.r1.type = spooldir a1.sources.r1.spoolDir = /usr/local/datas/flume a1.sources.r1.fileHeader = true # 描述和配置sink元件:k1 a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink a1.sinks.k1.kafka.topic = flumeLogs a1.sinks.k1.kafka.bootstrap.servers = hadoop01:9092,hadoop02:9092,hadoop03:9092 a1.sinks.k1.kafka.flumeBatchSize = 20 a1.sinks.k1.kafka.producer.acks = 1 a1.sinks.k1.kafka.producer.linger.ms = 1 a1.sinks.ki.kafka.producer.compression.type = snappy # 描述和配置channel元件,此處使用是記憶體快取的方式 a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 # 描述和配置source channel sink之間的連線關係 a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1
2)用SparkStreaming去消費kafka裡面的資料前,做一些Kafka引數的配置以及放入Redis資料庫所需要的配置。
(1)在IDEA中配置kafka和Redis相關引數,方便獲取kafka裡面的資料並且儲存到redis裡面
import com.typesafe.config.{Config, ConfigFactory} import org.apache.kafka.common.serialization.StringDeserializer object AppParams { /** * 解析application.conf配置檔案 * 載入resource下面的配置檔案,預設規則:application.conf->application.json->application.properties */ private lazy val config: Config = ConfigFactory.load() /** * 返回訂閱的主題 */ val topic = config.getString("kafka.topic").split(",") /** * kafka叢集所在的主機和埠 */ val borkers = config.getString("kafka.broker.list") /** * 消費者的ID */ val groupId = config.getString("kafka.group.id") /** * kafka的相關引數 */ val kafkaParams = Map[String, Object]( "bootstrap.servers" -> borkers, "key.deserializer" -> classOf[StringDeserializer], "value.deserializer" -> classOf[StringDeserializer], "group.id" -> groupId, "auto.offset.reset" -> "earliest", "enable.auto.commit" -> "false" ) /** * redis伺服器地址 */ val redisHost = config.getString("redis.host") /** * 將資料寫入到哪個庫 */ val selectDBIndex = config.getInt("redis.db.index") /** * 省份code和省份名稱的對映關係 */ import scala.collection.JavaConversions._ val pCode2PName = config.getObject("pcode2pname").unwrapped().toMap }
(2)方便計算訂單完成所需要的時間,封裝了一個類
import org.apache.commons.lang3.time.FastDateFormat
object CaculateTools {
// 非執行緒安全的
//private val format = new SimpleDateFormat("yyyyMMddHHmmssSSS")
// 執行緒安全的DateFormat
private val format = FastDateFormat.getInstance("yyyyMMddHHmmssSSS")
/**
* 計算時間差
*/
def caculateTime(startTime:String,endTime:String):Long = {
val start = startTime.substring(0,17)
format.parse(endTime).getTime - format.parse(start).getTime
}
}
(3)做一個Redis池去操作Redis中的資料
import com.alibaba.fastjson.JSON
import org.apache.commons.pool2.impl.GenericObjectPoolConfig
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.spark.rdd.RDD
import redis.clients.jedis.JedisPool
object Jpools {
private val poolConfig = new GenericObjectPoolConfig()
poolConfig.setMaxIdle(5) //最大的空閒連線數,連線池中最大的空閒連線數,預設是8
poolConfig.setMaxTotal(2000) //只支援最大的連線數,連線池中最大的連線數,預設是8
//連線池是私有的不能對外公開訪問
private lazy val jedisPool = new JedisPool(poolConfig, AppParams.redisHost)
def getJedis={
val jedis = jedisPool.getResource
jedis.select(AppParams.selectDBIndex)
jedis
}
}
(4)每次放入Redis前需要判斷偏移量,防止資料重複以及消耗資源
import org.apache.kafka.common.TopicPartition
import org.apache.spark.streaming.kafka010.OffsetRange
import scalikejdbc.{DB, SQL}
import scalikejdbc.config.DBs
object OffsetManager {
DBs.setup()
/**
* 獲取自己儲存的偏移量資訊
*/
def getMydbCurrentOffset: Unit ={
DB.readOnly(implicit session =>
SQL("select * from streaming_offset where groupId=?").bind(AppParams.groupId)
.map(rs =>
(
new TopicPartition(rs.string("topicName"),rs.int("partitionId")),
rs.long("offset")
)
).list().apply().toMap
)
}
/**
* 持久化儲存當前批次的偏移量
*/
def saveCurrentOffset(offsetRanges: Array[OffsetRange]) = {
DB.localTx(implicit session =>{
offsetRanges.foreach(or =>{
SQL("replace into streaming_offset values (?,?,?,?)")
.bind(or.topic,or.partition,or.untilOffset,AppParams.groupId)
.update()
.apply()
})
})
}
}
(5)設定自己的kafka、mysql(儲存偏移量)、redis的配置
kafka.topic = "flumeLog"
kafka.broker.list = "hadoop01:9092,hadoop02:9092,hadoop03:9092"
kafka.group.id = "day2_001"
# MySQL example
db.default.driver="com.mysql.jdbc.Driver"
db.default.url="jdbc:mysql://localhost/bigdata?characterEncoding=utf-8"
db.default.user="root"
db.default.password="926718"
# redis
redis.host="hadoop02"
redis.db.index=10
# 對映配置
pcode2pname {
100="北京"
200="廣東"
210="上海"
220="天津"
230="重慶"
240="遼寧"
250="江蘇"
270="湖北"
280="四川"
290="陝西"
311="河北"
351="山西"
371="河南"
431="吉林"
451="黑龍江"
471="內蒙古"
531="山東"
551="安徽"
571="浙江"
591="福建"
731="湖南"
771="廣西"
791="江西"
851="貴州"
871="雲南"
891="西藏"
898="海南"
931="甘肅"
951="寧夏"
971="青海"
991="新疆"
}
3)做好一系列配置之後就開始SparkStreaming資料處理的核心
先說明一下日誌檔案中欄位的含義
(1)下面是SparkStreaming核心程式碼
import com.alibaba.fastjson.JSON
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.rdd.RDD
object KpiTools {
/**
* 業務概況(總訂單量、成功訂單量、總金額、花費時間
*
* @param baseData
*/
def kpi_general(baseData: RDD[(String, String, List[Double], String, String)]): Unit = {
baseData.map(tp => (tp._1, tp._3)).reduceByKey((list1, list2) => {
//將所有的元素拉鍊為一個列表之後進行相加計算
list1.zip(list2).map(tp => tp._1 + tp._2)
})
.foreachPartition(partition => {
val jedis = Jpools.getJedis
partition.foreach(tp => {
//所有的資料都計算完成之後,顯示在資料庫中
jedis.hincrBy("A-" + tp._1, "total", tp._2(0).toLong)
jedis.hincrBy("A-" + tp._1, "succ", tp._2(1).toLong)
jedis.hincrByFloat("A-" + tp._1, "money", tp._2(2))
jedis.hincrBy("A-" + tp._1, "cost", tp._2(3).toLong)
// key的有效期
jedis.expire("A-" + tp._1, 48 * 60 * 60)
})
jedis.close()
})
}
/**
* 業務概述:每小時的充值總訂單量,每小時的成功訂單量
* 日期、時間、LIST(總訂單量、成功訂單量、充值成功總金額、時長)、
*
* @param baseData
*/
def kpi_general_hour(baseData: RDD[(String, String, List[Double], String, String)]): Unit = {
baseData.map(tp => ((tp._1, tp._2), List(tp._3(0), tp._3(1)))).reduceByKey((list1, list2) => {
//將所有的元素拉鍊為一個列表之後進行相加計算
list1.zip(list2).map(tp => tp._1 + tp._2)
})
.foreachPartition(partition => {
val jedis = Jpools.getJedis
partition.foreach(tp => {
//所有的資料都計算完成之後,顯示在資料庫中
jedis.hincrBy("B-" + tp._1._1, "T:" + tp._1._2, tp._2(0).toLong)
jedis.hincrBy("B-" + tp._1._1, "S" + tp._1._2, tp._2(1).toLong)
// key的有效期
jedis.expire("B-" + tp._1, 48 * 60 * 60)
})
jedis.close()
})
}
/**
* 業務質量
* 總的充值成功訂單量
*/
def kpi_quality(baseData: RDD[(String, String, List[Double], String, String)], p2p: Broadcast[Map[String, AnyRef]]) = {
baseData.map(tp => ((tp._1,tp._4),tp._3(1))).reduceByKey(_+_).foreachPartition(partition => {
val jedis = Jpools.getJedis
partition.foreach(tp => {
//總的充值成功和失敗訂單數量
jedis.hincrBy("C-" + tp._1._1,p2p.value.getOrElse(tp._1._2,tp._1._2).toString,tp._2.toLong)
jedis.expire("C-" + tp._1._1, 48 * 60 * 60)
})
jedis.close()
})
}
/**
* 實時統計每分鐘的充值金額和訂單量
* // (日期, 小時, Kpi(訂單,成功訂單,訂單金額,訂單時長),省份Code,分鐘數)
*/
def kpi_realtime_minute(baseData: RDD[(String, String, List[Double], String, String)]) = {
baseData.map(tp => ((tp._1,tp._2,tp._5),List(tp._3(1),tp._3(2)))).reduceByKey((list1,list2)=>{
list1.zip(list2).map(tp => tp._1+tp._2)
}).foreachPartition(partition => {
val jedis = Jpools.getJedis
partition.foreach(tp => {
//每分鐘充值成功的筆數和充值金額
jedis.hincrBy("D-" + tp._1._1,"C:"+ tp._1._2+tp._1._3,tp._2(0).toLong)
jedis.hincrByFloat("D-" + tp._1._1,"M"+tp._1._2+tp._1._3,tp._2(1))
jedis.expire("D-" + tp._1._1, 48 * 60 * 60)
})
jedis.close()
})
}
/**
* 整理基礎資料
*/
def baseDataRDD(rdd: RDD[ConsumerRecord[String, String]]): RDD[(String, String, List[Double], String, String)] = {
rdd // ConsumerRecord => JSONObject
.map(cr => JSON.parseObject(cr.value())) // 過濾出充值通知日誌
.filter(obj => obj.getString("serviceName").equalsIgnoreCase("reChargeNotifyReq")).map(obj => {
// 判斷該條日誌是否是充值成功的日誌
val result = obj.getString("bussinessRst")
val fee = obj.getDouble("chargefee")
// 充值發起時間和結束時間
val requestId = obj.getString("requestId")
// 資料當前日期
val day = requestId.substring(0, 8)
val hour = requestId.substring(8, 10)
val minute = requestId.substring(10, 12)
val receiveTime = obj.getString("receiveNotifyTime")
//省份Code
val provinceCode = obj.getString("provinceCode")
val costTime = CaculateTools.caculateTime(requestId, receiveTime)
val succAndFeeAndTime: (Double, Double, Double) = if (result.equals("0000")) (1, fee, costTime) else (0, 0, 0)
// (日期, 小時, Kpi(訂單,成功訂單,訂單金額,訂單時長),省份Code,分鐘數)
(day, hour, List[Double](1, succAndFeeAndTime._1, succAndFeeAndTime._2, succAndFeeAndTime._3), provinceCode, minute)
}).cache()
}
}
(2)將封裝好的方法呼叫
import cn.sheep.utils.{AppParams, KpiTools, OffsetManager}
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.spark.SparkConf
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.streaming.dstream.InputDStream
import org.apache.spark.streaming.kafka010._
import org.apache.spark.streaming.{Seconds, StreamingContext}
/**
* 中國移動實時監控平臺(優化版)
* Created by zhangjingcun on 2018/10/16 16:34.
*/
object BootStarpAppV2 {
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf()
sparkConf.setAppName("中國移動運營實時監控平臺-Monitor") //如果在叢集上執行的話,需要去掉:sparkConf.setMaster("local[*]")
sparkConf.setMaster("local[*]") //將rdd以序列化格式來儲存以減少記憶體的佔用
//預設採用org.apache.spark.serializer.JavaSerializer
//這是最基本的優化
sparkConf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer") //rdd壓縮
sparkConf.set("spark.rdd.compress", "true") //batchSize = partitionNum * 分割槽數量 * 取樣時間
sparkConf.set("spark.streaming.kafka.maxRatePerPartition", "10000") //優雅的停止
sparkConf.set("spark.streaming.stopGracefullyOnShutdown", "true")
val ssc = new StreamingContext(sparkConf, Seconds(2))
/**
* 提取資料庫中儲存的偏移量
*/
val currOffset = OffsetManager.getMydbCurrentOffset
/**
* 廣播省份對映關係
*/
val pcode2PName: Broadcast[Map[String, AnyRef]] = ssc.sparkContext.broadcast(AppParams.pCode2PName)
/** 獲取kafka的資料
* LocationStrategies:位置策略,如果kafka的broker節點跟Executor在同一臺機器上給一種策略,不在一臺機器上給另外一種策略
* 設定策略後會以最優的策略進行獲取資料
* 一般在企業中kafka節點跟Executor不會放到一臺機器的,原因是kakfa是訊息儲存的,Executor用來做訊息的計算,
* 因此計算與儲存分開,儲存對磁碟要求高,計算對記憶體、CPU要求高
* 如果Executor節點跟Broker節點在一起的話使用PreferBrokers策略,如果不在一起的話使用PreferConsistent策略
* 使用PreferConsistent策略的話,將來在kafka中拉取了資料以後儘量將資料分散到所有的Executor上 */
val stream: InputDStream[ConsumerRecord[String, String]] = KafkaUtils.createDirectStream(ssc,
LocationStrategies.PreferConsistent,
ConsumerStrategies.Subscribe[String, String](AppParams.topic, AppParams.kafkaParams)
)
/**
* 資料處理
*/
stream.foreachRDD(rdd=>{
val offsetRanges: Array[OffsetRange] = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
val baseData = KpiTools.baseDataRDD(rdd)
/**
* 計算業務概況
*/
KpiTools.kpi_general(baseData)
KpiTools.kpi_general_hour(baseData)
/**
* 業務質量
*/
KpiTools.kpi_quality(baseData, pcode2PName)
/**
* 實時充值情況分析
*/
KpiTools.kpi_realtime_minute(baseData)
/**
* 儲存偏移量
*/
OffsetManager.saveCurrentOffset(offsetRanges)
})
ssc.start()
ssc.awaitTermination()
}
}
pom檔案
<?xml version="1.0" encoding="UTF-8"?>
4.0.0
<groupId>cn.sheep</groupId>
<artifactId>cmcc_monitor</artifactId>
<version>1.0-SNAPSHOT</version>
<properties>
<spark.version>2.2.1</spark.version>
<mysql.version>5.1.40</mysql.version>
<jedis.version>2.9.0</jedis.version>
<config.version>1.3.3</config.version>
<fastjson.version>1.2.51</fastjson.version>
<scalikejdbc.version>3.3.1</scalikejdbc.version>
</properties>
<dependencies>
<!-- 匯入spark streaming依賴包-->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.11</artifactId>
<version>${spark.version}</version>
</dependency>
<!-- 匯入streaming kafka依賴包-->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka-0-10_2.11</artifactId>
<version>${spark.version}</version>
</dependency>
<!-- 匯入mysql資料庫驅動-->
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>${mysql.version}</version>
</dependency>
<!-- 匯入redis客戶端依賴包-->
<dependency>
<groupId>redis.clients</groupId>
<artifactId>jedis</artifactId>
<version>${jedis.version}</version>
</dependency>
<!-- 匯入config配置檔案依賴包-->
<dependency>
<groupId>com.typesafe</groupId>
<artifactId>config</artifactId>
<version>${config.version}</version>
</dependency>
<!-- 匯入json依賴包-->
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>${fastjson.version}</version>
</dependency>
<dependency>
<groupId>org.scalikejdbc</groupId>
<artifactId>scalikejdbc_2.11</artifactId>
<version>3.3.1</version>
</dependency>
<dependency>
<groupId>org.scalikejdbc</groupId>
<artifactId>scalikejdbc-core_2.11</artifactId>
<version>3.3.1</version>
</dependency>
<dependency>
<groupId>org.scalikejdbc</groupId>
<artifactId>scalikejdbc-config_2.11</artifactId>
<version>3.3.1</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.12</version>
<scope>compile</scope>
</dependency>
</dependencies>