1. 程式人生 > >Spark Streaming調優引數及最佳實踐深入剖析-Spark商業調優實戰

Spark Streaming調優引數及最佳實踐深入剖析-Spark商業調優實戰

本套技術專欄是作者(秦凱新)平時工作的總結和昇華,通過從真實商業環境抽取案例進行總結和分享,並給出商業應用的調優建議和叢集環境容量規劃等內容,請持續關注本套部落格。版權宣告:禁止轉載,歡迎學習。QQ郵箱地址:[email protected],如有任何商業交流,可隨時聯絡。

1 開門見山

1.1 優化1:進行HA機制處理-針對updateStateByKey與window等有狀態的操作

  • HA高可用性:High Availability,如果有些資料丟失,或者節點掛掉;如果不想讓你的實時計算程式掛了,就必須做一些資料上的冗餘副本,保證你的實時計算程式可以7 * 24小時的運轉。

  • 針對updateStateByKey與window等有狀態的操作,自動進行checkpoint,必須設定checkpoint目錄。

      HDFS:SparkStreaming.checkpoint("hdfs://192.168.1.105:9090/checkpoint"),
    
  • checkpoint 會把資料保留一份在容錯的檔案系統中,一旦記憶體中的資料丟失掉;就可以直接從文
    件系統中讀取資料,不需要重新進行計算。

      JavaStreamingContext jssc = new JavaStreamingContext(
            conf, Durations.seconds(5));  
      jssc.checkpoint("hdfs://192.168.1.105:9000/streaming_checkpoint");
    

1.2 優化2:進行HA機制處理-針對Driver高可用性

  • 在建立和啟動StreamingContext的時候,將元資料寫入容錯的檔案系統(比如hdfs)。保證在driver掛掉之後,spark叢集可以自己將driver重新啟動起來;而且driver在啟動的時候,不會重新建立一個streaming context,而是從容錯檔案系統(比如hdfs)中讀取之前的元資料資訊,包括job的執行進度,繼續接著之前的進度,繼續執行。

  • 使用這種機制,就必須使用cluster模式提交,確保driver執行在某個worker上面;

      JavaStreamingContextFactory contextFactory = new JavaStreamingContextFactory() {
          @Override
         public JavaStreamingContext create() {
           	JavaStreamingContext jssc = new JavaStreamingContext(...);
          	JavaDStream<String> lines = jssc.socketTextStream(...);
          	jssc.checkpoint(checkpointDirectory);
          	return jssc;
            }
          };
     JavaStreamingContext context = JavaStreamingContext.getOrCreate(checkpointDirectory, contextFactory);
     context.start();
     context.awaitTermination();
    
  • 提交方式

      spark-submit
              --deploy-mode cluster
              --supervise
    

1.3 優化3:實現RDD高可用性:啟動WAL預寫日誌機制

  • spark streaming,從原理上來說,是通過receiver來進行資料接收的;接收到的資料,會被劃分成一個一個的block;block會被組合成一個batch;針對一個batch,會建立一個rdd;

  • receiver接收到資料後,就會立即將資料寫入一份到容錯檔案系統(比如hdfs)上的checkpoint目錄中的,另一份寫入到磁碟檔案中去;作為資料的冗餘副本。無論你的程式怎麼掛掉,或者是資料丟失,那麼資料都不肯能會永久性的丟失;因為肯定有副本。

      SparkConf conf = new SparkConf()       
          			.setMaster("local[2]")
          			.setAppName("StreamingSpark");
          			.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer");
          			.set("spark.default.parallelism", "1000");
          			.set("spark.streaming.blockInterval", "50");    
          			.set("spark.streaming.receiver.writeAheadLog.enable", "true");   
      JavaStreamingContext jssc = new JavaStreamingContext(conf, Durations.seconds(5)); 
      jssc.checkpoint("hdfs://192.168.1.164:9000/checkpoint");
    

1.4 優化4:InputDStream並行化資料接收

  • 建立多個InputDStream來接收同一資料來源

  • 把多個topic資料細化為單一的kafkaStream來接收

       1:建立kafkaStream
       Map<String, String> kafkaParams = new HashMap<String, String>();
       kafkaParams.put("metadata.broker.list", "192.168.1.164:9092,192.168.1.165:9092,192.168.1.166:9092");
       kafkaParams.put("zookeeper.connect","master:2181,data1:2181,data2:2181");
       
       構建topic set
       String kafkaTopics = ConfigurationManager.getProperty(Constants.KAFKA_TOPICS);
       String[] kafkaTopicsSplited = kafkaTopics.split(",");
    
       Set<String> topics = new HashSet<String>();
       for(String kafkaTopic : kafkaTopicsSplited) {
       	topics.add(kafkaTopic);
       	
       JavaPairInputDStream<String, String> kafkaStream = KafkaUtils.createDirectStream(
       	jssc, 
       	String.class, 
       	String.class, 
       	StringDecoder.class, 
       	StringDecoder.class, 
       	kafkaParams, 
       	topics);
    
       2:InputDStream並行化資料接收
           int numStreams = 5;
           List<JavaPairDStream<String, String>> kafkaStreams = new
           ArrayList<JavaPairDStream<String,String>>(numStreams);
           
           for (int i = 0; i < numStreams; i++) {
           	kafkaStreams.add(KafkaUtils.createStream(...));
          	 }
           JavaPairDStream<String, String> unifiedStream = 
       	streamingContext.union(kafkaStreams.get(0), kafkaStreams.subList(1, kafkaStreams.size()));
           unifiedStream.print();
    

1.5 優化5:增加block數量,增加每個batch rdd的partition數量,增加處理並行度

  • 第一步:receiver從資料來源源源不斷地獲取到資料,首先是會按照block interval,將指定時間間隔的資料,收集為一個block;預設時間是200ms,官方推薦不要小於50ms;
  • 第二步:根據指定batch interval時間間隔合併為一個batch,建立為一個rdd,
  • 第三步:啟動一個job,去處理這個batch rdd中的資料。
  • 第四步:batch rdd 的partition數量是多少呢?一個batch有多少個block,就有多少個partition;就意味著並行度是多少;就意味著每個batch rdd有多少個task會平行計算和處理。
  • 調優:如果希望可以比預設的task數量和並行度再多一些,可以手動調節blockinterval,減少block interval。每個batch可以包含更多的block。因此也就有更多的partition,因此就會有更多的task並行處理每個batch rdd。

1.6 優化6:重分割槽,增加每個batch rdd的partition數量

  • inputStream.repartition():重分割槽,增加每個batch rdd的partition數量
    對dstream中的rdd進行重分割槽為指定數量的分割槽,就可以提高指定dstream的rdd的計算並行度
  • 調節並行度具體細節:

1.7 優化7:提升並行度

      方法1: spark.default.parallelism
      方法2: reduceByKey(numPartitions)

	JavaPairDStream<String, Long> dailyUserAdClickCountDStream = dailyUserAdClickDStream.reduceByKey(
			new Function2<Long, Long, Long>() {
				private static final long serialVersionUID = 1L;
				@Override
				public Long call(Long v1, Long v2) throws Exception {
					return v1 + v2;
				}
			});
			}, 1000);

1.8 優化8:使用Kryo序列化機制

  • spark streaming:提高序列化task傳送到executor上執行的效能,比如:task很多的時候,task序列化和反序列化的效能開銷也比較可觀

  • 預設輸入資料的儲存級別是StorageLevel.MEMORY_AND_DISK_SER_2,receiver接收到資料,預設就會進行持久化操作;首先序列化資料,儲存到記憶體中;如果記憶體資源不夠大,那麼就寫入磁碟;而且,還會寫一份冗餘副本到其他executor的block manager中,進行資料冗餘。

      SparkConf conf = new SparkConf()       
      		.setMaster("local[2]")
      		.setAppName("StreamingSpark");
      		
      		.set("spark.serializer", "org.apache.spar.serializer.KryoSerializer"); <= 優化點
      		
      		.set("spark.default.parallelism", "1000");
      		.set("spark.streaming.blockInterval", "50");    
      		.set("spark.streaming.receiver.writeAheadLog.enable", "true");   
    

1.9 優化9:batch interval:流式處理時間必須小於batch interval

  • batch interval,就是指每隔多少時間收集一次資料來源中的資料,然後進行處理,因此切忌處理時間過長導致的batch interval。

      JavaStreamingContext jssc = new JavaStreamingContext(conf, Durations.seconds(5));  
    

1.10 優化10:快取需要經常使用的資料

呼叫rdd.cache()來快取資料,這樣也可以加快資料的處理,但是我們需要更多的記憶體資源

1.11 優化11:定時清除不需要的資料

  • 通過配置spark.cleaner.ttl為一個合理的值,但是這個值不能過小,因為如果後面計算需要用的資料被清除會帶來不必要的麻煩。
  • 另外通過配置spark.streaming.unpersist為true(預設就是true)來更智慧地去持久化(unpersist)RDD。這個配置使系統找出那些不需要經常保有的RDD,然後去持久化它們。這可以減少Spark RDD的記憶體使用,也可能改善垃圾回收的行為。

1.12 優化12:GC優化策略(暫時不確定)

建議用並行Mark-Sweep垃圾回收機制,雖然它消耗更多的資源,但是我們還是建議開啟。
在spark-submit中使用

--driver-java-options "-XX:+UseConcMarkSweepGC"
--conf "spark.executor.extraJavaOptions=-XX:+UseConcMarkSweepGC"

1.13 優化13:去除壓縮

在記憶體充足的情況下,可以設定spark.rdd.compress 設定為false.

1.14 優化14:Executors和cpu核心數設定和Spark On Yarn 動態資源分配

  • 首先需要對YARN的NodeManager進行配置,使其支援Spark的Shuffle Service。

       ##修改
      <property>
      <name>yarn.nodemanager.aux-services</name>
      <value>mapreduce_shuffle,spark_shuffle</value>
      </property>
      ##增加
      <property>
      <name>yarn.nodemanager.aux-services.spark_shuffle.class</name>
      <value>org.apache.spark.network.yarn.YarnShuffleService</value>
      </property>
      <property>
      <name>spark.shuffle.service.port</name>
      <value>7337</value>
      </property>
    
  • 將spark中對應jar包拷貝到hadoop的目錄下:

      首先找到spark版本的spark-<version>-yarn-shuffle.jar
      shuffle包,並將該包放到叢集所有NodeManager的classpath下,
      比如放到HADOOP_HOME/share/hadoop/yarn/lib
    
  • 重啟所有NodeManager。

  • 配置示例如下:

      spark.shuffle.service.enabled=true
      spark.dynamicAllocation.enabled=true
      spark.dynamicAllocation.executorIdleTimeout=60s
      spark.dynamicAllocation.initialExecutors=1
      spark.dynamicAllocation.maxExecutors=5
      spark.dynamicAllocation.minExecutors=0
      
      spark-submit \
      --master yarn \
      --deploy-mode cluster \
      --executor-cores 3 \
      --executor-memory 10G \
      --driver-memory 4G \
      --conf spark.dynamicAllocation.enabled=true \
      --conf spark.shuffle.service.enabled=true \
      --conf spark.dynamicAllocation.initialExecutors=5 \
      --conf spark.dynamicAllocation.maxExecutors=40 \
      --conf spark.dynamicAllocation.minExecutors=0 \
      --conf spark.dynamicAllocation.executorIdleTimeout=30s \
      --conf spark.dynamicAllocation.schedulerBacklogTimeout=10s \
    

1.15 優化15:使用高效能運算元

使用reduceByKey/aggregateByKey替代groupByKey
使用mapPartitions替代普通map
使用foreachPartitions替代foreach
使用filter之後進行coalesce操作
使用repartitionAndSortWithinPartitions替代repartition與sort類操作

2 結語

Spark Streaming是一個非常優秀的批處理程式,我已經綜合了我所有的調優經驗,當然後續有可優化的點,我會及時補充的。辛苦成文,各自珍惜。

秦凱新 20181119 0:23