1. 程式人生 > >sparkStreaming 與fafka直接方式 進行消費者偏移量的保存如redis 裏面 避免代碼改變與節點重啟後的數據丟失與序列化問題

sparkStreaming 與fafka直接方式 進行消費者偏移量的保存如redis 裏面 避免代碼改變與節點重啟後的數據丟失與序列化問題

create term tex ria streaming 保存 else config cal

import java.util

import kafka.common.TopicAndPartition
import kafka.message.MessageAndMetadata
import kafka.serializer.StringDecoder
import org.apache.spark.SparkConf
import org.apache.spark.rdd.RDD
import org.apache.spark.streaming.dstream.InputDStream
import org.apache.spark.streaming.kafka.{HasOffsetRanges, KafkaUtils, OffsetRange}
import org
.apache.spark.streaming.{Duration, StreamingContext} import redis.clients.jedis.{Jedis, JedisPool, JedisPoolConfig} object KafkaDricteRedis { def main(args: Array[String]): Unit = { val conf = new SparkConf().setAppName("redis").setMaster("local[*]") val ssc = new StreamingContext(conf,new
Duration(5000)) val groupid = "GB01" //組名 val topic = "topic_bc"//topic 名 //在redis中以 groupid/topic作為唯一標識 ,存儲分區偏移量 //在Reids 使用的時hash類型來存儲 val gtKey = groupid+"/"+topic //topic val topics = Set(topic) //zk地址 val zkQuorum = "hadoop01:2181,hadoop02:2181,hadoop03:2181" //brokerList
val brokerList = "hadoop04:9092,hadoop05:9092,hadoop06:9092" val kafkaParams = Map( // metadata.broker.list "metadata.broker.list"->brokerList, "group.id"->groupid, "auto.offset.reset"->kafka.api.OffsetRequest.SmallestTimeString //從頭開始消費 ) //記錄topic 、分區對應的偏移量偏移量,在創建InputDStream時作為參數傳如 //從這個偏移量開始讀取 var fromOffset = Map[TopicAndPartition,Long]() var kafkaDStream :InputDStream[(String,String)] = null // 獲取一個jedis連接 val conn = getConnection() // conn.flushDB() //jd.hget(groupid+topic,"") //獲取全部的keys val values: util.Set[String] = conn.keys("*") //println(values) // [GB01/wordcount3] 分區數 偏移量 //如果keys中包含 GB01/wordcount3這樣的key,則表示以前讀取過 if(values.contains(gtKey)){ //獲取key 為GB01/wordcount3 下面所對應的(k,v) /** conn.hgetAll(gtKey) GB01/wordcount3: * 1 888 * 2 888 * 3 888 * 4 888 */ var allKey: util.Map[String, String] = conn.hgetAll(gtKey) //導入後,可以把Java中的集合轉換為Scala中的集合 import scala.collection.JavaConversions._ var list: List[(String, String)] = allKey.toList //循環得到的(k,v) //這裏面的 k 對應的是分區, v對應的是偏移量 for (key <- list){ //這裏的key是一個tuple類型 //new一個TopicAndPartition 把 topic 和分區數傳入 val tp = new TopicAndPartition(topic,key._1.toInt) //把每個topic 分區 對應的偏移量傳入 fromOffset += tp -> key._2.toLong println("分區"+key._1+"偏移量為"+key._2) } //這裏的是把數據(key ,value)是kafka 的key默認是null, //value 是kafka中的value val messageHandler =(mmd:MessageAndMetadata[String,String])=>{ ( mmd.key(),mmd.message()) } //創建一個InputDStream kafkaDStream= KafkaUtils.createDirectStream[String,String,StringDecoder,StringDecoder,(String,String)](ssc, kafkaParams,fromOffset,messageHandler) }else{ //如果以前沒有讀取過,創建一個新的InputDStream kafkaDStream= KafkaUtils.createDirectStream[String,String,StringDecoder,StringDecoder]( ssc,kafkaParams,topics ) } //用來更新偏移量,OffsetRange中可以獲取分區及偏移量 var OffsetRangs = Array[OffsetRange]() // kafkaDStream.foreachRDD(kafkaRDD=> { //這裏面的RDD是kafkaRDD ,可以轉換為HasOffsetRange val ranges = kafkaRDD.asInstanceOf[HasOffsetRanges] // 獲取分區信息的集合 OffsetRangs = ranges.offsetRanges //獲取value,(key 默認是null,沒有用) val map: RDD[String] = kafkaRDD.map(_._2) map.foreach(x=>print("")) //更新偏移量 for (o <- OffsetRangs){ //取出偏移量 val offset = o.untilOffset //取出分區 val partition = o.partition println("partition: "+partition) println("offset: "+offset) //把通過hset,把對應的partition和offset寫入到redis中 conn.hset(gtKey,partition.toString,offset.toString) } }) ssc.start() ssc.awaitTermination() } //Jedis連接池 def getConnection(): Jedis ={ //new 一個JedisPoolConfig,用來設定參數 val conf = new JedisPoolConfig() val pool = new JedisPool(conf,"192.168.121.12",6379) //最大連接數 conf.setMaxTotal(20) //最大空閑數 conf.setMaxIdle(20) val jedis = pool.getResource() //密碼 jedis.auth("test123") jedis }

sparkStreaming 與fafka直接方式 進行消費者偏移量的保存如redis 裏面 避免代碼改變與節點重啟後的數據丟失與序列化問題