kafka直連方式消費多個topic
阿新 • • 發佈:2018-12-21
一個消費者組可以消費多個topic,以前寫過一篇一個消費者消費一個topic的,這次的是一個消費者組通過直連方式消費多個topic,做了小測試,結果是正確的,通過檢視zookeeper的客戶端,zookeeper記錄了偏移量
package day04
/*
消費多個topic
*/
import kafka.common.TopicAndPartition
import kafka.message.MessageAndMetadata
import kafka.serializer.StringDecoder
import kafka.utils.{ZKGroupTopicDirs, ZkUtils}
import scala.collection.mutable.ListBuffer
import org.I0Itec.zkclient.ZkClient
import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.InputDStream
import org.apache.spark.streaming.kafka.{HasOffsetRanges, KafkaUtils, OffsetRange}
import org.apache.spark.streaming.{Duration, StreamingContext}
object OrderDemoYY1 {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName("yy").setMaster("local[*]")
val ssc = new StreamingContext(conf,Duration(5000))
//消費3個topic
val topic1 = "wc"
val topic2 ="wc1"
val topic3 ="wc2"
//組名
val groupid ="GPMMVV"
//zookeeper地址
val zkQuorum = "hadoop01:2181,hadoop02:2181,hadoop03:2181"
//brokerList
val brokerList = "hadoop01:9092,hadoop02:9092,hadoop03:9092"
//把消費的分割槽放到Set集合中,可以在第一次讀取時作為引數傳入
val topics = Set(topic1,topic2,topic3)
//ListBuffer時有序的,按下標有序
val topicsList = ListBuffer[String](topic1,topic2,topic3)
//設定kafka的引數
val kafkaParams = Map(
"metadata.broker.list"->brokerList,
"groupid"->groupid,
"auto.offset.reset"->kafka.api.OffsetRequest.SmallestTimeString
//預設時從頭開始讀的
)
//new ListBuffer用來存放ZKGroupTopicDirs, 用來儲存偏移量的地址
//因為有多個topic,對應的也就有多個ZKGroupTopicDirs
var zkGTList:ListBuffer[ZKGroupTopicDirs] =new ListBuffer[ZKGroupTopicDirs]()
//根據topicList 新建 ZKGroupTopicDirs 新增到zkGTList
for(tp <- topicsList){
val topicDirs = new ZKGroupTopicDirs(groupid,tp)
zkGTList += topicDirs
}
//新建zkClient,用來獲取偏移量和更新偏移量
val zkClient = new ZkClient(zkQuorum)
//新建一個InputDStream,要是var,因為有兩種情況,消費過? 沒有消費過? 根據情況賦值
var kafkaDStream :InputDStream[(String,String)] = null
//建立一個Map,(key,value)-》( 對應的時Topic和分割槽 ,偏移量)
var fromOffset = Map[TopicAndPartition,Long]()
//獲取每個topic是否被消費過
var childrens:ListBuffer[Int] =new ListBuffer[Int]()
var flag = false //有topic被消費過則為true
for (topicDir <- zkGTList){ //迴圈存放偏移量的
//通過zkClient.countChidren來獲取每個topic對應的分割槽中的偏移量ZKGroupTopicDirs的物件
val child: Int = zkClient.countChildren(topicDir.consumerOffsetDir)
childrens += child
if(child>0){
flag = true
}
}
if(flag){//消費過
for(z <- 0 until topics.size){ //根據topicsList的的下表獲取相應的child和ZKGroupTopicDirs
val child = childrens(z)
val gpDirs = zkGTList(z)
val topicn = topicsList(z)
for(i <- 0 until child){
//迴圈child, 根據使用zkClient.readData方法,u獲取topic的每個分割槽的偏移量
val offset = zkClient.readData[String](gpDirs.consumerOffsetDir+"/"+i)
val tp = new TopicAndPartition(topicn,i)
fromOffset += tp -> offset.toLong
}
}
//返回的而結果是 kafka的key,預設是null, value是kafka中的值
val messageHandler =(mmd:MessageAndMetadata[String,String])=>{
(mmd.key(),mmd.message())
}
//建立kafkaDStream
kafkaDStream = KafkaUtils.createDirectStream[String,String,StringDecoder,StringDecoder,(String,String)](
ssc,kafkaParams,fromOffset,messageHandler
)
}else{//以前沒有讀取過
kafkaDStream = KafkaUtils.createDirectStream[String,String,StringDecoder,StringDecoder](
ssc,kafkaParams,topics
)
}
/*val children1 = zkClient.countChildren(zKGroupTopicDirs1.consumerOffsetDir)
val children2 = zkClient.countChildren(zKGroupTopicDirs2.consumerOffsetDir)
if(children1>0 || children2>0){
if(children1>0){
for (i <- 0 until children1){
val offset = zkClient.readData[String](zKGroupTopicDirs1.consumerOffsetDir+"/"+i)
val tp = new TopicAndPartition(topic1,i)
fromOffset += tp ->offset.toLong
}
}
if(children2>0){
for (i <- 0 until children1){
val offset = zkClient.readData[String](zKGroupTopicDirs2.consumerOffsetDir+"/"+i)
val tp = new TopicAndPartition(topic2,i)
fromOffset += tp ->offset.toLong
}
}
val messageHandler =(mmd:MessageAndMetadata[String,String])=>{
(mmd.key(),mmd.message())
}
kafkaDStream = KafkaUtils.createDirectStream[String,String,StringDecoder,StringDecoder,(String,String)](ssc,
kafkaParams,fromOffset,messageHandler)
}else{
kafkaDStream = KafkaUtils.createDirectStream[String,String,StringDecoder,StringDecoder](ssc,kafkaParams,topics)
}*/
var offsetRanges = Array[OffsetRange]() //用來記錄更新的每個topic的分割槽偏移量
kafkaDStream.foreachRDD(kafkaRDD=>{
//kafkaRDD是一個KafkaRDD,可以轉換成HasOffsetRanges物件,從而獲取offsetRanges
offsetRanges= kafkaRDD.asInstanceOf[HasOffsetRanges].offsetRanges
kafkaRDD.foreach(println) //列印
for(o <- offsetRanges){
val topicNN: String = o.topic //獲取topic
val offset: Long = o.untilOffset //獲取偏移量
val partition: Int = o.partition //獲取分割槽
val i = topicsList.indexOf(topicNN) //通過topicList查詢topic的下標,找到與之對應的ZKGroupTopicDirs
val gpDir = zkGTList(i)
//通過ZkUtils更新偏移量
ZkUtils.updatePersistentPath(zkClient,gpDir.consumerOffsetDir+"/"+partition,offset.toString)
/*if(topicNN.equals(topic1)){
ZkUtils.updatePersistentPath(zkClient,zKGroupTopicDirs1.consumerOffsetDir+"/"+partition,offset.toString)
}else if(topicNN.equals(topic2)){
ZkUtils.updatePersistentPath(zkClient,zKGroupTopicDirs2.consumerOffsetDir+"/"+partition,offset.toString)
}*/
}
})
ssc.start()
ssc.awaitTermination()
}
}
可以通過zookeeper的客戶端,在/consumers中檢視偏移量,
我的3個topic中,其中wc和wc1只有1個分割槽,可以通過下圖可看出wc1的0分割槽偏移量13
wc的分割槽數只有一個,對應的分割槽號是0,偏移量是7
wc2中一共有3個分割槽,下圖可以看出分割槽1中的偏移量是7