1. 程式人生 > >Beam系列一 Beam介紹及簡單使用.md

Beam系列一 Beam介紹及簡單使用.md

1.簡介

簡單地說,Apache Beam是一個實時處理、流處理的大資料框架,由Google DataFlow貢獻給 Apache 基金會孵化而來。

2.應用場景

以下為應用場景的幾個例子:
1.Beam 可以用於 ETL Job 任務
Beam 的資料可以通過 SDKs 的 IO 接入,通過管道可以用後面的 Runners 做清洗。

2.Beam 資料倉庫快速切換、跨倉庫
由於 Beam 的資料來源是多樣 IO,所以用 Beam 可以快速切換任何資料倉庫。

3.Beam 計算處理平臺切換、跨平臺
Runners 目前提供了 3-4 種可以切換的平臺,如Hadoop,Spark,Flink

3.資料處理流程

3.1Modes

Modes 是 Beam 的模型或叫資料來源的 IO,它是由多種資料來源或倉庫的 IO 組成,資料來源支援批處理和流處理。
Modes要處理的問題有兩個,一個是資料來源型別,大致分為兩類,有界的資料集(如資料庫資料)和無界的資料流(如訊息中介軟體)。
第二個是時間處理,有兩種,一種是全量計算,另一種是部分增量計算。Beam Model 處理的目標資料是無界的時間亂序資料流,不考慮時間順序或有界的資料集可看做是無界亂序資料流的一個特例。

3.2 Pipeline

Pipeline 是 Beam 的管道,所有的批處理或流處理都要通過這個管道把資料傳輸到後端的計算平臺。這個管道現在是唯一的。資料來源可以切換多種,計算平臺或處理平臺也支援多種。需要注意的是,管道只有一條,它的作用是連線資料和 Runtimes 平臺。

3.3 Runtimes

Runtimes 是大資料計算或處理平臺,目前支援 Apache Flink、Apache Spark、Direct Pipeline 和 Google Clound Dataflow 四種。其中 Apache Flink 和 Apache Spark 同時支援本地和雲端。Direct Pipeline 僅支援本地,Google Clound Dataflow 僅支援雲端。

4. 第一個Beam 程式

4.1 新建一個Maven 專案

<dependency>
        <groupId>org.apache.beam</groupId>
        <artifactId>beam-sdks-java-core</artifactId>
        <version>2.8.0</version>
    </dependency>
    <dependency>
        <groupId>org.apache.beam</groupId>
        <artifactId>beam-runners-direct-java</artifactId>
        <version>2.8.0</version>
    </dependency>

4.2

static final List<String> LINES = Arrays.asList(
              "To be, or not to be: that is the question: ",
              "Whether 'tis nobler in the mind to suffer ",
              "The slings and arrows of outrageous fortune, ",
              "Or to take arms against a sea of troubles, ");
    public static void main(String[] args) {     
        wordLenth();
    }
    
    public static void wordLenth() {
         PipelineOptions options =  PipelineOptionsFactory.create();
         Pipeline p = Pipeline.create(options);
         PCollection<String> words = p.apply(Create.of(LINES)).setCoder(StringUtf8Coder.of());
         PCollection<Integer> wordLengths = words.apply(
                  "ComputeWordLengths",  ParDo.of(new ComputeWordLengthFn()));
         p.run().waitUntilFinish();
    }
    
    /**
     * 傳遞給ParDo的DoFn物件中包含對輸入集合中的元素的進行處理,DoFn從輸入的PCollection一次處理一個元素
     */
    static class ComputeWordLengthFn extends DoFn<String, Integer> {
          @ProcessElement
          public void processElement(ProcessContext c) {
            // Get the input element from ProcessContext.
            String word = c.element();
            // Use ProcessContext.output to emit the output element.
            System.out.println(word.length());
            c.output(word.length());
          }
        }

解釋一下上面的程式碼
1.第一步建立一個管道Pipeline
2.PCollection words = p.apply(Create.of(LINES)) 使用原始資料建立資料集
3.
words.apply(“ComputeWordLengths”, ParDo.of(new ComputeWordLengthFn())),統計每一個字串的長度。
4.p.run().waitUntilFinish();執行管道例項

5.解釋程式出現的方法

1.options = PipelineOptionsFactory.create();建立管道引數,設定Runtime型別,當我們不指定的時候,會預設使用DirectRunner這種型別
// pipe.setRunner(DirectRunner.class);

2.PCollection表示Beam中任何大小的輸入和輸出資料。pipeLine讀取資料輸入,生成PCollection作為輸出。
3.p.apply(Create.of(LINES)).setCoder(StringUtf8Coder.of());
使用Beam提供的Create從記憶體中的Java集合建立PCollection,Create接受Java Collection和一個Coder物件作為引數,在Coder指定的Collection中的元素如何編碼。
4.apply方法轉化管道中的資料,轉換採用PCollection(或多個PCollection)作為輸入,在該集合中的每個元素上執行指定的操作,並生成新的輸出PCollection。下面是轉換格式
[Output PCollection] = [Input PCollection].apply([Transform])

5.ParDo是用於通用並行處理的Beam轉換。ParDo的處理範例類似於map/shuffle/reduce形式的演算法中的“Map”操作:一個ParDo轉換考慮到了輸入PCollection中的每個元素,在該元素上執行一些處理函式(使用者程式碼),併發送0個,1個或多個元素到輸出PCollection