sparkstreaming整合kafka引數設定,message偏移量寫入redis
kafka高階資料來源拉取到spark,偏移量自我維護寫入到redis,建立redis連線池。
需要匯入
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka-0-10_2.11</artifactId>
<version>2.2.1</version>
</dependency>
<!--匯入redis的客戶端的java的依賴包,就可以實現和redis的連線。-->
<dependency>
<groupId>redis.clients</
<artifactId>jedis</artifactId>
<version>2.9.0</version>
</dependency>
import java.{lang, util} import org.apache.kafka.clients.consumer.ConsumerRecord import org.apache.kafka.common.TopicPartition import org.apache.kafka.common.serialization.StringDeserializer import org.apache.log4j.{Level, Logger} import org.apache.spark.SparkConf import org.apache.spark.streaming.dstream.InputDStream import org.apache.spark.streaming.kafka010._ import org.apache.spark.streaming.{Seconds, StreamingContext} import redis.clients.jedis.Jedis object WCKafkaRedisApp { // Logger.getLogger("org").setLevel(Level.WARN) def main(args: Array[String]): Unit = { val conf = new SparkConf().setMaster("local[*]").setAppName("xx") //每秒鐘每個分割槽kafka拉取訊息的速率 .set("spark.streaming.kafka.maxRatePerPartition", "100") // 序列化 .set("spark.serilizer", "org.apache.spark.serializer.KryoSerializer") // 建議開啟rdd的壓縮 .set("spark.rdd.compress", "true") val ssc = new StreamingContext(conf, Seconds(2)) //啟動一引數設定 val groupId = "test002" val kafkaParams = Map[String, Object]( "bootstrap.servers" -> "hdp01:9092,hdp02:9092,hdp03:9092", "key.deserializer" -> classOf[StringDeserializer], "value.deserializer" -> classOf[StringDeserializer], "group.id" -> groupId, "auto.offset.reset" -> "earliest", "enable.auto.commit" -> (false: lang.Boolean) ) val topics = Array("test") //啟動二引數設定 var formdbOffset: Map[TopicPartition, Long] = JedisOffset(groupId) //拉取kafka資料 val stream: InputDStream[ConsumerRecord[String, String]] = if (formdbOffset.size == 0) { KafkaUtils.createDirectStream[String, String]( ssc, LocationStrategies.PreferConsistent, ConsumerStrategies.Subscribe[String, String](topics, kafkaParams) ) } else { KafkaUtils.createDirectStream( ssc, LocationStrategies.PreferConsistent, ConsumerStrategies.Assign[String, String](formdbOffset.keys, kafkaParams, formdbOffset)
) } //資料偏移量處理。 stream.foreachRDD({ rdd => //獲得偏移量物件陣列 val offsetRange: Array[OffsetRange] = rdd.asInstanceOf[HasOffsetRanges].offsetRanges //邏輯處理 rdd.flatMap(_.value().split(" ")).map((_, 1)).reduceByKey(_ + _).foreachPartition({ it => val jedis = RedisUtils.getJedis it.foreach({ v => jedis.hincrBy("wordcount", v._1, v._2.toLong) }) jedis.close() }) //偏移量存入redis val jedis: Jedis = RedisUtils.getJedis for (or <- offsetRange) { jedis.hset(groupId, or.topic + "-" + or.partition, or.untilOffset.toString) } jedis.close() }) ssc.start() ssc.awaitTermination() } }
...
import java.util
import org.apache.kafka.common.TopicPartition
object JedisOffset {
def apply(groupId: String) = {
var formdbOffset = Map[TopicPartition, Long]()
val jedis1 = RedisUtils.getJedis
val topicPartitionOffset: util.Map[String, String] = jedis1.hgetAll(groupId)
import scala.collection.JavaConversions._
val topicPartitionOffsetlist: List[(String, String)] = topicPartitionOffset.toList
for (topicPL <- topicPartitionOffsetlist) {
val split: Array[String] = topicPL._1.split("[-]")
formdbOffset += (new TopicPartition(split(0), split(1).toInt) -> topicPL._2.toLong)
}
formdbOffset
}
}
....還有redis的連線池建立相關推薦
sparkstreaming整合kafka引數設定,message偏移量寫入redis
kafka高階資料來源拉取到spark,偏移量自我維護寫入到redis,建立redis連線池。需要匯入<groupId>org.apache.spark</groupId><artifactId>spark-streaming-kafka-
sparkstreaming整合kafka引數設定,message偏移量寫入mysql
kafka高階資料來源拉取到spark,偏移量自我維護,藉助scalikejdbc寫入到mysql。 需要匯入 <dependency><groupId>org.scalikejdbc</groupId><artifactId&
SparkStreaming整合Kafka--Direct方式
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.
大資料學習之路96-SparkStreaming整合Kafka
我們前面SparkStreaming獲取資料的來源是TCP,但是平常是不會這麼用的,我們通常用的是Kafka。 SparkStreamingContext是不直接提供對Kafka的訪問的。 這個時候就有KafkaUtils 這裡有兩個方法 1.createDirectStream
SparkStreaming整合kafka直連模式direct方式
org.apache.spark spark-streaming_2.10 1.6.2 org.apache.spark spark-streaming-kafka_2.10 1.
SparkStreaming整合Kafka-0.8的官方文件要點翻譯
Spark Streaming + Kafka Integration Guide (Kafka broker version 0.8.2.1 or higher) Note: Kafka 0.8 support is deprecated as of Spark 2.3.0
SparkStreaming整合kafka入門
package kafka import com.typesafe.config.ConfigFactory import org.apache.kafka.clients.consumer.ConsumerRecord import org.apache.kafka.common.seriali
sparkStreaming整合Kafka
這幾天看了spark整合Kafka,消費Kafka資料並向Kafka傳送資料,仿照官方樣例寫了兩個小例子。在此分享一下。 1.新增Kafka的repository 2.DirectKafkaWordCountDemo程式碼展示 3.kafkaProduc
SparkStreaming整合kafka的補充
clas metrics clu head zookeepe 大量 topic 自動重啟 備份 (1)SparkStreaming 整合 kafka 兩種方式對比 Direct 方式的優缺點分析 : 優點: 簡化並行(Simplified Parallelism)。不現
Flume1.4 相關引數設定,將收集的資料彙總到hdfs,解決許多小檔案問題
參照官方文件,將 flume client 收集的日誌檔案 彙總到 flume sink收集端,然後儲存到hdfs 中,預設會按生成許多小檔案,如圖所示 實際中可能只需要生成一個檔案即可,這就涉及到了幾個相關引數設定如下即可 需要修改的檔案位於 flume/con
Linux LVS高併發測試程式,核心引數設定,連線數查詢指令
最近在公司參與了一個LVS系統測試的專案,學習到了一些關於高併發測試相關的知識,寫到部落格裡記錄下 Linux核心引數設定 在伺服器端需要調整系統最大檔案控制代碼數 ulimit -n 1000000 在伺服器硬體支援,以及服務較輕量的情況下,最大
C++Builder6.0 啟動引數設定,不開啟預設工程,不顯示啟動畫面
This topic lists and describes all of the options that you can use to start the IDE from the command line. You must precede all options (u
SparkStreaming 中 Kafka 引數 auto.offset.reset 的說明
作用:如果Kafka中沒有初始偏移或者當前偏移在伺服器上不再存在時(例如,因為該資料已被刪除)的處理方法。 Kafka單獨寫consumer時 可選引數: earliest:自動將偏移重置為最早的偏移量 latest:自動將偏移量重置為最新的偏移量(
Kafka(七)消費者偏移量
sof () 取模 失敗 data 兩種方法 保存 庫存 num 在Kafka0.9版本之前消費者保存的偏移量是在zookeeper中/consumers/GROUP.ID/offsets/TOPIC.NAME/PARTITION.ID。新版消費者不在保存偏移量到zooke
如何手動更新Kafka中某個Topic的偏移量
轉載: 我們在使用consumer消費資料時,有些情況下我們需要對已經消費過的資料進行重新消費,這裡介紹kafka中兩種重新消費資料的方法。 1. 修改offset 我們在使用consumer消費的時候,每個topic會產生一個偏移量,這個偏移量保證我們消費的訊息順
報錯資訊MySQL server has gone away!引數設定過小導致記錄寫入失敗!(解決辦法)
報錯資訊: Warning: PDO::exec(): MySQL server has gone away 原因是:mysql max_allowed_packet 設定過小導致記錄寫入失敗! mysql根據配置檔案會限制server接受的資料包大小。 有時
Spark+Kafka的Direct方式將偏移量傳送到Zookeeper的實現
Apache Spark 1.3.0引入了Direct API,利用Kafka的低層次API從Kafka叢集中讀取資料,並且在Spark Streaming系統裡面維護偏移量相關的資訊,並且通過這種方式去實現零資料丟失(zero data loss)相比使用基於Receiver的方法要高效。但是因為是Spar
Spark+Kafka的Direct方式將偏移量傳送到Zookeeper實現
Apache Spark 1.3.0引入了Direct API,利用Kafka的低層次API從Kafka叢集中讀取資料,並且在Spark Streaming系統裡面維護偏移量相關的資訊,並且通過這種方式去實現零資料丟失(zero data loss)相比使用基於Rece
Python習題:給定一個字符串和一個偏移量,根據偏移量旋轉字符串(從左向右旋轉)。例:輸入: str="abcdefg", offset = 3 輸出: "efgabcd"
直接 spa ret code abcdefg int color bcd 旋轉字符串 方法一:用切片直接按照偏移量切割 1 def str_rotate(strings,num): 2 return strings[-(num%len(strings)):] +
設定座標軸刻度偏移量和刻度文字(tics text)加粗--gnuplot
//設定刻度文字遠離座標軸 set xtics offset -3.5,0 font 'Times,19,Bold' //設定刻度文字靠近座標軸 set xtics offset 2.5,0 font