1. 程式人生 > >flink on yarn叢集搭建

flink on yarn叢集搭建

前面一篇部落格中已經搭建了flink Standalone的叢集,需要的可以進去看一下,今天主要來說一下flink on yarn 叢集的搭建以及怎麼提交任務.這篇部落格寫的比較詳細,內容較多,希望大家耐心看完,都是乾貨.

版本資訊:

flink-1.6.0

zookeeper-3.4.10

hadoop-2.9.0

zk和hadoop叢集的搭建,今天這裡就不說了,主要說一下flink的.

(1). vi flink-conf.yaml (裡面具體的引數大小可以根據自己的機器實際情況而定)

env.java.home: /home/jason/bigdata/jdk/jdk1.8.0_11
jobmanager.rpc.address: master
taskmanager.heap.size: 4096
taskmanager.numberOfTaskSlots: 4
#parallelism.default: 8
taskmanager.tmp.dirs: /tmp
jobmanager.heap.size: 1024
jobmanager.web.port: 8085
jobmanager.rpc.port: 6123
state.backend: filesystem
state.backend.fs.checkpointdir: hdfs:///flink/checkpoints
#taskmanager.network.numberOfBuffers: 64000
fs.hdfs.hadoopconf: /home/jason/bigdata/hadoop/hadoop-2.9.0/etc/hadoop
recovery.mode: zookeeper
recovery.zookeeper.quorum: master:2181,storm1:2181,storm2:2181
recovery.zookeeper.storageDir: hdfs:///flink/recovery
recovery.zookeeper.path.root: /flinkOnYarn
recovery.zookeeper.path.namespace: /cluster_yarn
yarn.application-attempts: 4
akka.watch.heartbeat.interval: 5 s
akka.watch.heartbeat.pause: 20 s
akka.ask.timeout: 60 s
akka.framesize: 20971520b

注意冒號後面一定要有空格

(2).vi masters (這個裡面寫主節點和埠號)

master:8082

(3).vi slaves (把所有的子節點都寫上)

storm1
storm2

(4).scp -r flink [email protected]:/home/jason/bigdata/,scp -r flink [email protected]:/home/jason/bigdata/ (把flink資料夾複雜到子節點)

(5).接下來就可以啟動flink叢集了,這裡特別說明一下flink on yarn的啟動模式

Flink on Yarn

   Flink提供了兩種yarn的部署方式:

1.Start a long-running Flink cluster on YARN     通過命令yarn-session.sh來實現,本質上是在yarn叢集上啟動一個flink叢集。 由yarn預先給flink叢集分配若干個container給flink使用,在yarn的介面上只能看到一個Flink session with X TaskManagers的任務。只有一個Flink介面,可以從Yarn的ApplicationMaster連結進入。使用bin/flink run命令釋出任務時,本質上是使用Flink自帶的排程,與普通的在Flink叢集上釋出任務並沒有不同。不同的任務可能在一個TaskManager中,也即是在一個JVM程序中,無法實現資源隔離。

2.Run a Flink job on YARN     通過命令bin/flink run -m yarn-cluster實現,一次只發佈一個任務,本質上給每個flink任務啟動了一個叢集。yarn不事先給flink分配container,而是在任務釋出時,啟動JobManager(對應Yarn的AM)和TaskManager,如果一個任務指定了n個TaksManager(-yn n),則會啟動n+1個Container,其中一個是JobManager。釋出m個應用,則有m個Flink介面,對比方式一,同樣釋出m個應用,會多出m-1個JobManager的。釋出任務時,實際上是使用了Yarn的呼叫。不同的任務不可能在一個Container(JVM)中,也即是實現了資源隔離。

啟動yarn-session的引數解釋如下:

-n(--container)	taskmanager的數量	 
-s(--slots)	用啟動應用所需的slot數量/ -s 的值向上取整,有時可以多一些taskmanager,做冗餘 每個taskmanager的slot數量,預設一個slot一個core,預設每個taskmanager的slot的個數為1	6~10
-jm	jobmanager的記憶體(單位MB)	3072
-tm	每個taskmanager的記憶體(單位MB)根據core 與記憶體的比例來設定,-s的值* (core與記憶體的比)來算
-nm	yarn 的appName(現在yarn的ui上的名字)|	 
-d	後臺執行

啟動flink應用的引數解釋如下

-j	執行flink 應用的jar所在的目錄
-a	執行flink 應用的主方法的引數
-p	執行flink應用的並行度
-c	執行flink應用的主類, 可以通過在打包設定主類
-nm	flink 應用名字,在flink-ui 上面展示
-d	後臺執行
--fromsavepoint	flink 應用啟動的狀態恢復點

接下來我們分別啟動這兩種模式看一下效果:

先啟動yarn-session模式:

命令:./yarn-session.sh -n 8 -jm 1024 -tm 1024 -s 4 -nm FlinkOnYarnSession -d,表示啟動8個TaskManager,每個記憶體為1G,slots數為4個的Flink叢集。如下圖所以:

可以看到會在yarn上啟動一個長期執行的程式,除非手動殺死,是不會停的,其實這個可以理解為我們啟動了一個很大的容器,然後可以在這個容器裡面提交多個任務.

然後我們提交一個job上去,

命令:./flink run -c flink.flinkStreaming /home/jason/bigdata/jar/bj_parse_json-1.0-SNAPSHOT.jar 

點開Tracking UI中的ApplicationMaster,可以看到Flink的頁面,如下圖所示:

這裡需要注意一下如果沒有提交job,只啟動yarn-session的話,開啟flink的ui介面是看不到上面箭頭所指的資源數的.只有提交一個job後,才會顯示.上圖是我提交了一個job,當然還可以提交多個job到這個容器中執行.這種方式沒有實現資源的隔離.

Flink頁面中能看到目前只啟動了一個TaskMananger(一個JVM程序),並且有FreeSlot,新啟動的Flink Job會在這些slots中啟動,直到沒有更多FreeSlots了才會分配新的TaskMananger。

單個任務模式

提交命令:./flink run -m yarn-cluster -yn 1 -yjm 1024 -ytm 1024 /home/jason/bigdata/bj_parse_json-1.0-SNAPSHOT.jar

./flink run -m yarn-cluster -c flink.flinkStreamingJason -yn 2 -ys 4  -yjm 1024 -ytm 1024 /home/jason/bigdata/jar/bj_parse_json-1.0-SNAPSHOT.jar &

可以看到yarn上面顯示兩個job點進入看flink的ui介面如下:

這個地方還有點問題,理論上應該顯示兩個TaskMananger才對,但是我jps看程序是有兩個YarnSessionClusterEntrypoint的,還需要查詢一下原因.這種方式實現了資源的隔離.

slots數的指定:對於standalone cluster而言,由於一臺機器上只有一個TaskManager,slots數應與機器核數相同。對於single job on yarn模式和yarn cluster模式而言,一臺機器上可能有多個TaskManager(取決於yarn在該機器上分配的container數),理論上應該與該Container分配的核數一致為佳。 Flink命令執行後,在任務執行完之前不會返回,控制檯是不能退出的。可以在命令後加-d 引數,表示detached,但此時無法再通過flink命令結束任務,需要通過yarn命令yarn application -kill < appId > 結束任務。