1. 程式人生 > >Spark+Kafka的Direct方式將偏移量傳送到Zookeeper實現

Spark+Kafka的Direct方式將偏移量傳送到Zookeeper實現

Apache Spark 1.3.0引入了Direct API,利用Kafka的低層次API從Kafka叢集中讀取資料,並且在Spark Streaming系統裡面維護偏移量相關的資訊,並且通過這種方式去實現零資料丟失(zero data loss)相比使用基於Receiver的方法要高效。但是因為是Spark Streaming系統自己維護Kafka的讀偏移量,而Spark Streaming系統並沒有將這個消費的偏移量傳送到Zookeeper中,這將導致那些基於偏移量的Kafka叢集監控軟體(比如:Apache Kafka監控之Kafka Web ConsoleApache Kafka監控之KafkaOffsetMonitor

等)失效。本文就是基於為了解決這個問題,使得我們編寫的Spark Streaming程式能夠在每次接收到資料之後自動地更新Zookeeper中Kafka的偏移量。

  我們從Spark的官方文件可以知道,維護Spark內部維護Kafka便宜了資訊是儲存在HasOffsetRanges類的offsetRanges中,我們可以在Spark Streaming程式裡面獲取這些資訊:

val offsetsList = rdd.asInstanceOf[HasOffsetRanges].offsetRanges

這樣我們就可以獲取所以分割槽消費資訊,只需要遍歷offsetsList,然後將這些資訊傳送到Zookeeper即可更新Kafka消費的偏移量。完整的程式碼片段如下:

val messages = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topicsSet)
      messages.foreachRDD(rdd => {
        val offsetsList = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
        val kc = new KafkaCluster(kafkaParams)
        for (offsets < - offsetsList) {
          val topicAndPartition = TopicAndPartition("iteblog", offsets.partition)
          val o = kc.setConsumerOffsets(args(0), Map((topicAndPartition, offsets.untilOffset)))
          if (o.isLeft) {
            println(s"Error updating the offset to Kafka cluster: ${o.left.get}")
          }
        }
})

  KafkaCluster類用於建立和Kafka叢集的連結相關的操作工具類,我們可以對Kafka中Topic的每個分割槽設定其相應的偏移量Map((topicAndPartition, offsets.untilOffset)),然後呼叫KafkaCluster類的setConsumerOffsets方法去更新Zookeeper裡面的資訊,這樣我們就可以更新Kafka的偏移量,最後我們就可以通過KafkaOffsetMonitor之類軟體去監控Kafka中相應Topic的消費資訊,下圖是KafkaOffsetMonitor的監控情況:



如果想及時瞭解Spark、Hadoop或者Hbase相關的文章,歡迎關注微信公共帳號:iteblog_hadoop

  從圖中我們可以看到KafkaOffsetMonitor監控軟體已經可以監控到Kafka相關分割槽的消費情況,這對監控我們整個Spark Streaming程式來非常重要,因為我們可以任意時刻了解Spark讀取速度。另外,KafkaCluster工具類的完整程式碼如下:

package org.apache.spark.streaming.kafka

import kafka.api.OffsetCommitRequest
import kafka.common.{ErrorMapping, OffsetMetadataAndError, TopicAndPartition}
import kafka.consumer.SimpleConsumer
import org.apache.spark.SparkException
import org.apache.spark.streaming.kafka.KafkaCluster.SimpleConsumerConfig

import scala.collection.mutable.ArrayBuffer
import scala.util.Random
import scala.util.control.NonFatal

/**
 * User: 過往記憶
 * Date: 2015-06-02
 * Time: 下午23:46
 * bolg: https://www.iteblog.com
 * 本文地址:https://www.iteblog.com/archives/1381
 * 過往記憶部落格,專注於hadoop、hive、spark、shark、flume的技術部落格,大量的乾貨
 * 過往記憶部落格微信公共帳號:iteblog_hadoop
 */

class KafkaCluster(val kafkaParams: Map[String, String]) extends Serializable {
  type Err = ArrayBuffer[Throwable]

  @transient private var _config: SimpleConsumerConfig = null

  def config: SimpleConsumerConfig = this.synchronized {
    if (_config == null) {
      _config = SimpleConsumerConfig(kafkaParams)
    }
    _config
  }

  def setConsumerOffsets(groupId: String,
                         offsets: Map[TopicAndPartition, Long]
                          ): Either[Err, Map[TopicAndPartition, Short]] = {
    setConsumerOffsetMetadata(groupId, offsets.map { kv =>
      kv._1 -> OffsetMetadataAndError(kv._2)
    })
  }

  def setConsumerOffsetMetadata(groupId: String,
                                metadata: Map[TopicAndPartition, OffsetMetadataAndError]
                                 ): Either[Err, Map[TopicAndPartition, Short]] = {
    var result = Map[TopicAndPartition, Short]()
    val req = OffsetCommitRequest(groupId, metadata)
    val errs = new Err
    val topicAndPartitions = metadata.keySet
    withBrokers(Random.shuffle(config.seedBrokers), errs) { consumer =>
      val resp = consumer.commitOffsets(req)
      val respMap = resp.requestInfo
      val needed = topicAndPartitions.diff(result.keySet)
      needed.foreach { tp: TopicAndPartition =>
        respMap.get(tp).foreach { err: Short =>
          if (err == ErrorMapping.NoError) {
            result += tp -> err
          } else {
            errs.append(ErrorMapping.exceptionFor(err))
          }
        }
      }
      if (result.keys.size == topicAndPartitions.size) {
        return Right(result)
      }
    }
    val missing = topicAndPartitions.diff(result.keySet)
    errs.append(new SparkException(s"Couldn't set offsets for ${missing}"))
    Left(errs)
  }

  private def withBrokers(brokers: Iterable[(String, Int)], errs: Err)
                         (fn: SimpleConsumer => Any): Unit = {
    brokers.foreach { hp =>
      var consumer: SimpleConsumer = null
      try {
        consumer = connect(hp._1, hp._2)
        fn(consumer)
      } catch {
        case NonFatal(e) =>
          errs.append(e)
      } finally {
        if (consumer != null) {
          consumer.close()
        }
      }
    }
  }

  def connect(host: String, port: Int): SimpleConsumer =
    new SimpleConsumer(host, port, config.socketTimeoutMs,
      config.socketReceiveBufferBytes, config.clientId)
}

相關推薦

Spark+Kafka的Direct方式偏移傳送Zookeeper實現

Apache Spark 1.3.0引入了Direct API,利用Kafka的低層次API從Kafka叢集中讀取資料,並且在Spark Streaming系統裡面維護偏移量相關的資訊,並且通過這種方式去實現零資料丟失(zero data loss)相比使用基於Receiver的方法要高效。但是因為是Spar

Spark+Kafka的Direct方式偏移傳送Zookeeper實現

Apache Spark 1.3.0引入了Direct API,利用Kafka的低層次API從Kafka叢集中讀取資料,並且在Spark Streaming系統裡面維護偏移量相關的資訊,並且通過這種方式去實現零資料丟失(zero data loss)相比使用基於Rece

請問如何偏移轉換為地址?

請問如何將偏移量轉換為地址?  我在使用軟體定位木馬特徵碼的時候,只得到偏移量,為什麼有的人用OD轉到偏移量,但我用OD的時候,顯示沒有這個偏移量的,請問如何將偏移量轉換為地址?    在DFCG回答過,再轉過來吧。+---------+---------+----

Spark Streaming管理Kafka偏移

前言 為了讓Spark Streaming消費kafka的資料不丟資料,可以建立Kafka Direct DStream,由Spark Streaming自己管理offset,並不是存到zookeeper。啟用S​​park Streaming的 checkpoints是儲存偏移量的最簡單方法,因為它可以

Spark Streaming 之 Kafka 偏移管理

本文主要介紹 Spark Streaming 應用開發中消費 Kafka 訊息的相關內容,文章著重突出了開發環境的配置以及手動管理 Kafka 偏移量的實現。 一、開發環境 1、元件版本 CDH 叢集版本:6.0.1 Spark 版本:2.2.0 Kafka 版本:1.0.1 2、M

SparkStreaming消費Kafka中的資料 使用zookeeper和MySQL儲存偏移的兩種方式

Spark讀取Kafka資料的方式有兩種,一種是receiver方式,另一種是直連方式。今天分享的SparkStreaming消費Kafka中的資料儲存偏移量的兩種方式都是基於直連方式上的 話不多說 直接上程式碼 ! 第一種是使用zookeeper儲存偏移量 object Kafka

spark維護偏移

may exe 初始 進程 map message created reac 廣播 import kafka.common.TopicAndPartition import kafka.utils.ZkUtils import org.I0Itec.zkclient.Zk

計算C結構體成員偏移兩種方式本質上是一樣的

BE main print tdd of函數 pan color c結構體 計算 #include <stdio.h> #include <stddef.h> typedef struct test_st { char a[3];

采用位異或方式兩個變數值調換

clas 兩個 rgs package 異或 dem int span [] 1 package debug; 2 3 class Demo2 { 4 public static void main(String[] args){ 5

PB 通過http協議以POST方式XML傳送到協議伺服器

------解決方案-------------------------------------------------------- Blob lblb_args String ls_header String ls_url String ls_args long ll_length integer li_r

如何管理Spark Streaming消費Kafka的偏移(二)

上篇文章,討論了在spark streaming中管理消費kafka的偏移量的方式,本篇就接著聊聊上次說升級失敗的案例。 事情發生一個月前,由於當時我們想提高spark streaming程式的並行處理效能,於是需要增加kafka分割槽個數,,這裡需要說下,在新版本sp

如何管理Spark Streaming消費Kafka的偏移(三)

前面的文章已經介紹了在spark streaming整合kafka時,如何處理其偏移量的問題,由於spark streaming自帶的checkpoint弊端非常明顯,所以一些對資料一致性要求比較高的專案裡面,不建議採用其自帶的checkpoint來做故障恢復。 在sp

kafka直連方式,使用redis儲存偏移

使用Redis來記錄偏移量,以前用receive方式時,使用zookeeper儲存偏移量,不用自己儲存偏移量,使用直連方式可以自己儲存偏移量,更加靈活。在直連方式中,儲存偏移量可以使用zookeeper,也可以使用mysql、redis等來儲存偏移量,下面使用一

zookeeper上修改kafka消費組的偏移

[[email protected] bin]$ zookeeper-shell.sh 192.168.0.1:2181 Connecting to 192.168.0.1:2181 Wel

PB9.0 通過http協議以POST方式XML傳送到協議伺服器

最近接觸了手機支付系統,現在需要將生成的xml傳送至伺服器。 我查看了pb的幫助檔案,posturl可以實現該功能 servicereference.PostURL ( urlname, urldata, headers, {serverport, } data )

sparkStreaming 與fafka直接方式 進行消費者偏移的保存如redis 裏面 避免代碼改變與節點重啟後的數據丟失與序列化問題

create term tex ria streaming 保存 else config cal import java.util import kafka.common.TopicAndPartition import kafka.message.Messag

spark streaming中維護kafka偏移到外部介質

.exe topic _each keys off exec lose eat comm spark streaming中維護kafka偏移量到外部介質 以kafka偏移量維護到redis為例。 redis存儲格式 使用的數據結構為string,其中key為topic:

結構體偏移(sizeof長度)的簡單研究

long long size 一個 eof sig stdio.h 輸出結果 答案 cnblogs 總能夠網上搜到這樣的,關於結構體sizeof的答案,然而,經過這個簡單的實驗以後,發現gcc5.3編譯的結果並非如此。 字節對齊的細節和具體編譯器實現相關,但一般而言,滿足

什麽是二維數組偏移

[1] 地址 結束 它的 方式 所在地 連續存儲 若有 二維 比如:A[][]={{1,2,3},{4,5,6},{7,8,9}};4的偏移量就是3,8的偏移量就是7。對一個數組 A[M][N]中任一元素A[i][j]的偏移量的計算方法就是:i*N+j;比如:上面的4位置

ES6學習筆記二 新的聲明方式和變的解構賦值!

是什麽 一句話 數組 name ont 簡單 cee 問題 二次 新的聲明方式 在ES5的時候,我們只有一個聲明方式,var!但是在es6中,聲明進行了擴展,我們加上ES5的var的申明方式,我們有了三種聲明方式: var:它是variable的簡寫,可以理解成變量的意思