1. 程式人生 > >在使用Flink廣播變數broadcast時遇到的坑

在使用Flink廣播變數broadcast時遇到的坑

在使用Flink廣播變數遇到的坑
如下程式碼中需要特別注意:
(1)需要手動匯入org.apache.flink.api.scala._
(2)需要手動匯入scala.collection.JavaConverters._
【如果不手動匯入該包,導致asScala使用隱式轉換失敗】

package testbrocast

import org.apache.flink.api.common.functions.RichMapFunction
import org.apache.flink.api.scala.ExecutionEnvironment
import org.apache.
flink.api.scala._ import org.apache.flink.configuration.Configuration import scala.collection.JavaConverters._ //asScala需要使用隱式轉換 /** * @description: ${description} * @author: fangchangtan * @create: 2018-11-23 19:31 **/ object BroadCastTest { def main(args: Array[String]): Unit = { val env =
ExecutionEnvironment.getExecutionEnvironment val dataset1 = env.fromElements("11", "22", "33") val dataset2 = env.fromElements("aa", "bb", "cc") dataset1.map(new RichMapFunction[String, (String, String)] { private var dataset2: Traversable[String] = null override def open(
parameters: Configuration) { //import scala.collection.JavaConverters._ //asScala需要使用隱式轉換,切記!! dataset2 = getRuntimeContext.getBroadcastVariable[String]("broadCast").asScala } def map(t: String): (String, String) = { var result = "" for (broadVariable <- dataset2) { result = result + broadVariable + " " } (t, result) } }).withBroadcastSet(dataset2, "broadCast").print() } }

最終輸出結果:
在這裡插入圖片描述

Broadcast 廣播變數:可以理解為是一個公共的共享變數,我們可以把一個dataset 或者不變的快取物件(例如map list集合物件等)資料集廣播出去,然後不同的任務在節點上都能夠獲取到,並在每個節點上只會存在一份,而不是在每個併發執行緒中存在。如果不使用broadcast,則在每個節點中的每個任務中都需要拷貝一份dataset資料集,比較浪費記憶體(也就是一個節點中可能會存在多份dataset資料)。

因此在廣播小資料量的dataset 和或者不大的不可變快取物件的時候,特別適合使用Broadcast 廣播變數