1. 程式人生 > >Spark學習筆記(16)——Spark Streaming 整合Kafka

Spark學習筆記(16)——Spark Streaming 整合Kafka

1 啟動 zk(zookeeper-3.4.8)

三個節點同時操作 zkServer.sh start

2 啟動 Kafka

在這裡插入圖片描述
三個節點同時操作

kafka-server-start.sh /home/hadoop/apps/kafka_2.10-0.8.2.1/config/server.properties

後臺啟動方式

kafka-server-start.sh /home/hadoop/apps/kafka_2.10-0.8.2.1/config/server.properties > /dev/null 2>&1 &

2.1 建立一個 topic

[[email protected] ~]$ kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 --partitions 3 --topic test
Created topic "test".
[[email protected] ~]$ 

檢視 topic 詳情

[[email protected] ~]$ kafka-topics.sh --describe --zookeeper localhost:2181 --topic test
Topic:test	PartitionCount:3	ReplicationFactor:3	Configs:
	Topic: test	Partition: 0	Leader: 2	Replicas: 2,0,1	Isr: 2,0,1
	Topic: test	Partition: 1	Leader: 0	Replicas: 0,1,2	Isr: 0,1,2
	Topic: test	Partition: 2	Leader: 1	Replicas: 1,2,0	Isr: 1,2,0
[
[email protected]
~]$

3 原始碼

3.1 pom 檔案

<dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-streaming-kafka_2.10</artifactId>
            <version>1.6.3</version>
</dependency>

3.2 spark streaming 讀取 kafka

package streamingAndKafka

import mystreaming.LoggerLevels
import org.apache.spark.{HashPartitioner, SparkConf}
import org.apache.spark.streaming.kafka.KafkaUtils
import org.apache.spark.streaming.{Seconds, StreamingContext}


object KafkaWordcount {

  val updateFunc = (iter: Iterator[(String, Seq[Int], Option[Int])]) => {
    iter.flatMap {
      case (x, y, z) => Some(y.sum + z.getOrElse(0)).map(i => (x, i))
    }
  }

  def main(args: Array[String]): Unit = {

    LoggerLevels.setStreamingLogLevels()
    val Array(zkQuorum, group, topics, numThreads) = args

    val conf = new SparkConf().setAppName("KafkaWordcount").setMaster("local[2]")
    val ssc = new StreamingContext(conf, Seconds(5))

    ssc.checkpoint("d://ck1")
    val topicMap = topics.split(",").map((_,numThreads.toInt)).toMap
    val data = KafkaUtils.createStream(ssc,zkQuorum,group,topicMap)
    data.print()

    val words = data.map(_._2).flatMap(_.split(" "))
    val wordCounts = words.map((_,1)).updateStateByKey(updateFunc,new HashPartitioner(ssc.sparkContext.defaultParallelism),true)
    wordCounts.print()


  }
}

在這裡插入圖片描述

3.3 產生訊息

[[email protected] ~]$ kafka-console-producer.sh --broker-list 192.168.30.131:9092 --topic test

4 Spark Streaming Direct 方式