1. 程式人生 > >Flink on Yarn三部曲之三:提交Flink任務

Flink on Yarn三部曲之三:提交Flink任務

### 歡迎訪問我的GitHub [https://github.com/zq2599/blog_demos](https://github.com/zq2599/blog_demos) 內容:所有原創文章分類彙總及配套原始碼,涉及Java、Docker、Kubernetes、DevOPS等; 本文是《Flink on Yarn三部曲》系列的終篇,先簡單回顧前面的內容: 1. 《Flink on Yarn三部曲之一:準備工作》:準備好機器、指令碼、安裝包; 2. 《Flink on Yarn三部曲之二:部署和設定》:完成CDH和Flink部署,並在管理頁面做好相關的設定; 現在Flink、Yarn、HDFS都就緒了,接下來實踐提交Flink任務到Yarn執行; ### 全文連結 1. [《Flink on Yarn三部曲之一:準備工作》](https://xinchen.blog.csdn.net/article/details/105356306) 2. [《Flink on Yarn三部曲之二:部署和設定》](https://xinchen.blog.csdn.net/article/details/105356347) 3. [《Flink on Yarn三部曲之三:提交Flink任務》](https://xinchen.blog.csdn.net/article/details/105356399) ### 兩種Flink on YARN模式 實踐之前,對Flink on YARN先簡單瞭解一下,如下圖所示,Flink on Yarn在使用的時候分為兩種模式,Job Mode和Session Mode: ![在這裡插入圖片描述](https://img2020.cnblogs.com/other/485422/202010/485422-20201021074509377-846294365.png) Session Mode:在YARN中提前初始化一個Flink叢集,以後所有Flink任務都提交到這個叢集,如下圖: ![在這裡插入圖片描述](https://img2020.cnblogs.com/other/485422/202010/485422-20201021074509814-1411148898.png) Job Mode:每次提交Flink任務都會建立一個專用的Flink叢集,任務完成後資源釋放,如下圖: ![在這裡插入圖片描述](https://img2020.cnblogs.com/other/485422/202010/485422-20201021074510045-1724714968.png) 接下來分別實戰這兩種模式; ### 準備實戰用的資料(CDH伺服器) 接下來提交的Flink任務是經典的WordCount,先在HDFS中準備一份文字檔案,後面提交的Flink任務都會讀取這個檔案,統計裡面每個單詞的數字,準備文字的步驟如下: 1. SSH登入CDH伺服器; 2. 切換到hdfs賬號:su - hdfs 3. 下載實戰用的txt檔案: ```shell wget https://github.com/zq2599/blog_demos/blob/master/files/GoneWiththeWind.txt ``` 4. 建立hdfs資料夾:hdfs dfs -mkdir /input 5. 將文字檔案上傳到/input目錄:hdfs dfs -put ./GoneWiththeWind.txt /input 準備工作完成,可以提交任務試試了。 ### Session Mode實戰 1. SSH登入CDH伺服器; 2. 切換到hdfs賬號:su - hdfs 3. 進入目錄:/opt/flink-1.7.2/ 4. 執行如下命令建立Flink叢集,-n引數表示TaskManager的數量,-jm表示JobManager的記憶體大小,-tm表示每個TaskManager的記憶體大小: ```shell ./bin/yarn-session.sh -n 2 -jm 1024 -tm 1024 ``` 5. 建立成功後,控制檯輸出如下圖,注意紅框中的提示,表明可以通過38301埠訪問Flink: ![在這裡插入圖片描述](https://img2020.cnblogs.com/other/485422/202010/485422-20201021074511142-1725380687.png) 6. 瀏覽器訪問CDH伺服器的38301埠,可見Flink服務已經啟動: ![在這裡插入圖片描述](https://img2020.cnblogs.com/other/485422/202010/485422-20201021074511639-1161626983.png) 7. 瀏覽器訪問CDH伺服器的8088埠,可見YARN的Application(即Flink叢集)建立成功,如下圖,紅框中是任務ID,稍後結束Application的時候會用到此ID: ![在這裡插入圖片描述](https://img2020.cnblogs.com/other/485422/202010/485422-20201021074512477-273168465.png) 8. 再開啟一個終端,SSH登入CDH伺服器,切換到hdfs賬號,進入目錄:/opt/flink-1.7.2 9. 執行以下命令,就會提交一個Flink任務(安裝包自帶的WordCount例子),並指明將結果輸出到HDFS的wordcount-result.txt檔案中: ```shell bin/flink run ./examples/batch/WordCount.jar \ -input hdfs://192.168.50.134:8020/input/GoneWiththeWind.txt \ -output hdfs://192.168.50.134:8020/wordcount-result.txt ``` 10. 執行完畢後,控制檯輸出如下: ![在這裡插入圖片描述](https://img2020.cnblogs.com/other/485422/202010/485422-20201021074513166-459589503.png) 11. flink的WordCount任務結果儲存在hdfs,我們將結果取出來看看:hdfs dfs -get /wordcount-result.txt 12. vi開啟wordcount-result.txt檔案,如下圖,可見任務執行成功,指定文字中的每個單詞數量都統計出來了: ![在這裡插入圖片描述](https://img2020.cnblogs.com/other/485422/202010/485422-20201021074513630-1871727754.png) 13. 瀏覽器訪問Flink頁面(CDH伺服器的38301埠),也能看到任務的詳細情況: ![在這裡插入圖片描述](https://img2020.cnblogs.com/other/485422/202010/485422-20201021074514005-1876168239.png) 14. 銷燬這個Flink叢集的方法是在控制檯執行命令:yarn application -kill application_1580173588985_0002 ![在這裡插入圖片描述](https://img2020.cnblogs.com/other/485422/202010/485422-20201021074514595-283277234.png) Session Mode的實戰就完成了,接下來我們來嘗試Job Mode; ### Job Mode 1. 執行以下命令,建立一個Flink叢集,該叢集只用於執行引數中指定的任務(wordCount.jar),結果輸出到hdfs的wordcount-result-1.txt檔案: ```shell bin/flink run -m yarn-cluster \ -yn 2 \ -yjm 1024 \ -ytm 1024 \ ./examples/batch/WordCount.jar \ -input hdfs://192.168.50.134:8020/input/GoneWiththeWind.txt \ -output hdfs://192.168.50.134:8020/wordcount-result-1.txt ``` 2. 控制檯輸出如下,表明任務執行完成: ![在這裡插入圖片描述](https://img2020.cnblogs.com/other/485422/202010/485422-20201021074515051-130999244.png) 3. 如果您的記憶體和CPU核數充裕,可以立即執行以下命令再建立一個Flink叢集,該叢集只用於執行引數中指定的任務(wordCount.jar),結果輸出到hdfs的wordcount-result-2.txt檔案: ```shell bin/flink run -m yarn-cluster \ -yn 2 \ -yjm 1024 \ -ytm 1024 \ ./examples/batch/WordCount.jar \ -input hdfs://192.168.50.134:8020/input/GoneWiththeWind.txt \ -output hdfs://192.168.50.134:8020/wordcount-result-2.txt ``` 4. 在YARN管理頁面可見任務已經結束: ![在這裡插入圖片描述](https://img2020.cnblogs.com/other/485422/202010/485422-20201021074515367-505494786.png) 5. 執行命令hdfs dfs -ls /檢視結果檔案,已經成功生成: ![在這裡插入圖片描述](https://img2020.cnblogs.com/other/485422/202010/485422-20201021074515911-153236153.png) 6. 執行命令hdfs dfs -get /wordcount-result-1.txt下載結果檔案到本地,檢查資料正常; 7. 至此,Flink on Yarn的部署、設定、提交都實踐完成,《Flink on Yarn三部曲》系列也結束了,如果您也在學習Flink,希望本文能夠給您一些參考,也建議您根據自身情況和需求,修改ansible指令碼,搭建更適合自己的環境; ### 歡迎關注公眾號:程式設計師欣宸 > 微信搜尋「程式設計師欣宸」,我是欣宸,期待與您一同暢遊Java世界... [https://github.com/zq2599/blog_demos](https://github.com/zq2599/blo