1. 程式人生 > >Apache Flink原始碼解析之stream-sink

Apache Flink原始碼解析之stream-sink

上一篇我們談論了Flink stream source,它作為流的資料入口是整個DAG(有向無環圖)拓撲的起點。那麼與此對應的,流的資料出口就是跟source對應的Sink。這是我們本篇解讀的內容。

SinkFunction

SourceFunction對應,Flink針對Sink的根介面被稱為SinkFunction。繼承自Function這一標記介面。SinkFunction介面只提供了一個方法:

    void invoke(IN value) throws Exception;

該方法提供基於記錄級別的呼叫(也就是每個被輸出的記錄都會呼叫該介面一次)。上面方法的引數value

即為需要輸出的記錄。

SinkFunction相對來說比較簡潔,下面我們來看一下它的實現者。

內建的SinkFunction

同樣,我們先來看一下完整的型別繼承體系:

flink-stream-sink_all-class-diagram

DiscardingSink

這是最簡單的SinkFunction的實現,它的實現等同於沒有實現(其實現為空方法)。它的作用就是將記錄丟棄掉。它的主要場景應該是那些無需最終處理結果的記錄。

WriteSinkFunction

WriteSinkFunction是一個抽象類。該類的主要作用是將需要輸出的tuples(元組)作為簡單的文字輸出到指定路徑的檔案中去,元組被收集到一個list中去,然後週期性得寫入檔案。

WriteSinkFunction的構造器接收兩個引數:

  • path : 需要寫入的檔案路徑
  • format : WriteFormat的例項,用於指定寫入資料的格式

在構造器中,它呼叫方法cleanFile,該方法用於初始化指定path的檔案。初始化的行為是:如果不存在則建立,如果存在則清空

invoke方法的實現:

    public void invoke(IN tuple) {

        tupleList.add(tuple);
        if (updateCondition()) {
            format.write(path, tupleList);
            resetParameters();
        }

    }

從實現來看,其先將需要sink的元組加入內部集合。然後呼叫updateCondition方法。該方法是WriteSinkFunction定義的抽象方法。用於實現判斷將tupleList寫入檔案以及清空tupleList的條件。接著將集合中的tuple寫入到指定的檔案中。最後又呼叫了resetParameters方法。該方法同樣是一個抽象方法,它的主要用途是當寫入的場景是批量寫入時,可能會有一些狀態引數,該方法就是用於對狀態進行reset。

WriteSinkFunctionByMillis

該類是WriteSinkFunction的實現類。它支援以指定的毫秒數間隔將tuple批量寫入檔案。間隔由構造器引數millis指定。在內部,WriteSinkFunctionlastTime維護上一次寫入的時間狀態。它主要涉及上面提到的兩個抽象方法的實現:

    protected boolean updateCondition() {
        return System.currentTimeMillis() - lastTime >= millis;
    }

updateCondition的實現很簡單,拿當前主機的當前時間戳跟上一次的執行時間戳狀態作對比:如果大於指定的間隔,則條件為真,觸發寫入。

    protected void resetParameters() {
        tupleList.clear();
        lastTime = System.currentTimeMillis();
    }

resetParameters實現是先清空tupleList,然後將lastTime老的時間戳狀態覆蓋為最新時間戳。

WriteFormat

一個寫入格式的抽象類,提供了兩種實現:

  • WriteFormatAsText : 以原樣文字的形式寫入指定路徑的檔案
  • WriteFormatAsCsv : 以csv格式寫入指定檔案

RichSinkFunction

RichSinkFunction通過繼承AbstractRichFunction為實現一個rich SinkFunction提供基礎(AbstractRichFunction提供了一個open/close方法對,以及獲取執行時上下文物件手段)。RichSinkFunction也是抽象類,它有三個具體實現。

SocketClientSink

支援以socket的方式將資料傳送到特定目標主機所在的伺服器作為flink stream的sink。資料被序列化為byte array然後寫入到socket。該sink支援失敗重試模式的訊息傳送。該sink 可啟用autoFlush,如果啟用,那麼會導致吞吐量顯著下降,但延遲也會降低。該方法的構造器,提供的引數:

  • hostName : 待連線的server的host name
  • port : server的埠
  • schema :SerializationSchema的例項,用於序列化物件。
  • maxNumRetries : 最大重試次數(-1為無限重試)
  • autoflush : 是否自動flush

重試的策略在invoke方法中,當傳送失敗時進入到異常捕捉塊中進行。

OutputFormatSinkFunction

一個將記錄寫入OutputFormat的SinkFunction的實現。

OutputFormat :定義被消費記錄的輸出介面。指定了最終的記錄如何被儲存,比如檔案就是一種儲存實現。

PrintSinkFunction

該實現用於將每條記錄輸出到標準輸出流(stdOut)或標準錯誤流(stdErr)。在輸出時,如果當前task的並行subtask例項個數大於1,也就是說當前task是並行執行的(同時存在多個例項),那麼在輸出每條記錄之前會輸出一個prefix字首。prefix為在全域性上下文中當前subtask的位置。

常見聯結器中的Sink

Flink自身提供了一些針對第三方主流開源系統的聯結器支援,它們有:

  • elasticsearch
  • flume
  • kafka(0.8/0.9版本)
  • nifi
  • rabbitmq
  • twitter

這些第三方系統(除了twitter)的sink,無一例外都是繼承自RichSinkFunction

小結

這篇文章我們主要談及了Flink的stream sink相關的設計、實現。當然這個主題還沒有完全談完,還會有後續篇幅繼續解讀。

微信掃碼關注公眾號:Apache_Flink

apache_flink_weichat