1. 程式人生 > >flink開發實戰之 flink on yarn

flink開發實戰之 flink on yarn

flink 執行模式

Flink 和spark一樣有三種部署模式,分別是 Local、Standalone Cluster 和 Yarn Cluster。

實戰開發主要使用Yarn Cluster模式,所以本文主要介紹yarn  模式下flink任務的執行和資源分配。

Yarn Cluster 模式

在圖中可以看出,Flink 與 Yarn 的關係與 MapReduce 和 Yarn 的關係是一樣的。Flink 通過 Yarn 的介面實現了自己的 App Master。當在 Yarn 中部署了 Flink,Yarn 就會用自己的 Container 來啟動 Flink 的 JobManager

(也就是 App Master)和 TaskManager。

flink 任務提交到yarn上的全流程

第一步:

向資源管理器(ResourceManager)請求,要執行一個程式。中獲取新的作業ID(jobId),以及程式資源儲存路徑

第二步

ResourceManager檢查作業的輸出說明,然後返回一個存放程式資源的路徑以及jobId,這個路徑在hdfs的tmp資料夾中,如果程式中沒有指定輸出目錄或指定的輸出目錄已經存在,作業就不提交,錯誤返回給flink程式

就是這個路徑存放程式資源,包括程式的jar包,job的配置檔案等。

第三步

將作業資源(包括JAR、配置和資訊)複製到HDFS。

第五步:

通過呼叫資源管理器上的submitApplication()方法提交作業。

第六步

資源管理器收到呼叫它的submitApplication()訊息後,如果容器不夠,任務會現在等待佇列中等待,之後便將請求傳遞給排程器(Schedule),排程器分配一個容器,然後資源管理器在節點管理器的管理下在容器中啟動應用程式master程序也就是MRAPPMaster。

flink作業的application master是一個Java應用程式,它的主類是MRAPPMaster他對作業進行初始化,通過建立多個薄記物件以保持對作業進度的跟蹤,因為他將接受來自任務的進度和完成報告

第七步

MRAPPMaster根據配置資訊,獲知要啟動多少個TaskManger,向ResourceManager請求容器

第八步

一旦資源管理器的排程器為任務分配了容器,MRAPPMaster(application master) 就通過與節點管理器NodeManager通訊來啟動容器向已獲得容器的TaskManger發從啟動命令,也就是主類為YarnChild程式

Flink在yarn上執行兩種的模式

第一種:

一種是讓 Yarn 直接啟動 JobManager 和 TaskManager

在yarn上執行一個flink job


./bin/flink run -m yarn-cluster -yn 4 -yjm 1024 -ytm 4096 ./examples/batch/WordCount.jar

第二種:

是在執行 Flink Workload 的時候啟動 Flink 的模組。前者相當於讓 Flink 的模組處於 Standby 的狀態。這裡,我也主要介紹下前者

在下載和解壓 Flink 的安裝包之後,需要在環境中增加環境變數 HADOOP_CONF_DIR 或者 YARN_CONF_DIR,其指向 Yarn 的配置目錄

export HADOOP_CONF_DIR=/etc/hadoop/conf

這是因為 Flink 實現了 Yarn 的 Client,因此需要 Yarn 的一些配置和 Jar 包。在配置好環境變數後,只需簡單的執行如下的指令碼,Yarn 就會啟動 Flink 的 JobManager 和 TaskManager。

先啟動叢集:

 ./bin/yarn-session.sh  -n  2 -jm  1024 -tm  1024 -s 2

上面的意思是:向 Yarn 申請 2 個 Container 啟動 TaskManager(-n 2),每個 TaskManager 擁有兩個 Task Slot(-s 2),並且向每個 TaskManager 的 Container 申請 1024 的記憶體

再提交任務

./bin/flink run com.demo.florian.WordCount  ./flink-demo-1.0-SNAPSHOT.jar

啟動session的指令引數

 必選

     -n,--container    分配多少個yarn容器 (=taskmanager的數量)
可選

      -d,--detached                   獨立執行

     -jm,--jobManagerMemory     JobManager的記憶體 [in MB]

     -nm,--name                     在YARN上為一個自定義的應用設定一個名字

     -q,--query                      顯示yarn中可用的資源 (記憶體, cpu核數)

     -qu,--queue                指定YARN佇列.

     -s,--slots                 每個TaskManager使用的slots數量

     -tm,--taskManagerMemory    每個TaskManager的記憶體 [in MB]

     -z,--zookeeperNamespace    針對HA模式在zookeeper上建立NameSpace

run 提交任務的指令引數

 -c,--class <classname>           如果沒有在jar包中指定入口類,則需要在這裡通過這個引數指定  
 -m,--jobmanager <host:port>      指定需要連線的jobmanager(主節點)地址  
                                    使用這個引數可以指定一個不同於配置檔案中的jobmanager  
 -p,--parallelism <parallelism>   指定程式的並行度。可以覆蓋配置檔案中的預設值。

slot和parallelism

1.slot是指taskmanager的併發執行能力

在hadoop 1.x 版本中也有slot的概念,有興趣的讀者可以瞭解一下

taskmanager.numberOfTaskSlots:3

每一個taskmanager中的分配3個TaskSlot,3個taskmanager一共有9個TaskSlot

2.parallelism是指taskmanager實際使用的併發能力

parallelism.default:1

執行程式預設的並行度為1,9個TaskSlot只用了1個,有8個空閒。設定合適的並行度才能提高效率。

3.parallelism是可配置、可指定的

 

1.可以通過修改$FLINK_HOME/conf/flink-conf.yaml檔案的方式更改並行度。

2.可以通過設定$FLINK_HOME/bin/flink 的-p引數修改並行度

3.可以通過設定executionEnvironmentk的方法修改並行度

4.可以通過設定flink的程式設計API修改過並行度

5.這些並行度設定優先順序從低到高排序,排序為api>env>p>file.

6.設定合適的並行度,能提高運算效率

7.parallelism不能多與slot個數。

4.slot和parallelism總結

1.slot是靜態的概念,是指taskmanager具有的併發執行能力

2.parallelism是動態的概念,是指程式執行時實際使用的併發能力

3.設定合適的parallelism能提高運算效率,太多了和太少了都不行

4.設定parallelism有多中方式,優先順序為api>env>p>file