1. 程式人生 > >kafka直連方式消費多個topic

kafka直連方式消費多個topic

一個消費者組可以消費多個topic,以前寫過一篇一個消費者消費一個topic的,這次的是一個消費者組通過直連方式消費多個topic,做了小測試,結果是正確的,通過檢視zookeeper的客戶端,zookeeper記錄了偏移量

package day04

/*
消費多個topic
 */
import kafka.common.TopicAndPartition
import kafka.message.MessageAndMetadata
import kafka.serializer.StringDecoder
import kafka.utils.{ZKGroupTopicDirs, ZkUtils}
import scala.collection.mutable.ListBuffer import org.I0Itec.zkclient.ZkClient import org.apache.spark.SparkConf import org.apache.spark.streaming.dstream.InputDStream import org.apache.spark.streaming.kafka.{HasOffsetRanges, KafkaUtils, OffsetRange} import org.apache.spark.streaming.{Duration, StreamingContext}
object OrderDemoYY1 { def main(args: Array[String]): Unit = { val conf = new SparkConf().setAppName("yy").setMaster("local[*]") val ssc = new StreamingContext(conf,Duration(5000)) //消費3個topic val topic1 = "wc" val topic2 ="wc1" val topic3 ="wc2" //組名 val groupid ="GPMMVV"
//zookeeper地址 val zkQuorum = "hadoop01:2181,hadoop02:2181,hadoop03:2181" //brokerList val brokerList = "hadoop01:9092,hadoop02:9092,hadoop03:9092" //把消費的分割槽放到Set集合中,可以在第一次讀取時作為引數傳入 val topics = Set(topic1,topic2,topic3) //ListBuffer時有序的,按下標有序 val topicsList = ListBuffer[String](topic1,topic2,topic3) //設定kafka的引數 val kafkaParams = Map( "metadata.broker.list"->brokerList, "groupid"->groupid, "auto.offset.reset"->kafka.api.OffsetRequest.SmallestTimeString //預設時從頭開始讀的 ) //new ListBuffer用來存放ZKGroupTopicDirs, 用來儲存偏移量的地址 //因為有多個topic,對應的也就有多個ZKGroupTopicDirs var zkGTList:ListBuffer[ZKGroupTopicDirs] =new ListBuffer[ZKGroupTopicDirs]() //根據topicList 新建 ZKGroupTopicDirs 新增到zkGTList for(tp <- topicsList){ val topicDirs = new ZKGroupTopicDirs(groupid,tp) zkGTList += topicDirs } //新建zkClient,用來獲取偏移量和更新偏移量 val zkClient = new ZkClient(zkQuorum) //新建一個InputDStream,要是var,因為有兩種情況,消費過? 沒有消費過? 根據情況賦值 var kafkaDStream :InputDStream[(String,String)] = null //建立一個Map,(key,value)-》( 對應的時Topic和分割槽 ,偏移量) var fromOffset = Map[TopicAndPartition,Long]() //獲取每個topic是否被消費過 var childrens:ListBuffer[Int] =new ListBuffer[Int]() var flag = false //有topic被消費過則為true for (topicDir <- zkGTList){ //迴圈存放偏移量的 //通過zkClient.countChidren來獲取每個topic對應的分割槽中的偏移量ZKGroupTopicDirs的物件 val child: Int = zkClient.countChildren(topicDir.consumerOffsetDir) childrens += child if(child>0){ flag = true } } if(flag){//消費過 for(z <- 0 until topics.size){ //根據topicsList的的下表獲取相應的child和ZKGroupTopicDirs val child = childrens(z) val gpDirs = zkGTList(z) val topicn = topicsList(z) for(i <- 0 until child){ //迴圈child, 根據使用zkClient.readData方法,u獲取topic的每個分割槽的偏移量 val offset = zkClient.readData[String](gpDirs.consumerOffsetDir+"/"+i) val tp = new TopicAndPartition(topicn,i) fromOffset += tp -> offset.toLong } } //返回的而結果是 kafka的key,預設是null, value是kafka中的值 val messageHandler =(mmd:MessageAndMetadata[String,String])=>{ (mmd.key(),mmd.message()) } //建立kafkaDStream kafkaDStream = KafkaUtils.createDirectStream[String,String,StringDecoder,StringDecoder,(String,String)]( ssc,kafkaParams,fromOffset,messageHandler ) }else{//以前沒有讀取過 kafkaDStream = KafkaUtils.createDirectStream[String,String,StringDecoder,StringDecoder]( ssc,kafkaParams,topics ) } /*val children1 = zkClient.countChildren(zKGroupTopicDirs1.consumerOffsetDir) val children2 = zkClient.countChildren(zKGroupTopicDirs2.consumerOffsetDir) if(children1>0 || children2>0){ if(children1>0){ for (i <- 0 until children1){ val offset = zkClient.readData[String](zKGroupTopicDirs1.consumerOffsetDir+"/"+i) val tp = new TopicAndPartition(topic1,i) fromOffset += tp ->offset.toLong } } if(children2>0){ for (i <- 0 until children1){ val offset = zkClient.readData[String](zKGroupTopicDirs2.consumerOffsetDir+"/"+i) val tp = new TopicAndPartition(topic2,i) fromOffset += tp ->offset.toLong } } val messageHandler =(mmd:MessageAndMetadata[String,String])=>{ (mmd.key(),mmd.message()) } kafkaDStream = KafkaUtils.createDirectStream[String,String,StringDecoder,StringDecoder,(String,String)](ssc, kafkaParams,fromOffset,messageHandler) }else{ kafkaDStream = KafkaUtils.createDirectStream[String,String,StringDecoder,StringDecoder](ssc,kafkaParams,topics) }*/ var offsetRanges = Array[OffsetRange]() //用來記錄更新的每個topic的分割槽偏移量 kafkaDStream.foreachRDD(kafkaRDD=>{ //kafkaRDD是一個KafkaRDD,可以轉換成HasOffsetRanges物件,從而獲取offsetRanges offsetRanges= kafkaRDD.asInstanceOf[HasOffsetRanges].offsetRanges kafkaRDD.foreach(println) //列印 for(o <- offsetRanges){ val topicNN: String = o.topic //獲取topic val offset: Long = o.untilOffset //獲取偏移量 val partition: Int = o.partition //獲取分割槽 val i = topicsList.indexOf(topicNN) //通過topicList查詢topic的下標,找到與之對應的ZKGroupTopicDirs val gpDir = zkGTList(i) //通過ZkUtils更新偏移量 ZkUtils.updatePersistentPath(zkClient,gpDir.consumerOffsetDir+"/"+partition,offset.toString) /*if(topicNN.equals(topic1)){ ZkUtils.updatePersistentPath(zkClient,zKGroupTopicDirs1.consumerOffsetDir+"/"+partition,offset.toString) }else if(topicNN.equals(topic2)){ ZkUtils.updatePersistentPath(zkClient,zKGroupTopicDirs2.consumerOffsetDir+"/"+partition,offset.toString) }*/ } }) ssc.start() ssc.awaitTermination() } }

可以通過zookeeper的客戶端,在/consumers中檢視偏移量,
我的3個topic中,其中wc和wc1只有1個分割槽,可以通過下圖可看出wc1的0分割槽偏移量13
在這裡插入圖片描述
wc的分割槽數只有一個,對應的分割槽號是0,偏移量是7
在這裡插入圖片描述
wc2中一共有3個分割槽,下圖可以看出分割槽1中的偏移量是7
在這裡插入圖片描述