1. 程式人生 > >[原始碼分析] 從例項和原始碼入手看 Flink 之廣播 Broadcast

[原始碼分析] 從例項和原始碼入手看 Flink 之廣播 Broadcast

# [原始碼分析] 從例項和原始碼入手看 Flink 之廣播 Broadcast ## 0x00 摘要 本文將通過原始碼分析和例項講解,帶領大家熟悉Flink的廣播變數機制。 ## 0x01 業務需求 ### 1. 場景需求 對黑名單中的IP進行檢測過濾。IP黑名單的內容會隨時增減,因此是可以隨時動態配置的。 該黑名單假設存在mysql中,Flink作業啟動時候會把這個黑名單從mysql載入,作為一個變數由Flink運算元使用。 ### 2. 問題 我們不想重啟作業以便重新獲取這個變數。所以就需要一個能夠動態修改運算元裡變數的方法。 ### 3. 解決方案 使用廣播的方式去解決。去做配置的動態更新。 廣播和普通的流資料不同的是:廣播流的1條流資料能夠被運算元的所有分割槽所處理,而資料流的1條流資料只能夠被運算元的某一分割槽處理。因此廣播流的特點也決定適合做配置的動態更新。 ## 0x02 概述 廣播這部分有三個難點:使用步驟;如何自定義函式;如何存取狀態。下面就先為大家概述下。 ### 1. broadcast的使用步驟 - 建立MapStateDescriptor - 通過DataStream.broadcast方法返回廣播資料流BroadcastStream - 通過DataStream.connect方法,把業務資料流和BroadcastStream進行連線,返回BroadcastConnectedStream - 通過BroadcastConnectedStream.process方法分別進行processElement及processBroadcastElement處理 ### 2. 使用者自定義處理函式 - BroadcastConnectedStream.process接收兩種型別的function:KeyedBroadcastProcessFunction 和 BroadcastProcessFunction - 兩種型別的function都定義了processElement、processBroadcastElement抽象方法,只是KeyedBroadcastProcessFunction多定義了一個onTimer方法,預設是空操作,允許子類重寫 - processElement處理業務資料流 - processBroadcastElement處理廣播資料流 ### 3. Broadcast State - Broadcast State始終表示為MapState,即map format。這是Flink提供的最通用的狀態原語。是託管狀態的一種,託管狀態是由Flink框架管理的狀態,如ValueState, ListState, MapState等。 - 使用者必須建立一個 `MapStateDescriptor`,才能得到對應的狀態控制代碼。 這儲存了狀態名稱, 狀態所持有值的型別,並且可能包含使用者指定的函式 - checkpoint的時候也會checkpoint broadcast state - Broadcast State只在記憶體有,沒有RocksDB state backend - Flink 會將state廣播到每個task,注意該state並不會跨task傳播,對其修改僅僅是作用在其所在的task - downstream tasks接收到broadcast event的順序可能不一樣,所以依賴其到達順序來處理element的時候要小心 ## 0x03. 示例程式碼 ### 1. 示例程式碼 我們直接從Flink原始碼入手可以找到理想的示例。 以下程式碼直接摘錄 Flink 原始碼 StatefulJobWBroadcastStateMigrationITCase,我會在裡面加上註釋說明。 ```scala @Test def testRestoreSavepointWithBroadcast(): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) // 以下兩個變數是為了確定廣播流發出的資料型別,廣播流可以同時發出多種型別的資料 lazy val firstBroadcastStateDesc = new MapStateDescriptor[Long, Long]( "broadcast-state-1", BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]], BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]]) lazy val secondBroadcastStateDesc = new MapStateDescriptor[String, String]( "broadcast-state-2", BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO) env.setStateBackend(new MemoryStateBackend) env.enableCheckpointing(500) env.setParallelism(4) env.setMaxParallelism(4) // 資料流,這裡資料流和廣播流的Source都是同一種CheckpointedSource。資料流這裡做了一系列運算元操作,比如flatMap val stream = env .addSource( new CheckpointedSource(4)).setMaxParallelism(1).uid("checkpointedSource") .keyBy( new KeySelector[(Long, Long), Long] { override def getKey(value: (Long, Long)): Long = value._1 } ) .flatMap(new StatefulFlatMapper) .keyBy( new KeySelector[(Long, Long), Long] { override def getKey(value: (Long, Long)): Long = value._1 } ) // 廣播流 val broadcastStream = env .addSource( new CheckpointedSource(4)).setMaxParallelism(1).uid("checkpointedBroadcastSource") .broadcast(firstBroadcastStateDesc, secondBroadcastStateDesc) // 把資料流和廣播流結合起來 stream .connect(broadcastStream) .process(new VerifyingBroadcastProcessFunction(expectedFirstState, expectedSecondState)) .addSink(new AccumulatorCountingSink) } } // 使用者自定義的處理函式 class TestBroadcastProcessFunction extends KeyedBroadcastProcessFunction [Long, (Long, Long), (Long, Long), (Long, Long)] { // 重點說明,這裡的 firstBroadcastStateDesc,secondBroadcastStateDesc 其實和之前廣播流的那兩個MapStateDescriptor無關。 // 這裡兩個MapStateDescriptor是為了存取BroadcastState,這樣在 processBroadcastElement和processElement之間就可以傳遞變量了。我們完全可以定義新的MapStateDescriptor,只要processBroadcastElement和processElement之間認可就行。 // 這裡引數 "broadcast-state-1" 是name, flink就是用這個 name 來從Flink執行時系統中存取MapStateDescriptor lazy val firstBroadcastStateDesc = new MapStateDescriptor[Long, Long]( "broadcast-state-1", BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]], BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]]) val secondBroadcastStateDesc = new MapStateDescriptor[String, String]( "broadcast-state-2", BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO) override def processElement( value: (Long, Long), ctx: KeyedBroadcastProcessFunction [Long, (Long, Long), (Long, Long), (Long, Long)]#ReadOnlyContext, out: Collector[(Long, Long)]): Unit = { // 這裡Flink原始碼中是直接把接受到的業務變數直接再次轉發出去 out.collect(value) } override def processBroadcastElement( value: (Long, Long), ctx: KeyedBroadcastProcessFunction [Long, (Long, Long), (Long, Long), (Long, Long)]#Context, out: Collector[(Long, Long)]): Unit = { // 這裡是把最新傳來的廣播變數儲存起來,processElement中可以取出再次使用. 具體是通過firstBroadcastStateDesc 的 name 來獲取 BroadcastState ctx.getBroadcastState(firstBroadcastStateDesc).put(value._1, value._2) ctx.getBroadcastState(secondBroadcastStateDesc).put(value._1.toString, value._2.toString) } } // 廣播流和資料流的Source private class CheckpointedSource(val numElements: Int) extends SourceFunction[(Long, Long)] with CheckpointedFunction { private var isRunning = true private var state: ListState[CustomCaseClass] = _ // 就是簡單的定期傳送 override def run(ctx: SourceFunction.SourceContext[(Long, Long)]) { ctx.emitWatermark(new Watermark(0)) ctx.getCheckpointLock synchronized { var i = 0 while (i < numElements) { ctx.collect(i, i) i += 1 } } // don't emit a final watermark so that we don't trigger the registered event-time // timers while (isRunning) Thread.sleep(20) } } ``` ### 2. 技術難點 #### MapStateDescriptor 首先要說明一些概念: - Flink中包含兩種基礎的狀態:Keyed State和Operator State。 - Keyed State和Operator State又可以 以兩種形式存在:原始狀態和託管狀態。 - 託管狀態是由Flink框架管理的狀態,如ValueState, ListState, MapState等。 - raw state即原始狀態,由使用者自行管理狀態具體的資料結構,框架在做checkpoint的時候,使用byte[]來讀寫狀態內容,對其內部資料結構一無所知。 - MapState是託管狀態的一種:即狀態值為一個map。使用者通過`put`或`putAll`方法新增元素。 回到我們的例子,廣播變數就是OperatorState的一部分,是以託管狀態的MapState形式儲存的。具體getBroadcastState函式就是DefaultOperatorStateBackend中的實現
所以我們需要用MapStateDescriptor描述broadcast state,這裡MapStateDescriptor的使用比較靈活,因為是key,value類似使用,所以個人覺得value直接使用類,這樣更方便,尤其是對於從其他語言轉到scala的同學。 #### processBroadcastElement ```scala // 因為主要起到控制作用,所以這個函式的處理相對簡單 override def processBroadcastElement(): Unit = { // 這裡可以把最新傳來的廣播變數儲存起來,processElement中可以取出再次使用,比如 ctx.getBroadcastState(firstBroadcastStateDesc).put(value._1, value._2) } ``` #### processElement ```java // 這個函式需要和processBroadcastElement配合起來使用 override def processElement(): Unit = { // 可以取出processBroadcastElement之前儲存的廣播變數,然後用此來處理業務變數,比如 val secondBroadcastStateDesc = new MapStateDescriptor[String, String]( "broadcast-state-2", BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO) var actualSecondState = Map[String, String]() for (entry <- ctx.getBroadcastState(secondBroadcastStateDesc).immutableEntries()) { val v = secondExpectedBroadcastState.get(entry.getKey).get actualSecondState += (entry.getKey -> entry.getValue) } // 甚至這裡只要和processBroadcastElement一起關聯好,可以儲存任意型別的變數。不必須要和廣播變數的型別一致。重點是宣告新的對應的MapStateDescriptor // MapStateDescriptor繼承了StateDescriptor,其中state為MapState型別,value為Map型別 } ``` #### 結合起來使用 因為某些限制,所以下面只能從網上找一個例子給大家講講。 ```java // 模式始終儲存在MapState中,並將null作為鍵。broadcast state始終表示為MapState,這是Flink提供的最通用的狀態原語。 MapStateDe