1. 程式人生 > >flink開發實戰之flink原理解析

flink開發實戰之flink原理解析

 

目錄

Flink出現的背景

Flink 簡介

Flink 的生態圈(技術棧)

執行配置

設定並行性

操作級別

執行環境級別

 客戶級別

基本API(流處理和批處理)

DataSet和DataStream

Flink計劃的剖析

讀取資料

資料輸出

flink程式設計模型

案例一:基於檔案(本地,hdfs)的wordcount

案例二:讀取kafak中的資料儲存到hdfs中

檢查點 checkpoint

檢查點介紹

故障緊跟檢查點的情況

啟用和配置檢查點

視窗

時間視窗

時間視窗程式碼

計數視窗

會話視窗

水印

水印如何生成

有狀態的計算

廣播變數


Flink出現的背景

    我們知道目前流處理的主要流行的計算引擎有,Storm,SparkStreaming。但是這個兩個計算引擎都有自己的侷限性。Storm實現了低延遲,但是目前還沒有實現高吞吐,也不能在故障發生的時候準確的處理計算狀態(將資料從一個事件儲存到另一個事件的,這些保留下來的是資料較計算狀態),同時也不能實現exactly-once。SparkStreaming通過微批處理方法實現了高吞吐和容錯性,但是犧牲了低延遲和實時處理的能力,也不能使用視窗與自然時間相匹配。Flink的出現完美的解決了以上問題,這也是flink出現的原因,flink不僅能提供同時支援高吞吐和exactly-once語義的實時計算,還能夠提供批量資料的處理,並且和其他的計算引擎相比,flink能夠區分出不同的型別的時間。

Flink 簡介

     Flink 的前身已經是柏林理工大學一個研究性專案, 在 2014 被 Apache 孵化器所接受,然後迅速地成為了 ASF(Apache Software Foundation)的頂級專案之一。Flink 是一個針對流資料和批資料的分散式處理引擎。主要是由 Java 程式碼實現。其所要處理的主要場景就是流資料,批資料只是流資料的一個極限特例而已。Flink 可以支援本地的快速迭代,以及一些環形的迭代任務。並且 Flink 可以定製化記憶體管理。在這點,如果要對比 Flink 和 Spark 的話,Flink 並沒有將記憶體完全交給應用層。這也是為什麼 Spark 相對於 Flink,更容易出現 OOM 的原因(out of memory)。就框架本身與應用場景來說,Flink 更相似與 Storm。下面讓我們先來看下 Flink 的架構圖。

如圖 所示,我們可以瞭解到 Flink 幾個最基礎的概

Client、JobManager 和 TaskManager

Client 用來提交任務給 JobManager,JobManager 分發任務給 TaskManager 去執行,然後 TaskManager 會心跳的彙報任務狀態。從架構圖去看,JobManager 很像當年的 JobTracker,TaskManager 也很像當年的 TaskTracker。然而有一個最重要的區別就是 TaskManager 之間是是流(Stream)。其次,Hadoop 一代中,只有 Map 和 Reduce 之間的 Shuffle,而對 Flink 而言,可能是很多級而不像 Hadoop,是固定的 Map 到 Reduce。

Flink 的生態圈(技術棧)

Flink 首先支援了 Scala 和 Java 的 API,Python 也正在測試中。Flink 通過 Gelly 支援了圖操作,還有機器學習的 FlinkML。Table 是一種介面化的 SQL 支援,也就是 API 支援,而不是文字化的 SQL 解析和執行。值的一提的是flink分別提供了面向流處理介面(DataStream API)和麵向批處理的介面(DataSet  API),同時flink支援拓展庫設計機器學習,FlinkML,複雜時間處理(CEP)以及圖計算,還有分別針對流處理和批處理的Table API

執行配置

flink執行環境包括批處理和流出,所以要分兩種情況進行執行配置

Flink 批處理環境

val env = ExecutionEnvironment.getExecutionEnvironment

Flink 流處理環境

val env = StreamExecutionEnvironment.getExecutionEnvironment

接下來我可以在env進行相關的設定

StreamExecutionEnvironment包含ExecutionConfig允許為執行時設定工作的具體配置值。要更改影響所有作業的預設值。

val env = StreamExecutionEnvironment.getExecutionEnvironment

var executionConfig = env.getConfig

可以使用以下配置選項:

enableClosureCleaner()/ disableClosureCleaner()。

預設情況下啟用閉包清理器。閉包清理器刪除Flink程式中對周圍類匿名函式的不需要的引用。禁用閉包清除程式後,可能會發生匿名使用者函式引用周圍的類(通常不是Serializable)。這將導致序列化程式出現異常。

getParallelism()/ setParallelism(int parallelism)

設定作業的預設並行度。
getMaxParallelism()/ setMaxParallelism(int parallelism)
設定作業的預設最大並行度。此設定確定最大並行度並指定動態縮放的上限
還有其他的配置項可以配置,就不一一列舉,可以參考flink官方網站

https://flink.apache.org/flink-architecture.html

設定並行性

Flink程式由多個任務(轉換/運算子,資料來源和接收器)組成。任務被分成幾個並行例項以供執行,每個並行例項處理任務輸入資料的子集。任務的並行例項數稱為並行性。如果要使用儲存點,還應考慮設定最大並行度(或max parallelism)。從儲存點恢復時,您可以更改特定運算子或整個程式的並行度,此設定指定並行度的上限。這是必需的,因為Flink在內部將狀態劃分為金鑰組,並且我們不能擁有+Inf多個金鑰組,因為這會對效能產生不利影響。

操作級別

可以通過呼叫其setParallelism()方法來定義單個運算子,資料來源或資料接收器的並行性 。例如

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();DataStream<String> text = [...]DataStream<Tuple2<String, Integer>> wordCounts = text
    .flatMap(new LineSplitter())
    .keyBy(0)
    .timeWindow(Time.seconds(5))
    .sum(1).setParallelism(5);wordCounts.print();env.execute("Word Count Example");

執行環境級別

 Flink程式在執行環境的上下文中執行。執行環境為其執行的所有操作符,資料來源和資料接收器定義預設並行性。可以通過顯式配置運算子的並行性來覆蓋執行環境並行性。可以通過呼叫setParallelism()方法來指定執行環境的預設並行性 。要以並行方式執行所有運算子,資料來源和資料接收器,請3按如下方式設定執行環境的預設並行度:

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(3);
DataStream<String> text = [...]DataStream<Tuple2<String, Integer>> wordCounts = [...]
wordCounts.print();
env.execute("Word Count Example");

 客戶級別

在向Flink提交作業時,可以在客戶端設定並行性。客戶端可以是Java或Scala程式。這種客戶端的一個例子是Flink的命令列介面(CLI)。

對於CLI客戶端,可以使用指定parallelism引數-p。例如:

./bin/flink run -p 10 ../examples/*WordCount-java*.jar

基本API(流處理和批處理)

批處理是流處理的一種非常特殊的情況。Flink的特殊之處就在於既可以把資料當做流進行處理也可以把資料當作有限流進行批處理。可以理解為:

DataSet PI用於批處理:相當於spark core

DataStream API用於流式處理:相當於 spark streaming

DataSet和DataStream

Flink具有特殊類DataSet和DataStream在程式中表示資料。您可以將它們視為可以包含重複項的不可變資料集合。在DataSet資料有限,對於一個DataStream元素的數量可以是無界的。這些集合在某些關鍵方面與常規Java集合不同。首先,它們是不可變的,這意味著一旦建立它們就無法新增或刪除元素。你也不能簡單地檢查裡面的元素。集合最初通過在弗林克程式新增源建立和新的集合從這些通過將它們使用API方法如衍生map,filter等等。

Flink計劃的剖析

Flink程式看起來像是轉換資料集合的常規程式。每個程式包含相同的基本部分:

  1. 獲得一個execution environment
  2. 載入/建立初始資料,
  3. 指定此資料的轉換,
  4. 指定放置計算結果的位置,
  5. 觸發程式執行

我們現在將概述每個步驟,請參閱相應部分以獲取更多詳細資訊。請注意,Scala DataSet API的所有核心類都可以在org.apache.flink.api.scala包中找到, 而Scala DataStream API的類可以在org.apache.flink.streaming.api.scala中找到 。

StreamExecutionEnvironment是所有Flink計劃的基礎。您可以使用以下靜態方法獲取一個StreamExecutionEnvironment:

getExecutionEnvironment()

createLocalEnvironment()

createRemoteEnvironment(host: String, port: Int, jarFiles: String*)

通常,您只需要使用getExecutionEnvironment(),因為這將根據上下文做正確的事情:如果您在IDE中執行程式或作為常規Java程式,它將建立一個本地環境,將在本地計算機上執行您的程式。如果您從程式中建立了一個JAR檔案,並通過命令列呼叫它 ,則Flink叢集管理器將執行您的main方法並getExecutionEnvironment()返回一個執行環境,以便在叢集上執行您的程式。

讀取資料

對於指定資料來源,執行環境有幾種方法可以使用各種方法從檔案中讀取:您可以逐行讀取它們,CSV檔案或使用完全自定義資料輸入格式。要將文字檔案作為一系列行讀取,您可以使用:

val env = StreamExecutionEnvironment.getExecutionEnvironment()

val text: DataStream[String] = env.readTextFile("file:///path/to/file")

這將為您提供一個DataStream,然後您可以在其上應用轉換來建立新的派生DataStream。

您可以通過使用轉換函式呼叫DataSet上的方法來應用轉換。

例如,map轉換如下所示:

val input: DataSet[String] = ...val mapped = input.map { x => x.toInt }

這將通過將原始集合中的每個String轉換為Integer來建立新的DataStream。

資料輸出

一旦有了包含最終結果的DataStream,就可以通過建立接收器將其寫入外部系統。這些只是建立接收器的一些示例方法:

writeAsText(path: String)print()

一旦您指定的完整程式,你需要觸發執行程式呼叫 execute()StreamExecutionEnvironment。根據執行的型別,ExecutionEnvironment將在本地計算機上觸發執行或提交程式以在群集上執行。

該execute()方法返回一個JobExecutionResult,包含執行時間和累加器結果

flink程式設計模型

DataSet和DataStream相關運算元太多就不一一列舉了,使用時可以參考官方文件。在這舉兩個例子進行展示flink的程式設計模型

案例一:基於檔案(本地,hdfs的wordcount

public class FunctionTest {
    public static void main(String[] args) throws Exception {
        //建立流執行環境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        //讀取文字檔案中的資料
        DataStreamSource<String> streamSource = env.readTextFile("C:/flink_data/1.txt");
        //進行邏輯計算
        SingleOutputStreamOperator<Tuple2<String, Integer>> dataStream = streamSource
                .flatMap(new Splitter())
                .keyBy(0)
                .sum(1);
        dataStream.print();
        //設定程式名稱
        env.execute("Window WordCount");
    }
}

實現 FlatMapFunction

public  class Splitter implements FlatMapFunction<String, Tuple2<String, Integer>> {
    @Override
    public void flatMap(String sentence, Collector<Tuple2<String, Integer>> out) throws Exception {
        for (String word: sentence.split(" ")) {
            out.collect(new Tuple2<String, Integer>(word, 1));
        }
    }
}

案例二:讀取kafak中的資料儲存到hdfs中

新增maven依賴

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-connector-kafka-0.9_2.10</artifactId>
<version>1.1.3</version>
</dependency>

程式程式碼

object DataFkafka {
  def main(args: Array[String]): Unit = {
    //設定kafka連線引數
    val  properties = new Properties()
    properties.setProperty("bootstrap.servers", "10.10.4.11:9092,10.10.49.183:9092,10.10.49.207:9092");
    properties.setProperty("zookeeper.connect", "10.10.4.11:2181,10.10.49.183:2181");
    properties.setProperty("group.id", "res");
    //獲取流執行環境
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    //設定時間型別
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
    //設定檢查點時間間隔
    env.enableCheckpointing(1000)
    //設定檢查點模式
    env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE)
    //建立kafak消費者,獲取kafak中的資料
    val myConsumer: FlinkKafkaConsumer010[String] = new FlinkKafkaConsumer010[String]("flink", new SimpleStringSchema(), properties)
    val kafkaData: DataStream[String] = env.addSource(myConsumer)
    kafkaData.print()
    //資料儲存到hdfs
    kafkaData.writeAsText("hdfs://10.10.4.11:9000/output/flink.txt")
    print("kafka")
    //設定程式名稱
    env.execute("data_from_kafak_wangzh")

  }

}

java和scala對比可以看出 還是scala比較簡潔。

檢查點 checkpoint

Flink的檢查點特性在流處理器中是獨一無二的,程式執行時有flink自動生成,

它使得flink可以準確的維持狀態,實現資料的一致性(exactly-once),並且高效的重新處理資料。

檢查點介紹

Flink的檢查點機制實現了標準的Chandy-Lamport演算法,並用來實現分散式快照。在分散式快照當中,有一個核心的元素:Barrier。屏障作為資料流的一部分隨著記錄被注入到資料流中。屏障永遠不會趕超通常的流記錄,它會嚴格遵循順序。屏障將資料流中的記錄隔離成一系列的記錄集合,並將一些集合中的資料加入到當前的快照中,而另一些資料加入到下一個快照中。每一個屏障攜帶著快照的ID,快照記錄著ID並且將其放在快照資料的前面。屏障不會中斷流處理,因此非常輕量級。來自不同快照的多個屏障可能同時出現在流中,這意味著多個快照可能併發地發生

舉例說明就像多個人一起數一串項鍊的珠子數量,幾個人在說話,可能某一時刻,忘記數量是多少了,此時如果我們每五個珠子就栓一條不同的顏色,並且提前設定好規則。比如紅的代表數五個,黃色的代表數了10珠子,以次類推,那麼當我們忘記數了個珠子的時候多少時,就可以看一下繩子的顏色,就知道最新的繩子代表的珠子說,重新從繩子哪裡繼續數珠子的個數。

下圖是checkpoint的整體邏輯圖,其中ckpt是檢查點屏障。在資料流中,每一天資料都會嚴格按照檢查點前和檢查點後的規定,被處理。檢查點屏障也會像資料一樣在運算元之前流動。當flink運算元遇到檢查點屏障時,它會將檢查點在資料流的位置記錄下來,如果資料來自kafak那麼位置就是偏移量。

當檢查點操作完成,結果狀態和位置會備份到穩定的儲存介質中如下圖。需要注意的是:如果檢查點操作失敗了,flink會丟棄該檢查點繼續正常執行,因為之後的某一個檢查點很大程度會成功,雖然這樣恢復時間有點長,但是對狀態的保障依舊很有力,只有在一系列連的檢查點操作失敗flink才會報錯。

故障緊跟檢查點的情況

當檢查點操作已經完成,但是故障緊隨其後。這種情況下,flink會重新拓撲,將輸入流倒回到上一個檢查點,然後恢復狀態值並從該出重新繼續計算,可以保證在剩下的記錄被處理後,得到的map運算元的狀態與沒有發生故障的狀態一致,值得注意的是有些資料會重複計算,也就是資料可能會出現區域性的重複。但是我們可以將資料流寫入到特殊的系統中(比如檔案系統,資料庫)來解決這個問題。

啟用和配置檢查點

預設情況下,禁用檢查點。為了使檢查點在StreamExecutionEnvironment上,呼叫

enableCheckpointing(n),其中Ñ是以毫秒為單位的檢查點間隔

檢查點的其他引數包括:

完全一次與至少一次:您可以選擇將模式傳遞給enableCheckpointing(n)方法,以在兩個保證級別之間進行選擇。對於大多數應用來說,恰好一次是優選的。至少一次可能與某些超低延遲(始終為幾毫秒)的應用程式相關。

checkpoint timeout(檢查點超時):如果當前檢查點未完成,則中止檢查點的時間。

minimum time between checkpoints檢查點之間的最短時間:為確保流應用程式在檢查點之間取得一定進展,可以定義檢查點之間需要經過多長時間。如果將此值設定為例如5000,則無論檢查點持續時間和檢查點間隔如何,下一個檢查點將在上一個檢查點完成後不遲於5秒啟動。請注意,這意味著檢查點間隔永遠不會小於此引數。

通過定義“檢查點之間的時間”而不是檢查點間隔來配置應用程式通常更容易,因為“檢查點之間的時間”不易受檢查點有時需要比平均時間更長的事實的影響(例如,如果目標儲存系統暫時很慢)。

請注意,此值還表示併發檢查點的數量為一。

number of concurrent checkpoints併發檢查點數:預設情況下,當一個檢查點仍處於執行狀態時,系統不會觸發另一個檢查點。這可確保拓撲不會在檢查點上花費太多時間,也不會在處理流方面取得進展。可以允許多個重疊檢查點,這對於具有特定處理延遲的管道(例如,因為函式呼叫需要一些時間來響應的外部服務)而感興趣,但是仍然希望執行非常頻繁的檢查點(100毫秒) )在失敗時重新處理很少。

當定義檢查點之間的最短時間時,不能使用此選項。

externalized checkpoints外部化檢查點:您可以將外圍檢查點配置為外部持久化。外部化檢查點將其元資料寫入持久儲存,並且在作業失敗時不會自動清除。這樣,如果您的工作失敗,您將有一個檢查點可以從中恢復。有關外部化檢查點部署說明中有更多詳細資訊。

fail/continue task on checkpoint errors關於檢查點錯誤的失敗/繼續任務:這確定如果在執行任務的檢查點過程中發生錯誤,任務是否將失敗。這是預設行為。或者,當禁用此選項時,任務將簡單地拒絕檢查點協調器的檢查點並繼續執行

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// start a checkpoint every 1000 ms
env.enableCheckpointing(1000);
// advanced options:// set mode to exactly-once (this is the default)
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
// make sure 500 ms of progress happen between checkpoints
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500);
// checkpoints have to complete within one minute, or are discarded
env.getCheckpointConfig().setCheckpointTimeout(60000);
// allow only one checkpoint to be in progress at the same time
env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);// enable externalized checkpoints which are retained after job cancellation
env.getCheckpointConfig().enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);    

視窗

視窗是一種機制。允許許多事件按照時間或者其他特徵進行分組,將每一組作為整體去分析計算。Flink中的視窗主要有時間視窗,計數視窗,回話視窗。並且我們要知道flink是唯一一個支援回話視窗的開源流處理器,這裡主要介紹用處組多的時間視窗。

時間視窗

時間視窗是最簡單,最有用的一種視窗,它支援滾動和滑動,幾個簡單的例子,對感測器的發出的資料進行求和

一分鐘滾動視窗收集最近一分鐘的數值,並在一分鐘結束時輸出總和,如下圖

一分鐘滑動視窗計算最近一分鐘的數值總和,但是每半分鐘滑動一次並輸出結果,如下圖

第一個滑動視窗對 3,2,5,7求和得到17,半分鐘後窗口滑動,然後對2,5,7,1求和得到結果15以此類推。

時間視窗程式碼

一分鐘的滑動視窗:

Stream.timeWindows(Time.minute(1))

每半分鐘(30秒)滑動一次的一分鐘滑動視窗

Stream.timeWindows(Time.minute(1),Time.second(30))

計數視窗

計數視窗的分組依據不再是時間,而是元素的數量。例如在上面的圖-2也可以解釋為由4個元素組成的計數視窗,並且每兩個元素滑動一次,滾動和滑動計數視窗定義如下

Stream.countWindow(4)

Stream.countWindow(4,2)

注意;

計數視窗不如時間視窗那麼嚴謹,要謹慎使用,比如其定義的元素數量為100,然而某一個key對應的元素永遠達不到100個,那麼計數視窗就會永遠不關閉,則被該窗口占用的記憶體就浪費了,一種解決辦法就是用時間視窗觸發超時。

會話視窗

會話指的是活動階段,其前後都是非活動階段,例如某使用者在與網站進行一系列的互動之後,關閉瀏覽器或者不在互動(非活動階段)。會話需要有自己的處理機制,因為他們通常沒有固定的持續時間,或者說固定的互動次數(有的可能點選3次就購買了物品,有的可能點選40次才購買物品)。

在flink中。會話視窗由時間設定。既希望等待多久認為會話已經結束。舉例來說,以下程式碼表示,使用者處於非活動時間超過五分鐘既認為會話結束

Stream.window(sessionWindow.withGap(Time.minutes(5)))

水印

現在有一個問題就是:如何判斷所有的事件是否都已經到達,以及何時計算和輸出視窗的結果?換言之就是:如何追蹤事件時間,並知曉輸入資料已經流入到某個事件時間呢?為了追蹤事件時間,需要依靠由資料驅動的時鐘,而不是系統時間。

Flink通過水印來推進事件時間。水印是嵌入在流中的常規記錄。計算程式通常通過水獲知某個時間點已到。比如對於一分鐘的滾動視窗,假設水印標記時時間為:10:01:00,那麼收到水印的視窗就知道不會再有早於該時間的記錄出現,因為所有時間戳小於或等於該時間的事件都已經到達。這時,視窗就可以安全的計算並給出結果。水印使得事件時間和處理時間完全無關。遲到的水印並不會影響到結果的正確性,而會影響到結果的速度。

水印如何生成

在flink中,水印的生成由開發人員生成,這通常需要對相應的領域有一定的瞭解。完美的水印:時間戳小於水印標記時間的事件不會再出現。在特殊情況下(如非亂序事件流),最近一次事件的時間戳就可能是完美的水印。啟發式水印則相反,它只估計時間,因此有可能出錯,既遲到的時間:晚於水印出現。如果知道時間的遲到時間不會超過5秒,就可以將水印時間設為收到最大時間戳減去5秒。另一種做法是,採用一個flink作業的監控事件流,學習事件的遲到規律,並以此構成水印的生成模型。

有狀態的計算

流失計算分為有狀態計算和無狀態計算。無狀態計算是觀察每一個獨立時間,並根據最後一個時間輸出時間結果,有狀態計算則是根據多個事件輸出結果。

例如:

計算過去一個小時的平均溫度就是有狀態的計算,需要涉及多個事件共同計算出的結果。

廣播變數

廣播變數允許您為操作的所有並行例項提供資料集。這對於輔助資料集或與資料相關的引數化非常有用。然後,操作員可以將資料集作為集合訪問。

  • 廣播:廣播集通過名稱註冊withBroadcastSet(DataSet, String)
  • 訪問:可通過getRuntimeContext().getBroadcastVariable(String)目標運營商訪問。
val data = env.fromElements("a", "b")
data.map(new RichMapFunction[String, String]() {
    var broadcastSet: Traversable[String] = null
    override def open(config: Configuration): Unit = {
      // 3. Access the broadcast DataSet as a Collection   
   broadcastSet =getRuntimeContext().
   getBroadcastVariable[String("broadcastSetName").asScala
    }
    def map(in: String): String = {
          }}).withBroadcastSet(toBroadcast, "broadcastSetName") 

注意由於廣播變數的內容儲存在每個節點的記憶體中,因此不應該變得太大。對於標量值之類的簡單事物,您可以簡單地將引數作為函式閉包的一部分,或者使用該withParameters(...)方法傳遞配置。