Spark Streaming調優引數及最佳實踐深入剖析-Spark商業調優實戰
本套技術專欄是作者(秦凱新)平時工作的總結和昇華,通過從真實商業環境抽取案例進行總結和分享,並給出商業應用的調優建議和叢集環境容量規劃等內容,請持續關注本套部落格。版權宣告:禁止轉載,歡迎學習。QQ郵箱地址:[email protected],如有任何商業交流,可隨時聯絡。
1 開門見山
1.1 優化1:進行HA機制處理-針對updateStateByKey與window等有狀態的操作
-
HA高可用性:High Availability,如果有些資料丟失,或者節點掛掉;如果不想讓你的實時計算程式掛了,就必須做一些資料上的冗餘副本,保證你的實時計算程式可以7 * 24小時的運轉。
-
針對updateStateByKey與window等有狀態的操作,自動進行checkpoint,必須設定checkpoint目錄。
HDFS:SparkStreaming.checkpoint("hdfs://192.168.1.105:9090/checkpoint"), 複製程式碼
-
checkpoint 會把資料保留一份在容錯的檔案系統中,一旦記憶體中的資料丟失掉;就可以直接從文 件系統中讀取資料,不需要重新進行計算。
JavaStreamingContext jssc = new JavaStreamingContext( conf, Durations.seconds(5)); jssc.checkpoint("hdfs://192.168.1.105:9000/streaming_checkpoint"); 複製程式碼
1.2 優化2:進行HA機制處理-針對Driver高可用性
-
在建立和啟動StreamingContext的時候,將元資料寫入容錯的檔案系統(比如hdfs)。保證在driver掛掉之後,spark叢集可以自己將driver重新啟動起來;而且driver在啟動的時候,不會重新建立一個streaming context,而是從容錯檔案系統(比如hdfs)中讀取之前的元資料資訊,包括job的執行進度,繼續接著之前的進度,繼續執行。
-
使用這種機制,就必須使用cluster模式提交,確保driver執行在某個worker上面;
JavaStreamingContextFactory contextFactory = new JavaStreamingContextFactory() { @Override public JavaStreamingContext create() { JavaStreamingContext jssc = new JavaStreamingContext(...); JavaDStream<String> lines = jssc.socketTextStream(...); jssc.checkpoint(checkpointDirectory); return jssc; } }; JavaStreamingContext context = JavaStreamingContext.getOrCreate(checkpointDirectory, contextFactory); context.start(); context.awaitTermination(); 複製程式碼
-
提交方式
spark-submit --deploy-mode cluster --supervise 複製程式碼
1.3 優化3:實現RDD高可用性:啟動WAL預寫日誌機制
-
spark streaming,從原理上來說,是通過receiver來進行資料接收的;接收到的資料,會被劃分成一個一個的block;block會被組合成一個batch;針對一個batch,會建立一個rdd;
-
receiver接收到資料後,就會立即將資料寫入一份到容錯檔案系統(比如hdfs)上的checkpoint目錄中的,另一份寫入到磁碟檔案中去;作為資料的冗餘副本。無論你的程式怎麼掛掉,或者是資料丟失,那麼資料都不肯能會永久性的丟失;因為肯定有副本。
SparkConf conf = new SparkConf() .setMaster("local[2]") .setAppName("StreamingSpark"); .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer"); .set("spark.default.parallelism", "1000"); .set("spark.streaming.blockInterval", "50"); .set("spark.streaming.receiver.writeAheadLog.enable", "true"); JavaStreamingContext jssc = new JavaStreamingContext(conf, Durations.seconds(5)); jssc.checkpoint("hdfs://192.168.1.164:9000/checkpoint"); 複製程式碼
1.4 優化4:InputDStream並行化資料接收
-
建立多個InputDStream來接收同一資料來源
-
把多個topic資料細化為單一的kafkaStream來接收
1:建立kafkaStream Map<String, String> kafkaParams = new HashMap<String, String>(); kafkaParams.put("metadata.broker.list", "192.168.1.164:9092,192.168.1.165:9092,192.168.1.166:9092"); kafkaParams.put("zookeeper.connect","master:2181,data1:2181,data2:2181"); 構建topic set String kafkaTopics = ConfigurationManager.getProperty(Constants.KAFKA_TOPICS); String[] kafkaTopicsSplited = kafkaTopics.split(","); Set<String> topics = new HashSet<String>(); for(String kafkaTopic : kafkaTopicsSplited) { topics.add(kafkaTopic); JavaPairInputDStream<String, String> kafkaStream = KafkaUtils.createDirectStream( jssc, String.class, String.class, StringDecoder.class, StringDecoder.class, kafkaParams, topics); 2:InputDStream並行化資料接收 int numStreams = 5; List<JavaPairDStream<String, String>> kafkaStreams = new ArrayList<JavaPairDStream<String,String>>(numStreams); for (int i = 0; i < numStreams; i++) { kafkaStreams.add(KafkaUtils.createStream(...)); } JavaPairDStream<String, String> unifiedStream = streamingContext.union(kafkaStreams.get(0), kafkaStreams.subList(1, kafkaStreams.size())); unifiedStream.print(); 複製程式碼
1.5 優化5:增加block數量,增加每個batch rdd的partition數量,增加處理並行度
- 第一步:receiver從資料來源源源不斷地獲取到資料,首先是會按照block interval,將指定時間間隔的資料,收集為一個block;預設時間是200ms,官方推薦不要小於50ms;
- 第二步:根據指定batch interval時間間隔合併為一個batch,建立為一個rdd,
- 第三步:啟動一個job,去處理這個batch rdd中的資料。
- 第四步:batch rdd 的partition數量是多少呢?一個batch有多少個block,就有多少個partition;就意味著並行度是多少;就意味著每個batch rdd有多少個task會平行計算和處理。
- 調優:如果希望可以比預設的task數量和並行度再多一些,可以手動調節blockinterval,減少block interval。每個batch可以包含更多的block。因此也就有更多的partition,因此就會有更多的task並行處理每個batch rdd。
1.6 優化6:重分割槽,增加每個batch rdd的partition數量
- inputStream.repartition():重分割槽,增加每個batch rdd的partition數量 對dstream中的rdd進行重分割槽為指定數量的分割槽,就可以提高指定dstream的rdd的計算並行度
- 調節並行度具體細節:
1.7 優化7:提升並行度
方法1: spark.default.parallelism 方法2: reduceByKey(numPartitions) 複製程式碼

JavaPairDStream<String, Long> dailyUserAdClickCountDStream = dailyUserAdClickDStream.reduceByKey( new Function2<Long, Long, Long>() { private static final long serialVersionUID = 1L; @Override public Long call(Long v1, Long v2) throws Exception { return v1 + v2; } }); }, 1000); 複製程式碼
1.8 優化8:使用Kryo序列化機制
-
spark streaming:提高序列化task傳送到executor上執行的效能,比如:task很多的時候,task序列化和反序列化的效能開銷也比較可觀
-
預設輸入資料的儲存級別是StorageLevel.MEMORY_AND_DISK_SER_2,receiver接收到資料,預設就會進行持久化操作;首先序列化資料,儲存到記憶體中;如果記憶體資源不夠大,那麼就寫入磁碟;而且,還會寫一份冗餘副本到其他executor的block manager中,進行資料冗餘。
SparkConf conf = new SparkConf() .setMaster("local[2]") .setAppName("StreamingSpark"); .set("spark.serializer", "org.apache.spar.serializer.KryoSerializer"); <= 優化點 .set("spark.default.parallelism", "1000"); .set("spark.streaming.blockInterval", "50"); .set("spark.streaming.receiver.writeAheadLog.enable", "true"); 複製程式碼
1.9 優化9:batch interval:流式處理時間必須小於batch interval
-
batch interval,就是指每隔多少時間收集一次資料來源中的資料,然後進行處理,因此切忌處理時間過長導致的batch interval。
JavaStreamingContext jssc = new JavaStreamingContext(conf, Durations.seconds(5)); 複製程式碼
1.10 優化10:快取需要經常使用的資料
呼叫rdd.cache()來快取資料,這樣也可以加快資料的處理,但是我們需要更多的記憶體資源
1.11 優化11:定時清除不需要的資料
- 通過配置spark.cleaner.ttl為一個合理的值,但是這個值不能過小,因為如果後面計算需要用的資料被清除會帶來不必要的麻煩。
- 另外通過配置spark.streaming.unpersist為true(預設就是true)來更智慧地去持久化(unpersist)RDD。這個配置使系統找出那些不需要經常保有的RDD,然後去持久化它們。這可以減少Spark RDD的記憶體使用,也可能改善垃圾回收的行為。
1.12 優化12:GC優化策略(暫時不確定)
建議用並行Mark-Sweep垃圾回收機制,雖然它消耗更多的資源,但是我們還是建議開啟。 在spark-submit中使用
--driver-java-options "-XX:+UseConcMarkSweepGC" --conf "spark.executor.extraJavaOptions=-XX:+UseConcMarkSweepGC" 複製程式碼
1.13 優化13:去除壓縮
在記憶體充足的情況下,可以設定spark.rdd.compress 設定為false.
1.14 優化14:Executors和cpu核心數設定和Spark On Yarn 動態資源分配
-
首先需要對YARN的NodeManager進行配置,使其支援Spark的Shuffle Service。
##修改 <property> <name>yarn.nodemanager.aux-services</name> <value>mapreduce_shuffle,spark_shuffle</value> </property> ##增加 <property> <name>yarn.nodemanager.aux-services.spark_shuffle.class</name> <value>org.apache.spark.network.yarn.YarnShuffleService</value> </property> <property> <name>spark.shuffle.service.port</name> <value>7337</value> </property> 複製程式碼
-
將spark中對應jar包拷貝到hadoop的目錄下:
首先找到spark版本的spark-<version>-yarn-shuffle.jar shuffle包,並將該包放到叢集所有NodeManager的classpath下, 比如放到HADOOP_HOME/share/hadoop/yarn/lib 複製程式碼
-
重啟所有NodeManager。
-
配置示例如下:
spark.shuffle.service.enabled=true spark.dynamicAllocation.enabled=true spark.dynamicAllocation.executorIdleTimeout=60s spark.dynamicAllocation.initialExecutors=1 spark.dynamicAllocation.maxExecutors=5 spark.dynamicAllocation.minExecutors=0 spark-submit \ --master yarn \ --deploy-mode cluster \ --executor-cores 3 \ --executor-memory 10G \ --driver-memory 4G \ --conf spark.dynamicAllocation.enabled=true \ --conf spark.shuffle.service.enabled=true \ --conf spark.dynamicAllocation.initialExecutors=5 \ --conf spark.dynamicAllocation.maxExecutors=40 \ --conf spark.dynamicAllocation.minExecutors=0 \ --conf spark.dynamicAllocation.executorIdleTimeout=30s \ --conf spark.dynamicAllocation.schedulerBacklogTimeout=10s \ 複製程式碼
1.15 優化15:使用高效能運算元
使用reduceByKey/aggregateByKey替代groupByKey 使用mapPartitions替代普通map 使用foreachPartitions替代foreach 使用filter之後進行coalesce操作 使用repartitionAndSortWithinPartitions替代repartition與sort類操作