1. 程式人生 > >聊聊flink的ListCheckpointed

聊聊flink的ListCheckpointed

本文主要研究一下flink的ListCheckpointed

例項

public static class CounterSource
        extends RichParallelSourceFunction<Long>
        implements ListCheckpointed<Long> {

    /**  current offset for exactly once semantics */
    private Long offset;

    /** flag for job cancellation */
    private volatile boolean isRunning = true;

    @Override
    public void run(SourceContext<Long> ctx) {
        final Object lock = ctx.getCheckpointLock();

        while (isRunning) {
            // output and state update are atomic
            synchronized (lock) {
                ctx.collect(offset);
                offset += 1;
            }
        }
    }

    @Override
    public void cancel() {
        isRunning = false;
    }

    @Override
    public List<Long> snapshotState(long checkpointId, long checkpointTimestamp) {
        return Collections.singletonList(offset);
    }

    @Override
    public void restoreState(List<Long> state) {
        for (Long s : state)
            offset = s;
    }
}
  • CounterSource是一個有狀態的RichParallelSourceFunction,它實現了ListCheckpointed介面,snapshotState方法返回了當前的offset,而restoreState方法則根據傳入的state來恢復本地的offset;這裡要注意,如果要在failure或者recovery的時候達到exactly-once的語義,這裡更新offset的時候要使用SourceContext.getCheckpointLock來進行同步操作

ListCheckpointed

flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/checkpoint/ListCheckpointed.java

@PublicEvolving
public interface ListCheckpointed<T extends Serializable> {

	/**
	 * Gets the current state of the function. The state must reflect the result of all prior
	 * invocations to this function.
	 *
	 * <p>The returned list should contain one entry for redistributable unit of state. See
	 * the {@link ListCheckpointed class docs} for an illustration how list-style state
	 * redistribution works.
	 *
	 * <p>As special case, the returned list may be null or empty (if the operator has no state)
	 * or it may contain a single element (if the operator state is indivisible).
	 *
	 * @param checkpointId The ID of the checkpoint - a unique and monotonously increasing value.
	 * @param timestamp The wall clock timestamp when the checkpoint was triggered by the master.
	 *
	 * @return The operator state in a list of redistributable, atomic sub-states.
	 *         Should not return null, but empty list instead.
	 *
	 * @throws Exception Thrown if the creation of the state object failed. This causes the
	 *                   checkpoint to fail. The system may decide to fail the operation (and trigger
	 *                   recovery), or to discard this checkpoint attempt and to continue running
	 *                   and to try again with the next checkpoint attempt.
	 */
	List<T> snapshotState(long checkpointId, long timestamp) throws Exception;

	/**
	 * Restores the state of the function or operator to that of a previous checkpoint.
	 * This method is invoked when the function is executed after a failure recovery.
	 * The state list may be empty if no state is to be recovered by the particular parallel instance
	 * of the function.
	 *
	 * <p>The given state list will contain all the <i>sub states</i> that this parallel
	 * instance of the function needs to handle. Refer to the  {@link ListCheckpointed class docs}
	 * for an illustration how list-style state redistribution works.
	 *
	 * <p><b>Important:</b> When implementing this interface together with {@link RichFunction},
	 * then the {@code restoreState()} method is called before {@link RichFunction#open(Configuration)}.
	 *
	 * @param state The state to be restored as a list of atomic sub-states.
	 *
	 * @throws Exception Throwing an exception in this method causes the recovery to fail.
	 *                   The exact consequence depends on the configured failure handling strategy,
	 *                   but typically the system will re-attempt the recovery, or try recovering
	 *                   from a different checkpoint.
	 */
	void restoreState(List<T> state) throws Exception;
}
  • ListCheckpointed定義了兩個介面,一個是snapshotState方法,一個是restoreState方法
  • snapshotState方法,方法有個checkpointId引數,是唯一單調遞增的數字,而timestamp則是master觸發checkpoint的時間戳,該方法要返回當前的state(List結構)
  • restoreState方法會在failure recovery的時候被呼叫,傳遞的引數為List型別的state,方法裡頭可以將state恢復到本地

小結

  • stateful function可以通過CheckpointedFunction介面或者ListCheckpointed介面來使用managed operator state;對於manageed operator state,目前僅僅支援list-style的形式,即要求state是serializable objects的List結構,方便在rescale的時候進行redistributed;關於redistribution schemes的模式目前有兩種,分別是Even-split redistribution(在restore/redistribution的時候每個operator僅僅得到整個state的sublist)及Union redistribution(在restore/redistribution的時候每個operator得到整個state的完整list)
  • ListCheckpointed是CheckpointedFunction的限制版,它只能支援Even-split redistribution模式的list-style state
  • ListCheckpointed定義了兩個方法,分別是snapshotState方法及restoreState方法;snapshotState方法在master觸發checkpoint的時候被呼叫,使用者需要返回當前的狀態,而restoreState方法會在failure recovery的時候被呼叫,傳遞的引數為List型別的state,方法裡頭可以將state恢復到本地

doc