1. 程式人生 > >spark streaming讀取kafka資料令丟失(二)

spark streaming讀取kafka資料令丟失(二)

方式二:
方法二就是每次streaming 消費了kafka的資料後,將消費的kafka offsets更新到zookeeper。當你的程式掛掉或者升級的時候,就可以接著上次的讀取,實現資料的令丟失和 at most once。而且使用checkpoint的方式可能會導致資料重複消費,spark streaming維護的offset和zookeeper維護的偏移量不同步導致資料丟失或者重複消費等。那麼我們可以在dstream 出發action的時候 特別是在output的時候出發offset更新,這樣子就能確保已消費的資料能夠將offsets更新到zookeeper。好了不多說,直接上程式碼。

def start(ssc:StreamingContext,
                   brokerList:String,
                   zkConnect:String,
                   groupId:String,
                   topic: String): InputDStream[(String, String)]  ={

    val zkClient = new ZkClient(zkConnect, 60000, 60000, new ZkSerializer {
      override def serialize(data: Object
): Array[Byte] = { try { return data.toString().getBytes("UTF-8") } catch { case _: ZkMarshallingError => return null } } override def deserialize(bytes: Array[Byte]): Object = { try { return new String(bytes, "UTF-8") } catch
{ case _: ZkMarshallingError => return null } } }) val kafkaParams = Map("metadata.broker.list" -> brokerList, "group.id" -> groupId, "zookeeper.connect"->zkConnect, "auto.offset.reset" -> kafka.api.OffsetRequest.SmallestTimeString) val topics = topic.split(",").toSet val broker = brokerList.split(",")(0).split(":")(0) val topicDirs = new ZKGroupTopicDirs(groupId, topic) val zkTopicPath = s"${topicDirs.consumerOffsetDir}" val children = zkClient.countChildren(s"${topicDirs.consumerOffsetDir}") var kafkaStream: InputDStream[(String, String)] = null var fromOffsets: Map[TopicAndPartition, Long] = Map() if (children > 0) { val topicList = List(topic) val req = new TopicMetadataRequest(topicList,0) val getLeaderConsumer = new SimpleConsumer(broker,9092,10000,10000,"OffsetLookup") val res = getLeaderConsumer.send(req) val topicMetaOption = res.topicsMetadata.headOption val partitions = topicMetaOption match{ case Some(tm) => tm.partitionsMetadata.map(pm=>(pm.partitionId,pm.leader.get.host)).toMap[Int,String] case None => Map[Int,String]() } for (i <- 0 until children) { val partitionOffset = zkClient.readData[String](s"$zkTopicPath/${i}") val tp = TopicAndPartition(topic, i) val requestMin = OffsetRequest(Map(tp -> PartitionOffsetRequestInfo(OffsetRequest.EarliestTime,1))) val consumerMin = new SimpleConsumer(partitions(i),9092,10000,10000,"getMinOffset") val curOffsets = consumerMin.getOffsetsBefore(requestMin).partitionErrorAndOffsets(tp).offsets var nextOffset = partitionOffset.toLong if(curOffsets.nonEmpty && nextOffset < curOffsets.head){ nextOffset = curOffsets.head } fromOffsets += (tp -> nextOffset) fromOffsets += (tp -> partitionOffset.toLong) logger.info(tp.topic+":"+tp.partition +";partitionOffset:"+partitionOffset+"**********"+"nextOffset:"+nextOffset) } val messageHandler = (mmd: MessageAndMetadata[String, String]) => (mmd.topic, mmd.message()) kafkaStream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder, (String, String)]( ssc, kafkaParams, fromOffsets, messageHandler) } else { kafkaStream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topics) } var offsetRanges = Array[OffsetRange]() kafkaStream.transform { rdd => offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges rdd }.foreachRDD { rdd => { for (o <- offsetRanges) { ZkUtils.updatePersistentPath(zkClient, s"${topicDirs.consumerOffsetDir}/${o.partition}", ```````` .fromOffset.toString) } } } kafkaStream }