(四): Flink1.6.1 standalone叢集模式安裝部署,幾個常用引數配置
NameNode檢查點異常
安裝flink之前,觀察到一個NameNode檢查點異常:
沒去找官方解釋,按照此文的第4點解決了問題:
namenode的Cluster ID 與 secondnamenode的Cluster ID 不一致,對比/dfs/nn/current/VERSION 和/dfs/snn/current/VERSION中的Cluster ID 來確認,如果不一致改成一致後重啟應該可以解決
- 備份/dfs/snn/current/VERSION
- 修改/dfs/snn/current/VERSION中的Cluster ID與/dfs/nn/current/VERSION中的一致
- 重啟namenode
- 刪除備份
但是隨後發現,該方式治標不治本。
還有一個節點今天發現了Input/output error錯誤,linux-like相關erros解決中的第二個問題,暫時沒有解決,也沒有去機房重啟(估計重啟後問題更多)。
希望有經驗的前輩可以指點一二。
Flink1.6.1安裝
要求: Java 8.x
【補充:執行Flink程式,使用Flink的話,Hadoop並不是必須的,hadoop版本需要低於等於2.8】
【我這是jdk1.8.0_131+hadoop2.6.0+cdh5.13.2+2687,所以我下載的是】
linux上安裝:
- 從下載頁面下載二進位制檔案。
如果您計劃將Apache Flink與Apache Hadoop一起使用(在YARN上執行Flink,連線到HDFS,連線到HBase,或使用一些基於Hadoop的檔案系統聯結器),確保選擇與您的Hadoop版本匹配的Flink包
。
wget -c http://mirrors.shu.edu.cn/apache/flink/flink-1.6.1/flink-1.6.1-bin-hadoop26-scala_2.11.tgz
- 解壓
tar -zxf flink-1.6.1-bin-hadoop26-scala_2.11.tgz
可以看出,進入18年,Flink社群特別活躍:
“quickstart” 頁面的後續操作不建議執行了,是本地模式,接下來看看叢集模式安裝。
- standalone cluster
doc地址,因為最近有一兩個NodeManager不穩定,所以就先試試standalone cluster。YARN cluster會麻煩一些。
前提
- jdk 1.8+
- 每臺機器已經設定JAVA_HOME
- ssh免密登入
配置Flink
配置檔案位於conf/下:
配置master節點
- 選擇一個節點作為master節點(JobManager),在conf/flink-conf.yaml中設定jobmanager.rpc.address 配置項為該節點的IP或者主機名。
- 確保所有節點有有一樣的jobmanager.rpc.address 配置。
注意!
- 此設定適用於standalone模式
- 該值可能會被JobManager節點的可執行檔案bin/jobmanager.sh指定的- -host <hostname>引數覆蓋
- 在高可用性模式下,如果你使用bin/start-cluster.sh指令碼(來啟動),並且在conf/masters檔案設定了(多個節點),(那麼,使用哪個節點作為Jobmanager)是自動處理的。
JVM記憶體
- jobmanager.heap.size
- taskmanager.heap.size
(如果是YARN,這兩個值自動配置為TaskManager的YARN容器的大小,減去一定的容差值) - 以MB為單位
slaves
- 與HDFS配置類似,編輯檔案conf/slaves並輸入每個工作節點的IP/主機名。每個工作節點稍後將執行TaskManager。
- master負擔重的話,依然可以選擇master不作為TaskManager節點(去掉localhost)。
下面是以三個節點為例的配置示意圖:
taskmanager.numberOfTaskSlots
- 如果此值大於1,TaskManager可以使用多個CPU核心,單個TaskManager會將獲取函式或運算子並行執行。但同時,可用記憶體是公用的。此值通常與TaskManager的計算機具有的物理CPU核心數成比例(例如,等於核心數,或核心數的一半)。
- 這裡,我設定為4
jobstore.cache
- 作業的快取大小,預設52428800(以位元組為單位),也就是50M
- 我這裡,機器記憶體48G,我設定為300M:314527800
- 預設配置檔案裡面沒有,需要自己新增
臨時I/O目錄
- 記憶體不夠用時,寫入到taskmanager.tmp.dirs指定的目錄中
- 如果未顯式指定引數,Flink會將臨時資料寫入作業系統的臨時目錄,例如Linux系統中的/ tmp
暫時先處理這些配置,以後用到了,再補充。
更多的配置資訊見配置頁面
啟動叢集
注意!
啟動指令碼前,還需要配置HADOOP_CONF_DIR
否則:
vim /etc/profile
export HADOOP_CONF_DIR=/etc/hadoop/conf
source /etc/profile
bin/start-cluster.sh
- 在master節點上執行該指令碼啟動JobManager,它會並通過SSH連線到從slaves檔案中列出的所有工作節點,以在相應節點上啟動TaskManager。
- JobManager程序通過配置好的RPC埠(預設6123)來接收Job的提交的作業。
- 停止Flink:
bin/stop-cluster.sh
- web dashboard(jobmanager:8081)
目前dashboard看得出來功能不多,但是簡潔明瞭;
但是,這幾個數字怎麼算的?
將JobManager / TaskManager例項新增到群集
我的理解是,這些指令碼的應用場景是:
- JobManager 或者TaskManager(HA)因為某些原因退出了叢集,我們需要單獨啟動
- 單獨stop某一節點上的程序
新增JobManager
bin/jobmanager.sh ((start|start-foreground) cluster)|stop|stop-all
新增TaskManager
bin/taskmanager.sh start|start-foreground|stop|stop-all
bin下面的指令碼:
Maven依賴
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>1.6.1</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_2.11</artifactId>
<version>1.6.1</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_2.11</artifactId>
<version>1.6.1</version>
</dependency>
下篇文章寫兩個demo提交到叢集進行測試。