使用Apache Flink開始批處理
如果您最近一直在關注軟體開發新聞,那麼您可能聽說過名為Apache Flink的新專案。我已經在這裡和這裡寫了一些內容,但如果您不熟悉它,Apache Flink是新一代大資料處理工具,可以處理有限的資料集(這也稱為批處理)或潛在的無限的資料流(流處理)。在新功能方面,許多人認為Apache Flink是一款遊戲規則改變者,甚至可以在未來取代Apache Spark。
在本文中,我將向您介紹如何使用Apache Flink實現簡單的批處理演算法。我們將從設定開發環境開始,然後我們將看到如何載入資料,處理資料集以及將資料寫回外部系統。
為什麼批處理?
您可能聽說過流處理是“現在新的熱點”,Apache Flink是一個流處理工具。這可能會提出一個問題,為什麼我們需要學習如何實現批處理應用程式。
雖然確實如此,但流處理變得越來越普遍; 許多工仍然需要批量處理。另外,如果您剛開始使用Apache Flink,我認為最好從批處理開始,因為它更簡單,並且在某種程度上類似於使用資料庫。一旦您完成了批量處理,您就可以瞭解Apache Flink真正發揮作用的流處理!
如何遵循示例
如果您想自己實現一些Apache Flink應用程式,首先需要建立一個Flink專案。在本文中,我們將用Java編寫應用程式,但您也可以在Scala,Python或R中編寫Flink應用程式。
要建立Flink Java專案,請執行以下命令:
mvn archetype:generate \
-DarchetypeGroupId=org.apache.flink \
-DarchetypeArtifactId=flink-quickstart-java \
-DarchetypeVersion=1.3.2
輸入組ID,工件ID和專案版本後,此命令將建立以下專案結構:
.
├── pom.xml
└── src
└── main
├── java
│ └── flinkProject
│ ├── BatchJob.java
│ ├── SocketTextStreamWordCount.java
│ ├── StreamingJob.java
│ └── WordCount.java
└── resources
└── log4j.properties
這裡最重要的是pom.xml
指定所有必需依賴項的大量內容。自動建立的Java類是您可以檢視的一些簡單Flink應用程式的示例,但我們並不需要它們用於我們的目的。
要開始開發您的第一個Flink應用程式,請使用如下main
方法建立一個類:
public class FilterMovies {
public static void main(String[] args) throws Exception {
// Create Flink execution environment
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
// We will write our code here
// Start Flink application
env.execute();
}
}
這種main
方法沒什麼特別之處。我們所要做的就是新增一些樣板程式碼。
首先,我們需要建立一個Flink執行環境,如果在本地計算機或Flink叢集中執行它,它的行為會有所不同:
- 在本地計算機上,它將建立一個包含多個本地節點的完整Flink叢集。這是測試應用程式在實際環境中如何工作的好方法
- 在Flink叢集上,它不會建立任何內容,而是使用現有的叢集資源
或者,您可以建立一個這樣的集合環境:
ExecutionEnvironment env = ExecutionEnvironment.createCollectionsEnvironment();
這將建立一個Flink執行環境,而不是在本地群集上執行Flink應用程式將在單個Java程序中使用記憶體中集合模擬所有操作。您的應用程式將執行得更快,但此環境與具有多個節點的本地群集有一些細微差別。
我們從哪裡開始?
在我們可以做任何事情之前,我們需要將資料讀入Apache Flink。我們可以從眾多系統中讀取資料,包括:本地檔案系統,S3,HDFS,HBase,Cassandra等。無論我們從何處讀取資料集,Apache Flink都允許我們使用DataSet
類以統一的方式處理資料:
DataSet<Integer> numbers = ...
資料集中的所有項應具有相同的型別。single generics引數指定儲存在資料集中的資料型別。
要從檔案中讀取資料,我們可以使用readTextFile
逐行讀取檔案中的行並返回型別的資料集的方法String
:
DataSet<String> lines = env.readTextFile("path/to/file.txt");
如果指定這樣的檔案路徑,Flink將嘗試讀取本地檔案。如果要從HDFS讀取檔案,則需要指定hdfs://
協議:
env.readCsvFile("hdfs:///path/to/file.txt")
Flink還支援CSV檔案,但在這種情況下,它不會返回字串資料集。它將嘗試解析每一行並返回Tuple
例項的資料集:
DataSet<Tuple2<Long, String>> lines = env.readCsvFile("data.csv")
.types(Long.class, String.class);
Tuple2
是儲存不可改變的一對兩個場中的一類,但也有其他類似Tuple0
,Tuple1
,Tuple3
,高達Tuple25
該儲存從零到25的欄位。稍後我們將看到如何使用這些類。
該types
方法指定CSV檔案中的列型別和數量,因此Flink可以讀取它們進行解析。
我們還可以建立非常適合小型實驗和單元測試的小型資料集:
/ Create from a list
DataSet<String> letters = env.fromCollection(Arrays.asList("a", "b", "c"));
// Create from an array
DataSet<Integer> numbers = env.fromElements(1, 2, 3, 4, 5);
您可能會問的問題是我們可以在DataSet中儲存哪些資料?並非每種Java型別都可以在資料集中使用,並且您可以使用四種不同型別的型別:
- 內建Java型別和POJO類
- Flink Tuples和Scala案例類
- 值 - 這些是Java原始型別的特殊可變包裝器,您可以使用它來提高效能(我將在下一篇文章中寫到這一點)
- Hadoop可寫介面的實現
使用Apache Flink處理資料
現在到資料處理部分!如何實現處理資料的演算法?為此,您可以使用許多類似於Java 8流操作的操作,例如:
- map - 使用使用者定義的函式轉換資料集中的專案。每個輸入元素都轉換為一個輸出元素
- filter - 根據使用者定義的函式過濾資料集中的專案
- flatMap - 類似於map運算子,但允許返回零個,一個或多個元素
- groupBy - 按鍵分組元素。類似於
GROUP BY
SQL中的運算子 - project - 選擇元組資料集中的指定欄位,類似於
SELECT
SQL中的運算子 - reduce - 使用使用者定義的函式將資料集中的元素組合成單個值
請記住,Java流與這些操作之間的最大區別在於,Java 8可以處理記憶體中的資料並可以訪問本地資料,而Flink可以處理分散式環境中群集上的資料。
我們來看一個使用這些操作的簡單示例。以下示例非常簡單。它建立一個數字資料集,對每個數字進行平方並過濾掉所有奇數。
// Create a dataset of numbers
DataSet<Integer> numbers = env.fromElements(1, 2, 3, 4, 5, 6, 7);
// Square every number
DataSet<Integer> result = numbers.map(new MapFunction<Integer, Integer>() {
@Override
public Integer map(Integer integer) throws Exception {
return integer * integer;
}
})
// Leave only even numbers
.filter(new FilterFunction<Integer>() {
@Override
public boolean filter(Integer integer) throws Exception {
return integer % 2 == 0;
}
});
如果你對Java 8有任何經驗,你可能想知道我為什麼不在這裡使用lambdas。我們可以在這裡使用lambdas,但它可能會導致一些併發症,正如我在這裡寫的那樣。
儲存資料
在我們完成資料處理之後,儲存我們辛勤工作的結果是有意義的。Flink可以將資料儲存到許多第三方系統,如HDFS,S3,Cassandra等。
例如,要將資料寫入檔案,我們需要使用類中的writeAsText
方法DataSet
:
DataSet<Integer> ds = ...
ds.writeAsText("path/to/file");
出於除錯/測試目的,Flink可以將資料寫入標準輸出或標準輸出:
DataSet<Integer> ds = ...
// Output dataset to the standard output
ds.print();
// Output dataset to the standard err
ds.printToErr()
更復雜的例子
要實現一些有意義的演算法,我們首先需要下載Grouplens電影資料集。它包含多個CSV檔案,其中包含有關電影和電影評級的資訊。我們將使用movies.csv
此資料集中的檔案,其中包含所有電影的列表,如下所示:
movieId,title,genres
1,Toy Story (1995),Adventure|Animation|Children|Comedy|Fantasy
2,Jumanji (1995),Adventure|Children|Fantasy
3,Grumpier Old Men (1995),Comedy|Romance
4,Waiting to Exhale (1995),Comedy|Drama|Romance
5,Father of the Bride Part II (1995),Comedy
6,Heat (1995),Action|Crime|Thriller
7,Sabrina (1995),Comedy|Romance
8,Tom and Huck (1995),Adventure|Children
9,Sudden Death (1995),Action
10,GoldenEye (1995),Action|Adventure|Thriller
它有三列:
- movieId - 此資料集中電影的唯一電影ID
- 標題 - 電影的標題
- 流派 - 每部電影的“|”分類列表
我們現在可以在Apache Flink中載入此CSV檔案並執行一些有意義的處理。在這裡,我們將從本地檔案系統載入檔案,而在現實環境中,您將讀取更大的資料集,它可能駐留在分散式系統中,例如S3或HDFS。
在這個演示中,讓我們找到所有“動作”型別的電影。這是一個執行此操作的程式碼段:
// Load dataset of movies
DataSet<Tuple3<Long, String, String>> lines = env.readCsvFile("movies.csv")
.ignoreFirstLine()
.parseQuotedStrings('"')
.ignoreInvalidLines()
.types(Long.class, String.class, String.class);
DataSet<Movie> movies = lines.map(new MapFunction<Tuple3<Long,String,String>, Movie>() {
@Override
public Movie map(Tuple3<Long, String, String> csvLine) throws Exception {
String movieName = csvLine.f1;
String[] genres = csvLine.f2.split("\\|");
return new Movie(movieName, new HashSet<>(Arrays.asList(genres)));
}
});
DataSet<Movie> filteredMovies = movies.filter(new FilterFunction<Movie>() {
@Override
public boolean filter(Movie movie) throws Exception {
return movie.getGenres().contains("Action");
}
});
filteredMovies.writeAsText("output.txt");
讓我們分解吧。首先,我們使用以下readCsvFile
方法讀取CSV檔案:
DataSet<Tuple3<Long, String, String>> lines = env.readCsvFile("movies.csv")
// ignore CSV header
.ignoreFirstLine()
// Set strings quotes character
.parseQuotedStrings('"')
// Ignore invalid lines in the CSV file
.ignoreInvalidLines()
// Specify types of columns in the CSV file
.types(Long.class, String.class, String.class);
使用輔助方法,我們指定如何解析CSV檔案中的字串以及我們需要跳過第一行。在最後一行中,我們指定CSV檔案中每列的型別,Flink將為我們解析資料。
現在,當我們在Flink叢集中載入資料集時,我們可以進行一些資料處理。首先,我們使用以下map
方法解析每部電影的流派列表:
DataSet<Movie> movies = lines.map(new MapFunction<Tuple3<Long,String,String>, Movie>() {
@Override
public Movie map(Tuple3<Long, String, String> csvLine) throws Exception {
String movieName = csvLine.f1;
String[] genres = csvLine.f2.split("\\|");
return new Movie(movieName, new HashSet<>(Arrays.asList(genres)));
}
});
要轉換我們需要實現的每部電影MapFunction
,它將接收每個CSV記錄作為Tuple3
例項,並將其轉換為Movie
POJO類:
class Movie {
private String name;
private Set<String> genres;
public Movie(String name, Set<String> genres) {
this.name = name;
this.genres = genres;
}
public String getName() {
return name;
}
public Set<String> getGenres() {
return genres;
}
}
如果您回想起CSV檔案的結構,則第二列包含電影的名稱,第三列包含型別列表。因此,我們分別使用欄位f1
和列來訪問這些列f2
。
現在,當我們有一個電影資料集時,我們可以實現演算法的核心部分並過濾所有動作電影:
DataSet<Movie> filteredMovies = movies.filter(new FilterFunction<Movie>() {
@Override
public boolean filter(Movie movie) throws Exception {
return movie.getGenres().contains("Action");
}
});
這將僅返回在集合型別中包含“Action”的電影。
現在最後一步非常簡單; 我們將結果資料儲存到一個檔案中:
filteredMovies.writeAsText("output.txt");
這只是將結果資料儲存到本地文字檔案中,但與readTextFile
方法一樣,我們可以通過指定協議來將此檔案寫入HDFS或S3 hdfs://
。
更多資訊
這是一篇介紹性文章,Apache Flink還有很多內容。我會在不久的將來寫更多關於Flink的文章,敬請期待!您可以在這裡閱讀我的其他文章,或者您可以檢視我的Pluralsight課程,其中更詳細地介紹了Apache Flink:瞭解Apache Flink。這是本課程的簡短預覽。