1. 程式人生 > >如何正確使用Flink Connector?

如何正確使用Flink Connector?

本文主要分享 Flink connector 相關內容,分為以下三個部分的內容:第一部分會首先介紹一下 Flink Connector 有哪些。第二部分會重點介紹在生產環境中經常使用的 kafka connector 的基本的原理以及使用方法。第三部分答疑,對社群反饋的問題進行答疑。

Flink Streaming Connector

Flink 是新一代流批統一的計算引擎,它需要從不同的第三方儲存引擎中把資料讀過來,進行處理,然後再寫出到另外的儲存引擎中。Connector 的作用就相當於一個聯結器,連線 Flink 計算引擎跟外界儲存系統。Flink 裡有以下幾種方式,當然也不限於這幾種方式可以跟外界進行資料交換:

  • 第一種 Flink 裡面預定義了一些 source 和 sink。
  • 第二種 Flink 內部也提供了一些 Boundled connectors。
  • 第三種可以使用第三方 Apache Bahir 專案中提供的聯結器。
  • 第四種是通過非同步 IO 方式。

下面分別簡單介紹一下這四種資料讀寫的方式。

1.預定義的 source 和 sink

Flink 裡預定義了一部分 source 和 sink。在這裡分了幾類。

  • 基於檔案的 source 和 sink。

如果要從文字檔案中讀取資料,可以直接使用:

env.readTextFile(path)

就可以以文字的形式讀取該檔案中的內容。當然也可以使用:

env.readFile(fileInputFormat, path)

根據指定的 fileInputFormat 格式讀取檔案中的內容。

如果資料在 Flink 內進行了一系列的計算,想把結果寫出到檔案裡,也可以直接使用內部預定義的一些 sink,比如將結果已文字或 csv 格式寫出到檔案中,可以使用 DataStream 的 writeAsText(path) 和 writeAsCsv(path)。

  • 基於 Socket 的 Source 和 Sink

提供 Socket 的 host name 及 port,可以直接用 StreamExecutionEnvironment 預定的介面 socketTextStream 建立基於 Socket 的 source,從該 socket 中以文字的形式讀取資料。當然如果想把結果寫出到另外一個 Socket,也可以直接呼叫 DataStream writeToSocket。

  • 基於記憶體 Collections、Iterators 的 Source

可以直接基於記憶體中的集合或者迭代器,呼叫 StreamExecutionEnvironment fromCollection、fromElements 構建相應的 source。結果資料也可以直接 print、printToError 的方式寫出到標準輸出或標準錯誤。

詳細也可以參考 Flink 原始碼中提供的一些相對應的 Examples 來檢視異常預定義 source 和 sink 的使用方法,例如 WordCount、SocketWindowWordCount。

2.Bundled Connectors

Flink 裡已經提供了一些繫結的 Connector,例如 kafka source 和 sink,Es sink等。讀寫 kafka、es、rabbitMQ 時可以直接使用相應 connector 的 api 即可。第二部分會詳細介紹生產環境中最常用的 kafka connector。

雖然該部分是 Flink 專案原始碼裡的一部分,但是真正意義上不算作 Flink 引擎相關邏輯,並且該部分沒有打包在二進位制的釋出包裡面。所以在提交 Job 時候需要注意, job 程式碼 jar 包中一定要將相應的 connetor 相關類打包進去,否則在提交作業時就會失敗,提示找不到相應的類,或初始化某些類異常。

3.Apache Bahir 中的聯結器

Apache Bahir 最初是從 Apache Spark 中獨立出來專案提供,以提供不限於 Spark 相關的擴充套件/外掛、聯結器和其他可插入元件的實現。通過提供多樣化的流聯結器(streaming connectors)和 SQL 資料來源擴充套件分析平臺的覆蓋面。如有需要寫到 flume、redis 的需求的話,可以使用該專案提供的 connector。

4.Async I/O

流計算中經常需要與外部儲存系統互動,比如需要關聯 MySQL 中的某個表。一般來說,如果用同步 I/O 的方式,會造成系統中出現大的等待時間,影響吞吐和延遲。為了解決這個問題,非同步 I/O 可以併發處理多個請求,提高吞吐,減少延遲。

Tips:Async 的原理可參考官方文件

Flink Kafka Connector

本章重點介紹生產環境中最常用到的 Flink kafka connector。使用 Flink 的同學,一定會很熟悉 kafka,它是一個分散式的、分割槽的、多副本的、 支援高吞吐的、釋出訂閱訊息系統。生產環境環境中也經常會跟 kafka 進行一些資料的交換,比如利用 kafka consumer 讀取資料,然後進行一系列的處理之後,再將結果寫出到 kafka 中。這裡會主要分兩個部分進行介紹,一是 Flink kafka Consumer,一個是 Flink kafka Producer。

首先看一個例子來串聯下 Flink kafka connector。程式碼邏輯裡主要是從 kafka 裡讀資料,然後做簡單的處理,再寫回到 kafka 中。

分別用紅框框出如何構造一個 Source sink Function。Flink 提供了現成的構造FlinkKafkaConsumer、Producer 的介面,可以直接使用。這裡需要注意,因為 kafka 有多個版本,多個版本之間的介面協議會不同。Flink 針對不同版本的 kafka 有相應的版本的 Consumer 和 Producer。例如:針對 08、09、10、11 版本,Flink 對應的 consumer 分別是 FlinkKafkaConsumer 08、09、010、011,producer 也是。

1.Flink kafka Consumer

  • 反序列化資料

因為 kafka 中資料都是以二進位制 byte 形式儲存的。讀到 Flink 系統中之後,需要將二進位制資料轉化為具體的 java、scala 物件。具體需要實現一個 schema 類,定義如何序列化和反序列資料。反序列化時需要實現 DeserializationSchema 介面,並重寫 deserialize(byte[] message) 函式,如果是反序列化 kafka 中 kv 的資料時,需要實現 KeyedDeserializationSchema 介面,並重寫 deserialize(byte[] messageKey, byte[] message, String topic, int partition, long offset) 函式。

另外 Flink 中也提供了一些常用的序列化反序列化的 schema 類。例如,SimpleStringSchema,按字串方式進行序列化、反序列化。TypeInformationSerializationSchema,它可根據 Flink 的 TypeInformation 資訊來推斷出需要選擇的 schema。JsonDeserializationSchema 使用 jackson 反序列化 json 格式訊息,並返回 ObjectNode,可以使用 .get(“property”) 方法來訪問相應欄位。

  • 消費起始位置設定

如何設定作業從 kafka 消費資料最開始的起始位置,這一部分 Flink 也提供了非常好的封裝。在構造好的 FlinkKafkaConsumer 類後面呼叫如下相應函式,設定合適的起始位置。

  • setStartFromGroupOffsets,也是預設的策略,從 group offset 位置讀取資料,group offset 指的是 kafka broker 端記錄的某個 group 的最後一次的消費位置。但是 kafka broker 端沒有該 group 資訊,會根據 kafka 的引數"auto.offset.reset"的設定來決定從哪個位置開始消費。
  • setStartFromEarliest,從 kafka 最早的位置開始讀取。
  • setStartFromLatest,從 kafka 最新的位置開始讀取。
  • setStartFromTimestamp(long),從時間戳大於或等於指定時間戳的位置開始讀取。Kafka 時戳,是指 kafka 為每條訊息增加另一個時戳。該時戳可以表示訊息在 proudcer 端生成時的時間、或進入到 kafka broker 時的時間。
  • setStartFromSpecificOffsets,從指定分割槽的 offset 位置開始讀取,如指定的 offsets 中不存某個分割槽,該分割槽從 group offset 位置開始讀取。此時需要使用者給定一個具體的分割槽、offset 的集合。

一些具體的使用方法可以參考下圖。需要注意的是,因為 Flink 框架有容錯機制,如果作業故障,如果作業開啟 checkpoint,會從上一次 checkpoint 狀態開始恢復。或者在停止作業的時候主動做 savepoint,啟動作業時從 savepoint 開始恢復。這兩種情況下恢復作業時,作業消費起始位置是從之前儲存的狀態中恢復,與上面提到跟 kafka 這些單獨的配置無關。

  • topic 和 partition 動態發現

實際的生產環境中可能有這樣一些需求,比如場景一,有一個 Flink 作業需要將五份資料聚合到一起,五份資料對應五個 kafka topic,隨著業務增長,新增一類資料,同時新增了一個 kafka topic,如何在不重啟作業的情況下作業自動感知新的 topic。場景二,作業從一個固定的 kafka topic 讀資料,開始該 topic 有 10 個 partition,但隨著業務的增長資料量變大,需要對 kafka partition 個數進行擴容,由 10 個擴容到 20。該情況下如何在不重啟作業情況下動態感知新擴容的 partition?

針對上面的兩種場景,首先需要在構建 FlinkKafkaConsumer 時的 properties 中設定 flink.partition-discovery.interval-millis 引數為非負值,表示開啟動態發現的開關,以及設定的時間間隔。此時 FlinkKafkaConsumer 內部會啟動一個單獨的執行緒定期去 kafka 獲取最新的 meta 資訊。針對場景一,還需在構建 FlinkKafkaConsumer 時,topic 的描述可以傳一個正則表示式描述的 pattern。每次獲取最新 kafka meta 時獲取正則匹配的最新 topic 列表。針對場景二,設定前面的動態發現引數,在定期獲取 kafka 最新 meta 資訊時會匹配新的 partition。為了保證資料的正確性,新發現的 partition 從最早的位置開始讀取。

  • commit offset 方式

Flink kafka consumer commit offset 方式需要區分是否開啟了 checkpoint。

如果 checkpoint 關閉,commit offset 要依賴於 kafka 客戶端的 auto commit。需設定 enable.auto.commit,auto.commit.interval.ms 引數到 consumer properties,就會按固定的時間間隔定期 auto commit offset 到 kafka。

如果開啟 checkpoint,這個時候作業消費的 offset 是 Flink 在 state 中自己管理和容錯。此時提交 offset 到 kafka,一般都是作為外部進度的監控,想實時知道作業消費的位置和 lag 情況。此時需要 setCommitOffsetsOnCheckpoints 為 true 來設定當 checkpoint 成功時提交 offset 到 kafka。此時 commit offset 的間隔就取決於 checkpoint 的間隔,所以此時從 kafka 一側看到的 lag 可能並非完全實時,如果 checkpoint 間隔比較長 lag 曲線可能會是一個鋸齒狀。

  • Timestamp Extraction/Watermark 生成

我們知道當 Flink 作業內使用 EventTime 屬性時,需要指定從訊息中提取時戳和生成水位的函式。FlinkKakfaConsumer 構造的 source 後直接呼叫 assignTimestampsAndWatermarks 函式設定水位生成器的好處是此時是每個 partition 一個 watermark assigner,如下圖。source 生成的時戳為多個 partition 時戳對齊後的最小時戳。此時在一個 source 讀取多個 partition,並且 partition 之間資料時戳有一定差距的情況下,因為在 source 端 watermark 在 partition 級別有對齊,不會導致資料讀取較慢 partition 資料丟失。

2.Flink kafka Producer

  • Producer 分割槽

使用 FlinkKafkaProducer 往 kafka 中寫資料時,如果不單獨設定 partition 策略,會預設使用 FlinkFixedPartitioner,該 partitioner 分割槽的方式是 task 所在的併發 id 對 topic 總 partition 數取餘:parallelInstanceId % partitions.length。

  • 此時如果 sink 為 4,paritition 為 1,則 4 個 task 往同一個 partition 中寫資料。但當 sink task < partition 個數時會有部分 partition 沒有資料寫入,例如 sink task 為2,partition 總數為 4,則後面兩個 partition 將沒有資料寫入。
  • 如果構建 FlinkKafkaProducer 時,partition 設定為 null,此時會使用 kafka producer 預設分割槽方式,非 key 寫入的情況下,使用 round-robin 的方式進行分割槽,每個 task 都會輪循的寫下游的所有 partition。該方式下游的 partition 資料會比較均衡,但是缺點是 partition 個數過多的情況下需要維持過多的網路連線,即每個 task 都會維持跟所有 partition 所在 broker 的連線。

  • 容錯

Flink kafka 09、010 版本下,通過 setLogFailuresOnly 為 false,setFlushOnCheckpoint 為 true,能達到 at-least-once 語義。setLogFailuresOnly,預設為 false,是控制寫 kafka 失敗時,是否只打印失敗的 log 不拋異常讓作業停止。setFlushOnCheckpoint,預設為 true,是控制是否在 checkpoint 時 fluse 資料到 kafka,保證資料已經寫到 kafka。否則資料有可能還快取在 kafka 客戶端的 buffer 中,並沒有真正寫出到 kafka,此時作業掛掉資料即丟失,不能做到至少一次的語義。

Flink kafka 011 版本下,通過兩階段提交的 sink 結合 kafka 事務的功能,可以保證端到端精準一次。

一些疑問與解答

Q:在 Flink consumer 的並行度的設定:是對應 topic 的 partitions 個數嗎?要是有多個主題資料來源,並行度是設定成總體的 partitions 數嗎?

A:這個並不是絕對的,跟 topic 的資料量也有關,如果資料量不大,也可以設定小於 partitions 個數的併發數。但不要設定併發數大於 partitions 總數,因為這種情況下某些併發因為分配不到 partition 導致沒有資料處理。

Q:如果 partitioner 傳 null 的時候是 round-robin 發到每一個 partition?如果有 key 的時候行為是 kafka 那種按照 key 分佈到具體分割槽的行為嗎?

A:如果在構造 FlinkKafkaProducer 時,如果沒有設定單獨的 partitioner,則預設使用 FlinkFixedPartitioner,此時無論是帶 key 的資料,還是不帶 key。如果主動設定 partitioner 為 null 時,不帶 key 的資料會 round-robin 的方式寫出,帶 key 的資料會根據 key,相同 key 資料分割槽的相同的 partition,如果 key 為 null,再輪詢寫。不帶 key 的資料會輪詢寫各 partition。

Q:如果 checkpoint 時間過長,offset 未提交到 kafka,此時節點宕機了,重啟之後的重複消費如何保證呢?

A:首先開啟 checkpoint 時 offset 是 Flink 通過狀態 state 管理和恢復的,並不是從 kafka 的 offset 位置恢復。在 checkpoint 機制下,作業從最近一次 checkpoint 恢復,本身是會回放部分歷史資料,導致部分資料重複消費,Flink 引擎僅保證計算狀態的精準一次,要想做到端到端精準一次需要依賴一些冪等的儲存系統或者事務操作。


原文連結
本文為雲棲社群原創內容,未經