1. 程式人生 > >Beam編程系列之Java SDK Quickstart(官網的推薦步驟)

Beam編程系列之Java SDK Quickstart(官網的推薦步驟)

rate start mark http 單獨 org 托管 pipe bucket

  不多說,直接上幹貨!

https://beam.apache.org/get-started/beam-overview/

技術分享圖片

https://beam.apache.org/get-started/quickstart-java/

技術分享圖片

Apache Beam Java SDK Quickstart

  This Quickstart will walk you through executing your first Beam pipeline to run WordCount, written using Beam’s Java SDK, on a runner of your choice.

  • Set up your Development Environment
  • Get the WordCount Code
  • Run WordCount
  • Inspect the results
  • Next Steps

  我這裏為了方便大家快速入手,翻譯並整理為中文。

  本博文通過使用 Java SDK 來完成,你可以嘗試運行在不同的執行引擎上。

第一步:設置開發環境

  1. 下載並安裝 Java Development Kit (JDK) 1.7 或更高版本。檢查 JAVA_HOME 環境變量已經設置並指向你的 JDK 安裝目錄。
  2. 照著 Maven 的 安裝指南 下載並安裝適合你的操作系統的 Apache Maven 。

第二步:獲取 示例的WordCount 代碼

  獲得一份 WordCount 管線代碼拷貝最簡單的方法,就是使用下列指令來生成一個簡單的、包含基於 Beam 最新版的 WordCount 示例和構建的 Maven 項目:

  Apache Beam 的源代碼在 Github 有托管,可以到 Github 下載對應的源碼,下載地址:https://github.com/apache/beam

  然後,將其中的示例代碼進行打包,命令如下所示:(這是最新穩定版本)(所以一般用這個)

技術分享圖片
$ mvn archetype:generate       -DarchetypeRepository=https://repository.apache.org/content/groups/snapshots       -DarchetypeGroupId=org.apache.beam       -DarchetypeArtifactId=beam-sdks-java-maven-archetypes-examples       -DarchetypeVersion=LATEST 
-DgroupId=org.example -DartifactId=word-count-beam -Dversion="0.1" -Dpackage=org.apache.beam.examples -DinteractiveMode=false
技術分享圖片

  這是官網推薦的

技術分享圖片
$ mvn archetype:generate       -DarchetypeGroupId=org.apache.beam       -DarchetypeArtifactId=beam-sdks-java-maven-archetypes-examples       -DarchetypeVersion=2.1.0       -DgroupId=org.example       -DartifactId=word-count-beam       -Dversion="0.1"       -Dpackage=org.apache.beam.examples       -DinteractiveMode=false
技術分享圖片

  那是因為,最新的Bean為2.1.0。

  這將創建一個叫 word-count-beam 的目錄,其中包含了一份簡單的 pom.xml 文件和一套示例管線,用來計算某個文本文件中的各個單詞的數量。

技術分享圖片
$ cd word-count-beam/

$ ls
pom.xml    src

$ ls src/main/java/org/apache/beam/examples/
DebuggingWordCount.java    WindowedWordCount.java    common
MinimalWordCount.java    WordCount.java
技術分享圖片

  關於這些示例中用到的 Beam 的概念的詳細介紹,請閱讀 WordCount Example Walkthrough 一文。這裏我們只聚焦於如何執行 WordCount.java 上。

運行 WordCount 示例代碼

  一個 Beam 程序可以運行在多個 Beam 的可執行引擎上,包括 ApexRunner,FlinkRunner,SparkRunner 或者 DataflowRunner。 另外還有 DirectRunner。不需要特殊的配置就可以在本地執行,方便測試使用。

  下面,你可以按需選擇你想執行程序的引擎,即哪個runner後:

  1. 對引擎進行相關配置,確保你已經正確配置了該runner。
  2. 使用不同的命令:通過 --runner=<runner>參數指明引擎類型,默認是 DirectRunner;添加引擎相關的參數;指定輸出文件和輸出目錄,當然這裏需要保證文件目錄是執行引擎可以訪問到的,比如本地文件目錄是不能被外部集群訪問的。
  3. 運行示例程序,你的第一個WordCount 管線。

技術分享圖片

Direct

$ mvn compile exec:java -Dexec.mainClass=org.apache.beam.examples.WordCount      -Dexec.args="--inputFile=pom.xml --output=counts" -Pdirect-runner

Apex

$ mvn compile exec:java -Dexec.mainClass=org.apache.beam.examples.WordCount      -Dexec.args="--inputFile=pom.xml --output=counts --runner=ApexRunner" -Papex-runner

Flink-Local

$ mvn compile exec:java -Dexec.mainClass=org.apache.beam.examples.WordCount      -Dexec.args="--runner=FlinkRunner --inputFile=pom.xml --output=counts" -Pflink-runner

Flink-Cluster

$ mvn package exec:java -Dexec.mainClass=org.apache.beam.examples.WordCount      -Dexec.args="--runner=FlinkRunner --flinkMaster=<flink master> --filesToStage=target/word-count-beam-bundled-0.1.jar                   --inputFile=/path/to/quickstart/pom.xml --output=/tmp/counts" -Pflink-runner

You can monitor the running job by visiting the Flink dashboard at http://<flink master>:8081

  然後,你可以通過訪問 http://<flink master>:8081 來監測運行的應用程序。

Spark

$ mvn compile exec:java -Dexec.mainClass=org.apache.beam.examples.WordCount      -Dexec.args="--runner=SparkRunner --inputFile=pom.xml --output=counts" -Pspark-runner

Dataflow

$ mvn compile exec:java -Dexec.mainClass=org.apache.beam.examples.WordCount      -Dexec.args="--runner=DataflowRunner --project=<your-gcp-project>                   --gcpTempLocation=gs://<your-gcs-bucket>/tmp                   --inputFile=gs://apache-beam-samples/shakespeare/* --output=gs://<your-gcs-bucket>/counts"      -Pdataflow-runner

 

運行結果

  當程序運行完成後,你可以看到有多個文件以 count 開頭,個數取決於執行引擎的類型。當你查看文件的內容的時候,每個唯一的單詞後面會顯示其出現次數,但是前後順序是不固定的,也是分布式引擎為了提高效率的一種常用方式。

  一旦管線完成運行,你可以查看結果。你會註意到有多個以 count 打頭的輸出文件。具體會有幾個這樣的文件是由 runner 決定的。這樣能方便 runner 進行高效的分布式執行。

  當你查看文件內容的時候,你會看到裏面包含每個單詞的出現數量。文件中的元素順序可能會和這裏看到的不同。因為 Beam 模型通常並不保障順序,以便於 runner 優化效率。

Direct

技術分享圖片
$ ls counts*

$ more counts*
api: 9
bundled: 1
old: 4
Apache: 2
The: 1
limitations: 1
Foundation: 1
...
技術分享圖片

Apex

技術分享圖片
$ cat counts*
BEAM: 1
have: 1
simple: 1
skip: 4
PAssert: 1
...
技術分享圖片

Flink-Local

技術分享圖片
$ ls counts*

$ more counts*
The: 1
api: 9
old: 4
Apache: 2
limitations: 1
bundled: 1
Foundation: 1
...
技術分享圖片

Flink-Cluster

技術分享圖片
$ ls /tmp/counts*

$ more /tmp/counts*
The: 1
api: 9
old: 4
Apache: 2
limitations: 1
bundled: 1
Foundation: 1
...
技術分享圖片

Spark

技術分享圖片
$ ls counts*

$ more counts*
beam: 27
SF: 1
fat: 1
job: 1
limitations: 1
require: 1
of: 11
profile: 10
...
技術分享圖片

Dataflow

技術分享圖片
$ gsutil ls gs://<your-gcs-bucket>/counts*

$ gsutil cat gs://<your-gcs-bucket>/counts*
feature: 15
smother‘st: 1
revelry: 1
bashfulness: 1
Bashful: 1
Below: 2
deserves: 32
barrenly: 1
...
技術分享圖片

總結

  Apache Beam 主要針對理想並行的數據處理任務,並通過把數據集拆分多個子數據集,讓每個子數據集能夠被單獨處理,從而實現整體數據集的並行化處理。當然,也可以用 Beam 來處理抽取,轉換和加載任務和數據集成任務(一個ETL過程)。進一步將數據從不同的存儲介質中或者數據源中讀取,轉換數據格式,最後加載到新的系統中。

Beam編程系列之Java SDK Quickstart(官網的推薦步驟)