1. 程式人生 > >SparkStreaming(12):高階資料來源kafka Receiver方式(生產)

SparkStreaming(12):高階資料來源kafka Receiver方式(生產)

1.準備環境

(1) 啟動zk

    bin/zkServer.sh start

(2)啟動kafka

    bin/kafka-server-start.sh -daemon config/server.properties 

(3)建立topic

bin/kafka-topics.sh --create --topic kafka_streaming_topic --zookeeper bigdata.ibeifeng.com:2181/kafka08 --partitions 1 --replication-factor 1

檢視

bin/kafka-topics.sh --list --zookeeper bigdata.ibeifeng.com:2181/kafka08

(4)測試kafka可以正常接收產生的訊息,並且消費

  生產者

bin/kafka-console-producer.sh --broker-list bigdata.ibeifeng.com:9092 --topic kafka_streaming_topic

  消費:

bin/kafka-console-consumer.sh --topic kafka_streaming_topic --zookeeper bigdata.ibeifeng.com:2181/kafka08    

(經測試,成功!)

2.開發程式碼

(1)pom依賴

【參考:http://spark.apache.org/docs/2.1.0/streaming-kafka-0-8-integration.html】

    <dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-streaming-kafka-0-8_2.11</artifactId>
    <version>2.1.0</version>
    </dependency>

(2)程式碼

package Spark

import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.ReceiverInputDStream
import org.apache.spark.streaming.kafka.KafkaUtils
import org.apache.spark.streaming.{Seconds, StreamingContext}


/**
  */
object KafkaReceiverWordCount_product {
  def main(args: Array[String]): Unit = {
    if(args.length!=4){
      System.err.println("Usage: KafkaReceiverWordCount <zkQuorum><group><topics><numThreads>")
    }

    val Array(zkQuorum,group,topics,numThreads)=args
    //因為這個是生產環境,所以註釋
    val sparkConf=new SparkConf()

    val ssc=new StreamingContext(sparkConf,Seconds(5))

    val topicMap=topics.split(",").map((_,numThreads.toInt)).toMap
    //TODO: Spark streaming如何對接kafka
    //參考原始碼createStream
    val messages: ReceiverInputDStream[(String, String)] =KafkaUtils.createStream(ssc,zkQuorum,group,topicMap)
    //取第2個
    messages.map(_._2).flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_).print()

    ssc.start()
    ssc.awaitTermination()
  }
}

3.測試

(1)jar包放入

/opt/datas/lib/scalaProjectMaven.jar

(2)開啟hdfs

(3)提交spark任務

bin/spark-submit \
--class Spark.KafkaReceiverWordCount_product \
--master local[2] \
--name KafkaReceiverWordCount_product \
--packages org.apache.spark:spark-streaming-kafka-0-8_2.11:2.1.0 \
/opt/datas/lib/scalaProjectMaven.jar  bigdata.ibeifeng.com:2181/kafka08 test kafka_streaming_topic 1

(經測試成功!)