hadoop記錄-[Flink]Flink三種運行模式安裝部署以及實現WordCount(轉載)
[Flink]Flink三種運行模式安裝部署以及實現WordCount
前言
Flink三種運行方式:Local、Standalone、On Yarn。成功部署後分別用Scala和Java實現wordcount
環境
版本:Flink 1.6.2
集群環境:Hadoop2.6
開發工具: IntelliJ IDEA
一.Local模式
解壓:tar -zxvf flink-1.6.2-bin-hadoop26-scala_2.11.tgz
cd flink-1.6.2
啟動:./bin/start-cluster.sh
停止:./bin/stop-cluster.sh
可以通過master:8081監控集群狀態
二.Standalone模式
集群安裝
1:修改conf/flink-conf.yaml
jobmanager.rpc.address: hadoop100
2:修改conf/slaves
hadoop101
hadoop102
3:拷貝到其他節點
scp -rq /usr/local/flink-1.6.2 hadoop101:/usr/local
scp -rq /usr/local/flink-1.6.2 hadoop102:/usr/local
4:在hadoop100(master)節點啟動
bin/start-cluster.sh
5:訪問http://hadoop100:8081
三.Flink On Yarn模式
On Yarn實現邏輯
第一種【yarn-session.sh(開辟資源)+flink run(提交任務)】
啟動一個一直運行的flink集群
./bin/yarn-session.sh -n 2 -jm 1024 -tm 1024 [-d]
附著到一個已存在的flink yarn session
./bin/yarn-session.sh -id application_1463870264508_0029
執行任務
./bin/flink run ./examples/batch/WordCount.jar -input hdfs://hadoop100:9000/LICENSE -output hdfs://hadoop100:9000/wordcount-result.txt
第二種【flink run -m yarn-cluster(開辟資源+提交任務)】
啟動集群,執行任務
./bin/flink run -m yarn-cluster -yn 2 -yjm 1024 -ytm 1024 ./examples/batch/WordCount.jar
註意:client端必須要設置YARN_CONF_DIR或者HADOOP_CONF_DIR或者HADOOP_HOME環境變量,通過這個環境變量來讀取YARN和HDFS的配置信息,否則啟動會失敗
四.WordCount
代碼
Scala實現代碼
package com.skyell
import org.apache.flink.api.java.utils.ParameterTool
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.api.windowing.time.Time
/**
* 滑動窗口計算
*
* 每隔1秒統計最近2秒數據,打印到控制臺
*/
object SocketWindowWordCountScala {
def main(args: Array[String]): Unit = {
// 獲取socket端口號
val port: Int = try{
ParameterTool.fromArgs(args).getInt("port")
}catch {
case e: Exception => {
System.err.println("No port set use default port 9002--scala")
}
9002
}
// 獲取運行環境
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
// 連接socket獲取數據
val text = env.socketTextStream("master", port, ‘\n‘)
//添加隱式轉換,否則會報錯
import org.apache.flink.api.scala._
// 解析數據(把數據打平),分組,窗口計算,並且聚合求sum
val windowCount = text.flatMap(line => line.split("\\s"))
.map(w => WordWithCount(w, 1))
.keyBy("word") // 針對相同word進行分組
.timeWindow(Time.seconds(2), Time.seconds(1))// 窗口時間函數
.sum("count")
windowCount.print().setParallelism(1) // 設置並行度為1
env.execute("Socket window count")
}
// case 定義的類可以直接調用,不用new
case class WordWithCount(word:String,count: Long)
}
Java實現代碼
package com.skyell;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.util.Collector;
public class BatchWordCountJava {
public static void main(String[] args) throws Exception{
String inputPath = "D:\\DATA\\file";
String outPath = "D:\\DATA\\result";
// 獲取運行環境
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
// 讀取本地文件中內容
DataSource<String> text = env.readTextFile(inputPath);
// groupBy(0):從0聚合 sum(1):以第二個字段加和計算
DataSet<Tuple2<String, Integer>> counts = text.flatMap(new Tokenizer()).groupBy(0).sum(1);
counts.writeAsCsv(outPath, "\n", " ").setParallelism(1);
env.execute("batch word count");
}
public static class Tokenizer implements FlatMapFunction<String, Tuple2<String,Integer>>{
public void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception {
String[] tokens = value.toLowerCase().split("\\W+");
for (String token: tokens
) {
if(token.length()>0){
out.collect(new Tuple2<String, Integer>(token, 1));
}
}
}
}
}
pom依賴配置
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>1.6.2</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_2.11</artifactId>
<version>1.6.2</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-scala_2.11</artifactId>
<version>1.6.2</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-scala_2.11</artifactId>
<version>1.6.2</version>
<scope>provided</scope>
</dependency>
hadoop記錄-[Flink]Flink三種運行模式安裝部署以及實現WordCount(轉載)