1. 程式人生 > >flume+kafka+spark streaming(持續更新)

flume+kafka+spark streaming(持續更新)

kafka

Kafka是一種高吞吐量的分散式釋出訂閱訊息系統,它可以處理消費者規模的網站中的所有動作流資料。 kafka的設計初衷是希望作為一個統一的資訊收集平臺,能夠實時的收集反饋資訊,並需要能夠支撐較大的資料量,且具備良好的容錯能力.
Apache kafka是訊息中介軟體的一種。
一 、術語介紹
Broker
Kafka叢集包含一個或多個伺服器,這種伺服器被稱為broker。
Topic
每條釋出到Kafka叢集的訊息都有一個類別,這個類別被稱為Topic。(物理上不同Topic的訊息分開儲存,邏輯上一個Topic的訊息雖然保存於一個或多個broker上但使用者只需指定訊息的Topic即可生產或消費資料而不必關心資料存於何處)。每個topic都具有這兩種模式:(佇列:消費者組(consumer group)允許同名的消費者組成員瓜分處理;釋出訂閱:允許你廣播訊息給多個消費者組(不同名))。
Partition


Partition是物理上的概念,每個Topic包含一個或多個Partition.
Producer
負責釋出訊息到Kafka broker,比如flume採集機就是Producer。
Consumer
訊息消費者,向Kafka broker讀取訊息的客戶端。比如Hadoop機器就是Consumer。
Consumer Group
每個Consumer屬於一個特定的Consumer Group(可為每個Consumer指定group name,若不指定group name則屬於預設的group)。

二、使用場景
1、Messaging
對於一些常規的訊息系統,kafka是個不錯的選擇;partitons/replication和容錯,可以使kafka具有良好的擴充套件性和效能優勢.不過到目前為止,我們應該很清楚認識到,kafka並沒有提供JMS中的”事務性”“訊息傳輸擔保(訊息確認機制)”“訊息分組”等企業級特性;kafka只能使用作為”常規”的訊息系統,在一定程度上,尚未確保訊息的傳送與接收絕對可靠(比如,訊息重發,訊息傳送丟失等)
2、Websit activity tracking
kafka可以作為”網站活性跟蹤”的最佳工具;可以將網頁/使用者操作等資訊傳送到kafka中.並實時監控,或者離線統計分析等
3、Log Aggregation
kafka的特性決定它非常適合作為”日誌收集中心”;application可以將操作日誌”批量”“非同步”的傳送到kafka叢集中,而不是儲存在本地或者DB中;kafka可以批量提交訊息/壓縮訊息等,這對producer端而言,幾乎感覺不到效能的開支.此時consumer端可以使hadoop等其他系統化的儲存和分析系統.
4、它應用於2大類應用
構建實時的流資料管道,可靠地獲取系統和應用程式之間的資料。
構建實時流的應用程式,對資料流進行轉換或反應。

三、分散式
Log的分割槽被分佈到叢集中的多個伺服器上。每個伺服器處理它分到的分割槽。 根據配置每個分割槽還可以複製到其它伺服器作為備份容錯。 每個分割槽有一個leader,零或多個follower。Leader處理此分割槽的所有的讀寫請求,而follower被動的複製資料。如果leader宕機,其它的一個follower會被推舉為新的leader。 一臺伺服器可能同時是一個分割槽的leader,另一個分割槽的follower。 這樣可以平衡負載,避免所有的請求都只讓一臺或者某幾臺伺服器處理。

四、訊息處理順序
Kafka保證訊息的順序不變。 在這一點上Kafka做的更好,儘管並沒有完全解決上述問題。 Kafka採用了一種分而治之的策略:分割槽。 因為Topic分割槽中訊息只能由消費者組中的唯一一個消費者處理,所以訊息肯定是按照先後順序進行處理的。但是它也僅僅是保證Topic的一個分割槽順序處理,不能保證跨分割槽的訊息先後處理順序。 所以,如果你想要順序的處理Topic的所有訊息,那就只提供一個分割槽。

六、Key和Value
Kafka是一個分散式訊息系統,Producer生產訊息並推送(Push)給Broker,然後Consumer再從Broker那裡取走(Pull)訊息。Producer生產的訊息就是由Message來表示的,對使用者來講,它就是鍵-值對。

Message => Crc MagicByte Attributes Key Value

kafka會根據傳進來的key計算其分割槽,但key可以不傳,可以為null,空的話,producer會把這條訊息隨機的傳送給一個partition。

這裡寫圖片描述

MessageSet用來組合多條Message,它在每條Message的基礎上加上了Offset和MessageSize,其結構是:

MessageSet => [Offset MessageSize Message]

它的含義是MessageSet是個陣列,陣列的每個元素由三部分組成,分別是Offset,MessageSize和Message,它們的含義分別是:
這裡寫圖片描述

七、小例子
1.啟動ZooKeeper
進入kafka目錄,加上daemon表示在後臺啟動,不佔用當前的命令列視窗。
bin/zookeeper-server-start.sh -daemon config/zookeeper.properties
如果要關閉,下面這個
bin/zookeeper-server-stop.sh
ZooKeeper 的埠號是2181,輸入jps檢視程序號是QuorumPeerMain
2.啟動kafka
在server.properties中加入,第一個是保證你刪topic可以刪掉,第二個不然的話就報topic找不到的錯誤:
delete.topic.enable=true
listeners=PLAINTEXT://localhost:9092
然後:
bin/kafka-server-start.sh -daemon config/server.properties
如果要關閉,下面這個
bin/kafka-server-stop.sh
Kafka的埠號是9092,輸入jps檢視程序號是Kafka
3.建立一個主題(topic)
建立一個名為“test”的Topic,只有一個分割槽和一個備份:
bin/kafka-topics.sh –create –zookeeper localhost:2181 –replication-factor 1 –partitions 1 –topic test
建立好之後,可以通過執行以下命令,檢視已建立了哪些topic:
bin/kafka-topics.sh –list –zookeeper localhost:2181
檢視具體topic的資訊:
bin/kafka-topics.sh –describe –zookeeper localhost:2181 –topic test
4.傳送訊息
啟動kafka生產者:
bin/kafka-console-producer.sh –broker-list localhost:9092 –topic test
5.接收訊息
新開一個命令列視窗,啟動kafka消費者:
bin/kafka-console-consumer.sh –zookeeper localhost:2181 –topic test –from-beginning
6.最後
在producer視窗中輸入訊息,可以在consumer視窗中顯示:
這裡寫圖片描述
這裡寫圖片描述

spark streaming

Spark Streaming是一種構建在Spark上的實時計算框架,它擴充套件了Spark處理大規模流式資料的能力。

Spark Streaming的優勢在於:
能執行在100+的結點上,並達到秒級延遲。
使用基於記憶體的Spark作為執行引擎,具有高效和容錯的特性。
能整合Spark的批處理和互動查詢。
為實現複雜的演算法提供和批處理類似的簡單介面。

首先,Spark Streaming把實時輸入資料流以時間片Δt (如1秒)為單位切分成塊。Spark Streaming會把每塊資料作為一個RDD,並使用RDD操作處理每一小塊資料。每個塊都會生成一個Spark Job處理,最終結果也返回多塊。
在Spark Streaming中,則通過操作DStream(表示資料流的RDD序列)提供的介面,這些介面和RDD提供的介面類似。
正如Spark Streaming最初的目標一樣,它通過豐富的API和基於記憶體的高速計算引擎讓使用者可以結合流式處理,批處理和互動查詢等應用。因此Spark Streaming適合一些需要歷史資料和實時資料結合分析的應用場合。當然,對於實時性要求不是特別高的應用也能完全勝任。另外通過RDD的資料重用機制可以得到更高效的容錯處理。

當一個上下文(context)定義之後,你必須按照以下幾步進行操作:
定義輸入源;
準備好流計算指令;
利用streamingContext.start()方法接收和處理資料;
處理過程將一直持續,直到streamingContext.stop()方法被呼叫。

可以利用已經存在的SparkContext物件建立StreamingContext物件:

val sc = ...                // existing SparkContext
val ssc = new StreamingContext(sc, Seconds(1))

視窗函式
對於spark streaming中的視窗函式,參見:
視窗函式解釋

對非(K,V)形式的RDD 視窗化reduce:
1.reduceByWindow(reduceFunc, windowDuration, slideDuration)
2.reduceByWindow(reduceFunc, invReduceFunc, windowDuration, slideDuration)

對(K,V)形式RDD 按Key視窗化reduce:
1.reduceByKeyAndWindow(reduceFunc, windowDuration, slideDuration)
2.reduceByKeyAndWindow(reduceFunc, invReduceFunc, windowDuration, slideDuration, numPartitions, filterFunc)
從效率來說,應選擇帶有invReduceFunc的方法。

可以通過在多個RDD或者批資料間重用連線物件做更進一步的優化。開發者可以保有一個靜態的連線物件池,重複使用池中的物件將多批次的RDD推送到外部系統,以進一步節省開支:

 dstream.foreachRDD(rdd => {
      rdd.foreachPartition(partitionOfRecords => {
          // ConnectionPool is a static, lazily initialized pool of connections
          val connection = ConnectionPool.getConnection()
          partitionOfRecords.foreach(record => connection.send(record))
          ConnectionPool.returnConnection(connection)  // return to the pool for future reuse
      })
  })

spark執行時間是少了,但資料庫壓力比較大,會一直佔資源。

小例子:

package SparkStreaming

import org.apache.spark._
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming._

object Spark_streaming_Test {
  def main(args: Array[String]): Unit = {
    //local[2]表示在本地建立2個working執行緒
    //當執行在本地,如果你的master URL被設定成了“local”,這樣就只有一個核執行任務。這對程式來說是不足的,因為作為receiver的輸入DStream將會佔用這個核,這樣就沒有剩餘的核來處理資料了。
    //所以至少得2個核,也就是local[2]
    val conf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCount")
    //時間間隔是1秒
    val ssc = new StreamingContext(conf, Seconds(1))
    //有滑動視窗時,必須有checkpoint
    ssc.checkpoint("F:\\checkpoint")
    //DStream是一個基類
    //ssc.socketTextStream() 將建立一個 SocketInputDStream;這個 InputDStream 的 SocketReceiver 將監聽伺服器 9999 埠
    //ssc.socketTextStream()將 new 出來一個 DStream 具體子類 SocketInputDStream 的例項。
    val lines = ssc.socketTextStream("192.168.1.66", 9999, StorageLevel.MEMORY_AND_DISK_SER)
    //    val lines = ssc.textFileStream("F:\\scv")
    val words = lines.flatMap(_.split(" ")) // DStream transformation
    val pairs = words.map(word => (word, 1)) // DStream transformation
    //    val wordCounts = pairs.reduceByKey(_ + _) // DStream transformation
    //每隔3秒鐘,計算過去5秒的詞頻,顯然一次計算的內容與上次是有重複的。如果不想重複,把2個時間設為一樣就行了。
    //    val windowedWordCounts = pairs.reduceByKeyAndWindow((a:Int,b:Int) => (a + b), Seconds(5), Seconds(3))
    val windowedWordCounts = pairs.reduceByKeyAndWindow(_ + _, _ - _, Seconds(5), Seconds(3))
    windowedWordCounts.filter(x => x._2 != 0).print()
    //    wordCounts.print() // DStream output,列印每秒計算的詞頻
    //需要注意的是,當以上這些程式碼被執行時,Spark Streaming僅僅準備好了它要執行的計算,實際上並沒有真正開始執行。在這些轉換操作準備好之後,要真正執行計算,需要呼叫如下的方法
    ssc.start() // Start the computation
    ssc.awaitTermination() // Wait for the computation to terminate
    //在StreamingContext上呼叫stop()方法,也會關閉SparkContext物件。如果只想僅關閉StreamingContext物件,設定stop()的可選引數為false
    //一個SparkContext物件可以重複利用去建立多個StreamingContext物件,前提條件是前面的StreamingContext在後面StreamingContext建立之前關閉(不關閉SparkContext)
    ssc.stop()
  }
}

1.啟動
start-dfs.sh
start-yarn.sh
這裡寫圖片描述
2.終端輸入:
nc -lk 9999
然後在IEDA中執行spark程式。由於9999埠中還沒有寫東西,所以執行是下圖:

這裡寫圖片描述
只有時間,沒有打印出東西。然後在終端輸入下面的東西,也可以從其他地方複製進來。
hello world
hello hadoop
hadoop love
love cat
cat love rabbit
這時,IDEA的控制檯就輸出下面的東西。
這裡寫圖片描述

3.下面執行帶時間視窗的,注意如果加了時間視窗就必須有checkpoint
輸入下面的,不要一次全輸入,一次輸個幾行。
checkpoint
hello world
hello hadoop
hadoop love
love cat
cat love rabbit
ni hao a
hello world
hello hadoop
hadoop love
love cat
cat love rabbit
hello world
hello hadoop
hadoop love
love cat
cat love rabbit

先是++–的那種:
這裡寫圖片描述
這裡寫圖片描述
這裡寫圖片描述
這裡寫圖片描述
再然後是不++–的那種:
這裡寫圖片描述
這裡寫圖片描述
++–的那種是因為把過去的RDD也帶進來計算了,所以出現了0這個情況,為了避免這種情況只能在列印前過濾掉0的再列印。而沒有++–的那種情況是不需要這樣做的。

Checkpointing
在容錯、可靠的檔案系統(HDFS、s3等)中設定一個目錄用於儲存checkpoint資訊。就可以通過streamingContext.checkpoint(checkpointDirectory)方法來做。
預設的間隔時間是批間隔時間的倍數,最少10秒。它可以通過dstream.checkpoint來設定。需要注意的是,隨著 streaming application 的持續執行,checkpoint 資料佔用的儲存空間會不斷變大。因此,需要小心設定checkpoint 的時間間隔。設定得越小,checkpoint 次數會越多,佔用空間會越大;如果設定越大,會導致恢復時丟失的資料和進度越多。一般推薦設定為 batch duration 的5~10倍。

package streaming

import java.io.File
import java.nio.charset.Charset

import com.google.common.io.Files

import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}

/**
  * Created by Administrator on 2017/3/12.
  */

object RecoverableNetworkWordCount {

  def createContext(ip: String, port: Int, outputPath: String, checkpointDirectory: String): StreamingContext = {
    println("Creating new context") //如果沒有出現這句話,說明StreamingContext是從checkpoint裡面載入的
    val outputFile = new File(outputPath) //輸出檔案的目錄
    if (outputFile.exists()) outputFile.delete()
    val sparkConf = new SparkConf().setAppName("RecoverableNetworkWordCount")
    val ssc = new StreamingContext(sparkConf, Seconds(1)) //時間間隔是1秒
    ssc.checkpoint(checkpointDirectory) //設定一個目錄用於儲存checkpoint資訊

    val lines = ssc.socketTextStream(ip, port)
    val words = lines.flatMap(_.split(" "))
    val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _)
    val windowedWordCounts = wordCounts.reduceByKeyAndWindow(_ + _, _ - _, Seconds(30), Seconds(10))
    windowedWordCounts.checkpoint(Seconds(10))//一般推薦設定為 batch duration 的5~10倍,即StreamingContext的第二個引數的5~10倍
    windowedWordCounts.print()
    Files.append(windowedWordCounts + "\n", outputFile, Charset.defaultCharset())
    ssc
  }

  def main(args: Array[String]): Unit = {
    if (args.length != 4) {
      System.exit(1)
    }
    val ip = args(0)
    val port = args(1).toInt
    val checkpointDirectory = args(2)
    val outputPath = args(3)
    val ssc = StreamingContext.getOrCreate(checkpointDirectory, () => createContext(ip, port, outputPath, checkpointDirectory))
    ssc.start()
    ssc.awaitTermination()
  }
}

優化
1.資料接收的並行水平
建立多個輸入DStream並配置它們可以從源中接收不同分割槽的資料流,從而實現多資料流接收。因此允許資料並行接收,提高整體的吞吐量。

val numStreams = 5
val kafkaStreams = (1 to numStreams).map { i => KafkaUtils.createStream(...) }
val unifiedStream = streamingContext.union(kafkaStreams)
unifiedStream.print()

多輸入流或者多receiver的可選的方法是明確地重新分配輸入資料流(利用inputStream.repartition()),在進一步操作之前,通過叢集的機器數分配接收的批資料。
2.任務序列化
執行kyro序列化任何可以減小任務的大小,從而減小任務傳送到slave的時間。

    val conf = new SparkConf().setAppName("analyse_domain_day").set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")

3.設定合適的批間隔時間(即批資料的容量)
批處理時間應該小於批間隔時間。如果時間間隔是1秒,但處理需要2秒,則處理趕不上接收,待處理的資料會越來越多,最後就嘣了。
找出正確的批容量的一個好的辦法是用一個保守的批間隔時間(5-10,秒)和低資料速率來測試你的應用程式。為了驗證你的系統是否能滿足資料處理速率,你可以通過檢查端到端的延遲值來判斷(可以在Spark驅動程式的log4j日誌中檢視”Total delay”或者利用StreamingListener介面)。如果延遲維持穩定,那麼系統是穩定的。如果延遲持續增長,那麼系統無法跟上資料處理速率,是不穩定的。你能夠嘗試著增加資料處理速率或者減少批容量來作進一步的測試。

DEMO

spark流操作kafka有兩種方式:
一種是利用接收器(receiver)和kafaka的高層API實現。
一種是不利用接收器,直接用kafka底層的API來實現(spark1.3以後引入)。

相比基於Receiver方式有幾個優點:
1、不需要建立多個kafka輸入流,然後Union他們,而使用DirectStream,spark Streaming將會建立和kafka分割槽一樣的RDD的分割槽數,而且會從kafka並行讀取資料,Spark的分割槽數和Kafka的分割槽數是一一對應的關係。
2、第一種實現資料的零丟失是將資料預先儲存在WAL中,會複製一遍資料,會導致資料被拷貝兩次:一次是被Kafka複製;另一次是寫入到WAL中。
Direct的方式是會直接操作kafka底層的元資料資訊,這樣如果計算失敗了,可以把資料重新讀一下,重新處理。即資料一定會被處理。拉資料,是RDD在執行的時候直接去拉資料。
3、Receiver方式讀取kafka,使用的是高層API將偏移量寫入ZK中,雖然這種方法可以通過資料儲存在WAL中保證資料的不對,但是可能會因為sparkStreaming和ZK中儲存的偏移量不一致而導致資料被消費了多次。
第二種方式不採用ZK儲存偏移量,消除了兩者的不一致,保證每個記錄只被Spark Streaming操作一次,即使是在處理失敗的情況下。如果想更新ZK中的偏移量資料,需要自己寫程式碼來實現。
由於直接操作的是kafka,kafka就相當於你底層的檔案系統。這個時候能保證嚴格的事務一致性,即一定會被處理,而且只會被處理一次。

首先去maven的官網上下載jar包
spark-streaming_2.10-1.6.2.jar
spark-streaming-kafka_2.10-1.6.2.jar
我的Scala是2.10的,spark是1.6.0的,下載的spark.streaming和kafka版本要與之對應,spark-streaming_2.10-1.6.2.jar中2.10是Scala版本號,1.6.2是spark版本號。當然下載1.6.1也行。
需要新增 kafka-clients-0.8.2.1.jar以及kafka_2.10-0.8.2.1.jar
這裡的2.10是Scala版本號,0.8.2.1是kafka的版本號。就下這個版本,別的版本不對應,會出錯。

在kafka的配置檔案裡面:
delete.topic.enable=true
host.name=192.168.1.66
zookeeper.connect=192.168.1.66:2181
我這裡寫主機名的話,各種報錯,所以乾脆就寫IP地址了。
啟動kafka以及ZK的步驟和kafka 1-2是一樣的。
進入/kafka_2.10-0.8.2.1 新建一個主題:
bin/kafka-topics.sh –create –zookeeper 192.168.1.66:2181 –replication-factor 1 –partitions 1 –topic test
啟動一個生產者:
bin/kafka-console-producer.sh –broker-list 192.168.1.66:9092 –topic test
在自己的電腦上執行spark程式後,在命令列輸入:
這裡寫圖片描述
在控制檯會顯示:
這裡寫圖片描述

package SparkStreaming

//TopicAndPartition是對 topic和partition的id的封裝的一個樣例類
import kafka.common.TopicAndPartition
import org.apache.spark.streaming.kafka.{HasOffsetRanges, KafkaUtils}
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.{SparkConf, SparkContext}
import kafka.serializer.StringDecoder

object SparkStreaming_Kafka_Test {

  val kafkaParams = Map(
    //kafka broker的IP加埠號,這個是必須的
    "metadata.broker.list" -> "192.168.1.66:9092",
    // "group.id" -> "group1",
    /*此配置引數表示當此groupId下的消費者,
     在ZK中沒有offset值時(比如新的groupId,或者是zk資料被清空),
     consumer應該從哪個offset開始消費.largest表示接受接收最大的offset(即最新訊息),
     smallest表示最小offset,即從topic的開始位置消費所有訊息.*/
    "auto.offset.reset" -> "smallest"
  )

  val topicsSet = Set("test")

  //  val zkClient = new ZkClient("xxx:2181,xxx:2181,xxx:2181",Integer.MAX_VALUE,100000,ZKStringSerializer)

  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setMaster("local[2]").setAppName("SparkStreaming_Kafka_Test")
    val sc = new SparkContext(conf)
    val ssc = new StreamingContext(sc, Seconds(2))
    ssc.checkpoint("F:\\checkpoint")
    /*
    KafkaUtils.createDirectStream[
       [key的資料型別], [value的資料型別], [key解碼的類], [value解碼的類] ](
       streamingContext, [Kafka配置的引數,是一個map], [topics的集合,是一個set])
       */
    val messages = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topicsSet)
    val lines = messages.map(_._2) //取value
    val words = lines.flatMap(_.split(" "))
    val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _)
    wordCounts.print()

    ssc.start()
    ssc.awaitTermination()
    ssc.stop()
  }
}

偏移量僅僅被ssc儲存在checkpoint中,消除了zk和ssc偏移量不一致的問題。所以說checkpoint就已經可以保證容錯性了。
如果需要把偏移量寫入ZK,首先在工程中新建一個包:org.apache.spark.streaming.kafka,然後建一個KafkaCluster類:

package org.apache.spark.streaming.kafka

import kafka.api.OffsetCommitRequest
import kafka.common.{ErrorMapping, OffsetAndMetadata, TopicAndPartition}
import kafka.consumer.SimpleConsumer
import org.apache.spark.SparkException
import org.apache.spark.streaming.kafka.KafkaCluster.SimpleConsumerConfig

import scala.collection.mutable.ArrayBuffer
import scala.util.Random
import scala.util.control.NonFatal

class KafkaCluster(val kafkaParams: Map[String, String]) extends Serializable {
  type Err = ArrayBuffer[Throwable]

  @transient private var _config: SimpleConsumerConfig = null

  def config: SimpleConsumerConfig = this.synchronized {
    if (_config == null) {
      _config = SimpleConsumerConfig(kafkaParams)
    }
    _config
  }

  def setConsumerOffsets(groupId: String, offsets: Map[TopicAndPartition, Long], consumerApiVersion: Short): Either[Err, Map[TopicAndPartition, Short]] = {
    val meta = offsets.map {
      kv => kv._1 -> OffsetAndMetadata(kv._2)
    }
    setConsumerOffsetMetadata(groupId, meta, consumerApiVersion)
  }

  def setConsumerOffsetMetadata(groupId: String, metadata: Map[TopicAndPartition, OffsetAndMetadata], consumerApiVersion: Short): Either[Err, Map[TopicAndPartition, Short]] = {
    var result = Map[TopicAndPartition, Short]()
    val req = OffsetCommitRequest(groupId, metadata, consumerApiVersion)
    val errs = new Err
    val topicAndPartitions = metadata.keySet
    withBrokers(Random.shuffle(config.seedBrokers), errs) { consumer =>
      val resp = consumer.commitOffsets(req)
      val respMap = resp.commitStatus
      val needed = topicAndPartitions.diff(result.keySet)
      needed.foreach { tp: TopicAndPartition =>
        respMap.get(tp).foreach { err: Short =>
          if (err == ErrorMapping.NoError) {
            result += tp -> err
          } else {
            errs.append(ErrorMapping.exceptionFor(err))
          }
        }
      }
      if (result.keys.size == topicAndPartitions.size) {
        return Right(result)
      }
    }
    val missing = topicAndPartitions.diff(result.keySet)
    errs.append(new SparkException(s"Couldn't set offsets for ${missing}"))
    Left(errs)
  }

  private def withBrokers(brokers: Iterable[(String, Int)], errs: Err)(fn: SimpleConsumer => Any): Unit = {
    brokers.foreach { hp =>
      var consumer: SimpleConsumer = null
      try {
        consumer = connect(hp._1, hp._2)
        fn(consumer)
      } catch {
        case NonFatal(e) =>
          errs.append(e)
      } finally {
        if (consumer != null) {
          consumer.close()
        }
      }
    }
  }

  def connect(host: String, port: Int): SimpleConsumer =
    new SimpleConsumer(host, port, config.socketTimeoutMs,
      config.socketReceiveBufferBytes, config.clientId)
}

然後在主函式中:

// 手動更新ZK偏移量,使得基於ZK偏移量的kafka監控工具可以使用
    messages.foreachRDD(rdd => {
      // 先處理訊息
      val lines = rdd.map(_._2) //取value
      val words = lines.flatMap(_.split(" "))
      val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _)
      wordCounts.foreach(println)
      // 再更新offsets
      //spark內部維護kafka偏移量資訊是儲存在HasOffsetRanges類的offsetRanges中
      //OffsetRange 包含資訊有:topic名字,分割槽Id,開始偏移,結束偏移。
      val offsetsList = rdd.asInstanceOf[HasOffsetRanges].offsetRanges //得到該 rdd 對應 kafka 的訊息的 offset
      val kc = new KafkaCluster(kafkaParams)
      for (offsets <- offsetsList) {
        val topicAndPartition = TopicAndPartition("test", offsets.partition)
        val o = kc.setConsumerOffsets("group1", Map((topicAndPartition, offsets.untilOffset)),8)
        if (o.isLeft) {
          println(s"Error updating the offset to Kafka cluster: ${o.left.get}")
        }
      }
    })

下面是用kafka的API自己寫一個程式讀取檔案,作為kafka的生產者,需要將Scala和kafka的所有的jar包都匯入,lib資料夾下面的都匯入進去。
如果沒有2臺電腦,可以開2個開發環境,IDEA作為消費者,eclipse作為生產者。
生產者程式碼如下:

package spark_streaming_kafka_test;

import java.io.BufferedReader;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.FileReader;
import java.io.IOException;
import java.util.Properties;
import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;

public class MakeRealtimeDate extends Thread {

    private Producer<Integer, String> producer;

    public MakeRealtimeDate() {
        Properties props = new Properties();
        props.put("serializer.class", "kafka.serializer.StringEncoder");
        props.put("zk.connect", "192.168.1.66:2181");
        props.put("metadata.broker.list", "192.168.1.66:9092");
        ProducerConfig pc = new ProducerConfig(props);
        producer = new Producer<Integer, String>(pc);
    }

    public void run() {
        while (true) {
            File file = new File("C:\\Users\\Administrator\\Desktop\\wordcount.txt");
            BufferedReader reader = null;
            try {
                reader = new BufferedReader(new FileReader(file));
            } catch (FileNotFoundException e1) {
                // TODO Auto-generated catch block
                e1.printStackTrace();
            }
            String lineTxt = null;
            try {
                while ((lineTxt = reader.readLine()) != null) {
                    System.out.println(lineTxt);
                    producer.send(new KeyedMessage<Integer, String>("test", lineTxt));
                    try {
                        Thread.sleep(1000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            } catch (IOException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }

        }
    }

    public static void main(String[] args) {
        new MakeRealtimeDate().start();
    }

}

先啟動之前寫的sparkstreaming消費者統計單詞個數的程式,然後再啟動我們現在寫的這個生產者程式,最後就會在IDEA的控制檯中看到實時結果。

相關推薦

flume+kafka+spark streaming(持續更新)

kafka Kafka是一種高吞吐量的分散式釋出訂閱訊息系統,它可以處理消費者規模的網站中的所有動作流資料。 kafka的設計初衷是希望作為一個統一的資訊收集平臺,能夠實時的收集反饋資訊,並需要能夠支撐較大的資料量,且具備良好的容錯能力. Apache

基於Flume+Kafka+Spark Streaming打造實時流處理項目實戰課程

大數據本課程從實時數據產生和流向的各個環節出發,通過集成主流的分布式日誌收集框架Flume、分布式消息隊列Kafka、分布式列式數據庫HBase、及當前最火爆的Spark Streaming打造實時流處理項目實戰,讓你掌握實時處理的整套處理流程,達到大數據中級研發工程師的水平!下載地址:百度網盤下載

大資料求索(9): log4j + flume + kafka + spark streaming實時日誌流處理實戰

大資料求索(9): log4j + flume + kafka + spark streaming實時日誌流處理實戰 一、實時流處理 1.1 實時計算 跟實時系統類似(能在嚴格的時間限制內響應請求的系統),例如在股票交易中,市場資料瞬息萬變,決策通常需要秒級甚至毫秒級。通俗來

基於 Flume+Kafka+Spark Streaming 實現實時監控輸出日誌的報警系統

運用場景:我們機器上每天或者定期都要跑很多工,很多時候任務出現錯誤不能及時發現,導致發現的時候任務已經掛了很久了。  解決方法:基於 Flume+Kafka+Spark Streaming 的框架對這些任務的輸出日誌進行實時監控,當檢測到日誌出現Error的資訊就傳送郵件給

Flume+Kafka+Spark Streaming實現大資料實時流式資料採集

大資料實時流式資料處理是大資料應用中最為常見的場景,與我們的生活也息息相關,以手機流量實時統計來說,它總是能夠實時的統計出使用者的使用的流量,在第一時間通知使用者流量的使用情況,並且最為人性化的為使用者提供各種優惠的方案,如果採用離線處理,那麼等到使用者流量超標

flume+zookeeper+kafka+spark streaming

1.flume安裝部署 1.1、下載安裝介質,並解壓: cd /usr/local/wget http://archive.cloudera.com/cdh5/cdh/5/flume-ng-1.6.0-cdh5.7.0.tar.gztar -zxvf flume-ng-1.6.0-cdh

Kafka:ZK+Kafka+Spark Streaming集群環境搭建(三)安裝spark2.2.1

node word clas 執行 選擇 dir clust 用戶名 uil 如何配置centos虛擬機請參考《Kafka:ZK+Kafka+Spark Streaming集群環境搭建(一)VMW安裝四臺CentOS,並實現本機與它們能交互,虛擬機內部實現可以上網。》 如

Kafka:ZK+Kafka+Spark Streaming集群環境搭建(九)安裝kafka_2.11-1.1.0

itl CA blog tor line cat pre PE atan 如何搭建配置centos虛擬機請參考《Kafka:ZK+Kafka+Spark Streaming集群環境搭建(一)VMW安裝四臺CentOS,並實現本機與它們能交互,虛擬機內部實現可以上網。》 如

Kafka:ZK+Kafka+Spark Streaming集群環境搭建(二)VMW安裝四臺CentOS,並實現本機與它們能交互,虛擬機內部實現可以上網。

centos 失敗 sco pan html top n 而且 div href Centos7出現異常:Failed to start LSB: Bring up/down networking. 按照《Kafka:ZK+Kafka+Spark Streaming集群環

Kafka:ZK+Kafka+Spark Streaming集群環境搭建(十三)定義一個avro schema使用comsumer發送avro字符流,producer接受avro字符流並解析

finall ges records ring ack i++ 一個 lan cde 參考《在Kafka中使用Avro編碼消息:Consumer篇》、《在Kafka中使用Avro編碼消息:Producter篇》 pom.xml <depende

Kafka:ZK+Kafka+Spark Streaming集群環境搭建(十七)待整理

lan post -a 客戶端 客戶 struct bsp www get redis按照正則批量刪除key redis客戶端--jedis 在Spark結構化流readStream、writeStream 輸入輸出,及過程ETL Spark Structur

Kafka:ZK+Kafka+Spark Streaming集群環境搭建(十九)待整理

set dstream 搭建 details 編程指南 .com .cn csdn read redis按照正則批量刪除key redis客戶端--jedis 在Spark結構化流readStream、writeStream 輸入輸出,及過程ETL Spark St

Kafka:ZK+Kafka+Spark Streaming集群環境搭建(二十三)Structured Streaming遇到問題:Set(TopicName-0) are gone. Some data may have been missed

ack loss set div top 過程 pan check use 事情經過:之前該topic(M_A)已經存在,而且正常消費了一段時間,後來刪除了topic(M_A),重新創建了topic(M-B),程序使用新創建的topic(M-B)進行實時統計操作,執行過程中

Kafka:ZK+Kafka+Spark Streaming集群環境搭建(二十五)Structured Streaming:同一個topic中包含一組數據的多個部分,按照key它們拼接為一條記錄(以及遇到的問題)。

eas array 記錄 splay span ack timestamp b- each 需求: 目前kafka的topic上有一批數據,這些數據被分配到9個不同的partition中(就是發布時key:{m1,m2,m3,m4...m9},value:{records

計算成交量例子,kafka/spark streaming/zk

package com.ws.streaming import kafka.common.TopicAndPartition import kafka.message.MessageAndMetadata import kafka.serializer.StringDecoder impor

學習筆記 --- Kafka Spark Streaming獲取Kafka資料 Receiver與Direct的區別

Receiver 使用Kafka的高層次Consumer API來實現 receiver從Kafka中獲取的資料都儲存在Spark Executor的記憶體中,然後Spark Streaming啟動的job會去處理那些資料 要啟用高可靠機制,讓資料零丟失,就必須啟用Spark

kafka+spark streaming程式碼例項(pyspark+python)

一、系統準備1.啟動zookeeper:bin/zkServer.cmd start2.啟動kafka:bin/kafka-server-start.sh -daemon config/server.properties3.啟動spark:sbin/start-all.sh資

Kafka+Spark Streaming+Redis實時系統實踐

基於Spark通用計算平臺,可以很好地擴充套件各種計算型別的應用,尤其是Spark提供了內建的計算庫支援,像Spark Streaming、Spark SQL、MLlib、GraphX,這些內建庫都提供了高階抽象,可以用非常簡潔的程式碼實現複雜的計算邏輯、這也得益於S

Flume+Kakfa+Spark Streaming整合(執行WordCount小例子)

環境版本:Scala 2.10.5; Spark 1.6.0; Kafka 0.10.0.1; Flume 1.6.0 Flume/Kafka的安裝配置請看我之前的部落格: http://blog.c

ZK+Kafka+Spark Streaming叢集環境搭建(九)安裝kafka_2.11-1.1.0

安裝kafka的伺服器:192.168.0.120 master 192.168.0.121 slave1 192.168.0.122 slave2 192.168.0.123 slave3備註:只在slave1,slave2,slave3三個節店上安裝zookeepe