spark streaming程式因叢集kafka版本不一致造成ZkUtils類無法更新offset解決方案
阿新 • • 發佈:2019-01-04
問題:
因為CDH叢集環境問題,我spark streaming程式的依賴就依照其版本來進行,但這就遇到一個問題,叢集spark2支援的kafka版本是0.9.0,而我們程式操作zookeeper的ZkUtils類就不相容了。
解決方案:
重新KafkaCluster類,相容叢集版本。
原程式單個topic的zk更新offset的方法:
val stream = createCustomDirectKafkaStream(ssc,kafkaParams,"advertidshadoop161v14taiji.cdn.ifengidc.com","/kafka", topics)
/* * createDirectStream() method overloaded */ def createCustomDirectKafkaStream(ssc: StreamingContext, kafkaParams: Map[String, String], zkHosts: String , zkPath: String, topics: Set[String]): InputDStream[(String, String)] = { val topic = topics.last //TODO only for single kafka topic right now val zkClient = new ZkClient(zkHosts, 30000, 30000) val storedOffsets = readOffsets(zkClient,zkHosts, zkPath, topic) val kafkaStream = storedOffsets match { case None => // start from the latest offsets KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topics) case Some(fromOffsets) => // start from previously saved offsets val messageHandler = (mmd: MessageAndMetadata[String, String]) => (mmd.key, mmd.message) KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder , (String, String)](ssc, kafkaParams, fromOffsets, messageHandler) } // save the offsets kafkaStream.foreachRDD(rdd => saveOffsets(zkClient,zkHosts, zkPath, rdd)) kafkaStream } /* * Read the previously saved offsets from Zookeeper */ private def readOffsets(zkClient: ZkClient,zkHosts:String, zkPath: String, topic: String): Option[Map[TopicAndPartition, Long]] = { logger.info("Reading offsets from Zookeeper") val stopwatch = new Stopwatch() val (offsetsRangesStrOpt, _) = ZkUtils.readDataMaybeNull(zkClient, zkPath) offsetsRangesStrOpt match { case Some(offsetsRangesStr) => logger.info(s"Read offset ranges: ${offsetsRangesStr}") val offsets = offsetsRangesStr.split(",") .map(s => s.split(":")) .map { case Array(partitionStr, offsetStr) => (TopicAndPartition(topic, partitionStr.toInt) -> offsetStr.toLong) } .toMap logger.info("Done reading offsets from Zookeeper. Took " + stopwatch) Some(offsets) case None => logger.info("No offsets found in Zookeeper. Took " + stopwatch) None } } private def saveOffsets(zkClient: ZkClient,zkHosts:String, zkPath: String, rdd: RDD[_]): Unit = { logger.info("Saving offsets to Zookeeper") val stopwatch = new Stopwatch() val offsetsRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges offsetsRanges.foreach(offsetRange => logger.debug(s"Using ${offsetRange}")) val offsetsRangesStr = offsetsRanges.map(offsetRange => s"${offsetRange.partition}:${offsetRange.fromOffset}") .mkString(",") logger.info("Writing offsets to Zookeeper zkClient="+zkClient+" zkHosts="+zkHosts+"zkPath="+zkPath+" offsetsRangesStr:"+ offsetsRangesStr) ZkUtils.updatePersistentPath(zkClient, zkPath, offsetsRangesStr) logger.info("Done updating offsets in Zookeeper. Took " + stopwatch) } class Stopwatch { private val start = System.currentTimeMillis() override def toString() = (System.currentTimeMillis() - start) + " ms" }
重寫方法操作zk:
參考的github專案:https://github.com/xlturing/spark-journey/tree/master/SparkStreamingKafka