1. 程式人生 > >Flink 狀態管理與checkPoint資料容錯機制深入剖析-Flink牛刀小試

Flink 狀態管理與checkPoint資料容錯機制深入剖析-Flink牛刀小試

版權宣告:本套技術專欄是作者(秦凱新)平時工作的總結和昇華,通過從真實商業環境抽取案例進行總結和分享,並給出商業應用的調優建議和叢集環境容量規劃等內容,請持續關注本套部落格。版權宣告:禁止轉載,歡迎學習。QQ郵箱地址:[email protected],如有任何問題,可隨時聯絡。

1 何為狀態

  • 在批處理過程中,資料是劃分為塊分片去完成的,然後每一個Task去處理一個分片。當分片執行完成後,把輸出聚合起來就是最終的結果。在這個過程當中,對於state的需求還是比較小的。

  • 在流計算過程中,對State有非常高的要求,因為在流系統中輸入是一個無限制的流,會持續執行從不間斷。在這個過程當中,就需要將狀態資料很好的管理起來。

  • Flink的失敗恢復依賴於“檢查點機制+可部分重發的資料來源”。

  • 檢查點機制:檢查點定期觸發,產生快照,快照中記錄了(1)當前檢查點開始時資料來源(例如Kafka)中訊息的offset,(2)記錄了所有有狀態的operator當前的狀態資訊(例如sum中的數值)。

  • 可部分重發的資料來源:Flink選擇最近完成的檢查點K。然後系統重放整個分散式的資料流,然後給予每個operator他們在檢查點k快照中的狀態。資料來源被設定為從位置Sk開始重新讀取流。例如在Apache Kafka中,那意味著告訴消費者從偏移量Sk開始重新消費。

  • Flink中有兩種基本型別的State,即Keyed State和Operator State。

  • State可以被記錄,在失敗的情況下資料還可以恢復

一句話的事兒:state一般指一個具體的task/operator的狀態【state資料預設儲存在java的堆記憶體中】

2 檢查點Checkpoint 與Barrier

一句話的事兒: checkpoint【可以理解為checkpoint是把state資料持久化儲存了】,則表示了一個Flink Job在一個特定時刻的一份全域性狀態快照,即包含了所有task/operator的狀態

為了保證state的容錯性,Flink需要對state進行checkpoint。

Checkpoint是Flink實現容錯機制最核心的功能,它能夠根據配置週期性地基於Stream中各個Operator/task的狀態來生成快照,從而將這些狀態資料定期持久化儲存下來,當Flink程式一旦意外崩潰時,重新執行程式時可以有選擇地從這些快照進行恢復,從而修正因為故障帶來的程式資料異常

Flink的checkpoint機制可以與(stream和state)的持久化儲存互動的前提是: 持久化的source(如kafka),它需要支援在一定時間內重放事件。這種sources的典型例子是持久化的訊息佇列(比如Apache Kafka,RabbitMQ等)或檔案系統(比如HDFS,S3,GFS等) 用於state的持久化儲存,例如分散式檔案系統(比如HDFS,S3,GFS等)

Flink的檢查點機制實現了標準的Chandy-Lamport演算法,並用來實現分散式快照。在分散式快照當中,有一個核心的元素:Barrier。

  • 單流的barrier:

    1: 屏障作為資料流的一部分隨著記錄被注入到資料流中。屏障永遠不會趕超通常的流記錄,它會嚴格遵循順序。

    2: 屏障將資料流中的記錄隔離成一系列的記錄集合,並將一些集合中的資料加入到當前的快照中,而另一些資料加入到下一個快照中。

    3: 每一個屏障攜帶著快照的ID,快照記錄著ID並且將其放在快照資料的前面。

    4: 屏障不會中斷流處理,因此非常輕量級。

  • 並行barrier

    1:不止一個輸入流的時的operator,需要在快照屏障上對齊(align)輸入流,才會發射出去。 2:可以看到1,2,3會一直放在Input buffer,直到另一個輸入流的快照到達Operator。

3 有狀態的Operator工作一覽圖

Stateful Flink applications are optimized for local state access. Task state 
is always maintained in memory or, if the state size exceeds the available memory,
in access-efficient on-disk data structures. Hence, tasks perform all computations 
by accessing local, often in-memory, state yielding very low processing latencies.
Flink guarantees exactly-once state consistency in case of failures by periodically 
and asynchronously checkpointing the local state to durable storage.
複製程式碼

4 狀態管理

4.1 原始狀態與託管狀態

Keyed State和Operator State,可以以兩種形式存在:

  • 原始狀態(raw state)

  • 託管狀態(managed state)

  • 託管狀態是由Flink框架管理的狀態

  • 原始狀態,由使用者自行管理狀態具體的資料結構,框架在做checkpoint的時候,使用byte[]來讀寫狀態內容,對其內部資料結構一無所知。

  • 通常在DataStream上的狀態推薦使用託管的狀態。

  • 當實現一個使用者自定義的operator時,會使用到原始狀態

4.2 State-Keyed State 是什麼?直接上乾貨。(兄弟 State-Operator State)

  • 顧名思義,就是基於KeyedStream上的狀態。這個狀態是跟特定的key繫結的,對KeyedStream流上的每一個key,都對應一個state。 stream.keyBy(…)

  • state的資料結構;

    (1) ValueState:即型別為T的單值狀態。這個狀態與對應的key繫結,是最簡單的狀態了。它可以通過update方法更新狀態值,通過value()方法獲取狀態值

    (2) ListState:即key上的狀態值為一個列表。可以通過add方法往列表中附加值;也可以通過get()方法返回一個Iterable來遍歷狀態值

    (3) ReducingState:這種狀態通過使用者傳入的reduceFunction,每次呼叫add方法新增值的時候,會呼叫reduceFunction,最後合併到一個單一的狀態值

    (4) MapState<UK, UV>:即狀態值為一個map。使用者通過put或putAll方法新增元素

  • 需要注意的是,以上所述的State物件,僅僅用於與狀態進行互動(更新、刪除、清空等),而真正的狀態值,有可能是存在記憶體、磁碟、或者其他分散式儲存系統中。相當於我們只是持有了這個狀態的控制代碼。實際上:這些狀態有三種儲存方式:

      MemoryStateBackend:
      FsStateBackend
      RockDBStateBackend
    複製程式碼

4.3 State-Keyed State 儲存方式?直接上乾貨

  • MemoryStateBackend

    state資料儲存在java堆記憶體中,執行checkpoint的時候,會把state的快照資料儲存到jobmanager的記憶體中 基於記憶體的state backend在生產環境下不建議使用。

  • FsStateBackend

    state資料儲存在taskmanager的記憶體中,執行checkpoint的時候,會把state的快照資料儲存到配置的檔案系統中,可以使用hdfs等分散式檔案系統。

  • RocksDBStateBackend

    RocksDB跟上面的都略有不同,它會在本地檔案系統中維護狀態,state會直接寫入本地rocksdb中。同時RocksDB需要配置一個遠端的filesystem。

    uri(一般是HDFS),在做checkpoint的時候,會把本地的資料直接複製到filesystem中。fail over的時候從filesystem中恢復到本地。

    RocksDB克服了state受記憶體限制的缺點,同時又能夠持久化到遠端檔案系統中,比較適合在生產中使用

4.4 State 生成快照

4.5 State 快照恢復

5 與Key相關的狀態管理案例實戰

5.1 RichFlatMapFunction 核心程式碼奉上

package xuwei.tech.streaming;
import org.apache.flink.api.common.functions.RichFlatMapFunction;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.util.Collector;

/**
 * qinkaixin 2018 11 24 
 */
public class CountWindowAverage extends RichFlatMapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>> {


    /**
     * The ValueState handle. The first field is the count, the second field a running sum.
     */
    private transient ValueState<Tuple2<Long, Long>> sum;

    @Override
    public void flatMap(Tuple2<Long, Long> input, Collector<Tuple2<Long, Long>> out) throws Exception {

        // access the state value
        Tuple2<Long, Long> currentSum = sum.value();

        // update the count
        currentSum.f0 += 1;

        // add the second field of the input value
        currentSum.f1 += input.f1;

        // update the state
        sum.update(currentSum);

        // if the count reaches 2, emit the average and clear the state
        if (currentSum.f0 >= 2) {
            out.collect(new Tuple2<>(input.f0, currentSum.f1 / currentSum.f0));
            sum.clear();
        }

    }

    @Override
    public void open(Configuration config) {
        ValueStateDescriptor<Tuple2<Long, Long>> descriptor =
                new ValueStateDescriptor<>(
                        "average", 
                        TypeInformation.of(new TypeHint<Tuple2<Long, Long>>() {}), 
                        Tuple2.of(0L, 0L)); // default value of the state, if nothing was set
        sum = getRuntimeContext().getState(descriptor);
    }
}
複製程式碼

5.2 RichFlatMapFunction 執行操作

public static void main(String[] args) throws Exception{
    //獲取Flink的執行環境
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    env.fromElements(Tuple2.of(1L, 3L), Tuple2.of(1L, 5L), Tuple2.of(1L, 7L), Tuple2.of(1L, 4L), Tuple2.of(1L, 2L))
            .keyBy(0)
            .flatMap(new CountWindowAverage())
            .print();
    env.execute("StafulOperator");
    System.out.println("***********");
}
複製程式碼

5.3 最終結果為什麼是這樣的?

  • if the count reaches 2, emit the average and clear the state
  • 所以Tuple2.of(1L, 3L), Tuple2.of(1L, 5L), Tuple2.of(1L, 7L)一組
  • 所以Tuple2.of(1L, 4L), Tuple2.of(1L, 2L)一組

6 與Operator相關的State案例實戰

  • 與Key無關的State,與Operator繫結的state,整個operator只對應一個state

  • 儲存Operator state的資料結構為ListState

  • 舉例來說,Flink中的Kafka Connector,就使用了operator state。它會在每個connector例項中,儲存該例項中消費topic的所有(partition, offset)對映

  • 繼承CheckpointedFunction,實現snapshotState和restoreState。

      To use managed operator state, a stateful function can implement either 
      the more general CheckpointedFunction interface, or the 
      ListCheckpointed<T extends Serializable> interface.
    
      Whenever a checkpoint has to be performed, snapshotState() is called. The 
      counterpart,initializeState(), is called every time the user-defined function 
      is initialized, be that when the function is first initialized or be that when the function is actuallyrecovering from an earlier checkpoint. Given this,
      initializeState() is not only the place where different types of state are
      initialized, but also where state recovery
      logic is included.
    複製程式碼

6.1 BufferingSink案例

    public class BufferingSink implements SinkFunction<Tuple2<String, Integer>>,
                   CheckpointedFunction {

    private final int threshold;

    private transient ListState<Tuple2<String, Integer>> checkpointedState;

    private List<Tuple2<String, Integer>> bufferedElements;

    public BufferingSink(int threshold) {
        this.threshold = threshold;
        this.bufferedElements = new ArrayList<>();
    }

    @Override
    public void invoke(Tuple2<String, Integer> value) throws Exception {
        bufferedElements.add(value);
        if (bufferedElements.size() == threshold) {
            for (Tuple2<String, Integer> element: bufferedElements) {
                // send it to the sink
            }
            bufferedElements.clear();
        }
    }

    @Override
    public void snapshotState(FunctionSnapshotContext context) throws Exception {
        checkpointedState.clear();
        for (Tuple2<String, Integer> element : bufferedElements) {
            checkpointedState.add(element);
        }
    }

    @Override
    public void initializeState(FunctionInitializationContext context) throws Exception {
        ListStateDescriptor<Tuple2<String, Integer>> descriptor =
            new ListStateDescriptor<>(
                "buffered-elements",
                TypeInformation.of(new TypeHint<Tuple2<String, Integer>>() {}));

        checkpointedState = context.getOperatorStateStore().getListState(descriptor);

        if (context.isRestored()) {
            for (Tuple2<String, Integer> element : checkpointedState.get()) {
                bufferedElements.add(element);
            }
        }
    }
}
複製程式碼

6.2 Stateful Source案例

    public static class CounterSource  extends RichParallelSourceFunction<Long>
        implements ListCheckpointed<Long> {

    /**  current offset for exactly once semantics */
    private Long offset;

    /** flag for job cancellation */
    private volatile boolean isRunning = true;

    @Override
    public void run(SourceContext<Long> ctx) {
        final Object lock = ctx.getCheckpointLock();

        while (isRunning) {
            // output and state update are atomic
            synchronized (lock) {
                ctx.collect(offset);
                offset += 1;
            }
        }
    }

    @Override
    public void cancel() {
        isRunning = false;
    }

    @Override
    public List<Long> snapshotState(long checkpointId, long checkpointTimestamp) {
        return Collections.singletonList(offset);
    }

    @Override
    public void restoreState(List<Long> state) {
        for (Long s : state)
            offset = s;
    }
}
複製程式碼

7 checkPoint的配置進一步昇華

7.1 checkpoint 開關

  • 預設checkpoint功能是disabled的,想要使用的時候需要先啟用
  • checkpoint開啟之後,預設的checkPointMode是Exactly-once
  • checkpoint的checkPointMode有兩種,Exactly-once和At-least-once
  • Exactly-once對於大多數應用來說是最合適的。At-least-once可能用在某些延遲超低的應用程式(始終延遲為幾毫秒)

7.1 checkpoint 調優配置(Cancel處理很有意思)

  • 預設checkpoint功能是disabled的,想要使用的時候需要先啟用

      StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    
      // 每隔1000 ms進行啟動一個檢查點【設定checkpoint的週期】
      env.enableCheckpointing(1000);
      // 高階選項:
      // 設定模式為exactly-once (這是預設值)
      env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
      // 確保檢查點之間有至少500 ms的間隔【checkpoint最小間隔】
      env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500);
      // 檢查點必須在一分鐘內完成,或者被丟棄【checkpoint的超時時間】
      env.getCheckpointConfig().setCheckpointTimeout(60000);
      // 同一時間只允許進行一個檢查點
      env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
      // 表示一旦Flink處理程式被cancel後,會保留Checkpoint資料,以便根據實際需要恢復到指定的Checkpoint【詳細解釋見備註】
      env.getCheckpointConfig().enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
      
      cancel處理選項:
      (1)ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION:
      表示一旦Flink處理程式被cancel後,會保留Checkpoint資料,以便根據實際需要恢復到指定
      的Checkpoint
      
      (2)ExternalizedCheckpointCleanup.DELETE_ON_CANCELLATION:
      表示一旦Flink處理程式被cancel後,會刪除Checkpoint資料,只有job執行失敗的時候才會
      儲存checkpoint
    複製程式碼

8 State Backend 狀態的後端儲存(一劍封喉)

8.1 配置說明

修改State Backend的兩種方式

  • 第一種:單任務調整

      修改當前任務程式碼
      env.setStateBackend(new FsStateBackend("hdfs://namenode:9000/flink/checkpoints"));
      或者new MemoryStateBackend()
      或者new RocksDBStateBackend( hdfs->url, true);【需要新增第三方依賴】
    複製程式碼
  • 第二種:全域性調整

      修改flink-conf.yaml
      state.backend: filesystem
      state.checkpoints.dir: hdfs://namenode:9000/flink/checkpoints
      注意:state.backend的值可以是下面幾種:jobmanager(MemoryStateBackend), filesystem(FsStateBackend), rocksdb(RocksDBStateBackend)
    複製程式碼

8.2 精彩案例實戰

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.contrib.streaming.state.RocksDBStateBackend;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.runtime.state.memory.MemoryStateBackend;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector;


public class SocketWindowWordCountJavaCheckPoint {

    public static void main(String[] args) throws Exception{
        //獲取需要的埠號
        int port;
        try {
            ParameterTool parameterTool = ParameterTool.fromArgs(args);
            port = parameterTool.getInt("port");
        }catch (Exception e){
            System.err.println("No port set. use default port 9000--java");
            port = 9010;
        }

        //獲取flink的執行環境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // 每隔1000 ms進行啟動一個檢查點【設定checkpoint的週期】
        env.enableCheckpointing(1000);
        
        // 高階選項:
        // 設定模式為exactly-once (這是預設值)
        env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
        
        // 確保檢查點之間有至少500 ms的間隔【checkpoint最小間隔】
        env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500);
        
        // 檢查點必須在一分鐘內完成,或者被丟棄【checkpoint的超時時間】
        env.getCheckpointConfig().setCheckpointTimeout(60000);
        
        // 同一時間只允許進行一個檢查點
        env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
        
        // 表示一旦Flink處理程式被cancel後,會保留Checkpoint資料,以便根據實際需要恢復到指定的Checkpoint【詳細解釋見備註】
        //ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION:表示一旦Flink處理程式被cancel後,會保留Checkpoint資料,以便根據實際需要恢復到指定的Checkpoint
        //ExternalizedCheckpointCleanup.DELETE_ON_CANCELLATION: 表示一旦Flink處理程式被cancel後,會刪除Checkpoint資料,只有job執行失敗的時候才會儲存checkpoint
        
        env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);


        //設定statebackend

        //env.setStateBackend(new MemoryStateBackend());
        //env.setStateBackend(new FsStateBackend("hdfs://hadoop100:9000/flink/checkpoints"));
        //env.setStateBackend(new RocksDBStateBackend("hdfs://hadoop100:9000/flink/checkpoints",true));

        String hostname = "SparkMaster";
        String delimiter = "\n";
        //連線socket獲取輸入的資料
        DataStreamSource<String> text = env.socketTextStream(hostname, port, delimiter);

        // a a c

        // a 1
        // a 1
        // c 1
        DataStream<WordWithCount> windowCounts = text.flatMap(new FlatMapFunction<String, WordWithCount>() {
            public void flatMap(String value, Collector<WordWithCount> out) throws Exception {
                String[] splits = value.split("\\s");
                for (String word : splits) {
                    out.collect(new WordWithCount(word, 1L));
                }
            }
        }).keyBy("word")
                .timeWindow(Time.seconds(2), Time.seconds(1))//指定時間視窗大小為2秒,指定時間間隔為1秒
                .sum("count");//在這裡使用sum或者reduce都可以
                /*.reduce(new ReduceFunction<WordWithCount>() {
                                    public WordWithCount reduce(WordWithCount a, WordWithCount b) throws Exception {

                                        return new WordWithCount(a.word,a.count+b.count);
                                    }
                                })*/
        //把資料列印到控制檯並且設定並行度
        windowCounts.print().setParallelism(1);

        //這一行程式碼一定要實現,否則程式不執行
        env.execute("Socket window count");

    }

    public static class WordWithCount{
        public String word;
        public long count;
        public  WordWithCount(){}
        public WordWithCount(String word,long count){
            this.word = word;
            this.count = count;
        }
        @Override
        public String toString() {
            return "作者 : 秦凱新 , 窗大小2秒,滑動1秒       {" +
                    " word='" + word + '\'' +
                    ", count=" + count +
                    '}';
        }
    }
}
複製程式碼

8.3 精彩案例結果

9 華山論劍結束

這裡圍繞狀態管理進行了詳細的說明。一篇好文不容易,請發表你的評論,給予作者以肯定,謝謝。後續更精彩!

10 結語

版權宣告:本套技術專欄是作者(秦凱新)平時工作的總結和昇華,通過從真實商業環境抽取案例進行總結和分享,並給出商業應用的調優建議和叢集環境容量規劃等內容,請持續關注本套部落格。版權宣告:禁止轉載,歡迎學習。QQ郵箱地址:[email protected],如有任何問題,可隨時聯絡。

秦凱新 於深圳 20182136