1. 程式人生 > >Flink編程入門(二)

Flink編程入門(二)

detach 文件中 bili set ger led http boa 示例

Flink 有三種部署模式,分別是 LocalStandalone Cluster Yarn Cluster

1.1. Local模式

對於 Local 模式來說,JobManager TaskManager 會公用一個 JVM 來完成 Workload。如果要驗證一個簡單的應用,Local 模式是最方便的。實際應用中大多使用 Standalone 或者 Yarn Cluster,而local模式只是將安裝包解壓啟動(./bin/start-local.sh)即可,在這裏不在演示。

1.2. Standalone 模式

1.2.1. 下載

安裝包下載地址:http://flink.apache.org/downloads.html

快速入門教程地址:

https://ci.apache.org/projects/flink/flink-docs-release-1.3/quickstart/setup_quickstart.html

1.2.2. 上傳安裝包到linux系統

使用rz命令

1.2.3. 解壓

tar –zxvf flink-1.3.2-bin-hadoop26-scala_2.10.tgz

1.2.4. 重命名

mv flink-1.3.2 flink

1.2.5. 修改環境變量

切換到root用戶配置

export FLINK_HOME=/home/hadoop/flink

export PATH=$PATH:$FLINK_HOME/bin

配置結束後切換會普通用戶

source /etc/profile

1.2.6. 修改配置文件

修改flink/conf/masters

master1:8081

修改flink/conf/slaves

master1ha

master2

master2ha

修改flink/conf/flink-conf.yaml

taskmanager.numberOfTaskSlots: 2

jobmanager.rpc.address: master1

1.2.7. 啟動flink

/home/Hadoop/flink/bin/start-cluster.sh

1.2.8. Flink Rest API

Flink 和其他大多開源的框架一樣,提供了很多有用的 Rest API。不過 Flink RestAPI,目前還不是很強大,只能支持一些 Monitor 的功能。Flink Dashboard 本身也是通過其 Rest 來查詢各項的結果數據。在 Flink RestAPI 基礎上,可以比較容易的將 Flink Monitor 功能和其他第三方工具相集成,這也是其設計的初衷。

Flink 的進程中,是由 JobManager 來提供 Rest API 的服務。因此在調用 Rest 之前,要確定 JobManager 是否處於正常的狀態。正常情況下,在發送一個 Rest 請求給 JobManager 之後,Client 就會收到一個 JSON 格式的返回結果。由於目前 Rest 提供的功能還不多,需要增強這塊功能的讀者可以在子項目 flink-runtime-web 中找到對應的代碼。其中最關鍵一個類 WebRuntimeMonitor,就是用來對所有的 Rest 請求做分流的,如果需要添加一個新類型的請求,就需要在這裏增加對應的處理代碼。下面我例舉幾個常用 Rest API

1.查詢 Flink 集群的基本信息: /overview。示例命令行格式以及返回結果如下:

$ curl http://localhost:8081/overview

{"taskmanagers":1,"slots-total":16,

"slots-available":16,"jobs-running":0,"jobs-finished":0,"jobs-cancelled":0,"jobs-failed":0}

2.查詢當前 Flink 集群中的 Job 信息:/jobs。示例命令行格式以及返回結果如下:

$ curl http://localhost:8081/jobs

{"jobs-running":[],"jobs-finished":

["f91d4dd4fdf99313d849c9c4d29f8977"],"jobs-cancelled":[],"jobs-failed":[]}

3.查詢一個指定的 Job 信息: /jobs/jobid。這個查詢的結果會返回特別多的詳細的內容,這是我在瀏覽器中進行的測試,如下圖:

想要了解更多 Rest 請求內容的讀者,可以去 Apache Flink 的頁面中查找。

1.2.9. 運行測試任務

./bin/flink run -m master1:8082 ./examples/batch/WordCount.jar --input hdfs://master1:9000/words.txt --output hdfs://master1:9000/clinkout

1.3. Flink HA

首先,我們需要知道 Flink 有兩種部署的模式,分別是 Standalone 以及 Yarn Cluster 模式。對於 Standalone 來說,Flink 必須依賴於 Zookeeper 來實現 JobManager HAZookeeper 已經成為了大部分開源框架 HA 必不可少的模塊)。在 Zookeeper 的幫助下,一個 Standalone Flink 集群會同時有多個活著的 JobManager,其中只有一個處於工作狀態,其他處於 Standby 狀態。當工作中的 JobManager 失去連接後(如宕機或 Crash),Zookeeper 會從 Standby 中選舉新的 JobManager 來接管 Flink 集群。

對於 Yarn Cluaster 模式來說,Flink 就要依靠 Yarn 本身來對 JobManager HA 了。其實這裏完全是 Yarn 的機制。對於 Yarn Cluster 模式來說,JobManager TaskManager 都是被 Yarn 啟動在 Yarn Container 中。此時的 JobManager,其實應該稱之為 Flink Application Master。也就說它的故障恢復,就完全依靠著 Yarn 中的 ResourceManager(和 MapReduce AppMaster 一樣)。由於完全依賴了 Yarn,因此不同版本的 Yarn 可能會有細微的差異。這裏不再做深究。

1.3.1. 修改配置文件

修改flink-conf.yaml

state.backend: filesystem

state.backend.fs.checkpointdir: hdfs://master1:9000/flink-checkpoints

high-availability: zookeeper

high-availability.storageDir: hdfs://master1:9000/flink/ha/

high-availability.zookeeper.quorum: master1ha:2181,master2:2181,master2ha:2181

high-availability.zookeeper.client.acl: open

修改conf

server.1=master1ha:2888:3888

server.2=master2:2888:3888

server.3=master2ha:2888:3888

修改masters

master1:8082

master1ha:8082

修改slaves

master1ha

master2

master2ha

1.3.2. 啟動

/home/Hadoop/flink/bin/start-cluster.sh

1.4. Yarn Cluster 模式

1.4.1. 引入

在一個企業中,為了最大化的利用集群資源,一般都會在一個集群中同時運行多種類型的 Workload。因此 Flink 也支持在 Yarn 上面運行。首先,讓我們通過下圖了解下 Yarn Flink 的關系。

在圖中可以看出,Flink Yarn 的關系與 MapReduce Yarn 的關系是一樣的。Flink 通過 Yarn 的接口實現了自己的 App Master。當在 Yarn 中部署了 FlinkYarn 就會用自己的 Container 來啟動 Flink JobManager(也就是 App Master)和 TaskManager

1.4.2. 修改環境變量

export HADOOP_CONF_DIR= /home/hadoop/hadoop/etc/hadoop

1.4.3. 部署啟動

yarn-session.sh -d -s 2 -tm 800 -n 2

上面的命令的意思是,同時向Yarn申請3container,其中 2 Container 啟動 TaskManager-n 2),每個 TaskManager 擁有兩個 Task Slot-s 2),並且向每個 TaskManager Container 申請 800M 的內存,以及一個ApplicationMasterJob Manager)。

Flink部署到Yarn Cluster後,會顯示Job Manager的連接細節信息。

Flink on Yarn會覆蓋下面幾個參數,如果不希望改變配置文件中的參數,可以動態的通過-D選項指定,如 -Dfs.overwrite-files=true -Dtaskmanager.network.numberOfBuffers=16368

jobmanager.rpc.address:因為JobManager會經常分配到不同的機器上

taskmanager.tmp.dirs:使用Yarn提供的tmp目錄

parallelism.default:如果有指定slot個數的情況下

yarn-session.sh會掛起進程,所以可以通過在終端使用CTRL+C或輸入stop停止yarn-session

如果不希望Flink Yarn client長期運行,Flink提供了一種detached YARN session,啟動時候加上參數-d—detached

在上面的命令成功後,我們就可以在 Yarn Application 頁面看到 Flink 的紀錄。如下圖。

如果在虛擬機中測試,可能會遇到錯誤。這裏需要註意內存的大小,Flink Yarn 會申請多個 Container,但是 Yarn 的配置可能限制了 Container 所能申請的內存大小,甚至 Yarn 本身所管理的內存就很小。這樣很可能無法正常啟動 TaskManager,尤其當指定多個 TaskManager 的時候。因此,在啟動 Flink 之後,需要去 Flink 的頁面中檢查下 Flink 的狀態。這裏可以從 RM 的頁面中,直接跳轉(點擊 Tracking UI)。這時候 Flink 的頁面如圖

yarn-session.sh啟動命令參數如下:

Usage:

Required

-n,--container <arg> Number of YARN container to allocate (=Number of Task Managers)

Optional

-D <arg> Dynamic properties

-d,--detached Start detached

-jm,--jobManagerMemory <arg> Memory for JobManager Container [in MB]

-nm,--name Set a custom name for the application on YARN

-q,--query Display available YARN resources (memory, cores)

-qu,--queue <arg> Specify YARN queue.

-s,--slots <arg> Number of slots per TaskManager

-st,--streaming Start Flink in streaming mode

-tm,--taskManagerMemory <arg> Memory per TaskManager Container [in MB]

1.4.4. 提交任務

之後,我們可以通過這種方式提交我們的任務

./bin/flink run -m yarn-cluster -yn 2 ./examples/batch/WordCount.jar

以上命令在參數前加上y前綴,-yn表示TaskManager個數。

在這個模式下,同樣可以使用-m yarn-cluster提交一個"運行後即焚"detached yarn-yd)作業到yarn cluster

1.4.5. 停止yarn cluster

yarn application -kill application_1507603745315_0001

Flink編程入門(二)