1. 程式人生 > >spark streaming 學習(和flume結合+和kafka 的結合)

spark streaming 學習(和flume結合+和kafka 的結合)

spark 2.1 設定日誌級別很簡單 下面幾行程式碼就可以搞定 主要是下面畫橫線的程式碼

val conf = new SparkConf().setAppName("FlumePollWordCount").setMaster("local[2]")
val sc = new SparkContext(conf)
sc.setLogLevel("WARN")
val ssc = new StreamingContext(sc, Seconds(5))

spark streaming 是實時計算

spark core 之類的涉及到rdd的是離線計算

所以說spark即是實時計算,又有離線計算

spark streaming 的第一個例子

利用 nc -lk 8888 在192.168.235.128的8888埠開啟一個輸入訊息的應用

在IDEA上建立一個spark streaming的程式

package com.wxa.spark.four

import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.{SparkConf, SparkContext}

object StreamingWordCount {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setAppName("StreamingWordCount").setMaster("local[2]")
    val sc = new SparkContext(conf)
    val ssc =new StreamingContext(sc ,Seconds(5))

    //Dstream 是個特殊的RDD (有序)
    val ds =ssc.socketTextStream("192.168.235.128",8888)
    val result = ds.flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_)
    result.print()  //這裡不能用print(result)   print是一個action
    ssc.start()
    ssc.awaitTermination()
  }

}

spark 和 flume相結合

flume-poll.conf 檔案的編寫+IDEA端FlumePollWordCount程式碼編寫

先把下面的三個jar包下載下來  放到 flume的lib目錄下


# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1

# source
a1.sources.r1.type = spooldir
a1.sources.r1.spoolDir = /export/data/flume
a1.sources.r1.fileHeader = true

# Describe the sink
a1.sinks.k1.type = org.apache.spark.streaming.flume.sink.SparkSink
a1.sinks.k1.hostname = master
a1.sinks.k1.port = 8888

# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
IDEA端:package com.wxa.spark.five

import java.net.InetSocketAddress

import org.apache.spark.SparkConf
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.flume.FlumeUtils
import org.apache.spark.streaming.{Seconds, StreamingContext}

object FlumePollWordCount {
  def main(args: Array[String]) {
    val conf = new SparkConf().setAppName("FlumePollWordCount").setMaster("local[2]")
    val ssc = new StreamingContext(conf, Seconds(5))
    //從flume中拉取資料(flume的地址)
    val address = Seq(new InetSocketAddress("192.168.235.128", 8888))
    val flumeStream = FlumeUtils.createPollingStream(ssc, address, StorageLevel.MEMORY_AND_DISK)
    val words = flumeStream.flatMap(x => new String(x.event.getBody().array()).split(" ")).map((_,1))
    val results = words.reduceByKey(_+_)
    results.print()
    ssc.start()
    ssc.awaitTermination()
  }
}

flume-push.conf 檔案的編寫+IDEA端FlumePushWordCount 程式碼

a1.sources = r1
a1.sinks = k1
a1.channels = c1

# source
a1.sources.r1.type = spooldir
a1.sources.r1.spoolDir = /export/data/flume
a1.sources.r1.fileHeader = true

# Describe the sink
a1.sinks.k1.type = avro
#這是接收方
a1.sinks.k1.hostname = 192.168.235.1  //這是widows上的地址
a1.sinks.k1.port = 8888

# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
IDEA端object FlumePushWordCount {

  def main(args: Array[String]) {
//    val host = args(0)
//    val port = args(1).toInt
    val conf = new SparkConf().setAppName("FlumeWordCount").setMaster("local[2]")
    val ssc = new StreamingContext(conf, Seconds(5))
    //推送方式: flume向spark傳送資料
    val flumeStream = FlumeUtils.createStream(ssc, "192.168.235.1", 8888)   //這是widows上的地址

    //flume中的資料通過event.getBody()才能拿到真正的內容
    val words = flumeStream.flatMap(x => new String(x.event.getBody().array()).split(" ")).map((_, 1))

    val results = words.reduceByKey(_ + _)
    results.print()
    ssc.start()
    ssc.awaitTermination()
  }
}

spark streaming和flume結合圖形概述


spark如果只與flume結合有它的弊端,spark會在woker的excutor中開啟一個reciver而不是在driver裡面開啟一個recevier,所以在叢集中跑的時候需要指定的是woker的地址,由於所有的東西都是由一個reciver來處理的所以當資料量大的時候,recevier可能負載不過來

sparkstreaming 和kafka的結合


producer和brokerlist建立連線,consumer與zk建立連線,consumer分組的意義在於:同一分組的機子多臺機子當一臺機子用,比如producer產生5000條資料,有兩臺機子是同一個分組的 機子1最後消費了3000條資料,機子2消費了2000條資料,加起來消費了5000條資料

新增pom配置

<dependency>
  <groupId>org.apache.spark</groupId>
  <artifactId>spark-streaming-kafka_2.11</artifactId>
  <version>1.6.3</version>
</dependency>

話不多說上程式碼

package com.wxa.spark.five

import org.apache.spark.{HashPartitioner, SparkConf, SparkContext}
import org.apache.spark.streaming.kafka.KafkaUtils
import org.apache.spark.streaming.{Seconds, StreamingContext}

/**
  * Created by root on 2016/5/21.
  */
object KafkaWordCount {

  val updateFunc = (iter: Iterator[(String, Seq[Int], Option[Int])]) => {
    //iter.flatMap(it=>Some(it._2.sum + it._3.getOrElse(0)).map(x=>(it._1,x)))
    iter.flatMap { case (x, y, z) => Some(y.sum + z.getOrElse(0)).map(i => (x, i)) }
  }


  def main(args: Array[String]) {

    val Array(zkQuorum, group, topics, numThreads) = args
    val sparkConf = new SparkConf().setAppName("KafkaWordCount").setMaster("local[2]")
    val sc = new SparkContext(sparkConf)
    sc.setLogLevel("WARN")
    val ssc = new StreamingContext(sc, Seconds(5))
    ssc.checkpoint("f://ck2")  //等同於sc的setCheckpointDir
    //"alog-2016-04-16,alog-2016-04-17,alog-2016-04-18"
    //"Array((alog-2016-04-16, 2), (alog-2016-04-17, 2), (alog-2016-04-18, 2))"
    val topicMap = topics.split(",").map((_, numThreads.toInt)).toMap
    val data = KafkaUtils.createStream(ssc, zkQuorum, group, topicMap)
    val words = data.map(_._2).flatMap(_.split(" "))
    val wordCounts = words.map((_, 1)).updateStateByKey(updateFunc, new HashPartitioner(ssc.sparkContext.defaultParallelism), true)
    wordCounts.print()
    ssc.start()
    ssc.awaitTermination()
  }
}

傳入的引數如下圖


Spark升級到2.0後測試stream-kafka測試報java.lang.NoClassDefFoundError: org/apache/spark/Logging錯誤

解決方案:

spark-core_2.11-1.5.2.logging.jar包的下載地址

解決方法詳解:將上面的jar包下載下來,放到如下圖所示 spark2.1.0(本人用的是這個版本)的jar目錄下

 

IDEA 中點選

 


 dependencies中 匯入jar包

即可解決java.lang.NoClassDefFoundError: org/apache/spark/Logging錯誤

接下來要解決的是弄一個生產者 

kafka-console-producer.sh --broker-list hadoop01:9092 hadoop02:9092 --topic ssckfk

往裡面輸入資料IDEA端的sprkstreaming 就可以接收到

視窗函式(主要的是reduceByKeyAndWindow這個函式)

示例圖:兩者並沒有重複計算 10:15 這個資料 

計算的時間段內的資料

如:

18:10:00 到 10:15 這段 是30                 10:15 到 10:25 是 70


package com.wxa.spark.sixday

import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.streaming.{Milliseconds, Seconds, StreamingContext}

/**
  * Created by ZX on 2016/4/19.
  */
object WindowOpts {

  def main(args: Array[String]) {
    val conf = new SparkConf().setAppName("WindowOpts").setMaster("local[2]")
    val sc = new SparkContext(conf)
    sc.setLogLevel("WARN")
    val ssc = new StreamingContext(sc, Milliseconds(5000))
    val lines = ssc.socketTextStream("192.168.235.128", 9999)
    val pairs = lines.flatMap(_.split(" ")).map((_, 1))
    val windowedWordCounts = pairs.reduceByKeyAndWindow((a:Int,b:Int) => (a + b), Seconds(15), Seconds(10)) //視窗的長度和滑動間隔必須是批次的倍數才行
    //Map((hello, 5), (jerry, 2), (kitty, 3))
    windowedWordCounts.print()
//    val a = windowedWordCounts.map(_._2).reduce(_+_)
//    a.foreachRDD(rdd => {
//      println(rdd.take(0))
//    })
//    a.print()
//    //windowedWordCounts.map(t => (t._1, t._2.toDouble / a.toD))
//    windowedWordCounts.print()
//    //result.print()
    ssc.start()
    ssc.awaitTermination()
  }

}

 spark+kafka 的兩種連線方式

方式一  (有reciver)通過zk進行連線


方式二 (沒有reciver)通過broker list 來連線 

相比較前一種優點:

1 高效

2 要自己管理偏移量(前一種是交給zk管理偏移量),但是更加靈活



必須新建一個 org.apache.spark.streaming.kafka 的包 才能在自己寫的kafkamanager中正常使用kafkacluster

kafkamanager 程式碼

package org.apache.spark.streaming.kafka

import kafka.common.TopicAndPartition
import kafka.message.MessageAndMetadata
import kafka.serializer.Decoder
import org.apache.spark.SparkException
import org.apache.spark.rdd.RDD
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.dstream.InputDStream
import org.apache.spark.streaming.kafka.KafkaCluster.LeaderOffset

import scala.reflect.ClassTag

/**
  * 自己管理offset
  */
class KafkaManager(val kafkaParams: Map[String, String]) extends Serializable {

  private val kc = new KafkaCluster(kafkaParams)

  /**
    * 建立資料流
    */
  def createDirectStream[K: ClassTag, V: ClassTag, KD <: Decoder[K]: ClassTag, VD <: Decoder[V]: ClassTag](
                                                                                                            ssc: StreamingContext, kafkaParams: Map[String, String], topics: Set[String]): InputDStream[(K, V)] =  {
    val groupId = kafkaParams.get("group.id").get
    // 在zookeeper上讀取offsets前先根據實際情況更新offsets
    setOrUpdateOffsets(topics, groupId)

    //從zookeeper上讀取offset開始消費message
    val messages = {
      val partitionsE = kc.getPartitions(topics)
      if (partitionsE.isLeft)
        throw new SparkException(s"get kafka partition failed: ${partitionsE.left.get}")
      val partitions = partitionsE.right.get
      val consumerOffsetsE = kc.getConsumerOffsets(groupId, partitions)
      if (consumerOffsetsE.isLeft)
        throw new SparkException(s"get kafka consumer offsets failed: ${consumerOffsetsE.left.get}")
      val consumerOffsets = consumerOffsetsE.right.get
      KafkaUtils.createDirectStream[K, V, KD, VD, (K, V)](
        ssc, kafkaParams, consumerOffsets, (mmd: MessageAndMetadata[K, V]) => (mmd.key, mmd.message))
    }
    messages
  }

  /**
    * 建立資料流前,根據實際消費情況更新消費offsets
    * @param topics
    * @param groupId
    */
  private def setOrUpdateOffsets(topics: Set[String], groupId: String): Unit = {
    topics.foreach(topic => {
      var hasConsumed = true
      val partitionsE = kc.getPartitions(Set(topic))
      if (partitionsE.isLeft)
        throw new SparkException(s"get kafka partition failed: ${partitionsE.left.get}")
      val partitions = partitionsE.right.get
      val consumerOffsetsE = kc.getConsumerOffsets(groupId, partitions)
      if (consumerOffsetsE.isLeft) hasConsumed = false
      if (hasConsumed) {// 消費過
        /**
          * 如果streaming程式執行的時候出現kafka.common.OffsetOutOfRangeException,
          * 說明zk上儲存的offsets已經過時了,即kafka的定時清理策略已經將包含該offsets的檔案刪除。
          * 針對這種情況,只要判斷一下zk上的consumerOffsets和earliestLeaderOffsets的大小,
          * 如果consumerOffsets比earliestLeaderOffsets還小的話,說明consumerOffsets已過時,
          * 這時把consumerOffsets更新為earliestLeaderOffsets
          */
        val earliestLeaderOffsetsE = kc.getEarliestLeaderOffsets(partitions)
        if (earliestLeaderOffsetsE.isLeft)
          throw new SparkException(s"get earliest leader offsets failed: ${earliestLeaderOffsetsE.left.get}")
        val earliestLeaderOffsets = earliestLeaderOffsetsE.right.get
        val consumerOffsets = consumerOffsetsE.right.get

        // 可能只是存在部分分割槽consumerOffsets過時,所以只更新過時分割槽的consumerOffsets為earliestLeaderOffsets
        var offsets: Map[TopicAndPartition, Long] = Map()
        consumerOffsets.foreach({ case(tp, n) =>
          val earliestLeaderOffset = earliestLeaderOffsets(tp).offset
          if (n < earliestLeaderOffset) {
            println("consumer group:" + groupId + ",topic:" + tp.topic + ",partition:" + tp.partition +
              " offsets已經過時,更新為" + earliestLeaderOffset)
            offsets += (tp -> earliestLeaderOffset)
          }
        })
        if (!offsets.isEmpty) {
          kc.setConsumerOffsets(groupId, offsets)
        }
      } else {// 沒有消費過
      val reset = kafkaParams.get("auto.offset.reset").map(_.toLowerCase)
        var leaderOffsets: Map[TopicAndPartition, LeaderOffset] = null
        if (reset == Some("smallest")) {
          val leaderOffsetsE = kc.getEarliestLeaderOffsets(partitions)
          if (leaderOffsetsE.isLeft)
            throw new SparkException(s"get earliest leader offsets failed: ${leaderOffsetsE.left.get}")
          leaderOffsets = leaderOffsetsE.right.get
        } else {
          val leaderOffsetsE = kc.getLatestLeaderOffsets(partitions)
          if (leaderOffsetsE.isLeft)
            throw new SparkException(s"get latest leader offsets failed: ${leaderOffsetsE.left.get}")
          leaderOffsets = leaderOffsetsE.right.get
        }
        val offsets = leaderOffsets.map {
          case (tp, offset) => (tp, offset.offset)
        }
        kc.setConsumerOffsets(groupId, offsets)
      }
    })
  }

  /**
    * 更新zookeeper上的消費offsets
    * @param rdd
    */
  def updateZKOffsets(rdd: RDD[(String, Long)]) : Unit = {
    val groupId = kafkaParams.get("group.id").get
    val offsetsList = rdd.asInstanceOf[HasOffsetRanges].offsetRanges

    for (offsets <- offsetsList) {
      val topicAndPartition = TopicAndPartition(offsets.topic, offsets.partition)
      val o = kc.setConsumerOffsets(groupId, Map((topicAndPartition, offsets.untilOffset)))
      if (o.isLeft) {
        println(s"Error updating the offset to Kafka cluster: ${o.left.get}")
      }
    }
  }
}

DirectKafkaWordCount程式碼

package com.wxa.spark.five

import kafka.serializer.StringDecoder
import org.apache.log4j.{Level, Logger}
import org.apache.spark.SparkConf
import org.apache.spark.rdd.RDD
import org.apache.spark.streaming.kafka.{KafkaManager, KafkaUtils}
import org.apache.spark.streaming.{Seconds, StreamingContext}


object DirectKafkaWordCount {

  /*  def dealLine(line: String): String = {
      val list = line.split(',').toList
  //    val list = AnalysisUtil.dealString(line, ',', '"')// 把dealString函式當做split即可
      list.get(0).substring(0, 10) + "-" + list.get(26)
    }*/

  def processRdd(rdd: RDD[(String, String)]): Unit = {
    val lines = rdd.map(_._2)
    val words = lines.map(_.split(" "))
    val wordCounts = words.map(x => (x, 1L)).reduceByKey(_ + _)
    wordCounts.foreach(println)
  }

  def main(args: Array[String]) {
    if (args.length < 3) {
      System.err.println(
        s"""
           |Usage: DirectKafkaWordCount <brokers> <topics> <groupid>
           |  <brokers> is a list of one or more Kafka brokers
           |  <topics> is a list of one or more kafka topics to consume from
           |  <groupid> is a consume group
           |
        """.stripMargin)
      System.exit(1)
    }

    Logger.getLogger("org").setLevel(Level.WARN)

    val Array(brokers, topics, groupId) = args

    // Create context with 2 second batch interval
    val sparkConf = new SparkConf().setAppName("DirectKafkaWordCount")
    sparkConf.setMaster("local[*]")
    sparkConf.set("spark.streaming.kafka.maxRatePerPartition", "5")
    sparkConf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")

    val ssc = new StreamingContext(sparkConf, Seconds(2))

    // Create direct kafka stream with brokers and topics
    val topicsSet = topics.split(",").toSet
    val kafkaParams = Map[String, String](
      "metadata.broker.list" -> brokers,
      "group.id" -> groupId,
      "auto.offset.reset" -> "smallest"
    )

    val km = new KafkaManager(kafkaParams)    //和KafkaUtils.createDirectStream()建立直接流基本一致


    val messages = km.createDirectStream[String, String, StringDecoder, StringDecoder](
      ssc, kafkaParams, topicsSet)

//    messages.foreachRDD(rdd => {
//      if (!rdd.isEmpty()) {
//        // 先處理訊息
//        processRdd(rdd)
//        // 再更新offsets
//        km.updateZKOffsets(rdd)
//      }
//    })

    ssc.start()
    ssc.awaitTermination()
  }
}