Apache Flink On Yarn 模式高可用 (HA) 叢集部署
微信公眾號: 深廣大資料Club
關注可瞭解更多大資料相關的資訊。問題或建議,請公眾號留言;
如果你覺得深廣大資料Club對你有幫助,歡迎轉發朋友圈
本文介紹如何部署Apache Flink On YARN(也就是如何在YARN上執行Flink作業),採用HDP 2.6.5以及Apache Flink 1.7.2。
Yarn在Hadoop的生態系統中擔任了資源管理和任務排程的角色,可以更好對叢集資源進行排程和控制。
此處不對HDP安裝做講述,需要安裝HDP的可以通過HDP官網安裝指南進行安裝。
官方文件QuickStart中包含兩種Flink啟動方式:
-
啟動一個YARN session(Start a long-running Flink cluster on YARN)
-
直接在YARN上提交執行Flink作業(Run a Flink job on YARN)。
在講解執行方式之前,我們先來講解Flink基於HDP之上的On Yarn安裝。
安裝
下載Apache Flink安裝包
從Apache Flink官網下載地址(http://flink.apache.org/downloads.html)下載對應版本的安裝包並解壓
curl -O <flink_hadoop2_download_url> tar xvzf flink-1.8-SNAPSHOT-bin-hadoop2.tgz
與Hadoop整合
Flink On Yarn模式需要使用者配置與Hadoop叢集,設定HADOOP_CONF_DIR以及HADOOP_CLASSPATH。
將如下程式碼新增到 ~/.bash_profile
配置檔案中
$ vi ~/.bash_profile export HADOOP_CONF_DIR="/etc/hadoop/conf" export HADOOP_CLASSPATH="/usr/hdp/current/hadoop-client/*:/usr/hdp/current/hadoop-client/lib/*:/usr/hdp/current/hadoop-hdfs-client/*:/usr/hdp/current/hadoop-hdfs-client/lib/*:/usr/hdp/current/hadoop-yarn-client/*:/usr/hdp/current/h配置yarn啟動前環境變數 oop-yarn-client/lib/*"
source .bash_profile檔案引入環境變數並檢查變數是否設定正確
source ~/.bash_profile echo $HADOOP_CONFIG_DIR echo $HADOOP_CLASSPATH
配置
yarn-session.sh配置
由於HDP是執行Hadoop任務以及訪問HDFS都是使用hdfs使用者,我們需要在yarn啟動前指定HADOOP_USER_NAME變數,flink才不會因為許可權問題而無法啟動。
$ vi /usr/local/flink-1.3.3/bin/yarn-session.sh #!/usr/bin/env bash ... bin=`dirname "$0"` bin=`cd "$bin"; pwd` # get Flink config . "$bin"/config.sh if [ "$FLINK_IDENT_STRING" = "" ]; then FLINK_IDENT_STRING="$USER" fi export HADOOP_USER_NAME=hdfs JVM_ARGS="$JVM_ARGS -Xmx512m" CC_CLASSPATH=`manglePathList $(constructFlinkClassPath):$INTERNAL_HADOOP_CLASSPATHS` log=$FLINK_LOG_DIR/flink-$FLINK_IDENT_STRING-yarn-session-$HOSTNAME.log log_setting="-Dlog.file="$log" -Dlog4j.configuration=file:"$FLINK_CONF_DIR"/log4j-yarn-session.properties -Dlogback.configurationFile=file:"$FLINK_CONF_DIR"/logback-yarn.xml" export FLINK_CONF_DIR $JAVA_RUN $JVM_ARGS -classpath "$CC_CLASSPATH" $log_setting org.apache.flink.yarn.cli.FlinkYarnSessionCli -j "$FLINK_LIB_DIR"/flink-dist*.jar "$@"
注意: HADOOP_USER_NAME
引數必須在JAVA_RUN之前配置,否則程式執行之後無法讀取到該環境變數
conf/flink-conf.yaml(HA配置)
要啟動HA群集,需要在conf/flink-conf.yaml新增以下配置:
高可用性模式(必需):必須在conf/flink-conf.yaml中將高可用模式設定為zookeeper才能啟用高可用模式。 或者,此選項可以設定為Flink應該用於建立HighAvailabilityServices例項的工廠類的FQN。
high-availability: zookeeper
ZooKeeper quorum(必需):ZooKeeper quorum是ZooKeeper伺服器的複製組,它提供分散式協調服務。
high-availability.zookeeper.quorum: address1:2181[,...],addressX:2181
每個addressX:port指的是一個ZooKeeper伺服器,Flink可以在給定的地址和埠訪問它。
Zookeeper root(推薦):ZooKeeper root節點,在該節點下放置所有叢集節點。
high-availability.zookeeper.path.root: /flink
Zookeeper Cluster-id(推薦):cluster-id ZooKeeper節點,在該節點下放置叢集的所有必需的協調資料。
high-availability.cluster-id: /default_ns # important: customize per cluster
儲存目錄(必需):JobManager元資料儲存在檔案系統storageDir中,只有一個指向該狀態的指標儲存在ZooKeeper中。
high-availability.storageDir: hdfs:///flink/recovery
storageDir儲存JobManager故障恢復所需的所有元資料。
配置主伺服器和ZooKeeper quorum後,您可以像往常一樣使用提供的叢集啟動指令碼。他們將啟動HA群集。請記住,呼叫指令碼時必須執行ZooKeeper quorum,並確保為要啟動的每個HA群集配置單獨的ZooKeeper根路徑。
除HA配置外,還需要配置最大嘗試次數conf/flink-conf.yaml:
yarn.application-attempts: 10
這意味著在Yarn應用程式失敗之前,應用程式可以重新啟動9次(9次重試+ 1次初始嘗試)。
由於我們是基於HDP建立的Hadoop叢集,已有現成的zookeeper叢集,所以這裡我們使用現有的zookeeper進行HA配置,配置如下:
high-availability: zookeeper high-availability.zookeeper.quorum: flink-dc-01:2181,flink-dc-02:2181,flink-dc-03:2181 high-availability.zookeeper.path.root: /flink high-availability.storageDir: hdfs://ns1/flink/recovery yarn.application-attempts: 10
配置hadoop yarn-site.xml
配置Application最大的嘗試次數
<property> <name>yarn.resourcemanager.am.max-attempts</name> <value>4</value> <description> The maximum number of application master execution attempts. </description> </property>
當前YARN版本的預設值為2(表示允許單個JobManager失敗)。
java.lang.IllegalAccessError:
tried to access method問題
hdp平臺需要去掉uber shaded hadoop的包,同時新增mapreduce的包到yarn應用classpath,否則會出現如下問題:
Exception in thread "main" java.lang.IllegalAccessError: tried to access method org.apache.hadoop.yarn.client.ConfiguredRMFailoverProxyProvider.getProxyInternal()Ljava/lang/Object; from class org.apache.hadoop.yarn.client.RequestHedgingRMFailoverProxyProvider
刪除/重新命名
rm -f /root/flink-1.7.1/lib/flink-shaded-hadoop2-uber-1.7.1.jar
新增mapreduce的包到yarn應用classpath
進入ambari介面,service->yarn->config->advanced->Advanced yarn-site->yarn.application.classpath新增
/usr/hdp/current/hadoop-mapreduce-client/*,/usr/hdp/current/hadoop-mapreduce-client/lib/*
修改後,需要重啟yarn相關元件,ambari介面會有指示如何重啟,一鍵搞定.
Flink日誌配置
Flink預設包含兩種配置方式:log4j以及logback
不配置的情況下執行flink叢集或者執行flink job會提示建議移除其中一種。
org.apache.flink.yarn.AbstractYarnClusterDescriptor- The configuration directory ('/root/flink-1.7.1/conf') contains both LOG4J and Logback configuration files. Please delete or rename one of them.
直接移除或者重新命名都可行。
例如:
$ mv logback.xml logback.xml_bak
示例配置:
vi /usr/local/flink-1.3.3/conf/log4j.properties log4j.appender.file.append=true log4j.appender.file.MaxFileSize=100M#最大檔案大小 log4j.appender.file.MaxBackupIndex=10# 最大備份索引大小
啟動Flink
本節主要介紹Flink的兩種啟動方式。
啟動一個長期執行的Flink叢集
啟動一個長期執行的flink叢集通過yarn-session.sh執行部署。
yarn-session.sh使用指南
$ ./bin/yarn-session.sh Usage: Required -n,--container <arg>Number of YARN container to allocate (=Number of Task Managers) Optional -D <property=value>use value for given property -d,--detachedIf present, runs the job in detached mode -h,--helpHelp for the Yarn session CLI. -id,--applicationId <arg>Attach to running YARN session -j,--jar <arg>Path to Flink jar file -jm,--jobManagerMemory <arg>Memory for JobManager Container with optional unit (default: MB) -m,--jobmanager <arg>Address of the JobManager (master) to which to connect. Use this flag to connect to a different JobManager than the one specified in the configuration. -n,--container <arg>Number of YARN container to allocate (=Number of Task Managers) -nl,--nodeLabel <arg>Specify YARN node label for the YARN application -nm,--name <arg>Set a custom name for the application on YARN -q,--queryDisplay available YARN resources (memory, cores) -qu,--queue <arg>Specify YARN queue. -s,--slots <arg>Number of slots per TaskManager -sae,--shutdownOnAttachedExitIf the job is submitted in attached mode, perform a best-effort cluster shutdown when the CLI is terminated abruptly, e.g., in response to a user interrupt, such as typing Ctrl + C. -st,--streamingStart Flink in streaming mode -t,--ship <arg>Ship files in the specified directory (t for transfer) -tm,--taskManagerMemory <arg>Memory per TaskManager Container with optional unit (default: MB) -yd,--yarndetachedIf present, runs the job in detached mode (deprecated; use non-YARN specific option instead) -z,--zookeeperNamespace <arg>Namespace to create the Zookeeper sub-paths for high availability mode
主要引數講解:
1、-n 指定TaskManager數量
2、-jm 指定JobManager使用記憶體
3、-m 指定JobManager地址
4、-tm 指定TaskManager使用記憶體
5、-D 指定動態引數
6、-d 客戶端分離,指定後YarnSession部署到yarn之後,客戶端會自行關閉。
7、-j 指定執行jar包
部署一個長期執行的Flink on Yarn例項
bin/yarn-session.sh -n 8 -s 5 -jm 2048 -tm 4096 -nm pinpoint-flink-job
例項說明:
-
8個TaskManager
-
每個TaskManager5個slot
-
每個TaskManager記憶體4g,
-
指定application名稱為pinpoint-flink-job
注意:部署長期執行的flink on yarn例項後,在flink web上看到的TaskManager以及Slots都為0。只有在提交任務的時候,才會依據分配資源給對應的任務執行。
提交Job到長期執行的flink on yarn例項上
執行任務提交命令:
$ bin/flink run ./examples/batch/WordCount.jar --input hdfs://xdata2/tmp/LICENSE-2.0.txt --output hdfs://xdata2/tmp/wordcount_result.txt
指定輸入檔案:hdfs://xdata2/tmp/LICENSE-2.0.txt
指定輸出檔案:hdfs://xdata2/tmp/wordcount_result.txt
命令執行日誌如下:
SLF4J: Class path contains multiple SLF4J bindings. SLF4J: Found binding in [jar:file:/root/flink-1.7.1/lib/slf4j-log4j12-1.7.10.jar!/org/slf4j/impl/StaticLoggerBinder.class] SLF4J: Found binding in [jar:file:/root/flink-1.7.1/lib/slf4j-log4j12-1.7.15.jar!/org/slf4j/impl/StaticLoggerBinder.class] SLF4J: Found binding in [jar:file:/usr/hdp/2.6.2.0-205/hadoop/lib/slf4j-log4j12-1.7.10.jar!/org/slf4j/impl/StaticLoggerBinder.class] SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation. SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory] 2019-01-24 16:05:26,059 INFOorg.apache.flink.yarn.cli.FlinkYarnSessionCli- Found Yarn properties file under /tmp/.yarn-properties-root. 2019-01-24 16:05:26,059 INFOorg.apache.flink.yarn.cli.FlinkYarnSessionCli- Found Yarn properties file under /tmp/.yarn-properties-root. 2019-01-24 16:05:26,358 INFOorg.apache.flink.yarn.cli.FlinkYarnSessionCli- YARN properties set default parallelism to 40 2019-01-24 16:05:26,358 INFOorg.apache.flink.yarn.cli.FlinkYarnSessionCli- YARN properties set default parallelism to 40 YARN properties set default parallelism to 40 2019-01-24 16:05:26,618 INFOorg.apache.hadoop.yarn.client.AHSProxy- Connecting to Application History server at vigor-dc-38/192.168.2.38:10200 2019-01-24 16:05:26,628 INFOorg.apache.flink.yarn.cli.FlinkYarnSessionCli- No path for the flink jar passed. Using the location of class org.apache.flink.yarn.YarnClusterDescriptor to locate the jar 2019-01-24 16:05:26,628 INFOorg.apache.flink.yarn.cli.FlinkYarnSessionCli- No path for the flink jar passed. Using the location of class org.apache.flink.yarn.YarnClusterDescriptor to locate the jar 2019-01-24 16:05:26,638 INFOorg.apache.hadoop.yarn.client.RequestHedgingRMFailoverProxyProvider- Looking for the active RM in [rm1, rm2]... 2019-01-24 16:05:26,773 INFOorg.apache.hadoop.yarn.client.RequestHedgingRMFailoverProxyProvider- Found active RM [rm1] 2019-01-24 16:05:26,779 INFOorg.apache.flink.yarn.AbstractYarnClusterDescriptor- Found application JobManager host name 'vigor-dc-41' and port '39925' from supplied application id 'application_1548213441093_0011' 2019-01-24 16:05:27,186 WARNorg.apache.hadoop.hdfs.shortcircuit.DomainSocketFactory- The short-circuit local reads feature cannot be used because libhadoop cannot be loaded. Starting execution of program Program execution finished Job with JobID 7ab3cf90748c8d05c7aa2e7cbce85730 has finished. Job Runtime: 8979 ms
提交後可以在Flink web頁面上看到提交的任務資訊及執行情況。
檢查任務結果
使用hadoop命令查詢執行結果資訊
[root@vigor-dc-38 flink-1.7.1]# hadoop fs -cat /tmp/wordcount_result.txt ... above 1 acceptance 1 accepting 3 act 1 acting 1 acts 1 add 2 addendum 1 additional 5 additions 1 advised 1 against 2 agree 1 agreed 3 agreement 1 all 3 ...
在Yarn上執行單個Flink任務
若你想在Yarn上啟動Flink用於單獨任務執行,可以直接通過 bin/flink run
的方式來實現。
示例:
./bin/flink run -m yarn-cluster -yn 2 ./examples/batch/WordCount.jar
Yarn會話的命令列選項也可以用於./bin/flink。使用y或yarn(對於長引數選項)作為字首。
命令執行後,yarn會為任務單獨啟動一個flink on yarn例項,用於執行flink任務,在flink web介面上可以看到該任務。
檢視後段執行結果:
Printing result to stdout. Use --output to specify output path. (a,5) (action,1) (after,1) (against,1) (all,2) (and,12) (arms,1) (arrows,1) ...
總結
Flink on Yarn兩種部署方式可以根據自身的需求自行選擇。可選擇單獨一種,也可以兩種結合使用。
重要任務建議單獨執行一個例項,其他的任務可以使用長時間執行方式,將多個任務部署到上面,不用到時候資源也會得到釋放。
Standalone模式在後續的文章補上。
參考連結
https://ci.apache.org/projects/flink/flink-docs-stable/ops/deployment/hadoop.html
https://ci.apache.org/projects/flink/flink-docs-release-1.7/ops/deployment/yarn_setup.html
https://ci.apache.org/projects/flink/flink-docs-release-1.7/ops/jobmanager_high_availability.html
關注公眾號