SparkStreaming(13):高階資料來源kafka Direct方式(生產)
阿新 • • 發佈:2018-11-08
【Direct方式,直接從kafka的broker讀取資料,而Receiver方式,從zk獲得偏移量資訊,效能要差一些!】
1.測試環境
(1)啟動zk
bin/zkServer.sh start
(2) 啟動kafka
bin/kafka-server-start.sh -daemon config/server.properties
(3) 建立topic
bin/kafka-topics.sh --create --topic kafka_streaming_topic --zookeeper bigdata.ibeifeng.com:2181/kafka08 --partitions 1 --replication-factor 1
檢視:
bin/kafka-topics.sh --list --zookeeper bigdata.ibeifeng.com:2181/kafka08
2.程式碼開發
(1)新增pom依賴
【參考:http://spark.apache.org/docs/2.1.0/streaming-kafka-0-8-integration.html】
<dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-streaming-kafka-0-8_2.11</artifactId> <version>2.1.0</version> </dependency>
(2)程式碼
package Spark import kafka.serializer.StringDecoder import org.apache.spark.SparkConf import org.apache.spark.streaming.kafka.KafkaUtils import org.apache.spark.streaming.{Seconds, StreamingContext} /** */ object KafkaDirectWordCount_product { def main(args: Array[String]): Unit = { if(args.length!=2){ System.err.println("Usage: KafkaDirectWordCount <brokers><topics>") System.exit(1) } val Array(brokers,topics)=args val sparkConf=new SparkConf().setAppName("KafkaDirectWordCount") .setMaster("local[2]") val ssc=new StreamingContext(sparkConf,Seconds(5)) // val topicMap=topics.split(",").map((_,numThreads.toInt)).toMap val topicsSet=topics.split(",").toSet val kafkaParams=Map[String,String]("metadata.broker.list"->brokers) //TODO: Spark streaming如何對接kafka //參考原始碼createStream val messages =KafkaUtils.createDirectStream[String,String,StringDecoder,StringDecoder]( ssc,kafkaParams,topicsSet ) //取第2個 messages.map(_._2).flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_).print() ssc.start() ssc.awaitTermination() } }
3.測試
(1)提交spark任務
bin/spark-submit \
--class Spark.KafkaDirectWordCount_product \
--master local[2] \
--name KafkaDirectWordCount_product \
--packages org.apache.spark:spark-streaming-kafka-0-8_2.11:2.1.0 \
/opt/datas/lib/scalaProjectMaven.jar bigdata.ibeifeng.com:9092 kafka_streaming_topic