Spark學習筆記(16)——Spark Streaming 整合Kafka
阿新 • • 發佈:2018-11-06
1 啟動 zk(zookeeper-3.4.8)
三個節點同時操作 zkServer.sh start
2 啟動 Kafka
三個節點同時操作
kafka-server-start.sh /home/hadoop/apps/kafka_2.10-0.8.2.1/config/server.properties
後臺啟動方式
kafka-server-start.sh /home/hadoop/apps/kafka_2.10-0.8.2.1/config/server.properties > /dev/null 2>&1 &
2.1 建立一個 topic
[[email protected] ~]$ kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 --partitions 3 --topic test
Created topic "test".
[[email protected] ~]$
檢視 topic 詳情
[[email protected] ~]$ kafka-topics.sh --describe --zookeeper localhost:2181 --topic test
Topic:test PartitionCount:3 ReplicationFactor:3 Configs:
Topic: test Partition: 0 Leader: 2 Replicas: 2,0,1 Isr: 2,0,1
Topic: test Partition: 1 Leader: 0 Replicas: 0,1,2 Isr: 0,1,2
Topic: test Partition: 2 Leader: 1 Replicas: 1,2,0 Isr: 1,2,0
[ [email protected] ~]$
3 原始碼
3.1 pom 檔案
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka_2.10</artifactId>
<version>1.6.3</version>
</dependency>
3.2 spark streaming 讀取 kafka
package streamingAndKafka
import mystreaming.LoggerLevels
import org.apache.spark.{HashPartitioner, SparkConf}
import org.apache.spark.streaming.kafka.KafkaUtils
import org.apache.spark.streaming.{Seconds, StreamingContext}
object KafkaWordcount {
val updateFunc = (iter: Iterator[(String, Seq[Int], Option[Int])]) => {
iter.flatMap {
case (x, y, z) => Some(y.sum + z.getOrElse(0)).map(i => (x, i))
}
}
def main(args: Array[String]): Unit = {
LoggerLevels.setStreamingLogLevels()
val Array(zkQuorum, group, topics, numThreads) = args
val conf = new SparkConf().setAppName("KafkaWordcount").setMaster("local[2]")
val ssc = new StreamingContext(conf, Seconds(5))
ssc.checkpoint("d://ck1")
val topicMap = topics.split(",").map((_,numThreads.toInt)).toMap
val data = KafkaUtils.createStream(ssc,zkQuorum,group,topicMap)
data.print()
val words = data.map(_._2).flatMap(_.split(" "))
val wordCounts = words.map((_,1)).updateStateByKey(updateFunc,new HashPartitioner(ssc.sparkContext.defaultParallelism),true)
wordCounts.print()
}
}
3.3 產生訊息
[[email protected] ~]$ kafka-console-producer.sh --broker-list 192.168.30.131:9092 --topic test