1. 程式人生 > >flink on yarn模式

flink on yarn模式

flink on yarn模式的相關知識點(重要):https://blog.csdn.net/xu470438000/article/details/79576989


在flink on yarn模式中,flink yarn-session的兩種提交方式

兩種提交方式

1.yarn-session為flink app開闢公用資源—記憶體集中管理模式

在yarn中初始化一個flink叢集,開闢指定的資源,以後提交任務都向這裡提交。這個flink叢集會常駐在yarn叢集中,除非手工停止。
2.每個job提供一個yarn-session —記憶體job管理模式
每次提交都會建立一個新的flink叢集,任務之間互相獨立,互不影響,方便管理。任務執行完成之後建立的叢集也會消失。

第一種方式:

1.首先啟動yarn session,並且會啟動Flink的兩個必要服務:JobManager和TaskManagers,然後你可以向叢集提交作業。同一個Session中可以提交多個Flink作業。

我們先看看這個指令碼支援哪些引數:

./bin/yarn-session.sh
Usage:
   Required
     -n,--container <arg>   Number of YARN container to allocate (=Number of Task Managers) 申請多少資源
   Optional
     -D <arg>                    Dynamic properties動態屬性
     -d,–detached              Start detached類似於spark的叢集模式spark-cluster:後臺執行,預設類似與spark-client
     -f6,--flip6                     Specify this option to start a Flip-6 Yarn session cluster.
     -id,--applicationId <arg>       Attach to running YARN session
     -jm,--jobManagerMemory <arg>    Memory for JobManager Container [in MB] 
     -nm,--name<arg>             Set a custom name for the application on YARN
     -q,–query                Display available YARN resources (memory, cores)
     -qu,--queue <arg>            Specify YARN queue.
     -s,--slots  <arg>           Number of slots per TaskManager 沒個TaskManager 中的執行緒數量(核心數)
     -st,--streaming             Start Flink in streaming mode 
     -tm,--taskManagerMemory <arg>    Memory per TaskManager Container [in MB]
     -z,--zookeeperNamespace <arg>   Namespace to create the Zookeeper sub-paths for high availability mode

線上指令碼: bin/yarn-session.sh -n 7 -s 8 -jm 3072 -tm 32768 -qu root..-nm - -d
其中申請7個taskManager 每個8核 每個taskmanager有32768M記憶體 ,後臺執行
我的在yarn申請的flink資源命令:

HADOOP_HOME=/hadoop/hadoop-2.7.2/ HADOOP_CONF_DIR=/hadoop/hadoop-2.7.2/etc/hadoop/ ./yarn-session.sh -nm "Flink test" -n 15 -jm 8192 -tm 8192

此時我在yarn我申請的是叢集名稱為“Flink test”,15個taskManager的記憶體為8g,然後1個jobManager的記憶體為8g,yarn上顯示總的使用記憶體資源為128g

這樣我們就啟動了一個yarn-session 就可以提交flink任務了。
注意我的-d為後臺cluster執行,我的此時預設是client模式執行;

2.我們可以使用./bin/flink指令碼提交作業,同樣我們來看看這個指令碼支援哪些引數:

bin/flink
./flink [OPTIONS] [ARGUMENTS]

The following actions are available:
Action “run” compiles and runs a program.
Syntax: run [OPTIONS]
“run” action options:
-c,–class Class with the program entry point(“main” method or “getPlan()” method.Only needed if the JAR file does not specify the class in its manifest.
-C,–classpath Adds a URL to each user code classloader on all nodes in the cluster. The paths must specify a protocol (e.g. file://) and be accessible on all nodes (e.g. by means of a NFS share). You can use this ption multiple times for specifying more than one URL. The protocol must be supported by the {@link java.net.URLClassLoader}.
-d,–detached If present, runs the job in detached mode
-m,–jobmanager host:port Address of the JobManager (master) to which to connect. Specify yarn-cluster’ as the JobManager to deploy a YARN cluster for the job. Use this flag to connect to a different JobManager than the one specified in the configuration.
-p,–parallelism The parallelism with which to run the program. Optional flag to override the default value specified in the configuration.
-q,–sysoutLogging If present, supress logging output to standard out.
-s,–fromSavepoint Path to a savepoint to reset the job back to (for example file:///flink/savepoint-1537).

我們可以使用run選項執行Flink作業。這個指令碼可以自動獲取到YARN session的地址

線上指令碼: nohup bin/flink run -s hdfs:///flink/savepoints/savepoint-bcabee-bf3f54a3b924 -c **** jars/**** test > Flink-RealtimeDAU.log 2>&1 &
或者使用flink的web-ui介面來提交flink應用

通過-s 可以指定savepoints地址,來重跑job。
Savepoint是什麼
Flink的savepoint是一個全域性的、一致性的快照(snapshot)。其包含兩方面:

資料來源所有資料的位置;
並行操作的狀態
“全域性一致”是指所有的輸入源資料在指定的位置,所有的並行操作的狀態都被完全checkpoint了。
如果你的應用在過去某個時間點做了savepoint,那你隨時可以從前面的savepoint更新發布應用。這時,新的應用會從savepoint中的操作狀態進行初始化,並從savepoint的資料來源位置開始重新處理所有資料。

3.啟動之後如何停止執行的程式

關閉jobmanager
線上指令碼:bin/flink cancel -s hdfs:///flink/savepoints /savepoints-* -yid application_1535964220442_0034
通過cancel命令進行停止

或者通過yarn application -kill applicationId 直接將yarn-session停止掉

或者通過 flink list 獲得 jobId
bin/flink cancel -s hdfs:///flink/savepoints/savepoint-* jobId
其中-s為可選操作

第二種方式

每個job提供一個yarn-session,job執行完成就釋放
nohup bin/flink run -m yarn-cluster -yn 15 -s hdfs:///flink/savepoints/111* -c . jars/**** test >./Flink-my-log.log 2>&1 &

其中的-yn是指TaskManager的個數,必須指定。


參考:https://blog.csdn.net/asfjgvajfghaklsbf/article/details/82899872
https://blog.csdn.net/qq_40990732/article/details/81028619