spark streaming中維護kafka偏移量到外部介質
阿新 • • 發佈:2019-04-03
.exe topic _each keys off exec lose eat comm
spark streaming中維護kafka偏移量到外部介質
以kafka偏移量維護到redis為例。
redis存儲格式
使用的數據結構為string
,其中key為topic:partition
,value為offset
。
例如bobo
這個topic
下有3個分區,則key-value結構如下:
bobo:0
的偏移量為xbobo:1
的偏移量為ybobo:2
的偏移量為z
消費時指定offset
主要是如下兩個方法:
createKafkaStream()
創建kakfa流getOffsets()
從redis中獲取offsets
/** * kakfa參數 */ private val kafkaParams = Map[String, Object]( "bootstrap.servers" -> "crpprdap25:6667,crpprdap26:6667,crpprdap27:6667", "key.deserializer" -> classOf[StringDeserializer], "value.deserializer" -> classOf[StringDeserializer], "group.id" -> "use_a_separate_group_id_for_each_stream", // 註意這裏是none。 "auto.offset.reset" -> "none", "enable.auto.commit" -> (false: java.lang.Boolean) ) // `bobo`topic下有3個分區 private val topicPartitions = Map[String, Int]("bobo" -> 3) // 從redis中獲取offsets def getOffsets: Map[TopicPartition, Long] = { val jedis = InternalRedisClient.getResource // 設置每個分區起始的offset val offsets = mutable.Map[TopicPartition, Long]() topicPartitions.foreach { it => val topic = it._1 val partitions = it._2 // 遍歷分區,設置每個topic下對應partition的offset for (partition <- 0 until partitions) { val topicPartitionKey = topic + ":" + partition var lastOffset = 0L val lastSavedOffset = jedis.get(topicPartitionKey) if (null != lastSavedOffset) { try { lastOffset = lastSavedOffset.toLong } catch { case e: Exception => log.error("get lastSavedOffset error", e) System.exit(1) } } log.info("from redis topic: {}, partition: {}, lastOffset: {}", topic, partition, lastOffset) // 添加 offsets += (new TopicPartition(topic, partition) -> lastOffset) } } InternalRedisClient.returnResource(jedis) offsets.toMap } /** * 創建kakfa流 * * @param ssc StreamingContext * @return InputDStream */ def createKafkaStream(ssc: StreamingContext): InputDStream[ConsumerRecord[String, String]] = { val offsets = getOffsets // 創建kafka stream val stream = KafkaUtils.createDirectStream[String, String]( ssc, LocationStrategies.PreferConsistent, ConsumerStrategies.Assign[String, String](offsets.keys.toList, kafkaParams, offsets) ) stream }
其中:核心是通過ConsumerStrategies.Assign
方法來指定topic
下對應partition
的offset
信息。
更新offset到redis
最後將offset信息維護到redis即可。
/** * 消費 * * @param stream InputDStream */ def consume(stream: InputDStream[ConsumerRecord[String, String]]): Unit = { stream.foreachRDD { rdd => // 獲取offset信息 val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges // 計算相關指標,這裏就統計下條數了 val total = rdd.count() val jedis = InternalRedisClient.getResource val pipeline = jedis.pipelined() // 會阻塞redis pipeline.multi() // 更新相關指標 pipeline.incrBy("totalRecords", total) // 更新offset offsetRanges.foreach { offsetRange => log.info("save offsets, topic: {}, partition: {}, offset: {}", offsetRange.topic, offsetRange.partition, offsetRange.untilOffset) val topicPartitionKey = offsetRange.topic + ":" + offsetRange.partition pipeline.set(topicPartitionKey, offsetRange.untilOffset + "") } // 執行,釋放 pipeline.exec() pipeline.sync() pipeline.close() InternalRedisClient.returnResource(jedis) } }
參考
- 實時流計算、Spark Streaming、Kafka、Redis、Exactly-once、實時去重
spark代碼
順便貼一下自己整理的spark相關的代碼。
Github地址:spark-programming
主要包括:
- RDD的基本使用
- SQL
- jdbc(讀、寫)
- hive(讀、寫、動態分區)
- Streaming
- 消費kafka(手動提交、手動維護offset)
- 寫入HBase
- 寫入Hive
spark streaming中維護kafka偏移量到外部介質