1. 程式人生 > >SparkStreaming HA高可用性

SparkStreaming HA高可用性

程序 spl 計算 atop gen generated 需要 prope stat

1、UpdateStateByKey、windows等有狀態的操作時,自動進行checkpoint,必須設置checkpoint目錄,數據保留一份在容錯的文件系統中,一旦內存中的數據丟失,可以從文件系統中讀取數據,不需要重新計算。

SparkStreaming.checkpoint("hdfs://ip:port/checkpoint")

2、Driver高可用性(Java版)

第一次在創建和啟動StreamingContext的時候,那麽將持續不斷的產生實時計算的元數據並寫入檢查點,如果driver節點掛掉,那麽可以讓Spark集群自動重啟集群(必須使用yarn cluster模式,spark-submit --deploy-mode cluster --supervise

....),然後繼續運行計算程序,沒有數據丟失。

private static void testDriverHA() {

  final Streaming checkpointDir="hdfs://ip:port/checkpoint";

  JavaStreamingContextFactory contextFactory = new JavaStreamingContextFactory() {

  @Override
  public JavaStreamingContext create() {
    SparkConf conf = new SparkConf()
      .setMaster("local[2]")
      .setAppName("AdClickRealTimeStatSpark");

    JavaStreamingContext jssc = new JavaStreamingContext(
          conf, Durations.seconds(5));
    jssc.checkpoint(checkpointDir);

    Map<String, String> kafkaParams = new HashMap<String, String>();
    kafkaParams.put(Constants.KAFKA_METADATA_BROKER_LIST,
      ConfigurationManager.getProperty(Constants.KAFKA_METADATA_BROKER_LIST));
    String kafkaTopics = ConfigurationManager.getProperty(Constants.KAFKA_TOPICS);
    String[] kafkaTopicsSplited = kafkaTopics.split(",");
    Set<String> topics = new HashSet<String>();
    for(String kafkaTopic : kafkaTopicsSplited) {
      topics.add(kafkaTopic);
    }

    JavaPairInputDStream<String, String> adRealTimeLogDStream = KafkaUtils.createDirectStream(
      jssc,
      String.class,
      String.class,
      StringDecoder.class,
      StringDecoder.class,
      kafkaParams,
      topics);

    JavaPairDStream<String, String> filteredAdRealTimeLogDStream =
      filterByBlacklist(adRealTimeLogDStream);
    generateDynamicBlacklist(filteredAdRealTimeLogDStream);
    JavaPairDStream<String, Long> adRealTimeStatDStream = calculateRealTimeStat(
      filteredAdRealTimeLogDStream);
    calculateProvinceTop3Ad(adRealTimeStatDStream);
    calculateAdClickCountByWindow(adRealTimeLogDStream);
    return jssc;
    }
  };

  JavaStreamingContext context = JavaStreamingContext.getOrCreate(
  checkpointDir, contextFactory);
  context.start();
  context.awaitTermination();

}

3、實現RDD高可用性,啟動WAL預寫日誌機制

sparkStreaming從原理上說,是通過receiver來進行數據接收的,接收到時的數據,會被劃分成一個個的block,block會被組合成batch,針對一個batch,會創建一個Rdd,啟動一個job來執行定義的算子操作。receiver主要接收到數據,那麽就會立即將數據寫入一份到時容錯文件系統(比如hdfs)上的checkpoint目錄中的,一份磁盤文件中去,作為數據的冗余副本。

  SparkConf conf = new SparkConf()
    .setMaster("local[2]")
    .setAppName("AdClickRealTimeStatSpark")
    .set("spark.streaming.receiver.writeAheadLog.enable","true");

SparkStreaming HA高可用性