1. 程式人生 > >使用Apache Flink開始批處理

使用Apache Flink開始批處理

如果您最近一直在關注軟體開發新聞,那麼您可能聽說過名為Apache Flink的新專案。我已經在這裡這裡寫了一些內容,但如果您不熟悉它,Apache Flink是新一代大資料處理工具,可以處理有限的資料集(這也稱為批處理)或潛在的無限的資料流(流處理)。在新功能方面,許多人認為Apache Flink是一款遊戲規則改變者,甚至可以在未來取代Apache Spark。

在本文中,我將向您介紹如何使用Apache Flink實現簡單的批處理演算法。我們將從設定開發環境開始,然後我們將看到如何載入資料,處理資料集以及將資料寫回外部系統。

 

為什麼批處理?

您可能聽說過流處理是“現在新的熱點”,Apache Flink是一個流處理工具。這可能會提出一個問題,為什麼我們需要學習如何實現批處理應用程式。

雖然確實如此,但流處理變得越來越普遍; 許多工仍然需要批量處理。另外,如果您剛開始使用Apache Flink,我認為最好從批處理開始,因為它更簡單,並且在某種程度上類似於使用資料庫。一旦您完成了批量處理,您就可以瞭解Apache Flink真正發揮作用的流處理!

如何遵循示例

如果您想自己實現一些Apache Flink應用程式,首先需要建立一個Flink專案。在本文中,我們將用Java編寫應用程式,但您也可以在Scala,Python或R中編寫Flink應用程式。

要建立Flink Java專案,請執行以下命令:

 mvn archetype:generate                              \
      -DarchetypeGroupId=org.apache.flink            \
      -DarchetypeArtifactId=flink-quickstart-java    \
      -DarchetypeVersion=1.3.2

輸入組ID,工件ID和專案版本後,此命令將建立以下專案結構:

.
├── pom.xml
└── src
    └── main
        ├── java
        │   └── flinkProject
        │       ├── BatchJob.java
        │       ├── SocketTextStreamWordCount.java
        │       ├── StreamingJob.java
        │       └── WordCount.java
        └── resources
            └── log4j.properties

這裡最重要的是pom.xml指定所有必需依賴項的大量內容。自動建立的Java類是您可以檢視的一些簡單Flink應用程式的示例,但我們並不需要它們用於我們的目的。

要開始開發您的第一個Flink應用程式,請使用如下main方法建立一個類:

public class FilterMovies {

    public static void main(String[] args) throws Exception {
        // Create Flink execution environment
        final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        
        // We will write our code here
        
        // Start Flink application
        env.execute();
    }
}

這種main方法沒什麼特別之處。我們所要做的就是新增一些樣板程式碼。

首先,我們需要建立一個Flink執行環境,如果在本地計算機或Flink叢集中執行它,它的行為會有所不同:

  • 在本地計算機上,它將建立一個包含多個本地節點的完整Flink叢集。這是測試應用程式在實際環境中如何工作的好方法
  • 在Flink叢集上,它不會建立任何內容,而是使用現有的叢集資源

或者,您可以建立一個這樣的集合環境:

 ExecutionEnvironment env = ExecutionEnvironment.createCollectionsEnvironment();

這將建立一個Flink執行環境,而不是在本地群集上執行Flink應用程式將在單個Java程序中使用記憶體中集合模擬所有操作。您的應用程式將執行得更快,但此環境與具有多個節點的本地群集有一些細微差別。

我們從哪裡開始?

在我們可以做任何事情之前,我們需要將資料讀入Apache Flink。我們可以從眾多系統中讀取資料,包括:本地檔案系統,S3,HDFS,HBase,Cassandra等。無論我們從何處讀取資料集,Apache Flink都允許我們使用DataSet類以統一的方式處理資料:

DataSet<Integer> numbers = ...

資料集中的所有項應具有相同的型別。single generics引數指定儲存在資料集中的資料型別。

要從檔案中讀取資料,我們可以使用readTextFile逐行讀取檔案中的行並返回型別的資料集的方法String

DataSet<String> lines = env.readTextFile("path/to/file.txt");

如果指定這樣的檔案路徑,Flink將嘗試讀取本地檔案。如果要從HDFS讀取檔案,則需要指定hdfs://協議:

env.readCsvFile("hdfs:///path/to/file.txt")

Flink還支援CSV檔案,但在這種情況下,它不會返回字串資料集。它將嘗試解析每一行並返回Tuple例項的資料集:

DataSet<Tuple2<Long, String>> lines = env.readCsvFile("data.csv")
                .types(Long.class, String.class);

Tuple2是儲存不可改變的一對兩個場中的一類,但也有其他類似Tuple0Tuple1Tuple3,高達Tuple25該儲存從零到25的欄位。稍後我們將看到如何使用這些類。

types方法指定CSV檔案中的列型別和數量,因此Flink可以讀取它們進行解析。

我們還可以建立非常適合小型實驗和單元測試的小型資料集:

/ Create from a list
DataSet<String> letters = env.fromCollection(Arrays.asList("a", "b", "c"));
// Create from an array
DataSet<Integer> numbers = env.fromElements(1, 2, 3, 4, 5);

您可能會問的問題是我們可以在DataSet中儲存哪些資料?並非每種Java型別都可以在資料集中使用,並且您可以使用四種不同型別的型別:

  • 內建Java型別和POJO類
  • Flink Tuples和Scala案例類
  • 值 - 這些是Java原始型別的特殊可變包裝器,您可以使用它來提高效能(我將在下一篇文章中寫到這一點)
  • Hadoop可寫介面的實現

使用Apache Flink處理資料

現在到資料處理部分!如何實現處理資料的演算法?為此,您可以使用許多類似於Java 8流操作的操作,例如:

  • map - 使用使用者定義的函式轉換資料集中的專案。每個輸入元素都轉換為一個輸出元素
  • filter - 根據使用者定義的函式過濾資料集中的專案
  • flatMap - 類似於map運算子,但允許返回零個,一個或多個元素
  • groupBy - 按鍵分組元素。類似於GROUP BYSQL中的運算子
  • project - 選擇元組資料集中的指定欄位,類似於SELECTSQL中的運算子
  • reduce - 使用使用者定義的函式將資料集中的元素組合成單個值

請記住,Java流與這些操作之間的最大區別在於,Java 8可以處理記憶體中的資料並可以訪問本地資料,而Flink可以處理分散式環境中群集上的資料。

我們來看一個使用這些操作的簡單示例。以下示例非常簡單。它建立一個數字資料集,對每個數字進行平方並過濾掉所有奇數。

// Create a dataset of numbers
DataSet<Integer> numbers = env.fromElements(1, 2, 3, 4, 5, 6, 7);

// Square every number
DataSet<Integer> result = numbers.map(new MapFunction<Integer, Integer>() {
    @Override
    public Integer map(Integer integer) throws Exception {
        return integer * integer;
    }
})
// Leave only even numbers
.filter(new FilterFunction<Integer>() {
    @Override
    public boolean filter(Integer integer) throws Exception {
        return integer % 2 == 0;
    }
});

如果你對Java 8有任何經驗,你可能想知道我為什麼不在這裡使用lambdas。我們可以在這裡使用lambdas,但它可能會導致一些併發症,正如我在這裡寫的那樣。

儲存資料

在我們完成資料處理之後,儲存我們辛勤工作的結果是有意義的。Flink可以將資料儲存到許多第三方系統,如HDFS,S3,Cassandra等。

例如,要將資料寫入檔案,我們需要使用類中的writeAsText方法DataSet

DataSet<Integer> ds = ...

ds.writeAsText("path/to/file");

出於除錯/測試目的,Flink可以將資料寫入標準輸出或標準輸出:

DataSet<Integer> ds = ...

// Output dataset to the standard output
ds.print();

// Output dataset to the standard err
ds.printToErr()

更復雜的例子

要實現一些有意義的演算法,我們首先需要下載Grouplens電影資料集。它包含多個CSV檔案,其中包含有關電影和電影評級的資訊。我們將使用movies.csv此資料集中的檔案,其中包含所有電影的列表,如下所示:

movieId,title,genres
1,Toy Story (1995),Adventure|Animation|Children|Comedy|Fantasy
2,Jumanji (1995),Adventure|Children|Fantasy
3,Grumpier Old Men (1995),Comedy|Romance
4,Waiting to Exhale (1995),Comedy|Drama|Romance
5,Father of the Bride Part II (1995),Comedy
6,Heat (1995),Action|Crime|Thriller
7,Sabrina (1995),Comedy|Romance
8,Tom and Huck (1995),Adventure|Children
9,Sudden Death (1995),Action
10,GoldenEye (1995),Action|Adventure|Thriller

它有三列:

  • movieId - 此資料集中電影的唯一電影ID
  • 標題 - 電影的標題
  • 流派 - 每部電影的“|”分類列表

我們現在可以在Apache Flink中載入此CSV檔案並執行一些有意義的處理。在這裡,我們將從本地檔案系統載入檔案,而在現實環境中,您將讀取更大的資料集,它可能駐留在分散式系統中,例如S3或HDFS。

在這個演示中,讓我們找到所有“動作”型別的電影。這是一個執行此操作的程式碼段:

// Load dataset of movies
DataSet<Tuple3<Long, String, String>> lines = env.readCsvFile("movies.csv")
            .ignoreFirstLine()
            .parseQuotedStrings('"')
            .ignoreInvalidLines()
            .types(Long.class, String.class, String.class);


DataSet<Movie> movies = lines.map(new MapFunction<Tuple3<Long,String,String>, Movie>() {
    @Override
    public Movie map(Tuple3<Long, String, String> csvLine) throws Exception {
        String movieName = csvLine.f1;
        String[] genres = csvLine.f2.split("\\|");
        return new Movie(movieName, new HashSet<>(Arrays.asList(genres)));
    }
});
DataSet<Movie> filteredMovies = movies.filter(new FilterFunction<Movie>() {
    @Override
    public boolean filter(Movie movie) throws Exception {
        return movie.getGenres().contains("Action");
    }
});

filteredMovies.writeAsText("output.txt");

讓我們分解吧。首先,我們使用以下readCsvFile方法讀取CSV檔案:

DataSet<Tuple3<Long, String, String>> lines = env.readCsvFile("movies.csv")
    // ignore CSV header
    .ignoreFirstLine()
    // Set strings quotes character
    .parseQuotedStrings('"')
    // Ignore invalid lines in the CSV file
    .ignoreInvalidLines()
    // Specify types of columns in the CSV file
    .types(Long.class, String.class, String.class);

使用輔助方法,我們指定如何解析CSV檔案中的字串以及我們需要跳過第一行。在最後一行中,我們指定CSV檔案中每列的型別,Flink將為我們解析資料。

現在,當我們在Flink叢集中載入資料集時,我們可以進行一些資料處理。首先,我們使用以下map方法解析每部電影的流派列表:

DataSet<Movie> movies = lines.map(new MapFunction<Tuple3<Long,String,String>, Movie>() {
    @Override
    public Movie map(Tuple3<Long, String, String> csvLine) throws Exception {
        String movieName = csvLine.f1;
         String[] genres = csvLine.f2.split("\\|");
         return new Movie(movieName, new HashSet<>(Arrays.asList(genres)));
    }
});

要轉換我們需要實現的每部電影MapFunction,它將接收每個CSV記錄作為Tuple3例項,並將其轉換為MoviePOJO類:

class Movie {
    private String name;
    private Set<String> genres;

    public Movie(String name, Set<String> genres) {
        this.name = name;
        this.genres = genres;
    }

    public String getName() {
        return name;
    }

    public Set<String> getGenres() {
        return genres;
    }
}

如果您回想起CSV檔案的結構,則第二列包含電影的名稱,第三列包含型別列表。因此,我們分別使用欄位f1和列來訪問這些列f2

現在,當我們有一個電影資料集時,我們可以實現演算法的核心部分並過濾所有動作電影:

DataSet<Movie> filteredMovies = movies.filter(new FilterFunction<Movie>() {
    @Override
    public boolean filter(Movie movie) throws Exception {
        return movie.getGenres().contains("Action");
    }
});

這將僅返回在集合型別中包含“Action”的電影。

現在最後一步非常簡單; 我們將結果資料儲存到一個檔案中:

filteredMovies.writeAsText("output.txt");

這只是將結果資料儲存到本地文字檔案中,但與readTextFile方法一樣,我們可以通過指定協議來將此檔案寫入HDFS或S3 hdfs://

更多資訊

這是一篇介紹性文章,Apache Flink還有很多內容。我會在不久的將來寫更多關於Flink的文章,敬請期待!您可以在這裡閱讀我的其他文章,或者您可以檢視我的Pluralsight課程,其中更詳細地介紹了Apache Flink:瞭解Apache Flink。這是本課程的簡短預覽

原文部落格連結:https://brewing.codes/2017/10/01/start-flink-batch/