1. 程式人生 > >sparkstreaming整合kafka引數設定,message偏移量寫入redis

sparkstreaming整合kafka引數設定,message偏移量寫入redis

kafka高階資料來源拉取到spark,偏移量自我維護寫入到redis,建立redis連線池。

需要匯入

<groupId>org.apache.spark</groupId>

<artifactId>spark-streaming-kafka-0-10_2.11</artifactId>
<version>2.2.1</version>
</dependency>
<!--匯入redis的客戶端的java的依賴包,就可以實現和redis的連線。-->
<dependency>
<groupId>redis.clients</

groupId>
<artifactId>jedis</artifactId>
<version>2.9.0</version>
</dependency>


import java.{lang, util}

import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.log4j.{Level, Logger}
import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.InputDStream
import org.apache.spark.streaming.kafka010._
import org.apache.spark.streaming.{Seconds, StreamingContext}
import redis.clients.jedis.Jedis

object WCKafkaRedisApp {

  //  Logger.getLogger("org").setLevel(Level.WARN)

  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setMaster("local[*]").setAppName("xx")
      //每秒鐘每個分割槽kafka拉取訊息的速率
      .set("spark.streaming.kafka.maxRatePerPartition", "100")
      // 序列化
      .set("spark.serilizer", "org.apache.spark.serializer.KryoSerializer")
      // 建議開啟rdd的壓縮
      .set("spark.rdd.compress", "true")
    val ssc = new StreamingContext(conf, Seconds(2))

    //啟動一引數設定
    val groupId = "test002"
    val kafkaParams = Map[String, Object](
      "bootstrap.servers" -> "hdp01:9092,hdp02:9092,hdp03:9092",
      "key.deserializer" -> classOf[StringDeserializer],
      "value.deserializer" -> classOf[StringDeserializer],
      "group.id" -> groupId,
      "auto.offset.reset" -> "earliest",
      "enable.auto.commit" -> (false: lang.Boolean)
    )
    val topics = Array("test")

    //啟動二引數設定
    var formdbOffset: Map[TopicPartition, Long] = JedisOffset(groupId)

    //拉取kafka資料
    val stream: InputDStream[ConsumerRecord[String, String]] = if (formdbOffset.size == 0) {
      KafkaUtils.createDirectStream[String, String](
        ssc,
        LocationStrategies.PreferConsistent,
        ConsumerStrategies.Subscribe[String, String](topics, kafkaParams)
      )
    } else {
      KafkaUtils.createDirectStream(
        ssc,
        LocationStrategies.PreferConsistent,
        ConsumerStrategies.Assign[String, String](formdbOffset.keys, kafkaParams, formdbOffset)
      )
    }


    //資料偏移量處理。
    stream.foreachRDD({
      rdd =>
        //獲得偏移量物件陣列
        val offsetRange: Array[OffsetRange] = rdd.asInstanceOf[HasOffsetRanges].offsetRanges

        //邏輯處理
        rdd.flatMap(_.value().split(" ")).map((_, 1)).reduceByKey(_ + _).foreachPartition({
          it =>
            val jedis = RedisUtils.getJedis
            it.foreach({
              v =>
                jedis.hincrBy("wordcount", v._1, v._2.toLong)
            })
            jedis.close()
        })

        //偏移量存入redis
        val jedis: Jedis = RedisUtils.getJedis
        for (or <- offsetRange) {
          jedis.hset(groupId, or.topic + "-" + or.partition, or.untilOffset.toString)
        }
        jedis.close()

    })


    ssc.start()
    ssc.awaitTermination()
  }
}

...

import java.util

import org.apache.kafka.common.TopicPartition

object JedisOffset {


  def apply(groupId: String) = {
    var formdbOffset = Map[TopicPartition, Long]()
    val jedis1 = RedisUtils.getJedis
    val topicPartitionOffset: util.Map[String, String] = jedis1.hgetAll(groupId)
    import scala.collection.JavaConversions._
    val topicPartitionOffsetlist: List[(String, String)] = topicPartitionOffset.toList
    for (topicPL <- topicPartitionOffsetlist) {
      val split: Array[String] = topicPL._1.split("[-]")
      formdbOffset += (new TopicPartition(split(0), split(1).toInt) -> topicPL._2.toLong)
    }
    formdbOffset
  }
}
....還有redis的連線池建立


相關推薦

sparkstreaming整合kafka引數設定message偏移寫入redis

kafka高階資料來源拉取到spark,偏移量自我維護寫入到redis,建立redis連線池。需要匯入<groupId>org.apache.spark</groupId><artifactId>spark-streaming-kafka-

sparkstreaming整合kafka引數設定message偏移寫入mysql

 kafka高階資料來源拉取到spark,偏移量自我維護,藉助scalikejdbc寫入到mysql。 需要匯入 <dependency><groupId>org.scalikejdbc</groupId><artifactId&

SparkStreaming整合Kafka--Direct方式

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.

大資料學習之路96-SparkStreaming整合Kafka

我們前面SparkStreaming獲取資料的來源是TCP,但是平常是不會這麼用的,我們通常用的是Kafka。 SparkStreamingContext是不直接提供對Kafka的訪問的。 這個時候就有KafkaUtils 這裡有兩個方法 1.createDirectStream

SparkStreaming整合kafka直連模式direct方式

org.apache.spark spark-streaming_2.10 1.6.2 org.apache.spark spark-streaming-kafka_2.10 1.

SparkStreaming整合Kafka-0.8的官方文件要點翻譯

Spark Streaming + Kafka Integration Guide (Kafka broker version 0.8.2.1 or higher) Note: Kafka 0.8 support is deprecated as of Spark 2.3.0

SparkStreaming整合kafka入門

package kafka import com.typesafe.config.ConfigFactory import org.apache.kafka.clients.consumer.ConsumerRecord import org.apache.kafka.common.seriali

sparkStreaming整合Kafka

這幾天看了spark整合Kafka,消費Kafka資料並向Kafka傳送資料,仿照官方樣例寫了兩個小例子。在此分享一下。 1.新增Kafka的repository 2.DirectKafkaWordCountDemo程式碼展示 3.kafkaProduc

SparkStreaming整合kafka的補充

clas metrics clu head zookeepe 大量 topic 自動重啟 備份 (1)SparkStreaming 整合 kafka 兩種方式對比 Direct 方式的優缺點分析 : 優點: 簡化並行(Simplified Parallelism)。不現

Flume1.4 相關引數設定將收集的資料彙總到hdfs解決許多小檔案問題

參照官方文件,將 flume client 收集的日誌檔案 彙總到  flume sink收集端,然後儲存到hdfs 中,預設會按生成許多小檔案,如圖所示 實際中可能只需要生成一個檔案即可,這就涉及到了幾個相關引數設定如下即可 需要修改的檔案位於 flume/con

Linux LVS高併發測試程式核心引數設定連線數查詢指令

最近在公司參與了一個LVS系統測試的專案,學習到了一些關於高併發測試相關的知識,寫到部落格裡記錄下 Linux核心引數設定 在伺服器端需要調整系統最大檔案控制代碼數 ulimit -n 1000000 在伺服器硬體支援,以及服務較輕量的情況下,最大

C++Builder6.0 啟動引數設定不開啟預設工程不顯示啟動畫面

This topic lists and describes all of the options that you can use to start the IDE from the command line. You must precede all options (u

SparkStreamingKafka 引數 auto.offset.reset 的說明

作用:如果Kafka中沒有初始偏移或者當前偏移在伺服器上不再存在時(例如,因為該資料已被刪除)的處理方法。 Kafka單獨寫consumer時 可選引數: earliest:自動將偏移重置為最早的偏移量 latest:自動將偏移量重置為最新的偏移量(

Kafka(七)消費者偏移

sof () 取模 失敗 data 兩種方法 保存 庫存 num 在Kafka0.9版本之前消費者保存的偏移量是在zookeeper中/consumers/GROUP.ID/offsets/TOPIC.NAME/PARTITION.ID。新版消費者不在保存偏移量到zooke

如何手動更新Kafka中某個Topic的偏移

轉載: 我們在使用consumer消費資料時,有些情況下我們需要對已經消費過的資料進行重新消費,這裡介紹kafka中兩種重新消費資料的方法。 1. 修改offset 我們在使用consumer消費的時候,每個topic會產生一個偏移量,這個偏移量保證我們消費的訊息順

報錯資訊MySQL server has gone away!引數設定過小導致記錄寫入失敗!(解決辦法)

報錯資訊: Warning: PDO::exec(): MySQL server has gone away 原因是:mysql max_allowed_packet 設定過小導致記錄寫入失敗!   mysql根據配置檔案會限制server接受的資料包大小。 有時

Spark+Kafka的Direct方式將偏移傳送到Zookeeper的實現

Apache Spark 1.3.0引入了Direct API,利用Kafka的低層次API從Kafka叢集中讀取資料,並且在Spark Streaming系統裡面維護偏移量相關的資訊,並且通過這種方式去實現零資料丟失(zero data loss)相比使用基於Receiver的方法要高效。但是因為是Spar

Spark+Kafka的Direct方式將偏移傳送到Zookeeper實現

Apache Spark 1.3.0引入了Direct API,利用Kafka的低層次API從Kafka叢集中讀取資料,並且在Spark Streaming系統裡面維護偏移量相關的資訊,並且通過這種方式去實現零資料丟失(zero data loss)相比使用基於Rece

Python習題:給定一個字符串和一個偏移根據偏移旋轉字符串(從左向右旋轉)。例:輸入: str="abcdefg", offset = 3 輸出: "efgabcd"

直接 spa ret code abcdefg int color bcd 旋轉字符串 方法一:用切片直接按照偏移量切割 1 def str_rotate(strings,num): 2 return strings[-(num%len(strings)):] +

設定座標軸刻度偏移和刻度文字(tics text)加粗--gnuplot

//設定刻度文字遠離座標軸 set xtics offset -3.5,0 font 'Times,19,Bold' //設定刻度文字靠近座標軸 set xtics offset 2.5,0 font