1. 程式人生 > >Flink 從0到1學習 —— Flink 中如何管理配置?

Flink 從0到1學習 —— Flink 中如何管理配置?

前言

如果你瞭解 Apache Flink 的話,那麼你應該熟悉該如何像 Flink 傳送資料或者如何從 Flink 獲取資料。但是在某些情況下,我們需要將配置資料傳送到 Flink 叢集並從中接收一些額外的資料。

在本文的第一部分中,我將描述如何將配置資料傳送到 Flink 叢集。我們需要配置很多東西:方法引數、配置檔案、機器學習模型。Flink 提供了幾種不同的方法,我們將介紹如何使用它們以及何時使用它們。在本文的第二部分中,我將描述如何從 Flink 叢集中獲取資料。

如何傳送資料給 TaskManager?

在我們深入研究如何在 Apache Flink 中的不同元件之間傳送資料之前,讓我們先談談 Flink 叢集中的元件,下圖展示了 Flink 中的主要元件以及它們是如何相互作用的:

當我們執行 Flink 應用程式時,它會與 Flink JobManager 進行互動,這個 Flink JobManager 儲存了那些正在執行的 Job 的詳細資訊,例如執行圖。
JobManager 它控制著 TaskManager,每個 TaskManager 中包含了一部分資料來執行我們定義的資料處理方法。

在許多的情況下,我們希望能夠去配置 Flink Job 中某些執行的函式引數。根據用例,我們可能需要設定單個變數或者提交具有靜態配置的檔案,我們下面將討論在 Flink 中該如何實現?

除了向 TaskManager 傳送配置資料外,有時我們可能還希望從 Flink Job 的函式方法中返回資料。

如何配置使用者自定義函式?

假設我們有一個從 CSV 檔案中讀取電影列表的應用程式(它要過濾特定型別的所有電影):

//讀取電影列表資料集合
DataSet<Tuple3<Long, String, String>> lines = env.readCsvFile("movies.csv")
        .ignoreFirstLine()
        .parseQuotedStrings('"')
        .ignoreInvalidLines()
        .types(Long.class, String.class, String.class);

lines.filter((FilterFunction<Tuple3<Long, String, String>>) movie -> {
    // 以“|”符號分隔電影型別
    String[] genres = movie.f2.split("\\|");

    // 查詢所有 “動作” 型別的電影
    return Stream.of(genres).anyMatch(g -> g.equals("Action"));
}).print();

我們很可能想要提取不同型別的電影,為此我們需要能夠配置我們的過濾功能。 當你要實現這樣的函式時,最直接的配置方法是實現建構函式:

// 傳遞型別名稱
lines.filter(new FilterGenre("Action"))
    .print();

...

class FilterGenre implements FilterFunction<Tuple3<Long, String, String>> {
    //型別
    String genre;
    //初始化構造方法
    public FilterGenre(String genre) {
        this.genre = genre;
    }

    @Override
    public boolean filter(Tuple3<Long, String, String> movie) throws Exception {
        String[] genres = movie.f2.split("\\|");

        return Stream.of(genres).anyMatch(g -> g.equals(genre));
    }
}

或者,如果你使用 lambda 函式,你可以簡單地使用它的閉包中的一個變數:

final String genre = "Action";

lines.filter((FilterFunction<Tuple3<Long, String, String>>) movie -> {
    String[] genres = movie.f2.split("\\|");

    //使用變數
    return Stream.of(genres).anyMatch(g -> g.equals(genre));
}).print();

Flink 將序列化此變數並將其與函式一起傳送到叢集。

如果你需要將大量變數傳遞給函式,那麼這些方法就會變得非常煩人了。 為了解決這個問題,Flink 提供了 withParameters 方法。 要使用它,你需要實現那些 Rich 函式,比如你不必實現 MapFunction 介面,而是實現 RichMapFunction。

Rich 函式允許你使用 withParameters 方法傳遞許多引數:

// Configuration 類來儲存引數
Configuration configuration = new Configuration();
configuration.setString("genre", "Action");

lines.filter(new FilterGenreWithParameters())
        // 將引數傳遞給函式
        .withParameters(configuration)
        .print();

要讀取這些引數,我們需要實現 "open" 方法並讀取其中的引數:

class FilterGenreWithParameters extends RichFilterFunction<Tuple3<Long, String, String>> {

    String genre;

    @Override
    public void open(Configuration parameters) throws Exception {
        //讀取配置
        genre = parameters.getString("genre", "");
    }

    @Override
    public boolean filter(Tuple3<Long, String, String> movie) throws Exception {
        String[] genres = movie.f2.split("\\|");

        return Stream.of(genres).anyMatch(g -> g.equals(genre));
    }
}

所有這些選項都可以使用,但如果需要為多個函式設定相同的引數,則可能會很繁瑣。在 Flink 中要處理此種情況, 你可以設定所有 TaskManager 都可以訪問的全域性環境變數。

為此,首先需要使用 ParameterTool.fromArgs 從命令列讀取引數:

public static void main(String... args) {
    //讀取命令列引數
    ParameterTool parameterTool = ParameterTool.fromArgs(args);
    ...
}

然後使用 setGlobalJobParameters 設定全域性作業引數:

final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
env.getConfig().setGlobalJobParameters(parameterTool);
...

//該函式將能夠讀取這些全域性引數
lines.filter(new FilterGenreWithGlobalEnv()) //這個函式是自己定義的
                .print();

現在我們來看看這個讀取這些引數的函式,和上面說的一樣,它是一個 Rich 函式:

class FilterGenreWithGlobalEnv extends RichFilterFunction<Tuple3<Long, String, String>> {

    @Override
    public boolean filter(Tuple3<Long, String, String> movie) throws Exception {
        String[] genres = movie.f2.split("\\|");
        //獲取全域性的配置
        ParameterTool parameterTool = (ParameterTool) getRuntimeContext().getExecutionConfig().getGlobalJobParameters();
        //讀取配置
        String genre = parameterTool.get("genre");

        return Stream.of(genres).anyMatch(g -> g.equals(genre));
    }
}

要讀取配置,我們需要呼叫 getGlobalJobParameter 來獲取所有全域性引數,然後使用 get 方法獲取我們要的引數。

廣播變數

如果你想將資料從客戶端傳送到 TaskManager,上面文章中討論的方法都適合你,但如果資料以資料集的形式存在於 TaskManager 中,該怎麼辦? 在這種情況下,最好使用 Flink 中的另一個功能 —— 廣播變數。 它只允許將資料集傳送給那些執行你 Job 裡面函式的工作管理員。

假設我們有一個數據集,其中包含我們在進行文字處理時應忽略的單詞,並且我們希望將其設定為我們的函式。 要為單個函式設定廣播變數,我們需要使用 withBroadcastSet 方法和資料集。

DataSet<Integer> toBroadcast = env.fromElements(1, 2, 3);
// 獲取要忽略的單詞集合
DataSet<String> wordsToIgnore = ...

data.map(new RichFlatMapFunction<String, String>() {

    // 儲存要忽略的單詞集合. 這將儲存在 TaskManager 的記憶體中
    Collection<String> wordsToIgnore;

    @Override
    public void open(Configuration parameters) throws Exception {
        //讀取要忽略的單詞的集合
        wordsToIgnore = getRuntimeContext().getBroadcastVariable("wordsToIgnore");
    }

    @Override
    public String map(String line, Collector<String> out) throws Exception {
        String[] words = line.split("\\W+");
        for (String word : words)
            //使用要忽略的單詞集合
            if (wordsToIgnore.contains(word))
                out.collect(new Tuple2<>(word, 1));
    }
    //通過廣播變數傳遞資料集
}).withBroadcastSet(wordsToIgnore, "wordsToIgnore");

你應該記住,如果要使用廣播變數,那麼資料集將會儲存在 TaskManager 的記憶體中,如果資料集和越大,那麼佔用的記憶體就會越大,因此使用廣播變數適用於較小的資料集。

如果要向每個 TaskManager 傳送更多資料並且不希望將這些資料儲存在記憶體中,可以使用 Flink 的分散式快取向 TaskManager 傳送靜態檔案。 要使用 Flink 的分散式快取,你首先需要將檔案儲存在一個分散式檔案系統(如 HDFS)中,然後在快取中註冊該檔案:

ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

//從 HDFS 註冊檔案
env.registerCachedFile("hdfs:///path/to/file", "machineLearningModel")

...

env.execute()

為了訪問分散式快取,我們需要實現一個 Rich 函式:

class MyClassifier extends RichMapFunction<String, Integer> {

    @Override
    public void open(Configuration config) {
      File machineLearningModel = getRuntimeContext().getDistributedCache().getFile("machineLearningModel");
      ...
    }

    @Override
    public Integer map(String value) throws Exception {
      ...
    }
}

請注意,要訪問分散式快取中的檔案,我們需要使用我們用於註冊檔案的 key,比如上面程式碼中的 machineLearningModel

Accumulator(累加器)

我們前面已經介紹瞭如何將資料傳送給 TaskManager,但現在我們將討論如何從 TaskManager 中返回資料。 你可能想知道為什麼我們需要做這種事情。 畢竟,Apache Flink 就是建立資料處理流水線,讀取輸入資料,處理資料並返回結果。

為了表達清楚,讓我們來看一個例子。假設我們需要計算每個單詞在文字中出現的次數,同時我們要計算文字中有多少行:

//要處理的資料集合
DataSet<String> lines = ...

// Word count 演算法
lines.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
    @Override
    public void flatMap(String line, Collector<Tuple2<String, Integer>> out) throws Exception {
        String[] words = line.split("\\W+");
        for (String word : words) {
            out.collect(new Tuple2<>(word, 1));
        }
    }
})
.groupBy(0)
.sum(1)
.print();

// 計算要處理的文字中的行數
int linesCount = lines.count()
System.out.println(linesCount);

問題是如果我們執行這個應用程式,它將執行兩個 Flink 作業!首先得到單詞統計數,然後計算行數。

這絕對是低效的,但我們怎樣才能避免這種情況呢?一種方法是使用累加器。它們允許你從 TaskManager 傳送資料,並使用預定義的功能聚合此資料。 Flink 有以下內建累加器:

  • IntCounter,LongCounter,DoubleCounter:允許將 TaskManager 傳送的 int,long,double 值彙總在一起

  • AverageAccumulator:計算雙精度值的平均值

  • LongMaximum,LongMinimum,IntMaximum,IntMinimum,DoubleMaximum,DoubleMinimum:累加器,用於確定不同型別的最大值和最小值

  • 直方圖 - 用於計算 TaskManager 的值分佈

要使用累加器,我們需要建立並註冊一個使用者定義的函式,然後在客戶端上讀取結果。下面我們來看看該如何使用呢:

lines.flatMap(new RichFlatMapFunction<String, Tuple2<String, Integer>>() {

    //建立一個累加器
    private IntCounter linesNum = new IntCounter();

    @Override
    public void open(Configuration parameters) throws Exception {
        //註冊一個累加器
        getRuntimeContext().addAccumulator("linesNum", linesNum);
    }

    @Override
    public void flatMap(String line, Collector<Tuple2<String, Integer>> out) throws Exception {
        String[] words = line.split("\\W+");
        for (String word : words) {
            out.collect(new Tuple2<>(word, 1));
        }
        
        // 處理每一行資料後 linesNum 遞增
        linesNum.add(1);
    }
})
.groupBy(0)
.sum(1)
.print();

//獲取累加器結果
int linesNum = env.getLastJobExecutionResult().getAccumulatorResult("linesNum");
System.out.println(linesNum);

這樣計算就可以統計輸入文字中每個單詞出現的次數以及它有多少行。

如果需要自定義累加器,還可以使用 Accumulator 或 SimpleAccumulator 介面實現自己的累加器。

最後

本篇文章由 zhisheng 翻譯,禁止任何無授權的轉載。

翻譯後地址:http://www.54tianzhisheng.cn/2019/03/28/flink-additional-data/

原文地址:https://brewing.codes/2017/10/24/flink-additional-data/

本文部分程式碼地址:https://github.com/zhisheng17/flink-learning/tree/master/flink-learning-examples/src/main/java/com/zhisheng/examples/batch/accumulator

微信公眾號:zhisheng

另外我自己整理了些 Flink 的學習資料,目前已經全部放到微信公眾號了。你可以加我的微信:zhisheng_tian,然後回覆關鍵字:Flink 即可無條件獲取到。

更多私密資料請加入知識星球!

部落格

1、Flink 從0到1學習 —— Apache Flink 介紹

2、Flink 從0到1學習 —— Mac 上搭建 Flink 1.6.0 環境並構建執行簡單程式入門

3、Flink 從0到1學習 —— Flink 配置檔案詳解

4、Flink 從0到1學習 —— Data Source 介紹

5、Flink 從0到1學習 —— 如何自定義 Data Source ?

6、Flink 從0到1學習 —— Data Sink 介紹

7、Flink 從0到1學習 —— 如何自定義 Data Sink ?

8、Flink 從0到1學習 —— Flink Data transformation(轉換)

9、Flink 從0到1學習 —— 介紹 Flink 中的 Stream Windows

10、Flink 從0到1學習 —— Flink 中的幾種 Time 詳解

11、Flink 從0到1學習 —— Flink 讀取 Kafka 資料寫入到 ElasticSearch

12、Flink 從0到1學習 —— Flink 專案如何執行?

13、Flink 從0到1學習 —— Flink 讀取 Kafka 資料寫入到 Kafka

14、Flink 從0到1學習 —— Flink JobManager 高可用性配置

15、Flink 從0到1學習 —— Flink parallelism 和 Slot 介紹

16、Flink 從0到1學習 —— Flink 讀取 Kafka 資料批量寫入到 MySQL

17、Flink 從0到1學習 —— Flink 讀取 Kafka 資料寫入到 RabbitMQ

18、Flink 從0到1學習 —— Flink 讀取 Kafka 資料寫入到 HBase

19、Flink 從0到1學習 —— Flink 讀取 Kafka 資料寫入到 HDFS

20、Flink 從0到1學習 —— Flink 讀取 Kafka 資料寫入到 Redis

21、Flink 從0到1學習 —— Flink 讀取 Kafka 資料寫入到 Cassandra

22、Flink 從0到1學習 —— Flink 讀取 Kafka 資料寫入到 Flume

23、Flink 從0到1學習 —— Flink 讀取 Kafka 資料寫入到 InfluxDB

24、Flink 從0到1學習 —— Flink 讀取 Kafka 資料寫入到 RocketMQ

25、Flink 從0到1學習 —— 你上傳的 jar 包藏到哪裡去了

26、Flink 從0到1學習 —— 你的 Flink job 日誌跑到哪裡去了

27、阿里巴巴開源的 Blink 實時計算框架真香

28、Flink 從0到1學習 —— Flink 中如何管理配置?

29、Flink 從0到1學習—— Flink 不可以連續 Split(分流)?

30、Flink 從0到1學習—— 分享四本 Flink 國外的書和二十多篇 Paper 論文

31、Flink 架構、原理與部署測試

32、為什麼說流處理即未來?

33、OPPO 資料中臺之基石:基於 Flink SQL 構建實時資料倉庫

34、流計算框架 Flink 與 Storm 的效能對比

35、Flink狀態管理和容錯機制介紹

36、Apache Flink 結合 Kafka 構建端到端的 Exactly-Once 處理

37、360深度實踐:Flink與Storm協議級對比

38、如何基於Flink+TensorFlow打造實時智慧異常檢測平臺?只看這一篇就夠了

39、Apache Flink 1.9 重大特性提前解讀

40、Flink 全網最全資源(視訊、部落格、PPT、入門、實戰、原始碼解析、問答等持續更新)

41、Flink 靈魂兩百問,這誰頂得住?

原始碼解析

1、Flink 原始碼解析 —— 原始碼編譯執行

2、Flink 原始碼解析 —— 專案結構一覽

3、Flink 原始碼解析—— local 模式啟動流程

4、Flink 原始碼解析 —— standalone session 模式啟動流程

5、Flink 原始碼解析 —— Standalone Session Cluster 啟動流程深度分析之 Job Manager 啟動

6、Flink 原始碼解析 —— Standalone Session Cluster 啟動流程深度分析之 Task Manager 啟動

7、Flink 原始碼解析 —— 分析 Batch WordCount 程式的執行過程

8、Flink 原始碼解析 —— 分析 Streaming WordCount 程式的執行過程

9、Flink 原始碼解析 —— 如何獲取 JobGraph?

10、Flink 原始碼解析 —— 如何獲取 StreamGraph?

11、Flink 原始碼解析 —— Flink JobManager 有什麼作用?

12、Flink 原始碼解析 —— Flink TaskManager 有什麼作用?

13、Flink 原始碼解析 —— JobManager 處理 SubmitJob 的過程

14、Flink 原始碼解析 —— TaskManager 處理 SubmitJob 的過程

15、Flink 原始碼解析 —— 深度解析 Flink Checkpoint 機制

16、Flink 原始碼解析 —— 深度解析 Flink 序列化機制

17、Flink 原始碼解析 —— 深度解析 Flink 是如何管理好記憶體的?

18、Flink Metrics 原始碼解析 —— Flink-metrics-core

19、Flink Metrics 原始碼解析 —— Flink-metrics-datadog

20、Flink Metrics 原始碼解析 —— Flink-metrics-dropwizard

21、Flink Metrics 原始碼解析 —— Flink-metrics-graphite

22、Flink Metrics 原始碼解析 —— Flink-metrics-influxdb

23、Flink Metrics 原始碼解析 —— Flink-metrics-jmx

24、Flink Metrics 原始碼解析 —— Flink-metrics-slf4j

25、Flink Metrics 原始碼解析 —— Flink-metrics-statsd

26、Flink Metrics 原始碼解析 —— Flink-metrics-prometheus

26、Flink Annotations 原始碼解析

27、Flink 原始碼解析 —— 如何獲取 ExecutionGraph ?

28、大資料重磅炸彈——實時計算框架 Flink

29、Flink Checkpoint-輕量級分散式快照

30、Flink Clients 原始碼解析原文出處:zhisheng的部落格,歡迎關注我的公眾號:zhish