從零開始構建Flink開發專案-Scala版
版權宣告:本文為博主原創文章,未經博主允許不得轉載。https://blog.csdn.net/kongxx/article/details/90166338
今天要做一個Flink的測試,因此需要建立一個簡單的Flink專案,於是找到了下面這種方式來建立一個Flink啟動專案。
通過執行下面的命令來建立一個專案
curl https://flink.apache.org/q/quickstart-scala.sh | bash
也可以根據quickstart-scala.sh 檔案中的內容,使用maven命令來生成自己的專案,比如:
mvn archetype:generate\ -DarchetypeGroupId=org.apache.flink\ -DarchetypeArtifactId=flink-quickstart-scala\ -DarchetypeVersion=1.8.0\ -DgroupId=my.flink.quickstart\ -DartifactId=quickstart\ -Dversion=0.1\ -Dpackage=my.flink.quickstart\ -DinteractiveMode=false
工程建立後,檢視一下工程目錄結構,如下:
$ tree quickstart quickstart ├── pom.xml └── src └── main ├── resources │└── log4j.properties └── scala └── my └── flink └── quickstart ├── BatchJob.scala └── StreamingJob.scala
在開始正式編譯執行之前,需要根據自己環境的scala的版本,修改一下 pom.xml 檔案,我環境中的 scala 版本是 2.12.2,因此我做了下面的修改
修改 <scala.binary.version>2.11</scala.binary.version> <scala.version>2.11.12</scala.version> 為 <scala.binary.version>2.12</scala.binary.version> <scala.version>2.12.2</scala.version>
下面使用一個例子來測試一下工程,在 src/main/scala/my/flink/quickstart 目錄下建立一個 WordCount.scala 檔案,內容如下:
package my.flink.quickstart import org.apache.flink.api.java.utils.ParameterTool import org.apache.flink.api.scala._ object WordCount { def main(args: Array[String]) { if (args.length != 1) { println("Please give input parameter.") System.exit(1) } val env = ExecutionEnvironment.getExecutionEnvironment val text = env.readTextFile(args(0)) val counts = text.flatMap { _.toLowerCase.split("\\W+") filter { _.nonEmpty } } .map { (_, 1) } .groupBy(0) .sum(1) counts.print() } }
編譯打包
$ cd quickstart $ mvn clean package
執行 WordCount 程式
$ ${FLINK_HOME}/bin/flink run -c my.flink.quickstart.WordCount target/quickstart-0.1.jar ./