1. 程式人生 > >基於Python的Spark Streaming+Kafka程式設計實踐及調優總結

基於Python的Spark Streaming+Kafka程式設計實踐及調優總結

說明

Spark Streaming的原理說明的文章很多,這裡不做介紹。本文主要介紹使用Kafka作為資料來源的程式設計模型,編碼實踐,以及一些優化說明

演示環境

  1. Spark:1.6
  2. Kafka:kafka_2.11-0.9.0.1
  3. 實現語言:Python

程式設計模型

目前Spark Streaming 的kafka程式設計主要包括兩種模型 
1. 基於Receiver 
2. Direct(無Receiver)

基於Receiver

這種方式利用接收器(Receiver)來接收kafka中的資料,其最基本是使用Kafka高階使用者API介面。對於所有的接收器,從kafka接收來的資料會儲存在spark的executor中,之後spark streaming提交的job會處理這些資料

原理圖

這裡寫圖片描述

說明

  1. 需要藉助Write Ahead Logs 來保證資料的不丟失,如果我們啟用了Write Ahead Logs複製到檔案系統如HDFS,那麼storage level需要設定成 StorageLevel.MEMORY_AND_DISK_SER,也就是KafkaUtils.createStream(…, StorageLevel.MEMORY_AND_DISK_SER)
  2. 在Receiver的方式中,Spark中的partition和kafka中的partition並不是相關的,所以如果我們加大每個topic的partition數量,僅僅是增加執行緒來處理由單一Receiver消費的主題。但是這並沒有增加Spark在處理資料上的並行度。
  3. 對於不同的Group和topic我們可以使用多個Receiver建立不同的Dstream來並行接收資料,之後可以利用union來統一成一個Dstream。

Direct(無Receiver)

在spark1.3之後,引入了Direct方式。不同於Receiver的方式,Direct方式沒有receiver這一層,其會週期性的獲取Kafka中每個topic的每個partition中的最新offsets,之後根據設定的maxRatePerPartition來處理每個batch

不同於Receiver的方式(是從Zookeeper中讀取offset值,那麼自然zookeeper就儲存了當前消費的offset值,那麼如果重新啟動開始消費就會接著上一次offset值繼續消費)。而在Direct的方式中,是直接從kafka來讀資料,那麼offset需要自己記錄,可以利用checkpoint、資料庫或檔案記錄或者回寫到zookeeper中進行記錄

原理圖

這裡寫圖片描述

說明

  1. 簡化的並行:在Receiver的方式中我們提到建立多個Receiver之後利用union來合併成一個Dstream的方式提高資料傳輸並行度。而在Direct方式中,Kafka中的partition與RDD中的partition是一一對應的並行讀取Kafka資料,這種對映關係也更利於理解和優化。
  2. 高效:在Receiver的方式中,為了達到0資料丟失需要將資料存入Write Ahead Log中,這樣在Kafka和日誌中就儲存了兩份資料,浪費!而第二種方式不存在這個問題,只要我們Kafka的資料保留時間足夠長,我們都能夠從Kafka進行資料恢復。
  3. 精確一次:在Receiver的方式中,使用的是Kafka的高階API介面從Zookeeper中獲取offset值,這也是傳統的從Kafka中讀取資料的方式,但由於Spark Streaming消費的資料和Zookeeper中記錄的offset不同步,這種方式偶爾會造成資料重複消費。而第二種方式,直接使用了簡單的低階Kafka API,Offsets則利用Spark Streaming的checkpoints進行記錄,消除了這種不一致性。

程式碼實踐

Kafka生產者

package com.eric.kafka.producer;

import java.util.ArrayList;
import java.util.List;
import java.util.Random;
import java.util.Properties;

import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;

/**
 * Hello world!
 */
public class ProcuderSample {
    private final Producer<String, String> producer;
    public final static String TOPIC = "spark_streaming_test_topic";
    public final static Integer BATCH_SIZE = 2000;

    private ProcuderSample() {
        Properties props = new Properties();
        // 此處配置的是kafka的埠
        props.put("metadata.broker.list", "server1-2-5-24-138:9092,server1-3-5-24-139:9092,server1:9092");
        // 配置value的序列化類
        props.put("serializer.class", "kafka.serializer.StringEncoder");
        // 配置key的序列化類
        props.put("key.serializer.class", "kafka.serializer.StringEncoder");
        props.put("request.required.acks", "-1");
        producer = new Producer<String, String>(new ProducerConfig(props));
    }

    public void deadLoopSendMessage(){
        int recordCount=0;
        List<KeyedMessage<String, String>> tmpList=new ArrayList<KeyedMessage<String, String>>();
        while(true){
            Random rand=new Random();
            // 批量傳送資料
//          String randResult=recordCount+":"+rand.nextInt(100);
            String randResult=rand.nextInt(10)+"";
            tmpList.add(new KeyedMessage<String, String>(TOPIC, randResult , randResult));
            if (tmpList.size()%BATCH_SIZE==0){
                producer.send(tmpList);
                tmpList.clear();
            }
//          producer.send(new KeyedMessage<String, String>(TOPIC, randResult , randResult));
            recordCount+=1;
        }
    }



    public static void main(String[] args) {
        new ProcuderSample().deadLoopSendMessage();
    }
}

Receiver方式收取資料

# encoding:utf-8
__author__ = 'eric.sun'

"""演示如何使用Spark Streaming 通過Kafka Streaming實現WordCount
   執行命令:./spark-submit --master spark://server1-1-5-24-137:7077 --packages org.apache.spark:spark-streaming-kafka_2.10:1.6.0 ../examples/kafka_streaming.py > log
   Kafka資料來源程式:https://github.com/Eric-aihua/practise.git/java_cookbook/cookbook/src/main/java/com/eric/kafka/producer/ProcuderSample.java
"""

from pyspark import SparkContext
from pyspark import SparkConf
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils


def start():
    sconf=SparkConf()
    # sconf.set('spark.streaming.blockInterval','100')
    sconf.set('spark.cores.max' , 8)
    sc=SparkContext(appName='KafkaWordCount',conf=sconf)
    ssc=StreamingContext(sc,2)

    numStreams = 3
    kafkaStreams = [KafkaUtils.createStream(ssc,"server1-2-5-24-138:2181,server1-3-5-24-139:2181,server1-4-5-24-140:2181","streaming_test_group",{"spark_streaming_test_topic":1}) for _ in range (numStreams)]
    unifiedStream = ssc.union(*kafkaStreams)
    print unifiedStream
    #統計生成的隨機數的分佈情況
    result=unifiedStream.map(lambda x:(x[0],1)).reduceByKey(lambda x, y: x + y)
    result.pprint()
    ssc.start()             # Start the computation
    ssc.awaitTermination()  # Wait for the computation to terminate

if __name__ == '__main__':
    start()

Direct方式收取資料

# encoding:utf-8
__author__ = 'eric.sun'

"""演示如何使用Spark Streaming 通過Kafka Direct Streaming實現WordCount
   執行命令:./spark-submit --master spark://server1-1-5-24-137:7077 --packages org.apache.spark:spark-streaming-kafka_2.10:1.6.0 ../examples/kafka_streaming.py > log
   Kafka資料來源程式:https://github.com/Eric-aihua/practise.git/java_cookbook/cookbook/src/main/java/com/eric/kafka/producer/ProcuderSample.java

   使用Direct的好處
   1:根據topic的分割槽數預設的建立對應數量的rdd分割槽數
   2:Receiver的方式需要通過Write AHead Log的方式確保資料不丟失,Direct的方式不需要
   3:一次處理:使用Kafka Simple API對資料進行讀取,使用checkpoint對offset進行記錄

   問題:
   基於Zookeeper的Kafka監控工具不能獲取offset的值了,需要在每次Batch處理後,可以對Zookeeper的值進行設定

"""

from pyspark import SparkContext
from pyspark import SparkConf
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils


def start():
    sconf=SparkConf()
    sconf.set('spark.cores.max' , 8)
    sc=SparkContext(appName='KafkaDirectWordCount',conf=sconf)
    ssc=StreamingContext(sc,2)

    brokers="server1-2-5-24-138:9092,server1-3-5-24-139:9092,server1-4-5-24-140:9092"
    topic='spark_streaming_test_topic'
    kafkaStreams = KafkaUtils.createDirectStream(ssc,[topic],kafkaParams={"metadata.broker.list": brokers})
    #統計生成的隨機數的分佈情況
    result=kafkaStreams.map(lambda x:(x[0],1)).reduceByKey(lambda x, y: x + y)
    #列印offset的情況,此處也可以寫到Zookeeper中
    #You can use transform() instead of foreachRDD() as your
    # first method call in order to access offsets, then call further Spark methods.
    kafkaStreams.transform(storeOffsetRanges).foreachRDD(printOffsetRanges)
    result.pprint()
    ssc.start()             # Start the computation
    ssc.awaitTermination()  # Wait for the computation to terminate

offsetRanges = []

def storeOffsetRanges(rdd):
    global offsetRanges
    offsetRanges = rdd.offsetRanges()
    return rdd

def printOffsetRanges(rdd):
    for o in offsetRanges:
        print "%s %s %s %s %s" % (o.topic, o.partition, o.fromOffset, o.untilOffset,o.untilOffset-o.fromOffset)

if __name__ == '__main__':
    start()

調優總結

Spark streaming+Kafka的使用中,當資料量較小,很多時候預設配置和使用便能夠滿足情況,但是當資料量大的時候,就需要進行一定的調整和優化,而這種調整和優化本身也是不同的場景需要不同的配置。

  1. 合理的批處理時間(batchDuration):幾乎所有的Spark Streaming調優文件都會提及批處理時間的調整,在StreamingContext初始化的時候,有一個引數便是批處理時間的設定。如果這個值設定的過短,即個batchDuration所產生的Job並不能在這期間完成處理,那麼就會造成資料不斷堆積,最終導致Spark Streaming發生阻塞。而且,一般對於batchDuration的設定不會小於500ms,因為過小會導致SparkStreaming頻繁的提交作業,對整個streaming造成額外的負擔。在平時的應用中,根據不同的應用場景和硬體配置,我設在1~10s之間,我們可以根據SparkStreaming的視覺化監控介面,觀察Total Delay來進行batchDuration的調整
  2. 合理的Kafka拉取量(maxRatePerPartition重要):對於Spark Streaming消費kafka中資料的應用場景,這個配置是非常關鍵的,配置引數為:spark.streaming.kafka.maxRatePerPartition。這個引數預設是沒有上線的,即kafka當中有多少資料它就會直接全部拉出。而根據生產者寫入Kafka的速率以及消費者本身處理資料的速度,同時這個引數需要結合上面的batchDuration,使得每個partition拉取在每個batchDuration期間拉取的資料能夠順利的處理完畢,做到儘可能高的吞吐量,而這個引數的調整可以參考視覺化監控介面中的Input Rate和Processing Time
  3. 快取反覆使用的Dstream(RDD):Spark中的RDD和SparkStreaming中的Dstream,如果被反覆的使用,最好利用cache,將該資料流快取起來,防止過度的排程資源造成的網路開銷。可以參考觀察Scheduling Delay引數
  4. 設定合理的GC:長期使用Java的小夥伴都知道,JVM中的垃圾回收機制,可以讓我們不過多的關注與記憶體的分配回收,更加專注於業務邏輯,JVM都會為我們搞定。對JVM有些瞭解的小夥伴應該知道,在Java虛擬機器中,將記憶體分為了初生代(edengeneration)、年輕代young generation)、老年代(oldgeneration)以及永久代(permanentgeneration),其中每次GC都是需要耗費一定時間的,尤其是老年代的GC回收,需要對記憶體碎片進行整理,通常採用標記-清楚的做法。同樣的在Spark程式中,JVMGC的頻率和時間也是影響整個Spark效率的關鍵因素。在通常的使用中建議:–conf “spark.executor.extraJavaOptions=-XX:+UseConcMarkSweepGC”
  5. 設定合理的CPU資源數:CPU的core數量,每個executor可以佔用一個或多個core,可以通過觀察CPU的使用率變化來了解計算資源的使用情況,例如,很常見的一種浪費是一個executor佔用了多個core,但是總的CPU使用率卻不高(因為一個executor並不總能充分利用多核的能力),這個時候可以考慮讓麼個executor佔用更少的core,同時worker下面增加更多的executor,或者一臺host上面增加更多的worker來增加並行執行的executor的數量,從而增加CPU利用率。但是增加executor的時候需要考慮好記憶體消耗,因為一臺機器的記憶體分配給越多的executor,每個executor的記憶體就越小,以致出現過多的資料spill over甚至out of memory的情況
  6. 設定合理的parallelism:partition和parallelism,partition指的就是資料分片的數量,每一次task只能處理一個partition的資料,這個值太小了會導致每片資料量太大,導致記憶體壓力,或者諸多executor的計算能力無法利用充分;但是如果太大了則會導致分片太多,執行效率降低。在執行action型別操作的時候(比如各種reduce操作),partition的數量會選擇parent RDD中最大的那一個。而parallelism則指的是在RDD進行reduce類操作的時候,預設返回資料的paritition數量(而在進行map類操作的時候,partition數量通常取自parent RDD中較大的一個,而且也不會涉及shuffle,因此這個parallelism的引數沒有影響)。所以說,這兩個概念密切相關,都是涉及到資料分片的,作用方式其實是統一的。通過spark.default.parallelism可以設定預設的分片數量,而很多RDD的操作都可以指定一個partition引數來顯式控制具體的分片數量。 在SparkStreaming+kafka的使用中,我們採用了Direct連線方式,前文闡述過Spark中的partition和Kafka中的Partition是一一對應的,我們一般預設設定為Kafka中Partition的數量。
  7. 使用高效能的運算元:(1)使用reduceByKey/aggregateByKey替代groupByKe(2)使用mapPartitions替代普通map(3) 使用foreachPartitions替代foreach(4) 使用filter之後進行coalesce操作5 使用repartitionAndSortWithinPartitions替代repartition與sort類操作
  8. 使用Kryo優化序列化效能 
    主要有三個地方涉及到了序列化 
    • 在運算元函式中使用到外部變數時,該變數會被序列化後進行網路傳輸(見“原則七:廣播大變數”中的講解)。
    • 將自定義的型別作為RDD的泛型型別時(比如JavaRDD,Student是自定義型別),所有自定義型別物件,都會進行序列化。因此這種情況下,也要求自定義的類必須實現Serializable介面。
    • 使用可序列化的持久化策略時(比如MEMORY_ONLY_SER),Spark會將RDD中的每個partition都序列化成一個大的位元組陣列。

對於這三種出現序列化的地方,我們都可以通過使用Kryo序列化類庫,來優化序列化和反序列化的效能。Spark預設使用的是Java的序列化機制,也就是ObjectOutputStream/ObjectInputStream API來進行序列化和反序列化。但是Spark同時支援使用Kryo序列化庫,Kryo序列化類庫的效能比Java序列化類庫的效能要高很多。官方介紹,Kryo序列化機制比Java序列化機制,效能高10倍左右。Spark之所以預設沒有使用Kryo作為序列化類庫,是因為Kryo要求最好要註冊所有需要進行序列化的自定義型別,因此對於開發者來說,這種方式比較麻煩。 
以下是使用Kryo的程式碼示例,我們只要設定序列化類,再註冊要序列化的自定義型別即可(比如運算元函式中使用到的外部變數型別、作為RDD泛型型別的自定義型別等):

// 建立SparkConf物件。 val conf = new SparkConf.setMaster(...).setAppName(...) //設定序列化器為KryoSerializer。
conf.set("spark.serializer","org.apache.spark.serializer.KryoSerializer") //註冊要序列化的自定義型別。
conf.registerKryoClasses(Array(classOf[MyClass1], classOf[MyClass2]))