Spark Streaming消費Kafka Direct方式資料零丟失實現
一、概述
上次寫這篇文章文章的時候,Spark還是1.x,kafka還是0.8x版本,轉眼間spark到了2.x,kafka也到了2.x,儲存offset的方式也發生了改變,筆者根據上篇文章和網上文章,將offset儲存到Redis,既保證了併發也保證了資料不丟失,經過測試,有效。
二、使用場景
Spark Streaming實時消費kafka資料的時候,程式停止或者Kafka節點掛掉會導致資料丟失,Spark Streaming也沒有設定CheckPoint(據說比較雞肋,雖然可以儲存Direct方式的offset,但是可能會導致頻繁寫HDFS佔用IO),所以每次出現問題的時候,重啟程式,而程式的消費方式是Direct,所以在程式down掉的這段時間Kafka上的資料是消費不到的,雖然可以設定offset為smallest,但是會導致重複消費,重新overwrite hive上的資料,但是不允許重複消費的場景就不能這樣做。
三、原理闡述
在Spark Streaming中消費 Kafka 資料的時候,有兩種方式分別是 :
1.基於 Receiver-based 的 createStream 方法。receiver從Kafka中獲取的資料都是儲存在Spark Executor的記憶體中的,然後Spark Streaming啟動的job會去處理那些資料。然而,在預設的配置下,這種方式可能會因為底層的失敗而丟失資料。如果要啟用高可靠機制,讓資料零丟失,就必須啟用Spark Streaming的預寫日誌機制(Write Ahead Log,WAL)。該機制會同步地將接收到的Kafka資料寫入分散式檔案系統(比如HDFS)上的預寫日誌中。所以,即使底層節點出現了失敗,也可以使用預寫日誌中的資料進行恢復。本文對此方式不研究,有興趣的可以自己實現,個人不喜歡這個方式。KafkaUtils.createStream
2.Direct Approach (No Receivers) 方式的 createDirectStream 方法,但是第二種使用方式中 kafka 的 offset 是儲存在 checkpoint 中的,如果程式重啟的話,會丟失一部分資料,我使用的是這種方式。KafkaUtils.createDirectStream。本文將用程式碼說明如何將 kafka 中的 offset 儲存到 Redis 中,以及如何從 Redis 中讀取已存在的 offset。引數auto.offset.reset為latest的時候程式才會讀取redis的offset。
四、實現程式碼
import org.apache.kafka.clients.consumer.ConsumerRecord import org.apache.kafka.common.TopicPartition import org.apache.kafka.common.serialization.StringDeserializer import org.apache.spark.SparkConf import org.apache.spark.streaming.{Seconds, StreamingContext} import org.apache.spark.streaming.dstream.InputDStream import org.apache.spark.streaming.kafka010._ import scala.collection.JavaConverters._ import scala.util.Try /** * Created by chouyarn of BI on 2018/8/21 */ object KafkaUtilsRedis { /** * 根據groupId儲存offset * @param ranges * @param groupId */ def storeOffset(ranges: Array[OffsetRange], groupId: String): Unit = { for (o <- ranges) { val key = s"bi_kafka_offset_${groupId}_${o.topic}_${o.partition}" val value = o.untilOffset JedisUtil.set(key, value.toString) } } /** * 根據topic,groupid獲取offset * @param topics * @param groupId * @return */ def getOffset(topics: Array[String], groupId: String): (Map[TopicPartition, Long], Int) = { val fromOffSets = scala.collection.mutable.Map[TopicPartition, Long]() topics.foreach(topic => { val keys = JedisUtil.getKeys(s"bi_kafka_offset_${groupId}_${topic}*") if (!keys.isEmpty) { keys.asScala.foreach(key => { val offset = JedisUtil.get(key) val partition = Try(key.split(s"bi_kafka_offset_${groupId}_${topic}_").apply(1)).getOrElse("0") fromOffSets.put(new TopicPartition(topic, partition.toInt), offset.toLong) }) } }) if (fromOffSets.isEmpty) { (fromOffSets.toMap, 0) } else { (fromOffSets.toMap, 1) } } /** * 建立InputDStream,如果auto.offset.reset為latest則從redis讀取 * @param ssc * @param topic * @param kafkaParams * @return */ def createStreamingContextRedis(ssc: StreamingContext, topic: Array[String], kafkaParams: Map[String, Object]): InputDStream[ConsumerRecord[String, String]] = { var kafkaStreams: InputDStream[ConsumerRecord[String, String]] = null val groupId = kafkaParams.get("group.id").get val (fromOffSet, flag) = getOffset(topic, groupId.toString) val offsetReset = kafkaParams.get("auto.offset.reset").get if (flag == 1 && offsetReset.equals("latest")) { kafkaStreams = KafkaUtils.createDirectStream(ssc, LocationStrategies.PreferConsistent, ConsumerStrategies.Subscribe(topic, kafkaParams, fromOffSet)) } else { kafkaStreams = KafkaUtils.createDirectStream(ssc, LocationStrategies.PreferConsistent, ConsumerStrategies.Subscribe(topic, kafkaParams)) } kafkaStreams } def main(args: Array[String]): Unit = { val conf = new SparkConf().setAppName("offSet Redis").setMaster("local[2]") val ssc = new StreamingContext(conf, Seconds(60)) val kafkaParams = Map[String, Object]( "bootstrap.servers" -> "localhost:9092", "group.id" -> "binlog.test.rpt_test_1min", "auto.offset.reset" -> "latest", "enable.auto.commit" -> (false: java.lang.Boolean), "session.timeout.ms" -> "20000", "key.deserializer" -> classOf[StringDeserializer], "value.deserializer" -> classOf[StringDeserializer] ) val topic = Array("binlog.test.rpt_test", "binlog.test.hbase_test", "binlog.test.offset_test") val groupId = "binlog.test.rpt_test_1min" val lines = createStreamingContextRedis(ssc, topic, kafkaParams) lines.foreachRDD(rdds => { if (!rdds.isEmpty()) { println("##################:" + rdds.count()) } storeOffset(rdds.asInstanceOf[HasOffsetRanges].offsetRanges, groupId) }) ssc.start() ssc.awaitTermination() } }
五、JedisUtil程式碼
import java.util
import com.typesafe.config.ConfigFactory
import org.apache.kafka.common.serialization.StringDeserializer
import redis.clients.jedis.{HostAndPort, JedisCluster, JedisPool, JedisPoolConfig}
object JedisUtil {
private val config = ConfigFactory.load("realtime-etl.conf")
private val redisHosts: String = config.getString("redis.server")
private val port: Int = config.getInt("redis.port")
private val hostAndPortsSet: java.util.Set[HostAndPort] = new util.HashSet[HostAndPort]()
redisHosts.split(",").foreach(host => {
hostAndPortsSet.add(new HostAndPort(host, port))
})
private val jedisConf: JedisPoolConfig = new JedisPoolConfig()
jedisConf.setMaxTotal(5000)
jedisConf.setMaxWaitMillis(50000)
jedisConf.setMaxIdle(300)
jedisConf.setTestOnBorrow(true)
jedisConf.setTestOnReturn(true)
jedisConf.setTestWhileIdle(true)
jedisConf.setMinEvictableIdleTimeMillis(60000l)
jedisConf.setTimeBetweenEvictionRunsMillis(3000l)
jedisConf.setNumTestsPerEvictionRun(-1)
lazy val redis = new JedisCluster(hostAndPortsSet, jedisConf)
def get(key: String): String = {
try {
redis.get(key)
} catch {
case e: Exception => e.printStackTrace()
null
}
}
def set(key: String, value: String) = {
try {
redis.set(key, value)
} catch {
case e: Exception => {
e.printStackTrace()
}
}
}
def hmset(key: String, map: java.util.Map[String, String]): Unit = {
// val redis=pool.getResource
try {
redis.hmset(key, map)
}catch {
case e:Exception => e.printStackTrace()
}
}
def hset(key: String, field: String, value: String): Unit = {
// val redis=pool.getResource
try {
redis.hset(key, field, value)
} catch {
case e: Exception => {
e.printStackTrace()
}
}
}
def hget(key: String, field: String): String = {
try {
redis.hget(key, field)
}catch {
case e:Exception => e.printStackTrace()
null
}
}
def hgetAll(key: String): java.util.Map[String, String] = {
try {
redis.hgetAll(key)
} catch {
case e: Exception => e.printStackTrace()
null
}
}
}
六、總結
根據不同的groupid來儲存不同的offset,支援多個topic