CentOS7.5搭建Flink1.6.1分散式叢集
轉發自:https://www.cnblogs.com/frankdeng/p/9400627.html
一. Flink的下載
安裝包下載地址:http://flink.apache.org/downloads.html ,選擇對應Hadoop的Flink版本下載
[[email protected] software]$ wget http://mirrors.tuna.tsinghua.edu.cn/apache/flink/flink-1.6.1/flink-1.6.1-bin-hadoop27-scala_2.11.tgz [[email protected]software]$ ll -rw-rw-r-- 1 admin admin 301867081 Sep 15 15:47 flink-1.6.1-bin-hadoop27-scala_2.11.tgz
Flink 有三種部署模式,分別是 Local、Standalone Cluster 和 Yarn Cluster。
二. Local模式
對於 Local 模式來說,JobManager 和 TaskManager 會公用一個 JVM 來完成 Workload。如果要驗證一個簡單的應用,Local 模式是最方便的。實際應用中大多使用 Standalone 或者 Yarn Cluster,而local模式只是將安裝包解壓啟動(./bin/start-local.sh)即可,在這裡不在演示。
三. Standalone 模式
快速入門教程地址:https://ci.apache.org/projects/flink/flink-docs-release-1.6/quickstart/setup_quickstart.html
1. 軟體要求
- Java 1.8.x或更高版本,
- ssh(必須執行sshd才能使用管理遠端元件的Flink指令碼)
叢集部署規劃
節點名稱 | master | worker | zookeeper |
node21 | master | zookeeper | |
node22 | master | worker | zookeeper |
node23 | worker | zookeeper |
2. 解壓
[[email protected] software]$ tar zxvf flink-1.6.1-bin-hadoop27-scala_2.11.tgz -C /opt/module/ [[email protected] software]$ cd /opt/module/ [[email protected] module]$ ll drwxr-xr-x 8 admin admin 125 Sep 15 04:47 flink-1.6.1
3. 修改配置檔案
[[email protected] conf]$ ls flink-conf.yaml log4j-console.properties log4j-yarn-session.properties logback.xml masters sql-client-defaults.yaml log4j-cli.properties log4j.properties logback-console.xml logback-yarn.xml slaves zoo.cfg
修改flink/conf/masters,slaves,flink-conf.yaml
[[email protected] conf]$ sudo vi masters node21:8081 [[email protected] conf]$ sudo vi slaves node22 node23 [[email protected] conf]$ sudo vi flink-conf.yaml taskmanager.numberOfTaskSlots:2 jobmanager.rpc.address: node21
可選配置:
- 每個JobManager(
jobmanager.heap.mb
)的可用記憶體量, - 每個TaskManager(
taskmanager.heap.mb
)的可用記憶體量, - 每臺機器的可用CPU數量(
taskmanager.numberOfTaskSlots
), - 叢集中的CPU總數(
parallelism.default
)和 - 臨時目錄(
taskmanager.tmp.dirs
)
4. 拷貝安裝包到各節點
[[email protected] module]$ scp -r flink-1.6.1/ [email protected]:`pwd` [[email protected] module]$ scp -r flink-1.6.1/ [email protected]:`pwd`
5. 配置環境變數
配置所有節點Flink的環境變數
[[email protected] flink-1.6.1]$ sudo vi /etc/profile export FLINK_HOME=/opt/module/flink-1.6.1 export PATH=$PATH:$FLINK_HOME/bin [[email protected] flink-1.6.1]$ source /etc/profile
6. 啟動flink
[[email protected] flink-1.6.1]$ ./bin/start-cluster.sh Starting cluster. Starting standalonesession daemon on host node21. Starting taskexecutor daemon on host node22. Starting taskexecutor daemon on host node23.
jps檢視程序
7. WebUI檢視
8. Flink 的 HA
首先,我們需要知道 Flink 有兩種部署的模式,分別是 Standalone 以及 Yarn Cluster 模式。對於 Standalone 來說,Flink 必須依賴於 Zookeeper 來實現 JobManager 的 HA(Zookeeper 已經成為了大部分開源框架 HA 必不可少的模組)。在 Zookeeper 的幫助下,一個 Standalone 的 Flink 叢集會同時有多個活著的 JobManager,其中只有一個處於工作狀態,其他處於 Standby 狀態。當工作中的 JobManager 失去連線後(如宕機或 Crash),Zookeeper 會從 Standby 中選舉新的 JobManager 來接管 Flink 叢集。
對於 Yarn Cluaster 模式來說,Flink 就要依靠 Yarn 本身來對 JobManager 做 HA 了。其實這裡完全是 Yarn 的機制。對於 Yarn Cluster 模式來說,JobManager 和 TaskManager 都是被 Yarn 啟動在 Yarn 的 Container 中。此時的 JobManager,其實應該稱之為 Flink Application Master。也就說它的故障恢復,就完全依靠著 Yarn 中的 ResourceManager(和 MapReduce 的 AppMaster 一樣)。由於完全依賴了 Yarn,因此不同版本的 Yarn 可能會有細微的差異。這裡不再做深究。
1) 修改配置檔案
修改flink-conf.yaml,HA模式下,jobmanager不需要指定,在master file中配置,由zookeeper選出leader與standby。
#jobmanager.rpc.address: node21 high-availability:zookeeper #指定高可用模式(必須) high-availability.zookeeper.quorum:node21:2181,node22:2181,node23:2181 #ZooKeeper仲裁是ZooKeeper伺服器的複製組,它提供分散式協調服務(必須) high-availability.storageDir:hdfs:///flink/ha/ #JobManager元資料儲存在檔案系統storageDir中,只有指向此狀態的指標儲存在ZooKeeper中(必須) high-availability.zookeeper.path.root:/flink #根ZooKeeper節點,在該節點下放置所有叢集節點(推薦) high-availability.cluster-id:/flinkCluster #自定義叢集(推薦) state.backend: filesystem state.checkpoints.dir: hdfs:///flink/checkpoints state.savepoints.dir: hdfs:///flink/checkpoints
修改conf/zoo.cfg
server.1=node21:2888:3888 server.2=node22:2888:3888 server.3=node23:2888:3888
修改conf/masters
node21:8081 node22:8081
修改slaves
node22 node23
同步配置檔案conf到各節點
2) 啟動HA
先啟動zookeeper叢集各節點(測試環境中也可以用Flink自帶的start-zookeeper-quorum.sh),啟動dfs ,再啟動flink
[[email protected] flink-1.6.1]$ start-cluster.sh
WebUI檢視,這是會自動產生一個主Master,如下
3) 驗證HA
手動殺死node22上的master,此時,node21上的備用master轉為主mater。
4)手動將JobManager / TaskManager例項新增到群集
您可以使用bin/jobmanager.sh
和bin/taskmanager.sh
指令碼將JobManager和TaskManager例項新增到正在執行的叢集中。
新增JobManager
bin/jobmanager.sh ((start|start-foreground) [host] [webui-port])|stop|stop-all
新增TaskManager
bin/taskmanager.sh start|start-foreground|stop|stop-all
[[email protected] flink-1.6.1]$ jobmanager.sh start node22
新新增的為從master。
9. 執行測試任務
[[email protected] flink-1.6.1]$ flink run -m node21:8081 ./examples/batch/WordCount.jar --input /opt/wcinput/wc.txt --output /opt/wcoutput/ [[email protected] flink-1.6.1]$ flink run -m node21:8081 ./examples/batch/WordCount.jar --input hdfs:///user/admin/input/wc.txt --output hdfs:///user/admin/output2
四. Yarn Cluster模式
1. 引入
在一個企業中,為了最大化的利用叢集資源,一般都會在一個叢集中同時執行多種型別的 Workload。因此 Flink 也支援在 Yarn 上面執行。首先,讓我們通過下圖瞭解下 Yarn 和 Flink 的關係。
在圖中可以看出,Flink 與 Yarn 的關係與 MapReduce 和 Yarn 的關係是一樣的。Flink 通過 Yarn 的介面實現了自己的 App Master。當在 Yarn 中部署了 Flink,Yarn 就會用自己的 Container 來啟動 Flink 的 JobManager(也就是 App Master)和 TaskManager。
啟動新的Flink YARN會話時,客戶端首先檢查所請求的資源(容器和記憶體)是否可用。之後,它將包含Flink和配置的jar上傳到HDFS(步驟1)。
客戶端的下一步是請求(步驟2)YARN容器以啟動ApplicationMaster(步驟3)。由於客戶端將配置和jar檔案註冊為容器的資源,因此在該特定機器上執行的YARN的NodeManager將負責準備容器(例如,下載檔案)。完成後,將啟動ApplicationMaster(AM)。
該JobManager和AM在同一容器中執行。一旦它們成功啟動,AM就知道JobManager(它自己的主機)的地址。它正在為TaskManagers生成一個新的Flink配置檔案(以便它們可以連線到JobManager)。該檔案也上傳到HDFS。此外,AM容器還提供Flink的Web介面。YARN程式碼分配的所有埠都是臨時埠。這允許使用者並行執行多個Flink YARN會話。
之後,AM開始為Flink的TaskManagers分配容器,這將從HDFS下載jar檔案和修改後的配置。完成這些步驟後,即可建立Flink並準備接受作業。
2. 修改環境變數
export HADOOP_CONF_DIR= /opt/module/hadoop-2.7.6/etc/hadoop
3. 部署啟動
[[email protected] flink-1.6.1]$ yarn-session.sh -d -s 2 -tm 800 -n 2
-n : TaskManager的數量,相當於executor的數量
-s : 每個JobManager的core的數量,executor-cores。建議將slot的數量設定每臺機器的處理器數量
-tm : 每個TaskManager的記憶體大小,executor-memory
-jm : JobManager的記憶體大小,driver-memory
上面的命令的意思是,同時向Yarn申請3個container,其中 2 個 Container 啟動 TaskManager(-n 2),每個 TaskManager 擁有兩個 Task Slot(-s 2),並且向每個 TaskManager 的 Container 申請 800M 的記憶體,以及一個ApplicationMaster(Job Manager)。
Flink部署到Yarn Cluster後,會顯示Job Manager的連線細節資訊。
Flink on Yarn會覆蓋下面幾個引數,如果不希望改變配置檔案中的引數,可以動態的通過-D選項指定,如 -Dfs.overwrite-files=true -Dtaskmanager.network.numberOfBuffers=16368
jobmanager.rpc.address:因為JobManager會經常分配到不同的機器上
taskmanager.tmp.dirs:使用Yarn提供的tmp目錄
parallelism.default:如果有指定slot個數的情況下
yarn-session.sh會掛起程序,所以可以通過在終端使用CTRL+C或輸入stop停止yarn-session。
如果不希望Flink Yarn client長期執行,Flink提供了一種detached YARN session,啟動時候加上引數-d或—detached
在上面的命令成功後,我們就可以在 Yarn Application 頁面看到 Flink 的紀錄。如下圖。
如果在虛擬機器中測試,可能會遇到錯誤。這裡需要注意記憶體的大小,Flink 向 Yarn 會申請多個 Container,但是 Yarn 的配置可能限制了 Container 所能申請的記憶體大小,甚至 Yarn 本身所管理的記憶體就很小。這樣很可能無法正常啟動 TaskManager,尤其當指定多個 TaskManager 的時候。因此,在啟動 Flink 之後,需要去 Flink 的頁面中檢查下 Flink 的狀態。這裡可以從 RM 的頁面中,直接跳轉(點選 Tracking UI)。這時候 Flink 的頁面如圖
yarn-session.sh啟動命令引數如下:
[[email protected] flink-1.6.1]$ yarn-session.sh --help Usage: Required -n,--container <arg> Number of YARN container to allocate (=Number of Task Managers) Optional -D <property=value> use value for given property -d,--detached If present, runs the job in detached mode -h,--help Help for the Yarn session CLI. -id,--applicationId <arg> Attach to running YARN session -j,--jar <arg> Path to Flink jar file -jm,--jobManagerMemory <arg> Memory for JobManager Container with optional unit (default: MB) -m,--jobmanager <arg> Address of the JobManager (master) to which to connect. Use this flag to connect to a different JobManager than the one specified i n the configuration. -n,--container <arg> Number of YARN container to allocate (=Number of Task Managers) -nl,--nodeLabel <arg> Specify YARN node label for the YARN application -nm,--name <arg> Set a custom name for the application on YARN -q,--query Display available YARN resources (memory, cores) -qu,--queue <arg> Specify YARN queue. -s,--slots <arg> Number of slots per TaskManager -st,--streaming Start Flink in streaming mode -t,--ship <arg> Ship files in the specified directory (t for transfer) -tm,--taskManagerMemory <arg> Memory per TaskManager Container with optional unit (default: MB) -yd,--yarndetached If present, runs the job in detached mode (deprecated; use non-YARN specific option instead) -z,--zookeeperNamespace <arg> Namespace to create the Zookeeper sub-paths for high availability mode
4. 提交任務
之後,我們可以通過這種方式提交我們的任務
[[email protected] flink-1.6.1]$ ./bin/flink run -m yarn-cluster -yn 2 ./examples/batch/WordCount.jar --input /opt/wcinput/wc.txt --output /opt/wcoutput/
以上命令在引數前加上y字首,-yn表示TaskManager個數。
在這個模式下,同樣可以使用-m yarn-cluster提交一個"執行後即焚"的detached yarn(-yd)作業到yarn cluster。
5. 停止yarn cluster
yarn application -kill application_1539058959130_0001
6. Yarn模式的HA
應用最大嘗試次數(yarn-site.xml),您必須配置為嘗試應用的最大數量的設定yarn-site.xml,當前YARN版本的預設值為2(表示允許單個JobManager失敗)。
<property> <name>yarn.resourcemanager.am.max-attempts</name> <value>4</value> <description>The maximum number of application master execution attempts</description> </property>
申請嘗試(flink-conf.yaml),您還必須配置最大嘗試次數conf/flink-conf.yaml
: yarn.application-attempts:10
示例:高度可用的YARN會話
-
配置HA模式和zookeeper法定人數在
conf/flink-conf.yaml
:high-availability: zookeeper high-availability.zookeeper.quorum: node21:2181,node22:2181,node23:2181 high-availability.storageDir: hdfs:///flink/recovery high-availability.zookeeper.path.root: /flink yarn.application-attempts: 10
-
配置ZooKeeper的伺服器中
conf/zoo.cfg
(目前它只是可以執行每臺機器的單一的ZooKeeper伺服器):server.1=node21:2888:3888 server.2=node22:2888:3888 server.3=node23:2888:3888
-
啟動ZooKeeper仲裁:
$ bin / start-zookeeper-quorum.sh
-
啟動HA群集:
$ bin / yarn-session.sh -n 2
五.錯誤異常
1.身份認證失敗
[[email protected] flink-1.6.1]# flink run examples/streaming/SocketWindowWordCount.jar --port 9000 Starting execution of program ------------------------------------------------------------ The program finished with the following exception: org.apache.flink.client.program.ProgramInvocationException: Job failed. (JobID: b7a99ac5db242290413dbebe32ba52b0) at org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:267) at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:486) at org.apache.flink.streaming.api.environment.StreamContextEnvironment.execute(StreamContextEnvironment.java:66) at org.apache.flink.streaming.examples.socket.SocketWindowWordCount.main(SocketWindowWordCount.java:92) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:529) at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:421) at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:426) at org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:804) at org.apache.flink.client.cli.CliFrontend.runProgram(CliFrontend.java:280) at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:215) at org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1044) at org.apache.flink.client.cli.CliFrontend.lambda$main$11(CliFrontend.java:1120) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject.java:422) at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1754) at org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41) at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1120) Caused by: java.net.ConnectException: Connection refused (Connection refused) at java.net.PlainSocketImpl.socketConnect(Native Method) at java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:350) at java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:206) at java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:188) at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392) at java.net.Socket.connect(Socket.java:589) at org.apache.flink.streaming.api.functions.source.SocketTextStreamFunction.run(SocketTextStreamFunction.java:96) at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:87) at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:56) at org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:99) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711) at java.lang.Thread.run(Thread.java:748)
通過檢視日誌,發現有如下報錯
2018-10-20 02:32:19,668 ERROR org.apache.flink.shaded.curator.org.apache.curator.ConnectionState - Authentication failed
解決法案:新增定時任務認證kerberos