Flink編程入門(二)
Flink 有三種部署模式,分別是 Local、Standalone Cluster 和 Yarn Cluster。
1.1. Local模式
對於 Local 模式來說,JobManager 和 TaskManager 會公用一個 JVM 來完成 Workload。如果要驗證一個簡單的應用,Local 模式是最方便的。實際應用中大多使用 Standalone 或者 Yarn Cluster,而local模式只是將安裝包解壓啟動(./bin/start-local.sh)即可,在這裏不在演示。
1.2. Standalone 模式
1.2.1. 下載
安裝包下載地址:http://flink.apache.org/downloads.html
快速入門教程地址:
https://ci.apache.org/projects/flink/flink-docs-release-1.3/quickstart/setup_quickstart.html
1.2.2. 上傳安裝包到linux系統
使用rz命令
1.2.3. 解壓
tar –zxvf flink-1.3.2-bin-hadoop26-scala_2.10.tgz
1.2.4. 重命名
mv flink-1.3.2 flink
1.2.5. 修改環境變量
切換到root用戶配置
export FLINK_HOME=/home/hadoop/flink
export PATH=$PATH:$FLINK_HOME/bin
配置結束後切換會普通用戶
source /etc/profile
1.2.6. 修改配置文件
修改flink/conf/masters
master1:8081
修改flink/conf/slaves
master1ha
master2
master2ha
修改flink/conf/flink-conf.yaml
taskmanager.numberOfTaskSlots: 2
jobmanager.rpc.address: master1
1.2.7. 啟動flink
/home/Hadoop/flink/bin/start-cluster.sh
1.2.8. Flink 的 Rest API
Flink 和其他大多開源的框架一樣,提供了很多有用的 Rest API。不過 Flink 的 RestAPI,目前還不是很強大,只能支持一些 Monitor 的功能。Flink Dashboard 本身也是通過其 Rest 來查詢各項的結果數據。在 Flink RestAPI 基礎上,可以比較容易的將 Flink 的 Monitor 功能和其他第三方工具相集成,這也是其設計的初衷。
在 Flink 的進程中,是由 JobManager 來提供 Rest API 的服務。因此在調用 Rest 之前,要確定 JobManager 是否處於正常的狀態。正常情況下,在發送一個 Rest 請求給 JobManager 之後,Client 就會收到一個 JSON 格式的返回結果。由於目前 Rest 提供的功能還不多,需要增強這塊功能的讀者可以在子項目 flink-runtime-web 中找到對應的代碼。其中最關鍵一個類 WebRuntimeMonitor,就是用來對所有的 Rest 請求做分流的,如果需要添加一個新類型的請求,就需要在這裏增加對應的處理代碼。下面我例舉幾個常用 Rest API。
1.查詢 Flink 集群的基本信息: /overview。示例命令行格式以及返回結果如下:
$ curl http://localhost:8081/overview
{"taskmanagers":1,"slots-total":16,
"slots-available":16,"jobs-running":0,"jobs-finished":0,"jobs-cancelled":0,"jobs-failed":0}
2.查詢當前 Flink 集群中的 Job 信息:/jobs。示例命令行格式以及返回結果如下:
$ curl http://localhost:8081/jobs
{"jobs-running":[],"jobs-finished":
["f91d4dd4fdf99313d849c9c4d29f8977"],"jobs-cancelled":[],"jobs-failed":[]}
3.查詢一個指定的 Job 信息: /jobs/jobid。這個查詢的結果會返回特別多的詳細的內容,這是我在瀏覽器中進行的測試,如下圖:
想要了解更多 Rest 請求內容的讀者,可以去 Apache Flink 的頁面中查找。
1.2.9. 運行測試任務
./bin/flink run -m master1:8082 ./examples/batch/WordCount.jar --input hdfs://master1:9000/words.txt --output hdfs://master1:9000/clinkout
1.3. Flink 的 HA
首先,我們需要知道 Flink 有兩種部署的模式,分別是 Standalone 以及 Yarn Cluster 模式。對於 Standalone 來說,Flink 必須依賴於 Zookeeper 來實現 JobManager 的 HA(Zookeeper 已經成為了大部分開源框架 HA 必不可少的模塊)。在 Zookeeper 的幫助下,一個 Standalone 的 Flink 集群會同時有多個活著的 JobManager,其中只有一個處於工作狀態,其他處於 Standby 狀態。當工作中的 JobManager 失去連接後(如宕機或 Crash),Zookeeper 會從 Standby 中選舉新的 JobManager 來接管 Flink 集群。
對於 Yarn Cluaster 模式來說,Flink 就要依靠 Yarn 本身來對 JobManager 做 HA 了。其實這裏完全是 Yarn 的機制。對於 Yarn Cluster 模式來說,JobManager 和 TaskManager 都是被 Yarn 啟動在 Yarn 的 Container 中。此時的 JobManager,其實應該稱之為 Flink Application Master。也就說它的故障恢復,就完全依靠著 Yarn 中的 ResourceManager(和 MapReduce 的 AppMaster 一樣)。由於完全依賴了 Yarn,因此不同版本的 Yarn 可能會有細微的差異。這裏不再做深究。
1.3.1. 修改配置文件
修改flink-conf.yaml
state.backend: filesystem
state.backend.fs.checkpointdir: hdfs://master1:9000/flink-checkpoints
high-availability: zookeeper
high-availability.storageDir: hdfs://master1:9000/flink/ha/
high-availability.zookeeper.quorum: master1ha:2181,master2:2181,master2ha:2181
high-availability.zookeeper.client.acl: open
修改conf
server.1=master1ha:2888:3888
server.2=master2:2888:3888
server.3=master2ha:2888:3888
修改masters
master1:8082
master1ha:8082
修改slaves
master1ha
master2
master2ha
1.3.2. 啟動
/home/Hadoop/flink/bin/start-cluster.sh
1.4. Yarn Cluster 模式
1.4.1. 引入
在一個企業中,為了最大化的利用集群資源,一般都會在一個集群中同時運行多種類型的 Workload。因此 Flink 也支持在 Yarn 上面運行。首先,讓我們通過下圖了解下 Yarn 和 Flink 的關系。
在圖中可以看出,Flink 與 Yarn 的關系與 MapReduce 和 Yarn 的關系是一樣的。Flink 通過 Yarn 的接口實現了自己的 App Master。當在 Yarn 中部署了 Flink,Yarn 就會用自己的 Container 來啟動 Flink 的 JobManager(也就是 App Master)和 TaskManager。
1.4.2. 修改環境變量
export HADOOP_CONF_DIR= /home/hadoop/hadoop/etc/hadoop
1.4.3. 部署啟動
yarn-session.sh -d -s 2 -tm 800 -n 2
上面的命令的意思是,同時向Yarn申請3個container,其中 2 個 Container 啟動 TaskManager(-n 2),每個 TaskManager 擁有兩個 Task Slot(-s 2),並且向每個 TaskManager 的 Container 申請 800M 的內存,以及一個ApplicationMaster(Job Manager)。
Flink部署到Yarn Cluster後,會顯示Job Manager的連接細節信息。
Flink on Yarn會覆蓋下面幾個參數,如果不希望改變配置文件中的參數,可以動態的通過-D選項指定,如 -Dfs.overwrite-files=true -Dtaskmanager.network.numberOfBuffers=16368
jobmanager.rpc.address:因為JobManager會經常分配到不同的機器上
taskmanager.tmp.dirs:使用Yarn提供的tmp目錄
parallelism.default:如果有指定slot個數的情況下
yarn-session.sh會掛起進程,所以可以通過在終端使用CTRL+C或輸入stop停止yarn-session。
如果不希望Flink Yarn client長期運行,Flink提供了一種detached YARN session,啟動時候加上參數-d或—detached
在上面的命令成功後,我們就可以在 Yarn Application 頁面看到 Flink 的紀錄。如下圖。
如果在虛擬機中測試,可能會遇到錯誤。這裏需要註意內存的大小,Flink 向 Yarn 會申請多個 Container,但是 Yarn 的配置可能限制了 Container 所能申請的內存大小,甚至 Yarn 本身所管理的內存就很小。這樣很可能無法正常啟動 TaskManager,尤其當指定多個 TaskManager 的時候。因此,在啟動 Flink 之後,需要去 Flink 的頁面中檢查下 Flink 的狀態。這裏可以從 RM 的頁面中,直接跳轉(點擊 Tracking UI)。這時候 Flink 的頁面如圖
yarn-session.sh啟動命令參數如下:
Usage:
Required
-n,--container <arg> Number of YARN container to allocate (=Number of Task Managers)
Optional
-D <arg> Dynamic properties
-d,--detached Start detached
-jm,--jobManagerMemory <arg> Memory for JobManager Container [in MB]
-nm,--name Set a custom name for the application on YARN
-q,--query Display available YARN resources (memory, cores)
-qu,--queue <arg> Specify YARN queue.
-s,--slots <arg> Number of slots per TaskManager
-st,--streaming Start Flink in streaming mode
-tm,--taskManagerMemory <arg> Memory per TaskManager Container [in MB]
1.4.4. 提交任務
之後,我們可以通過這種方式提交我們的任務
./bin/flink run -m yarn-cluster -yn 2 ./examples/batch/WordCount.jar
以上命令在參數前加上y前綴,-yn表示TaskManager個數。
在這個模式下,同樣可以使用-m yarn-cluster提交一個"運行後即焚"的detached yarn(-yd)作業到yarn cluster。
1.4.5. 停止yarn cluster
yarn application -kill application_1507603745315_0001
Flink編程入門(二)