1. 程式人生 > ><Spark Streaming><Flume><Integration>

<Spark Streaming><Flume><Integration>

uri min 取數 nts general ora span int from

Overview

  • Flume:一個分布式的,可靠的,可用的服務,用於有效地收集、聚合、移動大規模日誌數據
  • 我們搭建一個flume + Spark Streaming的平臺來從Flume獲取數據,並處理它。
  • 有兩種方法實現:使用flume-style的push-based方法,或者使用自定義的sink來實現pull-based方法。

Approach 1: Flume-style Push-based Approach

  • flume被設計用來在Flume agents之間推信息,在這種方式下,Spark Streaming安裝一個receiver that acts like an Avro agent for Flume, to which Flume can push the data.

General Requirement

  • 當你啟動flume + spark streaming應用時,該機器上必須運行一個Spark workers
  • flume可以向該機器的某一個port push數據。
  • 基於這種push機制,streaming應用必須有一個receiver scheduled and listening on the chosen port.

Configuring Flume

  • 配置flume以向Avro sink發送數據
技術分享
agent.sinks = avroSink
agent.sinks.avroSink.type = avro
agent.sinks.avroSink.channel = memoryChannel
agent.sinks.avroSink.hostname = 
<chosen machine‘s hostname> agent.sinks.avroSink.port = <chosen port on the machine>
View Code

Configuring Spark Streaming Application

  1. Linking: 在maven項目中配置依賴
<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-streaming-flume-sink_2.10</
artifactId> <version>2.1.0</version> </dependency>

  2. Programming:import FlumeUtils, 創建input DStream

 import org.apache.spark.streaming.flume._

 val flumeStream = FlumeUtils.createStream(streamingContext, [chosen machines hostname], [chosen port])
  • 註意:應該與cluster中的resourceManager使用同一個hostname,這樣的話資源分配可以匹配names,並在正確的機器上launch receiver
  • 一個簡單的Spark Streaming統計Flume event個數的demo代碼:
技術分享
object FlumeEventCount {
  def main(args: Array[String]) {
    if (args.length < 2) {
      System.err.println(
        "Usage: FlumeEventCount <host> <port>")
      System.exit(1)
    }

    StreamingExamples.setStreamingLogLevels()

    val Array(host, IntParam(port)) = args

    val batchInterval = Milliseconds(2000)

    // Create the context and set the batch size
    val sparkConf = new SparkConf().setAppName("FlumeEventCount")
    val ssc = new StreamingContext(sparkConf, batchInterval)

    // Create a flume stream
    val stream = FlumeUtils.createStream(ssc, host, port, StorageLevel.MEMORY_ONLY_SER_2)

    // Print out the count of events received from this server in each batch
    stream.count().map(cnt => "Received " + cnt + " flume events." ).print()

    ssc.start()
    ssc.awaitTermination()
  }
}
View Code

<Spark Streaming><Flume><Integration>