1. 程式人生 > >Spark2.x 快速入門教程 7

Spark2.x 快速入門教程 7

Spark Streaming 整合 Kafka

一、實驗介紹

1.1 實驗內容

Kafka是一個分散式的釋出-訂閱式的訊息系統,可以作為 DStream 的高階資料來源,本部分以單擊統計為例介紹 Spark Streaming 程式從 kafka 中消費資料,包括兩部分(基於 Kafka Receiver 方式,基於Kafka Direct方式)。

1.2 先學課程

1.2 先學課程

1.3 實驗知識點

  • Kafka Receiver
  • Kafka Direct
  • Spark Streaming
  • Maven

1.4 實驗環境

  • Hadoop-2.6.1
  • kafka_2.10-0.10.0.0
  • Xfce 終端

    1.5 適合人群

    本課程屬於初級難度級別,適合具有 Kafka 基礎的使用者,如果對 Streaming 瞭解能夠更好的上手本課程。

二、實驗步驟

2.1 Spark Streaming設計設計思想

Spark Streaming 是 Spark 的核心元件之一,為 Spark 提供了可拓展、高吞吐、容錯的流計算能力。如下圖所示,Spark Streaming 可整合多種輸入資料來源,如 Kafka、Flume、HDFS等,經處理後的資料可儲存至檔案系統、資料庫,或顯示在儀表盤裡。

此處輸入圖片的描述

Spark Streaming 最主要的抽象是 DStream(Discretized Stream,離散化資料流),表示連續不斷的資料流。在內部實現上,Spark Streaming 的輸入資料按照時間片(如1秒)分成一段一段的 DStream,每一段資料轉換為 Spark 中的 RDD,並且對 DStream 的操作都最終轉變為對相應的 RDD 的操作。例如,下圖展示了進行單詞統計時,每個時間片的資料(儲存句子的 RDD)經 flatMap 操作,生成了儲存單詞的 RDD。整個流式計算可根據業務的需求對這些中間的結果進一步處理,或者儲存到外部裝置中。

此處輸入圖片的描述

2.2 準備工作

我們已經在實驗樓環境裡下載並配置啟動 hadoop-2.6.1 所需的檔案,免除您配置檔案的麻煩,您可以在 /opt 找到,只需格式化並啟動 hadoop 程序即可。

雙擊開啟桌面上的 Xfce 終端,用 sudo 命令切換到 hadoop 使用者,hadoop 使用者密碼為 hadoop,用 cd 命令進入 /opt目錄。

$ su hadoop
$ cd /opt/

此處輸入圖片的描述

在 /opt 目錄下格式化 hadoop。

$ hadoop-2.6.1/bin/hdfs namenode -format

此處輸入圖片的描述

在 /opt 目錄下啟動 hadoop 程序。

$ hadoop-2.6
.1/sbin/start-all.sh

此處輸入圖片的描述

用 jps 檢視 hadoop 程序是否啟動。

此處輸入圖片的描述

2.3 下載配置 Kafka

在 /opt 目錄下,用 hadoop 使用者通過 wget 命令下載,並用 tar 解壓。


$ sudo wget http://labfile.oss.aliyuncs.com/courses/785/kafka_2.10-0.10.0.0.tgz
$ sudo tar -zxf kafka_2.10-0.10.0.0.tgz

此處輸入圖片的描述

分別啟動 zookeeper,kafka。

#許可權不足,授權
$ sudo chmod 777 -R kafka_2.10-0.10.0.0
$ cd  kafka_2.10-0.10.0.0
#啟動zookeeper
$ bin/zookeeper-server-start.sh  config//zookeeper.properties &

#啟動kafka
$ bin/kafka-server-start.sh  config/server.properties  &

此處輸入圖片的描述

此處輸入圖片的描述

用 jps命令檢視程序。

此處輸入圖片的描述

用 kafka-topics.sh 指令碼建立主題。

bin/kafka-topics.sh  --create --zookeeper localhost:2181  --replication-factor 1 --partitions 1 --topic wc1

此處輸入圖片的描述

三、程式碼實現及測試

注意:本節課實驗是沿用上節課的 scala IDE 環境,pom.xml 不需要修改,需要的 spark-streaming-kafka_2.10 jar 依賴已經新增在裡面。

1). 基於Kafka Receiver方式

選中 cn.com.syl.spark 包 -> 用快捷鍵 Ctrl+N ->搜尋 class -> 選中 java class -> Next

此處輸入圖片的描述

輸入類名 -> Finish

此處輸入圖片的描述

KafkaReceiverSpark.java 程式碼如下:

package cn.com.syl.spark;

import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.streaming.Durations;
import org.apache.spark.streaming.api.java.JavaPairReceiverInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.streaming.kafka.KafkaUtils;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaPairDStream;

import scala.Tuple2;

public class KafkaReceiverSpark {

    public static void main(String[] args) {
        SparkConf conf = new SparkConf()
                .setMaster("local[2]")
                .setAppName("KafkaReceiverSpark");  
        JavaStreamingContext jsc = new JavaStreamingContext(conf, Durations.seconds(6));

        // 使用KafkaUtils.createStream()方法,建立 Kafka 的輸入資料流
        Map<String, Integer> topicThreadMap = new HashMap<String, Integer>();
        topicThreadMap.put("wc1", 1);

        JavaPairReceiverInputDStream<String, String> lines = KafkaUtils.createStream(
                jsc, 
                "localhost:2181", 
                "DefaultConsumerGroup", 
                topicThreadMap);



        // wordcount code
        JavaDStream<String> words = lines.flatMap(

                new FlatMapFunction<Tuple2<String,String>, String>() {

                    private static final long serialVersionUID = 1L;

                    @Override
                    public Iterable<String> call(Tuple2<String, String> tuple)
                            throws Exception {
                        return Arrays.asList(tuple._2.split(" "));  
                    }

                });

        JavaPairDStream<String, Integer> pairs = words.mapToPair(

                new PairFunction<String, String, Integer>() {

                    private static final long serialVersionUID = 1L;

                    @Override
                    public Tuple2<String, Integer> call(String word)
                            throws Exception {
                        return new Tuple2<String, Integer>(word, 1);
                    }

                });

        JavaPairDStream<String, Integer> wordCounts = pairs.reduceByKey(

                new Function2<Integer, Integer, Integer>() {

                    private static final long serialVersionUID = 1L;

                    @Override
                    public Integer call(Integer v1, Integer v2) throws Exception {
                        return v1 + v2;
                    }

                });

        wordCounts.print();  

        jsc.start();
        jsc.awaitTermination();
        jsc.close();
    }

}

啟動 Spark Streaming。

此處輸入圖片的描述

開啟 Xfce 終端啟動 kafka Producer。

$ bin/kafka-console-producer.sh  --broker-list localhost:9092 --topic wc1
#輸入任意

此處輸入圖片的描述

快速切換到scala IDE Console 控制檯,螢幕上會顯示程式執行的相關資訊,並會每隔6秒鐘重新整理一次資訊,大量資訊中會包含如下重要資訊,預設只顯示前十條:

此處輸入圖片的描述

同樣地,您也可以再另外開啟 consume 終端。

$ bin/kafka-console-consumer.sh  --zookeeper localhost:2181 --from-beginning --topic wc1

此處輸入圖片的描述

至此基於 Kafka Receiver 方式整哈Spark Streaming 順利完成。實驗結束後,要關閉各個終端,只要切換到該終端視窗,然後按鍵盤的 Ctrl+C 組合鍵,就可以結束程式執行。

2). 基於Kafka Direct 方式

關於 基於 Kafka Direct 方式,只需要新建一個類 KafkaDirectSpark,具體程式碼如下:

package cn.com.syl.spark;

import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import kafka.serializer.StringDecoder;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.streaming.Durations;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaPairDStream;
import org.apache.spark.streaming.api.java.JavaPairInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.streaming.kafka.KafkaUtils;

import scala.Tuple2;


public class KafkaDirectSpark{

    public static void main(String[] args) {
        SparkConf conf = new SparkConf()
                .setMaster("local[2]")
                .setAppName("KafkaDirectSpark");  
        JavaStreamingContext jssc = new JavaStreamingContext(conf, Durations.seconds(6));

        // 建立map,新增引數
        Map<String, String> kafkaParams = new HashMap<String, String>();
        kafkaParams.put("metadata.broker.list", 
                "localhost:9092");

        // 建立一個集合set,新增讀取的topic

        Set<String> topics = new HashSet<String>();
        topics.add("wc1");

        // 建立輸入DStream
        JavaPairInputDStream<String, String> lines = KafkaUtils.createDirectStream(
                jssc, 
                String.class, 
                String.class, 
                StringDecoder.class, 
                StringDecoder.class, 
                kafkaParams, 
                topics);

        // 單詞統計
        JavaDStream<String> words = lines.flatMap(

                new FlatMapFunction<Tuple2<String,String>, String>() {

                    private static final long serialVersionUID = 1L;

                    @Override
                    public Iterable<String> call(Tuple2<String, String> tuple)
                            throws Exception {
                        return Arrays.asList(tuple._2.split(" "));  
                    }

                });

        JavaPairDStream<String, Integer> pairs = words.mapToPair(

                new PairFunction<String, String, Integer>() {

                    private static final long serialVersionUID = 1L;

                    @Override
                    public Tuple2<String, Integer> call(String word) throws Exception {
                        return new Tuple2<String, Integer>(word, 1);
                    }

                });

        JavaPairDStream<String, Integer> wordCounts = pairs.reduceByKey(

                new Function2<Integer, Integer, Integer>() {

                    private static final long serialVersionUID = 1L;

                    @Override
                    public Integer call(Integer v1, Integer v2) throws Exception {
                        return v1 + v2;
                    }

                });

        wordCounts.print();

        jssc.start();
        jssc.awaitTermination();
        jssc.close();
    }

}

執行方式和上述基於 Kafka Receiver 方式一模一樣,在此就不演示了,請您完成。

補充知識:

假定您是在 windows 平臺寫的程式碼 ,對於上面的程式碼,您完全可以用打 jar 包的方式執行,具體參考上節 Streaming 整合 Flume

四、實驗總結

本節課主要介紹了 Spark Streaming 與 Kafka 的整合的兩種方式,並就 Windows 平臺如何打 jar 包提交到遠端伺服器進行講解,希望學完本節課,能幫助您理解 Spark Streaming,並能很快上手。

五、參考閱讀