Spark Streaming 之 consumer offsets 儲存到 Zookeeper 以實現資料零丟失
在 Spark Streaming 中消費 Kafka 資料的時候,有兩種方式:
1)基於 Receiver-based 的 createStream 方法
2)Direct Approach (No Receivers) 方式的 createDirectStream 方法
就效能而言,第二種方式比第一種方式高效得多。但是第二種使用方式中 kafka 的 offset 是儲存在 checkpoint 中的,Spark Streaming 並沒有將 消費的偏移量 傳送到Zookeeper中,這將導致那些基於偏移量的Kafka叢集監控軟體(比如:Apache Kafka監控之Kafka Web Console
官方只是蜻蜓點水地描述了可以用以下方法修改zookeeper中的consumer offsets(可以檢視http://spark.apache.org/docs/latest/streaming-kafka-integration.html)
// Hold a reference to the current offset ranges, so it can be used downstream var offsetRanges = Array.empty[OffsetRange] directKafkaStream.transform { rdd => offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges rdd }.map { ... }.foreachRDD { rdd => for (o <- offsetRanges) { println(s"${o.topic} ${o.partition} ${o.fromOffset} ${o.untilOffset}") } ... }
所以更新zookeeper中的consumer offsets還需要自己去實現,並且官方提供的 createDirectStream過載 並不能很好的滿足需求,需要進一步封裝。具體看以下KafkaManager類的程式碼:
import org.apache.spark.streaming.kafka.KafkaCluster.LeaderOffset import kafka.common.TopicAndPartition import kafka.message.MessageAndMetadata import kafka.serializer.Decoder import org.apache.spark.SparkException import org.apache.spark.rdd.RDD import org.apache.spark.streaming.StreamingContext import org.apache.spark.streaming.dstream.InputDStream import org.apache.spark.streaming.kafka.{KafkaUtils, OffsetRange} import scala.reflect.ClassTag /** * Created by YZX on 2017/5/20 13:14 in Beijing. */ class KafkaManager(val kafkaParams: Map[String, String]) { private val kc = new KafkaCluster(kafkaParams) /** * 建立資料流 * * @param ssc * @param kafkaParams * @param topics * @tparam K * @tparam V * @tparam KD * @tparam VD * @return */ def createDirectStream[K: ClassTag, V: ClassTag, KD <: Decoder[K]: ClassTag, VD <: Decoder[V]: ClassTag](ssc: StreamingContext, kafkaParams: Map[String, String], topics: Set[String]): InputDStream[(K, V)] = { val groupId = kafkaParams.get("group.id").get // 在zookeeper上讀取offsets前先根據實際情況更新offsets setOrUpdateOffsets(topics, groupId) //從zookeeper上讀取offset開始消費message val partitionsE = kc.getPartitions(topics) if (partitionsE.isLeft) { throw new SparkException("get kafka partition failed:") } val partitions = partitionsE.right.get val consumerOffsetsE = kc.getConsumerOffsets(groupId, partitions) if (consumerOffsetsE.isLeft) { throw new SparkException("get kafka consumer offsets failed:") } val consumerOffsets = consumerOffsetsE.right.get KafkaUtils.createDirectStream[K, V, KD, VD, (K, V)]( ssc, kafkaParams, consumerOffsets, (mmd: MessageAndMetadata[K, V]) => (mmd.key, mmd.message) ) } /** * 建立資料流前,根據實際消費情況更新消費offsets * * @param topics * @param groupId */ private def setOrUpdateOffsets(topics: Set[String], groupId: String): Unit = { topics.foreach{ topic => var hasConsumed = true val partitionsE = kc.getPartitions(Set(topic)) if (partitionsE.isLeft) { throw new SparkException("get kafka partition failed:") } val partitions = partitionsE.right.get val consumerOffsetsE = kc.getConsumerOffsets(groupId, partitions) if (consumerOffsetsE.isLeft) { hasConsumed = false } if (hasConsumed) {// 消費過 /** * 如果zk上儲存的offsets已經過時了,即kafka的定時清理策略已經將包含該offsets的檔案刪除。 * 針對這種情況,只要判斷一下zk上的consumerOffsets和earliestLeaderOffsets的大小, * 如果consumerOffsets比earliestLeaderOffsets還小的話,說明consumerOffsets已過時, * 這時把consumerOffsets更新為earliestLeaderOffsets */ val earliestLeaderOffsets = kc.getEarliestLeaderOffsets(partitions).right.get val consumerOffsets = consumerOffsetsE.right.get // 可能只是存在部分分割槽consumerOffsets過時,所以只更新過時分割槽的consumerOffsets為earliestLeaderOffsets var offsets: Map[TopicAndPartition, Long] = Map() consumerOffsets.foreach{ case(tp, n) => val earliestLeaderOffset = earliestLeaderOffsets(tp).offset if(n < earliestLeaderOffset) { println("consumer group:" + groupId + ",topic:" + tp.topic + ",partition:" + tp.partition + " offsets已經過時,更新為" + earliestLeaderOffset) offsets += (tp -> earliestLeaderOffset) } } if(!offsets.isEmpty) { kc.setConsumerOffsets(groupId, offsets) } } else {// 沒有消費過 val reset = kafkaParams.get("auto.offset.reset").map(_.toLowerCase) var leaderOffsets: Map[TopicAndPartition, LeaderOffset] = null if(reset == Some("smallest")) { leaderOffsets = kc.getEarliestLeaderOffsets(partitions).right.get } else { leaderOffsets = kc.getLatestLeaderOffsets(partitions).right.get } val offsets = leaderOffsets.map { case (tp, offset) => (tp, offset.offset) } kc.setConsumerOffsets(groupId, offsets) } } } /** * 更新zookeeper上的消費offsets * * @param rdd * @param offsetRanges */ def updateZKOffsets(rdd: RDD[(String, String)], offsetRanges: Array[OffsetRange]) : Unit = { val groupId = kafkaParams.get("group.id").get //val offsetsList = rdd.asInstanceOf[HasOffsetRanges].offsetRanges for (offsets <- offsetRanges) { val topicAndPartition = TopicAndPartition(offsets.topic, offsets.partition) val o = kc.setConsumerOffsets(groupId, Map((topicAndPartition, offsets.untilOffset))) if (o.isLeft) { println(s"Error updating the offset to Kafka cluster: ${o.left.get}") } } } }
其中,org.apache.spark.streaming.kafka.KafkaCluster 許可權為私有,所以需要把這部分原始碼拷貝出來。
最後,來一個完整的例子。
import java.sql.DriverManager
import kafka.serializer.StringDecoder
import org.apache.log4j.{Level, Logger}
import org.apache.spark.rdd.RDD
import org.apache.spark.streaming.kafka._
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.{SparkConf, SparkContext}
/**
* Created by YZX on 2017/5/20 13:14 in Beijing.
*/
object DirectKafkaReportStreaming {
def main(args: Array[String]) {
// 遮蔽不必要的日誌顯示在終端上
Logger.getLogger("org").setLevel(Level.WARN)
val conf = new SparkConf().setAppName("DirectKafkaReportStreaming") //.setMaster("local[*]")
val sc = new SparkContext(conf)
val ssc = new StreamingContext(sc, Seconds(args(0).toLong))
// Create direct kafka stream with brokers and topics
val topics = Set("report.pv_account", "report.base_reach_click", "report.base_second_jump", "report.base_conversion_click", "report.base_conversion_imp")
val brokers = "192.168.145.216:9092, 192.168.145.217:9092, 192.168.145.218:9092, 192.168.145.221:9092, 192.168.145.222:9092, 192.168.145.223:9092, 192.168.145.224:9092, 192.168.145.225:9092, 192.168.145.226:9092, 192.168.145.227:9092"
val kafkaParams = Map[String, String]("metadata.broker.list" -> brokers, "consumer.timeout.ms" -> "30000", "group.id" -> "YZXDirectKafkaReportStreaming")
// Hold a reference to the current offset ranges, so it can be used downstream
var offsetRanges = Array[OffsetRange]()
val km = new KafkaManager(kafkaParams)
val directKafkaStream = km.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topics)
// This is safe because we haven't shuffled or otherwise disrupted partitioning and the original input rdd partitions were 1:1 with kafka partitions
directKafkaStream.transform{ rdd =>
offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
rdd
}.foreachRDD{ rdd =>
if (!rdd.isEmpty()) {
//先處理訊息
processRDD(rdd.map(_._2))
//再更新offsets
km.updateZKOffsets(rdd, offsetRanges)
}
//不能保證Exactly once,因為更新 Mysql 和更新 zookeeper 不是一個事務
}
ssc.start()
ssc.awaitTermination()
ssc.stop(true, true) //優雅地結束
}
def getDouble(input: String) : Double = try{ input.toDouble } catch { case e: Exception => 0.0 }
def getLong(input: String) : Long = try{ input.toLong } catch { case e: Exception => 0L }
def processRDD(messages: RDD[String] ) = {
val (dbDriver, url_144_237, user, password) = ("com.mysql.jdbc.Driver", "jdbc:mysql://192.168.144.237:3306/", "data", "[email protected]#$%^&8")
val keyValue = messages.map(_.split("\t", -1).map(_.trim)).filter(_.length == 97).map{ arr =>
val kafkaTime = arr(0) //kafka上資料的業務時間
val day = try { kafkaTime.substring(0, 8) } catch { case e: Exception => "-l" } //天
val hour = try { kafkaTime.substring(8, 10) } catch { case e: Exception => "-l" } //小時
val (partner_id, advertiser_company_id, advertiser_id, order_id) = (arr(1), arr(2), arr(3), arr(4))
val (campaign_id, sub_campaign_id, exe_campaign_id, vertical_tag_id, conversion_pixel) = (arr(5), arr(6), arr(7), arr(8), arr(9))
val (creative_size, creative_id, creative_type, inventory_type, ad_slot_type, platform) = (arr(10), arr(11), arr(12), arr(16), arr(17), arr(24))
//維度
val key = (day, hour, partner_id, advertiser_company_id, advertiser_id, order_id, campaign_id, sub_campaign_id, exe_campaign_id, vertical_tag_id, conversion_pixel, creative_size, creative_id, creative_type, inventory_type, ad_slot_type, platform)
val raw_media_cost: Double = getDouble(arr(53))
val media_cost: Double = getDouble(arr(54))
val service_fee: Double = getDouble(arr(55))
val media_tax: Double = getDouble(arr(56))
val service_tax: Double = getDouble(arr(57))
val total_cost: Double = getDouble(arr(58))
val system_loss: Double = getDouble(arr(59))
val bid: Long = getLong(arr(61))
val imp: Long = getLong(arr(62))
val click: Long = getLong(arr(63))
val reach: Long = getLong(arr(64))
val two_jump: Long = getLong(arr(65))
val click_conversion: Long = getLong(arr(66))
val imp_conversion: Long = getLong(arr(67))
//指標
val value = Array[Double](raw_media_cost, media_cost, service_fee, media_tax, service_tax, total_cost, system_loss, bid, imp, click, reach, two_jump, click_conversion, imp_conversion)
(key, value)
}
//按照維度聚合,對應指標累加
val reduceRDD = keyValue.reduceByKey{ case (v1, v2) => v1.zip(v2).map(x => x._1 + x._2) }
reduceRDD.foreachPartition{ iter =>
Class.forName(dbDriver)
val connection = DriverManager.getConnection(url_144_237, user, password)
//connection.setAutoCommit(false) //關閉事務自動提交
val statement = connection.createStatement()
for(row <- iter) {
val key = row._1 //維度
val day = key._1
val hour = key._2
val partner_id = try{ key._3.toLong } catch { case e: Exception => -1L }
val advertiser_company_id = try { key._4.toLong } catch { case e: Exception => -1L }
val advertiser_id = try { key._5.toLong } catch { case e: Exception => -1L }
val order_id = try { key._6.toLong } catch { case e: Exception => -1L }
val campaign_id = try { key._7.toLong } catch { case e: Exception => -1L }
val sub_campaign_id = try { key._8.toLong } catch { case e: Exception => -1L }
val exe_campaign_id = try { key._9.toLong } catch { case e: Exception => -1L }
val vertical_tag_id = try { key._10.toLong } catch { case e: Exception => -1L }
val conversion_pixel = try { key._11.toLong } catch { case e: Exception => -1L }
val creative_size = if(key._12 != null) key._12 else ""
val creative_id = if(key._13 != null) key._13 else ""
val creative_type = if(key._14 != null) key._14 else ""
val inventory_type = if(key._15 != null) key._15 else ""
val ad_slot_type = if(key._16 != null) key._16 else ""
val platform = if(key._17 != null) key._17 else ""
val value = row._2 //指標
val raw_media_cost = value(0)
val media_cost = value(1)
val service_fee = value(2)
val media_tax = value(3)
val service_tax = value(4)
val total_cost = value(5)
val system_loss = value(6)
val bid = value(7).toLong
val imp = value(8).toLong
val click = value(9).toLong
val reach = value(10).toLong
val two_jump = value(11).toLong
val click_conversion = value(12).toLong
val imp_conversion = value(13).toLong
//沒有就插入,有就更新,需要對保持唯一的欄位建立唯一索引
val sql =
s"""
|INSERT INTO test.rpt_effect_newday
|(day, hour, partner_id, advertiser_company_id, advertiser_id, order_id,
|campaign_id, sub_campaign_id, exe_campaign_id, vertical_tag_id, conversion_pixel,
|creative_size, creative_id, creative_type, inventory_type, ad_slot_type, platform,
|raw_media_cost, media_cost, service_fee,media_tax, service_tax, total_cost, system_loss,
|bid, imp, click, reach, two_jump, click_conversion, imp_conversion)
|VALUES ($day, $hour, $partner_id, $advertiser_company_id, $advertiser_id, $order_id,
|$campaign_id, $sub_campaign_id, $exe_campaign_id, $vertical_tag_id, $conversion_pixel,
|'$creative_size', '$creative_id', '$creative_type', '$inventory_type', '$ad_slot_type', '$platform',
|$raw_media_cost, $media_cost, $service_fee, $media_tax, $service_tax, $total_cost, $system_loss,
|$bid, $imp, $click, $reach, $two_jump, $click_conversion, $imp_conversion)
|ON DUPLICATE KEY UPDATE
|raw_media_cost=raw_media_cost+$raw_media_cost, media_cost=media_cost+$media_cost, service_fee=service_fee+$service_fee, media_tax=media_tax+$media_tax, service_tax=service_tax+$service_tax, total_cost=total_cost+$total_cost, system_loss=system_loss+$system_loss,
|bid=bid+$bid, imp=imp+$imp, click=click+$click, reach=reach+$reach, two_jump=two_jump+$two_jump, click_conversion=click_conversion+$click_conversion, imp_conversion=imp_conversion+$imp_conversion
""".stripMargin.replace("\n", " ")
statement.addBatch(sql)
}
statement.executeBatch() //執行批量更新
//connection.commit() //語句執行完畢,提交本事務
connection.close() //關閉資料庫連線
}
}
}
友情連結如下:
Spark+Kafka的Direct方式將偏移量傳送到Zookeeper實現
將 Spark Streaming + Kafka direct 的 offset 存入Zookeeper並重用
spark streaming kafka1.4.1中的低階api createDirectStream使用總結,directstream
Exactly-once Spark Streaming from Apache Kafka
https://github.com/koeninger/kafka-exactly-once/tree/spark-1.6.0