1. 程式人生 > >(四): Flink1.6.1 standalone叢集模式安裝部署,幾個常用引數配置

(四): Flink1.6.1 standalone叢集模式安裝部署,幾個常用引數配置

NameNode檢查點異常

安裝flink之前,觀察到一個NameNode檢查點異常:
在這裡插入圖片描述

沒去找官方解釋,按照此文的第4點解決了問題:

namenode的Cluster ID 與 secondnamenode的Cluster ID 不一致,對比/dfs/nn/current/VERSION 和/dfs/snn/current/VERSION中的Cluster ID 來確認,如果不一致改成一致後重啟應該可以解決

  1. 備份/dfs/snn/current/VERSION
  2. 修改/dfs/snn/current/VERSION中的Cluster ID與/dfs/nn/current/VERSION中的一致
  3. 重啟namenode
  4. 刪除備份
    但是隨後發現,該方式治標不治本。

還有一個節點今天發現了Input/output error錯誤linux-like相關erros解決中的第二個問題,暫時沒有解決,也沒有去機房重啟(估計重啟後問題更多)。

希望有經驗的前輩可以指點一二。
在這裡插入圖片描述

Flink1.6.1安裝

要求: Java 8.x
【補充:執行Flink程式,使用Flink的話,Hadoop並不是必須的,hadoop版本需要低於等於2.8
【我這是jdk1.8.0_131+hadoop2.6.0+cdh5.13.2+2687,所以我下載的是

linux上安裝:

  1. 下載頁面下載二進位制檔案。

如果您計劃將Apache Flink與Apache Hadoop一起使用(在YARN上執行Flink,連線到HDFS,連線到HBase,或使用一些基於Hadoop的檔案系統聯結器),確保選擇與您的Hadoop版本匹配的Flink包

wget -c http://mirrors.shu.edu.cn/apache/flink/flink-1.6.1/flink-1.6.1-bin-hadoop26-scala_2.11.tgz

  1. 解壓
    tar -zxf flink-1.6.1-bin-hadoop26-scala_2.11.tgz

可以看出,進入18年,Flink社群特別活躍:
在這裡插入圖片描述

“quickstart” 頁面的後續操作不建議執行了,是本地模式,接下來看看叢集模式安裝。

  1. standalone cluster

doc地址,因為最近有一兩個NodeManager不穩定,所以就先試試standalone cluster。YARN cluster會麻煩一些。

在這裡插入圖片描述

前提

  • jdk 1.8+
  • 每臺機器已經設定JAVA_HOME
  • ssh免密登入

配置Flink

配置檔案位於conf/下:
在這裡插入圖片描述

配置master節點

  • 選擇一個節點作為master節點(JobManager),在conf/flink-conf.yaml中設定jobmanager.rpc.address 配置項為該節點的IP或者主機名。
  • 確保所有節點有有一樣的jobmanager.rpc.address 配置。
    在這裡插入圖片描述

注意!

  • 此設定適用於standalone模式
  • 該值可能會被JobManager節點的可執行檔案bin/jobmanager.sh指定的- -host <hostname>引數覆蓋
  • 在高可用性模式下,如果你使用bin/start-cluster.sh指令碼(來啟動),並且在conf/masters檔案設定了(多個節點),(那麼,使用哪個節點作為Jobmanager)是自動處理的。

JVM記憶體

  • jobmanager.heap.size
  • taskmanager.heap.size
    (如果是YARN,這兩個值自動配置為TaskManager的YARN容器的大小,減去一定的容差值)
  • 以MB為單位

slaves

  • 與HDFS配置類似,編輯檔案conf/slaves並輸入每個工作節點的IP/主機名。每個工作節點稍後將執行TaskManager。
  • master負擔重的話,依然可以選擇master不作為TaskManager節點(去掉localhost)。

下面是以三個節點為例的配置示意圖:
在這裡插入圖片描述

taskmanager.numberOfTaskSlots

  • 如果此值大於1,TaskManager可以使用多個CPU核心,單個TaskManager會將獲取函式或運算子並行執行。但同時,可用記憶體是公用的。此值通常與TaskManager的計算機具有的物理CPU核心數成比例(例如,等於核心數,或核心數的一半)。
  • 這裡,我設定為4

jobstore.cache

  • 作業的快取大小,預設52428800(以位元組為單位),也就是50M
  • 我這裡,機器記憶體48G,我設定為300M:314527800
  • 預設配置檔案裡面沒有,需要自己新增

臨時I/O目錄

  • 記憶體不夠用時,寫入到taskmanager.tmp.dirs指定的目錄中
  • 如果未顯式指定引數,Flink會將臨時資料寫入作業系統的臨時目錄,例如Linux系統中的/ tmp

暫時先處理這些配置,以後用到了,再補充。
更多的配置資訊見配置頁面

啟動叢集

注意!
啟動指令碼前,還需要配置HADOOP_CONF_DIR

否則:
在這裡插入圖片描述

vim /etc/profile
export HADOOP_CONF_DIR=/etc/hadoop/conf
source /etc/profile

bin/start-cluster.sh

  • 在master節點上執行該指令碼啟動JobManager,它會並通過SSH連線到從slaves檔案中列出的所有工作節點,以在相應節點上啟動TaskManager。
  • JobManager程序通過配置好的RPC埠(預設6123)來接收Job的提交的作業。
  • 停止Flink:bin/stop-cluster.sh
  • web dashboard(jobmanager:8081)
    目前dashboard看得出來功能不多,但是簡潔明瞭;
    但是,這幾個數字怎麼算的?
    在這裡插入圖片描述

將JobManager / TaskManager例項新增到群集

我的理解是,這些指令碼的應用場景是:

  • JobManager 或者TaskManager(HA)因為某些原因退出了叢集,我們需要單獨啟動
  • 單獨stop某一節點上的程序

新增JobManager
bin/jobmanager.sh ((start|start-foreground) cluster)|stop|stop-all
新增TaskManager
bin/taskmanager.sh start|start-foreground|stop|stop-all

bin下面的指令碼:
在這裡插入圖片描述

Maven依賴

<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-java</artifactId>
  <version>1.6.1</version>
</dependency>
<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-streaming-java_2.11</artifactId>
  <version>1.6.1</version>
</dependency>
<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-clients_2.11</artifactId>
  <version>1.6.1</version>
</dependency>

下篇文章寫兩個demo提交到叢集進行測試。

參考