1. 程式人生 > >【Big Data 每日一題20180922】sparkstreaming同時消費多個topic的資料實現exactly-once的語義

【Big Data 每日一題20180922】sparkstreaming同時消費多個topic的資料實現exactly-once的語義

最近很多人問我,sparkstreaming怎麼消費多個topic的資料,自己維護offest,其實這個跟消費一個topic是一樣的,但還是有很多問我,今天就簡單的寫一個demo,供大家參考,直接上程式碼吧,已經測試過了.我把offest存到redis裡了,當然也可以儲存在zk,kafka,mysql,hbase中都可以,看自己的選擇.(用了3個topic,每個topic5個partition.)

package spark
 
import java.io.File
import kafka.{PropertiesScalaUtils, RedisKeysListUtils}
import kafka.streamingRedisHive.{dbIndex}
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.log4j.{Level, Logger}
import org.apache.spark.TaskContext
import org.apache.spark.sql.SparkSession
import org.apache.spark.streaming.dstream.InputDStream
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.kafka010._
import redis.RedisPool
 
object moreTopic {
  def main(args: Array[String]): Unit = {
    Logger.getLogger("org.apache.spark").setLevel(Level.INFO)
    Logger.getLogger("org.eclipse.jetty.server").setLevel(Level.INFO)
    Logger.getLogger("org.apache.kafka.clients.consumer").setLevel(Level.INFO)
    val warehouseLocation = new File("hdfs://cluster/hive/warehouse").getAbsolutePath
    val spark = SparkSession.builder().appName("Spark Jason").config("spark.sql.warehouse.dir", warehouseLocation).enableHiveSupport().getOrCreate()
    spark.conf.set("spark.streaming.concurrentJobs", 10)
    spark.conf.set("spark.streaming.kafka.maxRetries", 50)
    spark.conf.set("spark.streaming.stopGracefullyOnShutdown",true)
    spark.conf.set("spark.streaming.backpressure.enabled",true)
    spark.conf.set("spark.streaming.backpressure.initialRate",5000)
    spark.conf.set("spark.streaming.kafka.maxRatePerPartition", 3000)
    @transient
    val sc = spark.sparkContext
    val scc = new StreamingContext(sc, Seconds(2))
    val kafkaParams = Map[String, Object](
      "auto.offset.reset" -> "latest",
      "value.deserializer" -> classOf[StringDeserializer]
      , "key.deserializer" -> classOf[StringDeserializer]
      , "bootstrap.servers" -> PropertiesScalaUtils.loadProperties("broker")
      , "group.id" -> PropertiesScalaUtils.loadProperties("groupId")
      , "enable.auto.commit" -> (false: java.lang.Boolean)
    )
    var stream: InputDStream[ConsumerRecord[String, String]] = null
    val topics = Array("jason_20180519", "jason_0606","jason_test")
    val maxTotal = 200
    val maxIdle = 100
    val minIdle = 10
    val testOnBorrow = false
    val testOnReturn = false
    val maxWaitMillis = 5000
    RedisPool.makePool(PropertiesScalaUtils.loadProperties("redisHost"), PropertiesScalaUtils.loadProperties("redisPort").toInt, maxTotal, maxIdle, minIdle, testOnBorrow, testOnReturn, maxWaitMillis)
    val jedis = RedisPool.getPool.getResource
    jedis.select(dbIndex)
    val keys = jedis.keys(topics(0) + "*")
    val keys_2 = jedis.keys(topics(1) +"*")
    val keys_3 = jedis.keys(topics(2) +"*")
    if(keys.size() == 0 && keys_2.size() == 0 && keys_3.size() == 0){
      println("第一次啟動,從頭開始消費資料-----------------------------------------------------------")
      stream = KafkaUtils.createDirectStream[String, String](
        scc,
        LocationStrategies.PreferConsistent,
        ConsumerStrategies.Subscribe[String, String](topics, kafkaParams)
      )
    }else{
      println("不是第一次啟動,從上次的offest開始消費資料-----------------------------------------------")
      stream = KafkaUtils.createDirectStream[String, String](
        scc,
        LocationStrategies.PreferConsistent,
        ConsumerStrategies.Subscribe[String, String](topics, kafkaParams, RedisKeysListUtils.getRedisOffest(topics,jedis)))
    }
    jedis.close()
    stream.foreachRDD(rdd=>{
      val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
      rdd.foreachPartition(partition=>{
        val o = offsetRanges(TaskContext.get.partitionId)
        println(s"${o.topic} ${o.partition} ${o.fromOffset} ${o.untilOffset}")
        val jedis_jason = RedisPool.getPool.getResource
        jedis_jason.select(dbIndex)
        partition.foreach(pair=>{
          //自己的計算邏輯;
        })
        offsetRanges.foreach { offsetRange =>
          println("partition : " + offsetRange.partition + " fromOffset:  " + offsetRange.fromOffset + " untilOffset: " + offsetRange.untilOffset)
          val topic_partition_key_new = offsetRange.topic + "_" + offsetRange.partition
          jedis_jason.set(topic_partition_key_new, offsetRange.untilOffset + "")
        }
        jedis_jason.close()
      })
    })
    scc.start()
    scc.awaitTermination()
  }
}

轉:https://blog.csdn.net/xianpanjia4616/article/details/81709075 --------------------- 本文來自 JasonLee_coding 的CSDN 部落格 ,全文地址請點選:https://blog.csdn.net/xianpanjia4616/article/details/81709075?utm_source=copy

相關推薦

Big Data 每日20180922sparkstreaming同時消費topic資料實現exactly-once語義

最近很多人問我,sparkstreaming怎麼消費多個topic的資料,自己維護offest,其實這個跟消費一個topic是一樣的,但還是有很多問我,今天就簡單的寫一個demo,供大家參考,直接上程式碼吧,已經測試過了.我把offest存到redis裡了,當然也可以儲存在z

Big Data 每日20180916Spark累加器(Accumulator)陷阱及解決辦法

Accumulator簡介 Accumulator是spark提供的累加器,顧名思義,該變數只能夠增加。  只有driver能獲取到Accumulator的值(使用value方法),Task只能對其做增加操作(使用 +=)。你也可以在為Accumulator命名(不支援Py

Big Data 每日20180927Structured Streaming 之 Event Time 解析

Structured Streaming 之 Event Time 解析 [酷玩 Spark] Structured Streaming 原始碼解析系列 ,返回目錄請 猛戳這裡 本文內容適用範圍: * 2017.07.11 update, Spark 2.2 全系列

Big Data 每日20180926Structured Streaming 之狀態儲存解析

Structured Streaming 之狀態儲存解析 [酷玩 Spark] Structured Streaming 原始碼解析系列 ,返回目錄請 猛戳這裡 本文內容適用範圍: * 2017.07.11 update, Spark 2.2 全系列 √ (已釋出:2

Big Data 每日20181104Minor GC、Major GC和Full GC之間的區別

在 Plumbr 從事 GC 暫停檢測相關功能的工作時,我被迫用自己的方式,通過大量文章、書籍和演講來介紹我所做的工作。在整個過程中,經常對 Minor、Major、和 Full GC 事件的使用感到困惑。這也是我寫這篇部落格的原因,我希望能清楚地解釋這其中的一些疑惑。 文

Big Data 每日20181028Alluxio簡介

 一、Alluxio是什麼?         Alluxio是一個基於記憶體的分散式檔案系統,它是架構在底層分散式檔案系統和上層分散式計算框架之間的一箇中間件,主要職責是以檔案形式在記憶體或其它儲存設施中提供資料的存取服務。         Alluxio的前身為Tach

Big Data 每日20180821Spark中ml和mllib的區別

Spark中ml和mllib的主要區別和聯絡如下: ml和mllib都是Spark中的機器學習庫,目前常用的機器學習功能2個庫都能滿足需求。 spark官方推薦使用ml, 因為ml功能更全面更靈活,未來會主要支援ml,mllib很有可能會被廢棄(據說可能是在spark3.

Big Data 每日20181116塊儲存、檔案儲存、物件儲存意義及差異

關於塊儲存、檔案儲存、物件儲存方面的知識在知乎上看到了個很好的解答:https://www.zhihu.com/question/21536660 通俗易懂,查了些資料做了詳細的補充。  塊儲存     典型裝置:磁碟陣列、硬碟     塊儲存主要是將裸磁碟空間整個

Big Data 每日20180831Spark 的 task 資料 locality?

在Spark Application Web UI的 Stages tag 上,tasks 的一些資訊,其中 Locality Level 一欄的值可以有 PROCESS_LOCAL: 資料在同一個 JVM 中,即同一個 executor 上。這是最佳資料 locali

Big Data 每日20180921Spark 序列化問題

在Spark應用開發中,很容易出現如下報錯: org.apache.spark.SparkException: Task not serializable at org.apache.spark.util.ClosureCleaner$.ensureSerializab

Big Data 每日-20181115linux 命令技巧 chmod & Set-User-ID & Set-Group-ID

本篇文章討論的是Linux/Unix的許可權問題,包括檔案的訪問許可權、可執行檔案的許可權以及修改這些許可權的命令chmod。 Linux許可權模式介紹   用ls -l命令列出檔案的詳細資訊,關於ls -l參見這裡。 1 2 3 4 5 6

Big Data 每日20181031深入分析volatile的實現原理

通過前面一章我們瞭解了synchronized是一個重量級的鎖,雖然JVM對它做了很多優化,而下面介紹的volatile則是輕量級的synchronized。如果一個變數使用volatile,則它比使用synchronized的成本更加低,因為它不會引起執行緒上下文的切換

Big Data 每日20181101如何用訊息系統避免分散式事務?

前陣子從支付寶轉賬1萬塊錢到餘額寶,這是日常生活的一件普通小事,但作為網際網路研發人員的職業病,我就思考支付寶扣除1萬之後,如果系統掛掉怎麼辦,這時餘額寶賬戶並沒有增加1萬,資料就會出現不一致狀況了。   上述場景在各個型別的系統中都能找到相似影子,比如在電商系統中,當

Big Data 每日20180822Java動態編譯優化——URLClassLoader 記憶體洩漏問題解決

一、動態編譯案例 要說動態編譯記憶體洩漏,首先我們先看一個案例(網上搜動態編譯的資料是千篇一律,只管實現功能,不管記憶體洩漏,並且都恬不知恥的標識為原創!!) 這篇文章和我google搜的其他文章、資料一樣,屬於JDK1.6以後的版本。確實能實現動態編譯並載入,但

Big Data 每日20181103你應該知道的RPC原理

你應該知道的RPC原理 在學校期間大家都寫過不少程式,比如寫個hello world服務類,然後本地呼叫下,如下所示。這些程式的特點是服務消費方和服務提供方是本地呼叫關係。   而一旦踏入公司尤其是大型網際網路公司就會發現,公司的系統都由成千上萬大大小小的服務組成,各

Big Data 每日20181111為什麼有棧記憶體和堆記憶體之分

為什麼有棧記憶體和堆記憶體之分?         陣列引用變數只是一個引用,這個引用變數可以指向任何有效的記憶體,只有當該引用指向有效記憶體,才可以通過該陣列變數來訪問陣列。        實際的陣列物件被儲存在堆(heap)記憶體中;如果引用該陣列物件引用變數是一個區域

Big Data 每日Spark開發效能調優總結

1. 分配資源調優 Spark效能調優的王道就是分配資源,即增加和分配更多的資源對效能速度的提升是顯而易見的,基本上,在一定範圍之內,增加資源與效能的提升是成正比的,當公司資源有限,能分配的資源達到頂峰之後,那麼才去考慮做其他的調優 如何分配及分配哪些資源 在生產環境中,提交spark作

Big Data 每日目錄

Spark 【Big Data 每日一題】Spark開發效能調優總結 【Big Data 每日一題20180821】Spark中ml和mllib的區別? 【Big Data 每日一題20180828】Maven 中 jar 包的 Snapshot 和 Release 版本區別? 【B

CTF 每日20160630PYTHON 位元組碼

python逆向基礎資源: 本題分析: 1.話說用python中的dis模組可以自己編個反編譯程式,但是我目前不會,就不羅嗦了。下載uncompyle2後,可以在終端進入uncompyle2目錄下,找到setup.py,就可執行下列命令執行安

CTF 每日20160607

啥? 2014-12-03 19:51:14 作者:admin 1765 86     誰能告訴我這是啥?答案又是啥。。 答案形式:wctf{你的字串} 答案:wctf{mianw