1. 程式人生 > >在Flink叢集搭建和使用中遇到的坑

在Flink叢集搭建和使用中遇到的坑

一、專案概況

使用Flink測試中間狀態設定checkpoint和從checkpoint中恢復。

二、搭建中出現的問題

Flink的叢集搭建中需要配置中間狀態快取的路徑(專案中使用到的是在hdfs中儲存中間狀態)
在叢集中需要配置的專案是(如果需要中間狀態的儲存,這個必須的):

##配置使用的web介面,用來訪問叢集。預設應該也可以
jobmanager.web.address: 192.168.11.100

##宣告使用檔案系統來儲存checkpoint
state.backend: filesystem
##配置使用的檔案系統路徑,這個我自己沒有配置導致諸多錯誤。
state.checkpoints.dir: hdfs://192.168.xx.xx:9000/flink/persist

(1)叢集提交使用flink_web ui介面提交,當然也可以使用上傳jar包到叢集,然後提交任務

###提交任務到叢集中
flink run -c com.testMain /home/myhome100/FlinkTest_Tank-1.0-SNAPSHOT-jar-with-dependencies.jar

(2)從hdfs上的檢查點checkpoint恢復

flink run 
-s hdfs://192.168.xx.xx:9000/flink/current/kafka2flink/5ea0c67a29b4186d2a900bb9e4dbc1ce/chk-4/a91c3a93-3892-458d-9314-ab4d96133200 
-c com.testMain /home/myhome100/FlinkTest_Tank-1.0-S
NAPSHOT-jar-with-dependencies.jar 

其中,我使用的maven工程中的pom檔案為:

<properties>
    <scala.version>2.11.12</scala.version>
    <hadoop.version>2.7.2</hadoop.version>
    <flink.version>1.4.2</flink.version>
    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>

    <maven.
compiler.source>1.8</maven.compiler.source> <maven.compiler.target>1.8</maven.compiler.target> </properties> <dependencies> <dependency> <groupId>org.scala-lang</groupId> <artifactId>scala-library</artifactId> <version>${scala.version}</version> </dependency> <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-shaded-hadoop2 --> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-shaded-hadoop2</artifactId> <version>1.4.2</version> </dependency> <!--flink相關配置--> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-clients_2.11</artifactId> <version>${flink.version}</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-scala_2.11</artifactId>${flink.version}${flink.version} <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-scala_2.11</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-rabbitmq_2.11</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-filesystem_2.11</artifactId> <version>1.2.1</version> </dependency> <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-connector-kafka-0.10 --> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-kafka-0.10_2.11</artifactId> <version>1.4.2</version> </dependency> <!-- We need protobuf for chill-protobuf --> <dependency> <groupId>com.google.protobuf</groupId> <artifactId>protobuf-java</artifactId> <version>2.5.0</version> </dependency> </dependencies>