1. 程式人生 > >Flink Broadcast 廣播變數應用案例實戰-Flink牛刀小試

Flink Broadcast 廣播變數應用案例實戰-Flink牛刀小試

版權宣告:本套技術專欄是作者(秦凱新)平時工作的總結和昇華,通過從真實商業環境抽取案例進行總結和分享,並給出商業應用的調優建議和叢集環境容量規劃等內容,請持續關注本套部落格。版權宣告:禁止轉載,歡迎學習。QQ郵箱地址:[email protected],如有任何問題,可隨時聯絡。

1.1 Broadcast 真假曹操

  • DataStreaming Broadcast (元素廣播):元素廣播,重複處理

    • 把元素廣播給所有的分割槽,資料會被重複處理,類似於storm中的allGrouping
    • 使用技巧:dataStream.broadcast()
  • Flink Broadcast(廣播變數)

    • 廣播變數建立後,它可以執行在叢集中的任何function上,而不需要多次傳遞給叢集節點。 另外需要記住,不應該修改廣播變數,這樣才能確保每個節點獲取到的值都是一致的。

    • 一句話解釋,可以理解為是一個公共的共享變數,我們可以把一個dataset 資料集廣播出去,然後不同的task在節點上都能夠獲取到,這個資料在每個節點上只會存 在一份。

    • 如果不使用broadcast,則在每個節點中的每個task中都需要拷貝一份dataset資料集,比較浪費記憶體(也就是一個節點中可能會存在多份dataset資料)。

    • 用法如下:

        1:初始化資料
        DataSet<Integer> toBroadcast = env.fromElements(1, 2, 3)
        2:廣播資料
        .withBroadcastSet(toBroadcast, "broadcastSetName");
        3:獲取資料
        Collection<Integer> broadcastSet = getRuntimeContext().getBroadcastVariable("broadcastSetName");
        
        注意:
        1:廣播出去的變數存在於每個節點的記憶體中,所以這個資料集不能太大。因為廣播出去的資料,會常駐記憶體,除非程式執行結束
        2:廣播變數在初始化廣播出去以後不支援修改,這樣才能保證每個節點的資料都是一致的。
      複製程式碼

2 元素廣播案例實戰

2.1 實現元素的重複廣播,設定source的並行度為1

public class StreamingDemoWithMyNoPralalleSourceBroadcast {

    public static void main(String[] args) throws Exception {
        //獲取Flink的執行環境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(4);

        //獲取資料來源
        DataStreamSource<Long> text = env.addSource(new MyNoParalleSource()).setParallelism(1);//注意:針對此source,並行度只能設定為1

        DataStream<Long> num = text.broadcast().map(new MapFunction<Long, Long>() {
            @Override
            public Long map(Long value) throws Exception {
                long id = Thread.currentThread().getId();
                System.out.println("執行緒id:"+id+",接收到資料:" + value);
                return value;
            }
        });

        //每2秒鐘處理一次資料
        DataStream<Long> sum = num.timeWindowAll(Time.seconds(2)).sum(0);

        //列印結果
        sum.print().setParallelism(1);

        String jobName = StreamingDemoWithMyNoPralalleSourceBroadcast.class.getSimpleName();
        env.execute(jobName);
    }
}
複製程式碼

2.2 自定義接收器MyNoParalleSource

    public class MyNoParalleSource implements SourceFunction<Long>{

        private long count = 1L;
    
        private boolean isRunning = true;
    
        /**
         * 主要的方法
         * 啟動一個source
         * 大部分情況下,都需要在這個run方法中實現一個迴圈,這樣就可以迴圈產生資料了
         *
         * @param ctx
         * @throws Exception
         */
        @Override
        public void run(SourceContext<Long> ctx) throws Exception {
            while(isRunning){
                ctx.collect(count);
                count++;
                //每秒產生一條資料
                Thread.sleep(1000);
            }
        }
    
        /**
         * 取消一個cancel的時候會呼叫的方法
         *
         */
        @Override
        public void cancel() {
            isRunning = false;
        }
}
複製程式碼

2.3 結果展示

發現整個Map元素別處理了4次:

執行緒id:44,接收到資料:1
執行緒id:46,接收到資料:1
執行緒id:42,接收到資料:1
執行緒id:48,接收到資料:1
4
複製程式碼

3 廣播變數

3.1 第一步:封裝DataSet,呼叫withBroadcastSet。

3.2 第二步:getRuntimeContext().getBroadcastVariable,獲得廣播變數

3.3 第三步:RichMapFunction中執行獲得廣播變數的邏輯

public class BatchDemoBroadcast {

    public static void main(String[] args) throws Exception{

        //獲取執行環境
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

        //1:準備需要廣播的資料
        ArrayList<Tuple2<String, Integer>> broadData = new ArrayList<>();
        broadData.add(new Tuple2<>("zs",18));
        broadData.add(new Tuple2<>("ls",20));
        broadData.add(new Tuple2<>("ww",17));
        DataSet<Tuple2<String, Integer>> tupleData = env.fromCollection(broadData);


        //1.1:處理需要廣播的資料,把資料集轉換成map型別,map中的key就是使用者姓名,value就是使用者年齡
        DataSet<HashMap<String, Integer>> toBroadcast = tupleData.map(new MapFunction<Tuple2<String, Integer>, HashMap<String, Integer>>() {
            @Override
            public HashMap<String, Integer> map(Tuple2<String, Integer> value) throws Exception {
                HashMap<String, Integer> res = new HashMap<>();
                res.put(value.f0, value.f1);
                return res;
            }
        });

        //源資料
        DataSource<String> data = env.fromElements("zs", "ls", "ww");

        //注意:在這裡需要使用到RichMapFunction獲取廣播變數
        DataSet<String> result = data.map(new RichMapFunction<String, String>() {

            List<HashMap<String, Integer>> broadCastMap = new ArrayList<HashMap<String, Integer>>();
            HashMap<String, Integer> allMap = new HashMap<String, Integer>();

            /**
             * 這個方法只會執行一次
             * 可以在這裡實現一些初始化的功能
             *
             * 所以,就可以在open方法中獲取廣播變數資料
             *
             */
            @Override
            public void open(Configuration parameters) throws Exception {
                super.open(parameters);
                //3:獲取廣播資料
                this.broadCastMap = getRuntimeContext().getBroadcastVariable("broadCastMapName");
                for (HashMap map : broadCastMap) {
                    allMap.putAll(map);
                }

            }

            @Override
            public String map(String value) throws Exception {
                Integer age = allMap.get(value);
                return value + "," + age;
            }
        }).withBroadcastSet(toBroadcast, "broadCastMapName");//2:執行廣播資料的操作
        result.print();
    }
}
複製程式碼

3.4 結果展示

    zs,18
    ls,20
    ww,17
複製程式碼

總結

簡單成文,方便Flink整體體系構成,感謝Github FLink 原始碼作者,讓我學到很多東西。辛苦成文,各自珍惜,謝謝!

版權宣告:本套技術專欄是作者(秦凱新)平時工作的總結和昇華,通過從真實商業環境抽取案例進行總結和分享,並給出商業應用的調優建議和叢集環境容量規劃等內容,請持續關注本套部落格。版權宣告:禁止轉載,歡迎學習。QQ郵箱地址:[email protected],如有任何問題,可隨時聯絡。

秦凱新 於深圳 20181608