1. 程式人生 > >java8下spark-streaming結合kafka程式設計(spark 2.3 kafka 0.10)

java8下spark-streaming結合kafka程式設計(spark 2.3 kafka 0.10)

前面有說道spark-streaming的簡單demo,也有說到kafka成功跑通的例子,這裡就結合二者,也是常用的使用之一。

1.相關元件版本 
首先確認版本,因為跟之前的版本有些不一樣,所以才有必要記錄下,另外仍然沒有使用scala,使用java8,spark 2.0.0,kafka 0.10。

2.引入maven包 
網上找了一些結合的例子,但是跟我當前版本不一樣,所以根本就成功不了,所以探究了下,列出引入包。

<dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-streaming-kafka-0-10_2.11</artifactId>
      <version>2.3.1</version>
</dependency>


網上能找到的不帶kafka版本號的包最新是1.6.3,我試過,已經無法在spark2下成功運行了,所以找到的是對應kafka0.10的版本,注意spark2.0的scala版本已經是2.11,所以包括之前必須後面跟2.11,表示scala版本。

3.SparkSteamingKafka類 
需要注意的是引入的包路徑是org.apache.spark.streaming.kafka010.xxx,所以這裡把import也放進來了。其他直接看註釋。

import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.TopicPartition;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.streaming.Durations;
import org.apache.spark.streaming.api.java.JavaInputDStream;
import org.apache.spark.streaming.api.java.JavaPairDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.streaming.kafka010.ConsumerStrategies;
import org.apache.spark.streaming.kafka010.KafkaUtils;
import org.apache.spark.streaming.kafka010.LocationStrategies;

import scala.Tuple2;

public class SparkSteamingKafka {
    public static void main(String[] args) throws InterruptedException {
        String brokers = "master2:6667";
        String topics = "topic1";
        SparkConf conf = new SparkConf().setMaster("local[2]").setAppName("streaming word count");
        JavaSparkContext sc = new JavaSparkContext(conf);
        sc.setLogLevel("WARN");
        JavaStreamingContext ssc = new JavaStreamingContext(sc, Durations.seconds(1));

        Collection<String> topicsSet = new HashSet<>(Arrays.asList(topics.split(",")));
        //kafka相關引數,必要!缺了會報錯
        Map<String, Object> kafkaParams = new HashMap<>();
        kafkaParams.put("metadata.broker.list", brokers) ;
        kafkaParams.put("bootstrap.servers", brokers);
        kafkaParams.put("group.id", "group1");
        kafkaParams.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        kafkaParams.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        kafkaParams.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        //Topic分割槽  也可以通過配置項實現 
        //如果沒有初始化偏移量或者當前的偏移量不存在任何伺服器上,可以使用這個配置屬性
        //earliest 當各分割槽下有已提交的offset時,從提交的offset開始消費;無提交的offset時,從頭開始消費
        //latest 當各分割槽下有已提交的offset時,從提交的offset開始消費;無提交的offset時,消費新產生的該分割槽下的資料
        //none topic各分割槽都存在已提交的offset時,從offset後開始消費;只要有一個分割槽不存在已提交的offset,則丟擲異常
        //kafkaParams.put("auto.offset.reset", "latest");
        //kafkaParams.put("enable.auto.commit",false); 

        new HashMap<>();
        offsets.put(new TopicPartition("topic1", 0), 2L); 
        //通過KafkaUtils.createDirectStream(...)獲得kafka資料,kafka相關引數由kafkaParams指定
        JavaInputDStream<ConsumerRecord<Object,Object>> lines = KafkaUtils.createDirectStream(
                ssc,
                LocationStrategies.PreferConsistent(),
                ConsumerStrategies.Subscribe(topicsSet, kafkaParams, offsets)
            );
        //這裡就跟之前的demo一樣了,只是需要注意這邊的lines裡的引數本身是個ConsumerRecord物件
        JavaPairDStream<String, Integer> counts = 
                lines.flatMap(x -> Arrays.asList(x.value().toString().split(" ")).iterator())
                .mapToPair(x -> new Tuple2<String, Integer>(x, 1))
                .reduceByKey((x, y) -> x + y);  
        counts.print();
//  可以列印所有資訊,看下ConsumerRecord的結構
//      lines.foreachRDD(rdd -> {
//          rdd.foreach(x -> {
//            System.out.println(x);
//          });
//        });
        ssc.start();
        ssc.awaitTermination();
        ssc.close();
    }
}


4.執行測試 
這裡使用上一篇kafka初探裡寫的producer類,put資料到kafka服務端,我這是master2節點上部署的kafka,本地測試跑spark2。

UserKafkaProducer producerThread = new UserKafkaProducer(KafkaProperties.topic);
producerThread.start();

再執行3裡的SparkSteamingKafka類,可以看到已經成功。 
執行生產者類

執行spark充當消費者

SparkStreaming 資料處理

根據需要,將流式資料與Hive中的靜態資料關聯,結果通過Elasticsearch For Hadoop匯出到ES叢集中。

如果靜態資料需要定時更新,可以在建立資料流後,在foreachRDD邏輯中,根據實際情況定期更新靜態資料。

調優

由於個人經驗較少,處理的資料量不大,以下內容大多是紙上談兵,僅供參考。

合理的批處理時間(batchDuration)

  • 幾乎所有的Spark Streaming調優文件都會提及批處理時間的調整,在StreamingContext初始化的時候,有一個引數便是批處理時間的設定。
  • 如果這個值設定的過短,即個batchDuration所產生的Job並不能在這期間完成處理,那麼就會造成資料不斷堆積,最終導致Spark Streaming發生阻塞。
  • 一般對於batchDuration的設定不會小於500ms,因為過小會導致SparkStreaming頻繁的提交作業,對整個streaming造成額外的負擔。
  • 在平時的應用中,根據不同的應用場景和硬體配置,我設在1~10s之間,我們可以根據SparkStreaming的視覺化監控介面,觀察Total Delay來進行batchDuration的調整,直達SparkStreaming剛剛能及時處理完上一個批處理的資料,這樣就是目前情況的最優值。

 

合理的Kafka拉取量(maxRatePerPartition重要)

spark.streaming.kafka.maxRatePerPartition引數配置指定了每秒每一個topic的每一個分割槽獲取的最大訊息數。

對於Spark Streaming消費kafka中資料的應用場景,這個配置是非常關鍵的。這個引數預設是沒有上限的,即kafka當中有多少資料它就會直接全部拉出。而根據生產者寫入Kafka的速率以及消費者本身處理資料的速度,同時這個引數需要結合上面的batchDuration,使得每個partition拉取在每個batchDuration期間拉取的資料能夠順利的處理完畢,做到儘可能高的吞吐量,而這個引數的調整可以參考視覺化監控介面中的Input Rate和Processing Time。

 

快取反覆使用的Dstream(RDD)

Spark中的RDD和SparkStreaming中的Dstream,如果被反覆的使用,最好利用cache(),將該資料流快取起來,防止過度的排程資源造成的網路開銷。可以參考觀察Scheduling Delay引數。


設定合理的GC

長期使用Java的小夥伴都知道,JVM中的垃圾回收機制,可以讓我們不過多的關注與記憶體的分配回收,更加專注於業務邏輯,JVM都會為我們搞定。對JVM有些瞭解的小夥伴應該知道,在Java虛擬機器中,將記憶體分為了初生代(eden generation)、年輕代(young generation)、老年代(old generation)以及永久代(permanent generation),其中每次GC都是需要耗費一定時間的,尤其是老年代的GC回收,需要對記憶體碎片進行整理,通常採用標記-清楚的做法。同樣的在Spark程式中,JVM GC的頻率和時間也是影響整個Spark效率的關鍵因素。在通常的使用中建議:

設定年老代為併發收集。
--conf "spark.executor.extraJavaOptions=-XX:+UseConcMarkSweepGC"

設定合理的CPU資源數

CPU的core數量,每個executor可以佔用一個或多個core,可以通過觀察CPU的使用率變化來了解計算資源的使用情況,例如,很常見的一種浪費是一個executor佔用了多個core,但是總的CPU使用率卻不高(因為一個executor並不總能充分利用多核的能力),這個時候可以考慮讓麼個executor佔用更少的core,同時worker下面增加更多的executor,或者一臺host上面增加更多的worker來增加並行執行的executor的數量,從而增加CPU利用率。

但是增加executor的時候需要考慮好記憶體消耗,因為一臺機器的記憶體分配給越多的executor,每個executor的記憶體就越小,以致出現過多的資料spill over甚至out of memory的情況。

設定合理的parallelism

partition和parallelism,partition指的就是資料分片的數量,每一次task只能處理一個partition的資料,這個值太小了會導致每片資料量太大,導致記憶體壓力,或者諸多executor的計算能力無法利用充分;但是如果太大了則會導致分片太多,執行效率降低。在執行action型別操作的時候(比如各種reduce操作),partition的數量會選擇parent RDD中最大的那一個。而parallelism則指的是在RDD進行reduce類操作的時候,預設返回資料的paritition數量(而在進行map類操作的時候,partition數量通常取自parent RDD中較大的一個,而且也不會涉及shuffle,因此這個parallelism的引數沒有影響)。所以說,這兩個概念密切相關,都是涉及到資料分片的,作用方式其實是統一的。通過spark.default.parallelism可以設定預設的分片數量,而很多RDD的操作都可以指定一個partition引數來顯式控制具體的分片數量。 在SparkStreaming+kafka的使用中,我們採用了Direct連線方式,前文闡述過Spark中的partition和Kafka中的Partition是一一對應的,我們一般預設設定為Kafka中Partition的數量。

使用高效能的運算元

這裡參考了美團技術團隊的博文,並沒有做過具體的效能測試,其建議如下:

  • 使用reduceByKey/aggregateByKey替代groupByKey

  • 使用mapPartitions替代普通map

  • 使用foreachPartitions替代foreach

  • 使用filter之後進行coalesce操作

  • 使用repartitionAndSortWithinPartitions替代repartition與sort類操作

  • 使用Kryo優化序列化效能 這個優化原則我本身也沒有經過測試,但是好多優化文件有提到,這裡也記錄下來。 在Spark中,主要有三個地方涉及到了序列化:

  • 在運算元函式中使用到外部變數時,該變數會被序列化後進行網路傳輸。

  • 將自定義的型別作為RDD的泛型型別時(比如JavaRDD,Student是自定義型別),所有自定義型別物件,都會進行序列化。因此這種情況下,也要求自定義的類必須實現Serializable介面。

  • 使用可序列化的持久化策略時(比如MEMORY_ONLY_SER),Spark會將RDD中的每個partition都序列化成一個大的位元組陣列。

對於這三種出現序列化的地方,我們都可以通過使用Kryo序列化類庫,來優化序列化和反序列化的效能。Spark預設使用的是Java的序列化機制,也就是ObjectOutputStream/ObjectInputStream API來進行序列化和反序列化。但是Spark同時支援使用Kryo序列化庫,Kryo序列化類庫的效能比Java序列化類庫的效能要高很多。
官方介紹,Kryo序列化機制比Java序列化機制,效能高10倍左右。Spark之所以預設沒有使用Kryo作為序列化類庫,是因為Kryo要求最好要註冊所有需要進行序列化的自定義型別,因此對於開發者來說,這種方式比較麻煩。