1. 程式人生 > >【Apache Beam系列】Apache Beam Pipeline設計

【Apache Beam系列】Apache Beam Pipeline設計

微信公眾號(SZBigdata-Club):後續部落格的文件都會轉到微信公眾號中。  1、公眾號會持續給大家推送技術文件、學習視訊、技術書籍、資料集等。  2、接受大家投稿支援。  3、對於各公司hr招聘的,可以私下聯絡我,把招聘資訊發給我我會在公眾號中進行推送。 

è¿éåå¾çæè¿° 技術交流群:59701880 深圳廣州hadoop好友會 

è¿éåå¾çæè¿°

本文主要介紹開發者在開發Apache Beam管道的時候,需要考慮哪些因素以及設計的方法有哪幾種。 最終你所要設計的Apache Beam管道都是基於這些設計方法之上的拓展。

設計Pipeline時需要考慮什麼?

當我們設計我們的beam pipeline時,需要考慮的一些基礎問題:

  • 你的輸入資料儲存在哪裡?這將決定Read在pipeline開始時需要使用哪些型別的轉換

  • 你的資料是怎麼樣的?

  • 你想用資料做什麼?

  • 你的輸出資料是什麼樣的,應該儲存到哪裡?這個決定write在pipeline末端使用哪些型別的transforms

一條基本的Pipeline

最簡單的Pipeline體現為一個線性的操作流程。如下圖

但是,在實際場景中,Pipeline比基本的Pipeline要複雜的多。Pipeline代表一個有步驟的有向無環圖。它可能有多個輸入源,多個輸出接收器,並且其操作PTransforms可以讀取和輸出多個PCollections。

分支PCollections

重要的是要了解Transforms不消耗PCollections,相反,他們認為每一個獨立的元素都是一個PCollection,且建立一個新的PCollection用於輸出。這樣,我們就可以對同一個PCollection中的不同元素做不同的操作。

多個PTransforms處理相同的PCollections

可以使用相同的PCollection輸入用於多個轉換,而不消耗輸入或更改它。 圖中所示的流水線從單個數據庫源讀取輸入資料,並建立一個PCollection錶行。然後,Pipeline將多個transforms應用到同一個PCollection。轉換a讀取所有以A字母開頭的PCollection,轉換b讀取所有以B開頭的PCollection。轉換a與轉換b的輸入是同一個PCollection

PCollection<String> dbRowCollection = ...;
PCollection<String> aCollection = dbRowCollection.apply("aTrans", ParDo.of(new DoFn<String, String>(){
  @ProcessElement
  public void processElement(ProcessContext c) {
    if(c.element().startsWith("A")){
      c.output(c.element());
    }
  }
}));
PCollection<String> bCollection = dbRowCollection.apply("bTrans", ParDo.of(new DoFn<String, String>(){
  @ProcessElement
  public void processElement(ProcessContext c) {
    if(c.element().startsWith("B")){
      c.output(c.element());
    }
  }
}));

單個PTransforms產生多個輸出

分支管道的另一種方法是通過使用帶標籤的輸出將單個變換輸出轉換為多個。轉換產生超過一個的輸出處理輸入中的每一個元素,以及輸出0到多個PCollections 下面的圖3示出了上述相同的示例,但是是一個轉換產生多個輸出。以“A”開頭的名稱將新增到主輸出中PCollection,以“B”開頭的名稱將新增到其他輸出PCollection。

圖2中的Pipeline包含兩個處理相同PCollection元素的轉換,一個轉換使用以下邏輯:

if(以'A'開頭){outputToPCollectionA}

而另一個轉換則是:

if(以'B'開頭){outputToPCollectionB}

因為每個轉換都讀取整個PCollection,所以PCollection中的每個元素都被處理兩次。 圖3中的Pipeline以不同的方式執行相同的操作。下面的邏輯只使用一個轉換

if(以'A'開頭){outputToPCollectionA} else if(以'B'開頭){outputToPCollectionB}

其中輸入PCollection中的每個元素

// Define two TupleTags, one for each output.
final TupleTag<String> startsWithATag = new TupleTag<String>(){};
final TupleTag<String> startsWithBTag = new TupleTag<String>(){};
PCollectionTuple mixedCollection =
    dbRowCollection.apply(ParDo
        .of(new DoFn<String, String>() {
          @ProcessElement
          public void processElement(ProcessContext c) {
            if (c.element().startsWith("A")) {
              // Emit to main output, which is the output with tag startsWithATag.
              c.output(c.element());
            } else if(c.element().startsWith("B")) {
              // Emit to output with tag startsWithBTag.
              c.output(startsWithBTag, c.element());
            }
          }
        })
        // Specify main output. In this example, it is the output
        // with tag startsWithATag.
        .withOutputTags(startsWithATag,
        // Specify the output with tag startsWithBTag, as a TupleTagList.
                        TupleTagList.of(startsWithBTag)));
// Get subset of the output with tag startsWithATag.
mixedCollection.get(startsWithATag).apply(...);
// Get subset of the output with tag startsWithBTag.
mixedCollection.get(startsWithBTag).apply(...);

您可以使用任一機制來產生多個輸出PCollection。然而,如果變換的每個元素的計算是耗時的,則使用額外的輸出更有意義

合併PCollections

通常,通過多次轉換將PCollection分成多個PCollections後,您將需要將部分或全部PCollections合併在一起。 您可以使用以下方法之一:

  • Flatten - 您可以使用Beam SDK中的Flatten變換來合併多個相同型別的PCollections。

  • Join - 您可以使用Beam SDK中的CoGroupByKey變換來執行兩個PCollections之間的關係連線。 PCollections必須鍵入(即它們必須是鍵/值對的集合),並且它們必須使用相同的鍵型別。

下圖中,在分成兩個PCollections之後,一個名字以’A’開頭,一個名字以’B’開頭,管道將兩個合併成一個PCollection,現在包含以“A”或“B”。 在這裡,使用Flatten是合理的,因為合併的PCollections都包含相同的型別。

//merge the two PCollections with Flatten
PCollectionList<String> collectionList = PCollectionList.of(aCollection).and(bCollection);
PCollection<String> mergedCollectionWithFlatten = collectionList
    .apply(Flatten.<String>pCollections());
// continue with the new merged PCollection     
mergedCollectionWithFlatten.apply(...);

多個來源

管道可以從一個或多個來源讀取其輸入。 如果管道從多個來源讀取,並且來自這些來源的資料是相關的,則可以將輸入連線在一起。 在下圖所示的示例中,管道從資料庫表中讀取名稱和地址,以及從Kafka主題讀取名稱和訂單號。 然後,管道使用CoGroupByKey來加入這個資訊,其中的關鍵是名稱; 結果PCollection包含名稱,地址和訂單的所有組合。

PCollection<KV<String, String>> userAddress = pipeline.apply(JdbcIO.<KV<String, String>>read()...);
PCollection<KV<String, String>> userOrder = pipeline.apply(KafkaIO.<String, String>read()...);
final TupleTag<String> addressTag = new TupleTag<String>();
final TupleTag<String> orderTag = new TupleTag<String>();
// Merge collection values into a CoGbkResult collection.
PCollection<KV<String, CoGbkResult>> joinedCollection =
  KeyedPCollectionTuple.of(addressTag, userAddress)
                       .and(orderTag, userOrder)
                       .apply(CoGroupByKey.<String>create());
coGbkResultCollection.apply(...);