1. 程式人生 > >解讀Flink中輕量級的非同步快照機制--Flink 1.2 原始碼

解讀Flink中輕量級的非同步快照機制--Flink 1.2 原始碼

上一篇文章中,對於ABS演算法,其實現主要通過checkpoint的barrier的阻塞與釋放來實現。

本片重點關注ABS在Flink 1.2中原始碼的實現。

1、CheckpointBarrierHandler

此介面位於org.apache.flink.streaming.runtime.io中,管理從input channel獲取的barrier的資訊。它提供瞭如下幾種方法:

public interface CheckpointBarrierHandler {

    BufferOrEvent getNextNonBlocked() throws Exception;

    void
registerCheckpointEventHandler(StatefulTask task); void cleanup() throws IOException; boolean isEmpty(); long getAlignmentDurationNanos(); }

其中關於barrier的阻塞與釋放,主要在getNextNonBlocked() 中實現。

根據CheckpointingMode的不同,Flink提供了2種不同的檢查點模式:

1、Exactly once
2、At least once

其中預設的模式是EXACTLY_ONCE。

對應這兩種不同的模式,Flink提供了2種不同的實現類:

1、BarrierBuffer類(對應於Exactly Once)
2、BarrierTracker類(對應於At Least Once)

這裡寫圖片描述

由於論文中重點強調input channel的阻塞,即對於Exactly Once的實現,因此我們這裡也重點關注程式碼中BarrierBuffer類的實現。

2、BarrierBuffer類

我們先回顧一下上一篇論文中關於此演算法的偽碼:

這裡寫圖片描述

其核心就是一個input channel收到barrier,立刻阻塞,然後判斷是否收到所有input channel的barrier,如果全部收到,則廣播出barrier,觸發此task的檢查點,並對阻塞的channel釋放鎖。

實際上,為了防止輸入流的背壓(back-pressuring),BarrierBuffer並不是真正的阻塞這個流,而是將此channel中,barrier之後資料通過一個BufferSpiller來buffer起來,當channel的鎖釋放後,再從buffer讀回這些資料,繼續處理。

下面我們看看這個類的具體實現:

public class BarrierBuffer implements CheckpointBarrierHandler {

    private static final Logger LOG = LoggerFactory.getLogger(BarrierBuffer.class);

    /** The gate that the buffer draws its input from */
    private final InputGate inputGate; //一個task對應一個InputGate,代表input的資料集合(可能來自不同的input channel)

    /** Flags that indicate whether a channel is currently blocked/buffered */
    private final boolean[] blockedChannels; // 標記每個input channel是否被阻塞(或者叫被buffer)

    /** The total number of channels that this buffer handles data from */
    private final int totalNumberOfInputChannels; // input channel的數量,可通過InputGate獲得

    /** To utility to write blocked data to a file channel */
    private final BufferSpiller bufferSpiller; // 將被阻塞的input channel的資料寫到buffer

    /** The pending blocked buffer/event sequences. Must be consumed before requesting
     * further data from the input gate. */
    private final ArrayDeque<BufferSpiller.SpilledBufferOrEventSequence> queuedBuffered; // barrier到達時,此operator中在之前buffered的資料要消費掉

    /** The maximum number of bytes that may be buffered before an alignment is broken. -1 means unlimited */
    private final long maxBufferedBytes; // 最多允許buffer的位元組數,-1代表無限制

    /** The sequence of buffers/events that has been unblocked and must now be consumed
     * before requesting further data from the input gate */
    private BufferSpiller.SpilledBufferOrEventSequence currentBuffered; // 已經buffer的資料

    /** Handler that receives the checkpoint notifications */
    private StatefulTask toNotifyOnCheckpoint; // 通知檢查點進行

    /** The ID of the checkpoint for which we expect barriers */
    private long currentCheckpointId = -1L; // 當前檢查點ID

    /** The number of received barriers (= number of blocked/buffered channels)
     * IMPORTANT: A canceled checkpoint must always have 0 barriers */
    private int numBarriersReceived; // 接收到的barrier的數量,這個值最終要等於buffered channel的數量。當一個檢查點被cancel時,此值為0

    /** The number of already closed channels */
    private int numClosedChannels; // 已經關閉的channel的數量

    /** The number of bytes in the queued spilled sequences */
    private long numQueuedBytes; // spill到佇列中的資料的位元組數

    /** The timestamp as in {@link System#nanoTime()} at which the last alignment started */
    private long startOfAlignmentTimestamp; // 上一次對齊開始時的時間戳

    /** The time (in nanoseconds) that the latest alignment took */
    private long latestAlignmentDurationNanos; // 最近一次對齊持續的時間

    /** Flag to indicate whether we have drawn all available input */
    private boolean endOfStream; // 標記是否流結束(所有的input已經收到barrier,標記檢查點完成)

    /**
     * Creates a new checkpoint stream aligner.
     * 
     * <p>There is no limit to how much data may be buffered during an alignment.
     * 
     * @param inputGate The input gate to draw the buffers and events from.
     * @param ioManager The I/O manager that gives access to the temp directories.
     *
     * @throws IOException Thrown, when the spilling to temp files cannot be initialized.
     */
    public BarrierBuffer(InputGate inputGate, IOManager ioManager) throws IOException {
        this (inputGate, ioManager, -1);
    }

    /**
     * Creates a new checkpoint stream aligner.
     * 
     * <p>The aligner will allow only alignments that buffer up to the given number of bytes.
     * When that number is exceeded, it will stop the alignment and notify the task that the
     * checkpoint has been cancelled.
     * 
     * @param inputGate The input gate to draw the buffers and events from.
     * @param ioManager The I/O manager that gives access to the temp directories.
     * @param maxBufferedBytes The maximum bytes to be buffered before the checkpoint aborts.
     * 
     * @throws IOException Thrown, when the spilling to temp files cannot be initialized.
     */
    public BarrierBuffer(InputGate inputGate, IOManager ioManager, long maxBufferedBytes) throws IOException {
        checkArgument(maxBufferedBytes == -1 || maxBufferedBytes > 0);

        this.inputGate = inputGate;
        this.maxBufferedBytes = maxBufferedBytes;
        this.totalNumberOfInputChannels = inputGate.getNumberOfInputChannels();
        this.blockedChannels = new boolean[this.totalNumberOfInputChannels];

        this.bufferSpiller = new BufferSpiller(ioManager, inputGate.getPageSize());
        this.queuedBuffered = new ArrayDeque<BufferSpiller.SpilledBufferOrEventSequence>();
    }

其構造方法中傳入InputGate引數,每個task都會對應有一個InputGate,目的是專門處理流入到此task中的所有的輸入資訊,這些輸入可能來自多個partition。

這裡寫圖片描述

我們再看看BarrierBuffer中最重要的方法:getNextNonBlocked。

getNextNonBlocked

// ------------------------------------------------------------------------
//  Buffer and barrier handling
// ------------------------------------------------------------------------

@Override
    public BufferOrEvent getNextNonBlocked() throws Exception {
        while (true) {
            // process buffered BufferOrEvents before grabbing new ones
            BufferOrEvent next; // buffer代表資料,event代表事件,例如barrier就是個事件
            if (currentBuffered == null) {
                next = inputGate.getNextBufferOrEvent();// 如果已經buffer的資料為空,則直接從inputGate中獲取下一個BufferOrEvent
            }
            else {
                next = currentBuffered.getNext(); // 否則,從currentBuffered的佇列中拿到下一個BufferOrEvent
                if (next == null) { // 如果next為空,說明已經buffer的資料被處理完了
                    completeBufferedSequence(); // 清空currentBuffered,然後繼續處理queuedBuffered中的資料
                    return getNextNonBlocked(); // 遞迴呼叫,此時currentBuffered如果為null,則queuedBuffered也為null;否則如果currentBuffered不為null,說明還要繼續處理queuedBuffere中的資料
                }
            }

            if (next != null) {
                if (isBlocked(next.getChannelIndex())) { //如果這個channel還是被阻塞,則繼續把這條record新增到buffer中
                    // if the channel is blocked we, we just store the BufferOrEvent
                    bufferSpiller.add(next);
                    checkSizeLimit();
                }
                else if (next.isBuffer()) {//否則如果這個channel不再被阻塞,且下一條記錄是資料,則返回此資料
                    return next;
                }
                else if (next.getEvent().getClass() == CheckpointBarrier.class) { // 如果下一個是Barrier,且流沒有結束,則說明這個channel收到了barrier了
                    if (!endOfStream) {
                        // process barriers only if there is a chance of the checkpoint completing
                        processBarrier((CheckpointBarrier) next.getEvent(), next.getChannelIndex()); // 此時,進行processBarrier處理
                    }
                }
                else if (next.getEvent().getClass() == CancelCheckpointMarker.class) { // 如果下一個是帶有cancel標記的barrier,則進行processCancellationBarrier處理
                    processCancellationBarrier((CancelCheckpointMarker) next.getEvent());
                }
                else {
                    if (next.getEvent().getClass() == EndOfPartitionEvent.class) { // 如果此partition的資料全部消費完
                        processEndOfPartition(); // 增加numClosedChannels的值,且將此channel解鎖
                    }
                    return next;
                }
            }
            else if (!endOfStream) { // 如果next為null且不是stream的終點,則置為終點,且釋放所有channel的鎖,重置初始值
                // end of input stream. stream continues with the buffered data
                endOfStream = true;
                releaseBlocksAndResetBarriers();
                return getNextNonBlocked();
            }
            else {
                // final end of both input and buffered data
                return null;
            }
        }
    }

這個方法中,當收到barrier後,立刻進行processBarrier()的處理,這也是其核心所在。

processBarrier

private void processBarrier(CheckpointBarrier receivedBarrier, int channelIndex) throws Exception {
        final long barrierId = receivedBarrier.getId();

        // fast path for single channel cases
        if (totalNumberOfInputChannels == 1) { // 如果總共的channel數量只有1,此時說明這個operator只有一個input
            if (barrierId > currentCheckpointId) { //如果這個barrierId大於當前的檢查點ID,則說明這個barrier是一個新的barrier
                // new checkpoint
                currentCheckpointId = barrierId;//將這個barrierId賦給當前的檢查點ID
                notifyCheckpoint(receivedBarrier); //觸發檢查點
            }
            return;
        }

        // -- general code path for multiple input channels --

        if (numBarriersReceived > 0) { //如果已經收到過barrier
            // this is only true if some alignment is already progress and was not canceled

            if (barrierId == currentCheckpointId) { // 判斷此barrierId與當前的檢查點ID是否一致
                // regular case
                onBarrier(channelIndex); // 如果一直,則阻塞此channel
            }
            else if (barrierId > currentCheckpointId) { // 如果barrierId大於當前的檢查點ID,則說明當前的檢查點過期了,跳過當前的檢查點
                // we did not complete the current checkpoint, another started before
                LOG.warn("Received checkpoint barrier for checkpoint {} before completing current checkpoint {}. " +
                        "Skipping current checkpoint.", barrierId, currentCheckpointId);

                // let the task know we are not completing this
                notifyAbort(currentCheckpointId, new CheckpointDeclineSubsumedException(barrierId));// 通知task終止當前的檢查點

                // abort the current checkpoint
                releaseBlocksAndResetBarriers();// 釋放所有channel的鎖

                // begin a the new checkpoint
                beginNewAlignment(barrierId, channelIndex);// 根據barrierId,開始新的檢查點
            }
            else {
                // ignore trailing barrier from an earlier checkpoint (obsolete now)
                return;
            }
        }
        else if (barrierId > currentCheckpointId) { // 如果第一次收到的barrierID大於當前的檢查點ID,說明是一個新的barrier
            // first barrier of a new checkpoint
            beginNewAlignment(barrierId, channelIndex);// 根據barrierId,開始新的檢查點
        }
        else {
            // either the current checkpoint was canceled (numBarriers == 0) or
            // this barrier is from an old subsumed checkpoint
            return;
        }

        // check if we have all barriers - since canceled checkpoints always have zero barriers
        // this can only happen on a non canceled checkpoint
        if (numBarriersReceived + numClosedChannels == totalNumberOfInputChannels) { //如果收到所有channel的barrier,說明走到了
            // actually trigger checkpoint
            if (LOG.isDebugEnabled()) {
                LOG.debug("Received all barriers, triggering checkpoint {} at {}",
                        receivedBarrier.getId(), receivedBarrier.getTimestamp());
            }

            releaseBlocksAndResetBarriers(); // 釋放所有channel的鎖
            notifyCheckpoint(receivedBarrier);// 觸發檢查點
        }
    }

Flink 1.2中有個變化就是判斷當前的operator是否只有一個input channel且收到了最新的barrier,如果是,則開通一個綠色通道,直接進行檢查點:notifyCheckpoint。

否則如果有多個input channel(totalNumberOfInputChannels是通過InputGate獲得),則只有當收到所有input channel的最新的barrier後,才開始進行檢查點:notifyCheckpoint,否則就要先阻塞該input channel,實際上是buffer起來後續的資料。

notifyCheckpoint

private void notifyCheckpoint(CheckpointBarrier checkpointBarrier) throws Exception {
        if (toNotifyOnCheckpoint != null) {
            CheckpointMetaData checkpointMetaData =
                    new CheckpointMetaData(checkpointBarrier.getId(), checkpointBarrier.getTimestamp());

            long bytesBuffered = currentBuffered != null ? currentBuffered.size() : 0L;

            checkpointMetaData
                    .setBytesBufferedInAlignment(bytesBuffered)
                    .setAlignmentDurationNanos(latestAlignmentDurationNanos);

            toNotifyOnCheckpoint.triggerCheckpointOnBarrier(checkpointMetaData);
        }
    }

toNotifyOnCheckpoint是個StatefulTask介面,管理每個task接收檢查點的通知,其triggerCheckpoint方法是真正的實現。

webUI中對checkpoint的部分增加了很多的元資料資訊,包括檢查點的詳細資訊:

這裡寫圖片描述

這裡寫圖片描述

這裡寫圖片描述

包括每個checkpoint中state的大小,檢查點的狀態,完成的時間以及持續的時間。並且對每一個檢查點,可以額看到每一個subtask的詳細資訊。這點對於檢查點的管理、監控以及對state的調整都起到了積極的作用。

4、總結

ABS在Flink中預設是Exactly Once,需要對齊,對齊的演算法就是阻塞+解除。阻塞和解除阻塞都有各自的判斷依據。

相關推薦

解讀Flink輕量級非同步快照機制--Flink 1.2 原始碼

上一篇文章中,對於ABS演算法,其實現主要通過checkpoint的barrier的阻塞與釋放來實現。 本片重點關注ABS在Flink 1.2中原始碼的實現。 1、CheckpointBarrierHandler 此介面位於org.apache.fli

FlinkPeriodic水印和Punctuated水印實現原理(原始碼分析)

在使用者程式碼中,我們設定生成水印和事件時間的方法assignTimestampsAndWatermarks()中這裡有個方法的過載 我們傳入的物件分為兩種 AssignerWithPunctuatedWatermarks(可以理解為每條資料都會產生水印,如果不想產生水印,返回一個null的水印) Assig

Atitit 持久化 Persistence概念的藝術 目錄 1. 持久化是將程式資料在持久狀態和瞬時狀態間轉換的機制1 2. DBC就是一種持久化機制。檔案IO也是一種持久化機制2 3.

Atitit 持久化 Persistence概念的藝術   目錄 1. 持久化是將程式資料在持久狀態和瞬時狀態間轉換的機制。 1 2. DBC就是一種持久化機制。檔案IO也是一種持久化機制。 2 3. 日常持久化的方法 2 4. 理解與分類 3 4.1

Flink接收端反壓以及Credit機制 (原始碼分析)

先上一張圖整體瞭解Flink中的反壓            可以看到每個task都會有自己對應的IG(inputgate)對接上游傳送過來的資料和RS(resultPatation)對接往下游傳送資料, 整個反壓機制通過inputgat

Flink傳送端反壓以及Credit機制(原始碼分析)

上一篇《Flink接收端反壓機制》說到因為Flink每個Task的接收端和傳送端是共享一個bufferPool的,形成了天然的反壓機制,當Task接收資料的時候,接收端會根據積壓的資料量以及可用的buffer數量(可用的memorySegment數)來決定是否向上遊傳送Credit(簡而言之就是當我還有空間的

[Flink基礎]--Apache Flink的廣播狀態實用指南

感謝英文原文作者:https://data-artisans.com/blog/a-practical-guide-to-broadcast-state-in-apache-flink Apache Flink中的廣播狀態實用指南 從版本1.5.0開始,Apache FlinkⓇ具

Flinkscala提示錯誤——could not find implicit value for evidence parameter of type org.apa

Flink第一個簡單的demo ,wordCount 該問題參考引用如下: https://blog.csdn.net/dax1n/article/details/70211035 自身程式碼中問題: package cetc.flink import org.apa

Flink狀態管理和容錯機制介紹

本文主要內容如下: 有狀態的流資料處理; Flink中的狀態介面; 狀態管理和容錯機制實現; 一.有狀態的流資料處理 1.1.什麼是有狀態的計算 計算任務的結果不僅僅依賴於輸入,還依賴於它的當前狀態,其實大多數的計算都是有狀態的計算。 比如wordc

Flink的序列化失敗問題 和transent宣告

最近在Flink的的map運算元中使用了自義定類(實現richMapFunction)來序列化中存在的問題? 一、背景介紹 在編寫Spark程式中,由於在map等運算元內部使用了外部定義的變數和函式,從而引發Task未序列化問題。然而,Spark運算元在計算過程中使用外部變數在許多情

Flink的資料傳輸與背壓

一圖道盡心酸: 大的原理,上游的task產生資料後,會寫在本地的快取中,然後通知JM自己的資料已經好了,JM通知下游的Task去拉取資料,下游的Task然後去上游的Task拉取資料,形成鏈條。 但是在何時通知JM?這裡有一個設定,比如pipeline還是blocking,pipeline意味著上游哪怕

Flink原始碼系列——Flink一個簡單的資料處理功能的實現過程

在Flink中,實現從指定主機名和埠接收字串訊息,對接收到的字串中出現的各個單詞,每隔1秒鐘就輸出最近5秒內出現的各個單詞的統計次數。 程式碼實現如下: public class SocketWindowWordCount {     public static void

Flink的批處理的WordCount轉化為流處理的WordCount

將Flink中的批處理的WordCount轉化為流處理的WordCount 目的:將Flink中批處理的WordCount轉化為流處理的WordCount 作用:感覺毫無用處 如何實現:將批的environmentBatch中的各個運算元,在流的environmentStream中

Akka在Flink的使用剖析

Akka與Actor 模型 Akka是一個用來開發支援併發、容錯、擴充套件性的應用程式框架。它是actor model的實現,因此跟Erlang的併發模型很像。在actor模型的上下文中,所有的活動實體都被認為是互不依賴的actor。actor之間的互相通訊是

Android非同步訊息處理機制

這也是Android中老生常談的一個話題了,它本身並不是很複雜,可是面試官比較喜歡問。本文就從原始碼再簡單的理一下這個機制。也可以說是理一下Handler、Looper、MessageQueue之間的關係。 單執行緒中的訊息處理機制的實現 首先我們以Looper.java原始碼中給出的一個例子來

Apache Flink的廣播狀態實用指南

感謝英文原文作者:https://data-artisans.com/blog/a-practical-guide-to-broadcast-state-in-apache-flink 不過,原文最近好像不能訪問了。應該是https://www.da-platform.com/網站移除了blog板塊了。

Flinkslot的一點理解

slot在flink裡面可以認為是資源組,Flink是通過將任務分成子任務並且將這些子任務分配到slot來並行執行程式。 每個Flink TaskManager在叢集中提供處理槽。 插槽的數量通常與每個TaskManager的可用CPU核心數成比例。一般情況下你的slot數

《從0到1學習Flink》—— Flink 幾種 Time 詳解

前言Flink 在流程式中支援不同的 Time 概念,就比如有 Processing Time、Event Time 和 Ingestion Time。 下面我們一起來看看這幾個 Time: Processing TimeProcessing Time 是指事件被處理時機器的系統時間。 當流程式在 Pr

Flink的多source+event watermark測試

這次需要做一個監控專案,全網日誌的指標計算,上線的話,計算量應該是百億/天 單個source對應的sql如下 最原始的sql select pro,throwable,level,ip,`count`,id,`time`,firstl,lastl from ( select pro,thro

深入理解Flink的狀態

本文是整理自幾個月前的內部flink state分享,flink狀態所包含的東西很多,在下面列舉了一些,還有一些在本文沒有體現,後續會單獨的挑出來再進行講解 state的層次結構 keyedState => windowState OperatorState => kaf

深入理解python3.4Asyncio庫與Node.js的非同步IO機制

譯者前言 如何用yield以及多路複用機制實現一個基於協程的非同步事件框架? 現有的元件中yield from是如何工作的,值又是如何被傳入yield from表示式的? 在這個yield from之上,是如何在一個執行緒內實現一個排程機制去排程協程的? 協程中呼叫協程的呼叫