1. 程式人生 > >kafka(六):與spark streaming對接,spark streaming接收kafka資料來源

kafka(六):與spark streaming對接,spark streaming接收kafka資料來源

1.功能實現

spark streaming從kafka接收資料,有兩種方式,receiver和direct兩種方式。

2.pom依賴

針對kafka_2.10-0.8.2.1版本

        <!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka -->
        <dependency>
           <groupId>org.apache.kafka</groupId>
           <artifactId>kafka_2.11</artifactId>
           <version>0.8.2.1</version>
        </dependency>

        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-streaming-kafka-0-8_2.11</artifactId>
            <version>2.1.0</version>
        </dependency>

3.scala程式碼

(1)receiver方式

package stream

import kafka.serializer.StringDecoder
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.dstream.ReceiverInputDStream
import org.apache.spark.streaming.kafka.KafkaUtils
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.{SparkConf, SparkContext}
//import org.apache.spark.streaming.Kafka.ka

/**
  * 
  */
object UseReceiveKafkaStreaming08 extends App{
  val conf = new SparkConf()
    .setMaster("local[*]")
    .setAppName("UseReceiveKafkaStreaming")
    .set("spark.streaming.blockInterval","1s")
  val sc = SparkContext.getOrCreate(conf)
  //  val sc = SparkUtil.createSparkContext(true,"StreamingWC")

  val ssc = new StreamingContext(sc,Seconds(10))

  //獲取資料來源
  /**
    * s
   */
  val zkQuorum="bigdata.ibeifeng.com:2181/kafka08"
  val topics=Map[String,Int]("beifeng1"-> 5)
  val groupId="sparkstreaming"


    //API1:
    val kafkaDStream=KafkaUtils
    .createStream(ssc,zkQuorum,groupId,topics,StorageLevel.MEMORY_AND_DISK_SER_2)
    .map(word=>(word._2,1))
    .reduceByKey(_ + _)

  //===============================上面是API 1已經被驗證!=============================

  

  kafkaDStream.print()
  ssc.start()
  ssc.awaitTermination()


}

(2)direct方式

package stream

import kafka.serializer.StringDecoder
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.dstream.ReceiverInputDStream
import org.apache.spark.streaming.kafka.KafkaUtils
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.{SparkConf, SparkContext}
//import org.apache.spark.streaming.Kafka.ka

/**
  * Created by Administrator on 2018/8/5.
  */
object UseReceiveKafkaStreaming08 extends App{
  val conf = new SparkConf()
    .setMaster("local[*]")
    .setAppName("UseReceiveKafkaStreaming")
    .set("spark.streaming.blockInterval","1s")
  val sc = SparkContext.getOrCreate(conf)
  //  val sc = SparkUtil.createSparkContext(true,"StreamingWC")

  val ssc = new StreamingContext(sc,Seconds(10))

  //獲取資料來源
  /**
    * s
   */
  val zkQuorum="bigdata.ibeifeng.com:2181/kafka08"
  val topics=Map[String,Int]("beifeng1"-> 5)
  val groupId="sparkstreaming"

  //===============================下面是API 2=============================
  /**
    *   def createStream[K: ClassTag, V: ClassTag, U <: Decoder[_]: ClassTag, T <: Decoder[_]: ClassTag](
      ssc: StreamingContext,
      kafkaParams: Map[String, String],
      topics: Map[String, Int],
      storageLevel: StorageLevel
    ): ReceiverInputDStream[(K, V)] = {
    val walEnabled = WriteAheadLogUtils.enableReceiverLog(ssc.conf)
    new KafkaInputDStream[K, V, U, T](ssc, kafkaParams, topics, walEnabled, storageLevel)
  }
    */
  val kafkaParams: Map[String, String] = Map[String,String](
    "zookeeper.connect" -> zkQuorum, "group.id" -> groupId,
    "zookeeper.connection.timeout.ms" -> "10000",
    //largest
    "auto.offset.reset" -> "smallest")
  //API2:
  val kafkaDStream = KafkaUtils.createStream[String,String,
    StringDecoder,StringDecoder](ssc,kafkaParams,topics,StorageLevel.MEMORY_AND_DISK)
    .flatMap(line => line._2.split(" "))
    .map(word => (word,1))
    .reduceByKey(_ + _)

  kafkaDStream.print()
  ssc.start()
  ssc.awaitTermination()
  //===============================上面是API 2已經被驗證!
}