1. 程式人生 > >spark streaming中維護kafka偏移量到外部介質

spark streaming中維護kafka偏移量到外部介質

.exe topic _each keys off exec lose eat comm

spark streaming中維護kafka偏移量到外部介質

以kafka偏移量維護到redis為例。

redis存儲格式

使用的數據結構為string,其中key為topic:partition,value為offset

例如bobo這個topic下有3個分區,則key-value結構如下:

  • bobo:0的偏移量為x
  • bobo:1的偏移量為y
  • bobo:2的偏移量為z

消費時指定offset

主要是如下兩個方法:

  • createKafkaStream()創建kakfa流
  • getOffsets()從redis中獲取offsets
/**
  * kakfa參數
  */
private val kafkaParams = Map[String, Object](
  "bootstrap.servers" -> "crpprdap25:6667,crpprdap26:6667,crpprdap27:6667",
  "key.deserializer" -> classOf[StringDeserializer],
  "value.deserializer" -> classOf[StringDeserializer],
  "group.id" -> "use_a_separate_group_id_for_each_stream",
  // 註意這裏是none。
  "auto.offset.reset" -> "none",
  "enable.auto.commit" -> (false: java.lang.Boolean)
)

// `bobo`topic下有3個分區
private val topicPartitions = Map[String, Int]("bobo" -> 3)

// 從redis中獲取offsets
def getOffsets: Map[TopicPartition, Long] = {
  val jedis = InternalRedisClient.getResource

  // 設置每個分區起始的offset
  val offsets = mutable.Map[TopicPartition, Long]()

  topicPartitions.foreach { it =>
    val topic = it._1
    val partitions = it._2
    // 遍歷分區,設置每個topic下對應partition的offset
    for (partition <- 0 until partitions) {
      val topicPartitionKey = topic + ":" + partition
      var lastOffset = 0L
      val lastSavedOffset = jedis.get(topicPartitionKey)

      if (null != lastSavedOffset) {
        try {
          lastOffset = lastSavedOffset.toLong
        } catch {
          case e: Exception =>
            log.error("get lastSavedOffset error", e)
            System.exit(1)
        }
      }
      log.info("from redis topic: {}, partition: {}, lastOffset: {}", topic, partition, lastOffset)

      // 添加
      offsets += (new TopicPartition(topic, partition) -> lastOffset)
    }
  }

  InternalRedisClient.returnResource(jedis)

  offsets.toMap
}

/**
  * 創建kakfa流
  *
  * @param ssc StreamingContext
  * @return InputDStream
  */
def createKafkaStream(ssc: StreamingContext): InputDStream[ConsumerRecord[String, String]] = {
  val offsets = getOffsets

  // 創建kafka stream
  val stream = KafkaUtils.createDirectStream[String, String](
    ssc,
    LocationStrategies.PreferConsistent,
    ConsumerStrategies.Assign[String, String](offsets.keys.toList, kafkaParams, offsets)
  )
  stream
}

其中:核心是通過ConsumerStrategies.Assign方法來指定topic下對應partitionoffset信息。

更新offset到redis

最後將offset信息維護到redis即可。

/**
  * 消費
  *
  * @param stream InputDStream
  */
def consume(stream: InputDStream[ConsumerRecord[String, String]]): Unit = {
  stream.foreachRDD { rdd =>
    // 獲取offset信息
    val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges

    // 計算相關指標,這裏就統計下條數了
    val total = rdd.count()

    val jedis = InternalRedisClient.getResource
    val pipeline = jedis.pipelined()
    // 會阻塞redis
    pipeline.multi()

    // 更新相關指標
    pipeline.incrBy("totalRecords", total)

    // 更新offset
    offsetRanges.foreach { offsetRange =>
      log.info("save offsets, topic: {}, partition: {}, offset: {}", offsetRange.topic, offsetRange.partition, offsetRange.untilOffset)
      val topicPartitionKey = offsetRange.topic + ":" + offsetRange.partition
      pipeline.set(topicPartitionKey, offsetRange.untilOffset + "")
    }

    // 執行,釋放
    pipeline.exec()
    pipeline.sync()
    pipeline.close()
    InternalRedisClient.returnResource(jedis)
  }
}

參考

  • 實時流計算、Spark Streaming、Kafka、Redis、Exactly-once、實時去重

spark代碼

順便貼一下自己整理的spark相關的代碼。

Github地址:spark-programming

主要包括:

  • RDD的基本使用
  • SQL
    • jdbc(讀、寫)
    • hive(讀、寫、動態分區)
  • Streaming
    • 消費kafka(手動提交、手動維護offset)
    • 寫入HBase
    • 寫入Hive

spark streaming中維護kafka偏移量到外部介質