《Flink官方文件》Batch Examples
批處理示例
下面的程式展示了從簡單的單詞詞頻統計到圖演算法等不同的Flink應用。程式碼展示了Flink資料集API的使用方法。
下面案例和更多案例的完整原始碼可以參見Flink原始碼中的flink-examples-batch和 flink-examples-streaming模組。
執行例項
為了執行Flink的例子,我們假設你擁有已經啟動的Flink例項。在導航欄中的“Quickstart” 和 “Setup”介紹了啟動Flink的幾種不同方法。
最簡單的方法是執行指令碼./bin/start-local.sh,執行後一個啟動本地JobManager。
每個編譯好的Flink原始碼包含了一個例項目錄,其中包括了此頁面每個例子的jar包。
執行如下命令,來執行WordCount例子
./bin/flink run ./examples/batch/WordCount.jar
其他的例子都可以用類似的方式執行
如果執行例子的時候沒有帶引數,預設使用預設引數。如果希望使用真實資料來執行WordCount,需要將資料的路徑傳遞進去
./bin/flink run ./examples/batch/WordCount.jar –input /path/to/some/text/data –output /path/to/result
注意非本地檔案系統需要標明資料庫字首,比如HDFS://
詞頻統計
單詞詞頻統計是大資料處理系統“hello world”程式。它計算了文字中的詞頻。演算法分成兩步,第一部分,將文字分隔成不同單詞。第二步,講這些單詞分組並計數。
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); DataSet<String> text = env.readTextFile("/path/to/file"); DataSet<Tuple2<String, Integer>> counts = // split up the lines in pairs (2-tuples) containing: (word,1) text.flatMap(new Tokenizer()) // group by the tuple field "0" and sum up tuple field "1" .groupBy(0) .sum(1); counts.writeAsCsv(outputPath, "\n", " "); // User-defined functions public static class Tokenizer implements FlatMapFunction<String, Tuple2<String, Integer>> { @Override public void flatMap(String value, Collector<Tuple2<String, Integer>> out) { // normalize and split the line String[] tokens = value.toLowerCase().split("\\W+"); // emit the pairs for (String token : tokens) { if (token.length() > 0) { out.collect(new Tuple2<String, Integer>(token, 1)); } } } }
詞頻統計例子實現了上述描述的演算法,需要兩個輸入引數。–input –output 。測試資料可以替換為任何文字。
val env = ExecutionEnvironment.getExecutionEnvironment
// get input data
val text = env.readTextFile("/path/to/file")
val counts = text.flatMap { _.toLowerCase.split("\\W+") filter { _.nonEmpty } }
.map { (_, 1) }
.groupBy(0)
.sum(1)
counts.writeAsCsv(outputPath, "\n", " ")
Page Rank
PageRank演算法計算了圖中頁面的重要性,一個頁面到另一頁面的點形成了連結,這些連結定義成圖。它是迭代式的演算法,意味著相同的計算會被重複執行。在每次迭代中,每個頁面對它的鄰居貢獻出相同的評分,並接受來自它的鄰居的加權評分作為新的評分。PageRank演算法因google搜尋引擎眾所周知,它被用來計算網頁搜尋查詢結果的評分。
這個例子中,PageRank通過一批迭代和固定次數的迭代來完成。
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
// read the pages and initial ranks by parsing a CSV file
DataSet<Tuple2<Long, Double>> pagesWithRanks = env.readCsvFile(pagesInputPath)
.types(Long.class, Double.class)
// the links are encoded as an adjacency list: (page-id, Array(neighbor-ids))
DataSet<Tuple2<Long, Long[]>> pageLinkLists = getLinksDataSet(env);
// set iterative data set
IterativeDataSet<Tuple2<Long, Double>> iteration = pagesWithRanks.iterate(maxIterations);
DataSet<Tuple2<Long, Double>> newRanks = iteration
// join pages with outgoing edges and distribute rank
.join(pageLinkLists).where(0).equalTo(0).flatMap(new JoinVertexWithEdgesMatch())
// collect and sum ranks
.groupBy(0).sum(1)
// apply dampening factor
.map(new Dampener(DAMPENING_FACTOR, numPages));
DataSet<Tuple2<Long, Double>> finalPageRanks = iteration.closeWith(
newRanks,
newRanks.join(iteration).where(0).equalTo(0)
// termination condition
.filter(new EpsilonFilter()));
finalPageRanks.writeAsCsv(outputPath, "\n", " ");
// User-defined functions
public static final class JoinVertexWithEdgesMatch
implements FlatJoinFunction<Tuple2<Long, Double>, Tuple2<Long, Long[]>,
Tuple2<Long, Double>> {
@Override
public void join(<Tuple2<Long, Double> page, Tuple2<Long, Long[]> adj,
Collector<Tuple2<Long, Double>> out) {
Long[] neighbors = adj.f1;
double rank = page.f1;
double rankToDistribute = rank / ((double) neigbors.length);
for (int i = 0; i < neighbors.length; i++) {
out.collect(new Tuple2<Long, Double>(neighbors[i], rankToDistribute));
}
}
}
public static final class Dampener implements MapFunction<Tuple2<Long,Double>, Tuple2<Long,Double>> {
private final double dampening, randomJump;
public Dampener(double dampening, double numVertices) {
this.dampening = dampening;
this.randomJump = (1 - dampening) / numVertices;
}
@Override
public Tuple2<Long, Double> map(Tuple2<Long, Double> value) {
value.f1 = (value.f1 * dampening) + randomJump;
return value;
}
}
public static final class EpsilonFilter
implements FilterFunction<Tuple2<Tuple2<Long, Double>, Tuple2<Long, Double>>> {
@Override
public boolean filter(Tuple2<Tuple2<Long, Double>, Tuple2<Long, Double>> value) {
return Math.abs(value.f0.f1 - value.f1.f1) > EPSILON;
}
}
pagerank 程式實現了上面的例子。需要下面的執行引數–pages –links –output –numPages –iterations 。
scala
// User-defined types
case class Link(sourceId: Long, targetId: Long)
case class Page(pageId: Long, rank: Double)
case class AdjacencyList(sourceId: Long, targetIds: Array[Long])
// set up execution environment
val env = ExecutionEnvironment.getExecutionEnvironment
// read the pages and initial ranks by parsing a CSV file
val pages = env.readCsvFile[Page](pagesInputPath)
// the links are encoded as an adjacency list: (page-id, Array(neighbor-ids))
val links = env.readCsvFile[Link](linksInputPath)
// assign initial ranks to pages
val pagesWithRanks = pages.map(p => Page(p, 1.0 / numPages))
// build adjacency list from link input
val adjacencyLists = links
// initialize lists
.map(e => AdjacencyList(e.sourceId, Array(e.targetId)))
// concatenate lists
.groupBy("sourceId").reduce {
(l1, l2) => AdjacencyList(l1.sourceId, l1.targetIds ++ l2.targetIds)
}
// start iteration
val finalRanks = pagesWithRanks.iterateWithTermination(maxIterations) {
currentRanks =>
val newRanks = currentRanks
// distribute ranks to target pages
.join(adjacencyLists).where("pageId").equalTo("sourceId") {
(page, adjacent, out: Collector[Page]) =>
for (targetId <- adjacent.targetIds) {
out.collect(Page(targetId, page.rank / adjacent.targetIds.length))
}
}
// collect ranks and sum them up
.groupBy("pageId").aggregate(SUM, "rank")
// apply dampening factor
.map { p =>
Page(p.pageId, (p.rank * DAMPENING_FACTOR) + ((1 - DAMPENING_FACTOR) / numPages))
}
// terminate if no rank update was significant
val termination = currentRanks.join(newRanks).where("pageId").equalTo("pageId") {
(current, next, out: Collector[Int]) =>
// check for significant update
if (math.abs(current.rank - next.rank) > EPSILON) out.collect(1)
}
(newRanks, termination)
}
val result = finalRanks
// emit result
result.writeAsCsv(outputPath, "\n", " ")
輸入檔案必須是普通文字檔案而且檔案必須是遵循下列格式:
–Pages 用long型的ID表示,並以換行符分隔,如”1\n2\n12\n42\n63\n”體現了5個頁面,id分別是1, 2, 12, 42, and 63。
–Links表示了多對pageId的組合,每對之間通過空格分隔,不同links用換行符分隔。”1 2\n2 12\n1 12\n42 63\n”表示了(1)->(2), (2)->(12), (1)->(12), and (42)->(63)四個有向連結。
為了這個簡單實現至少需要每個頁面至少有一個入連結和一個出連結。一個頁面可以連結到他自己。
連通分支
連通分支演算法識別會一個更大的圖,這部分圖通過被相同的元件ID連結的所有頂點連線。類似PageRank,連通元件是一個迭代演算法。在每個步驟中,每個頂點都將其當前元件ID傳給所有鄰居。如果小於自己的元件ID,一個頂點從鄰居接受元件ID。
此實現使用增量迭代:元件ID未變化的頂點不參與下一步驟。因為後來的迭代通常只處理一些離群頂點,這將產生更好的效能。
// read vertex and edge data
DataSet<Long> vertices = getVertexDataSet(env);
DataSet<Tuple2<Long, Long>> edges = getEdgeDataSet(env).flatMap(new UndirectEdge());
// assign the initial component IDs (equal to the vertex ID)
DataSet<Tuple2<Long, Long>> verticesWithInitialId = vertices.map(new DuplicateValue<Long>());
// open a delta iteration
DeltaIteration<Tuple2<Long, Long>, Tuple2<Long, Long>> iteration =
verticesWithInitialId.iterateDelta(verticesWithInitialId, maxIterations, 0);
// apply the step logic:
DataSet<Tuple2<Long, Long>> changes = iteration.getWorkset()
// join with the edges
.join(edges).where(0).equalTo(0).with(new NeighborWithComponentIDJoin())
// select the minimum neighbor component ID
.groupBy(0).aggregate(Aggregations.MIN, 1)
// update if the component ID of the candidate is smaller
.join(iteration.getSolutionSet()).where(0).equalTo(0)
.flatMap(new ComponentIdFilter());
// close the delta iteration (delta and new workset are identical)
DataSet<Tuple2<Long, Long>> result = iteration.closeWith(changes, changes);
// emit result
result.writeAsCsv(outputPath, "\n", " ");
// User-defined functions
public static final class DuplicateValue<T> implements MapFunction<T, Tuple2<T, T>> {
@Override
public Tuple2<T, T> map(T vertex) {
return new Tuple2<T, T>(vertex, vertex);
}
}
public static final class UndirectEdge
implements FlatMapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>> {
Tuple2<Long, Long> invertedEdge = new Tuple2<Long, Long>();
@Override
public void flatMap(Tuple2<Long, Long> edge, Collector<Tuple2<Long, Long>> out) {
invertedEdge.f0 = edge.f1;
invertedEdge.f1 = edge.f0;
out.collect(edge);
out.collect(invertedEdge);
}
}
public static final class NeighborWithComponentIDJoin
implements JoinFunction<Tuple2<Long, Long>, Tuple2<Long, Long>, Tuple2<Long, Long>> {
@Override
public Tuple2<Long, Long> join(Tuple2<Long, Long> vertexWithComponent, Tuple2<Long, Long> edge) {
return new Tuple2<Long, Long>(edge.f1, vertexWithComponent.f1);
}
}
public static final class ComponentIdFilter
implements FlatMapFunction<Tuple2<Tuple2<Long, Long>, Tuple2<Long, Long>>,
Tuple2<Long, Long>> {
@Override
public void flatMap(Tuple2<Tuple2<Long, Long>, Tuple2<Long, Long>> value,
Collector<Tuple2<Long, Long>> out) {
if (value.f0.f1 < value.f1.f1) {
out.collect(value.f0);
}
}
}
scala
// set up execution environment
val env = ExecutionEnvironment.getExecutionEnvironment
// read vertex and edge data
// assign the initial components (equal to the vertex id)
val vertices = getVerticesDataSet(env).map { id => (id, id) }
// undirected edges by emitting for each input edge the input edges itself and an inverted
// version
val edges = getEdgesDataSet(env).flatMap { edge => Seq(edge, (edge._2, edge._1)) }
// open a delta iteration
val verticesWithComponents = vertices.iterateDelta(vertices, maxIterations, Array(0)) {
(s, ws) =>
// apply the step logic: join with the edges
val allNeighbors = ws.join(edges).where(0).equalTo(0) { (vertex, edge) =>
(edge._2, vertex._2)
}
// select the minimum neighbor
val minNeighbors = allNeighbors.groupBy(0).min(1)
// update if the component of the candidate is smaller
val updatedComponents = minNeighbors.join(s).where(0).equalTo(0) {
(newVertex, oldVertex, out: Collector[(Long, Long)]) =>
if (newVertex._2 < oldVertex._2) out.collect(newVertex)
}
// delta and new workset are identical
(updatedComponents, updatedComponents)
}
verticesWithComponents.writeAsCsv(outputPath, "\n", " ")
該連通分支程式實現了上述例子。它需要執行下列引數:–vertices –edges –output –iterations 。
輸入檔案是純文字檔案,必須格式化如下:
–Vertices 以IDS表示的頂點,由換行字元分隔。例如“1\n2\n12\n42\n63\n”給出了五個訂單(1)、(2)、(12)、(42)和(63)。
–Edges 邊通過以空格分隔的兩個頂點ID表示。不同邊是由換行符分隔。例如“1 2\n2 12\n1 12\n42 63\n”表示了四個無方向連結(1)-(2)、(2)-(12)、(1)-(12)和(42)-(63)。
關係型查詢
關係型查詢示例假定會使用兩張表,一張訂單表,另一張是TPC-H決策支援基準測試表。TPC-H是資料庫行業標準基準測試。如何生成輸入資料請參見下面的說明。
該示例實現以下sql查詢。
SELECT l_orderkey, o_shippriority, sum(l_extendedprice) as revenue
FROM orders, lineitem
WHERE l_orderkey = o_orderkey
AND o_orderstatus = "F"
AND YEAR(o_orderdate) > 1993
AND o_orderpriority LIKE "5%"
GROUP BY l_orderkey, o_shippriority;
Flink程式中按照如下的方式進行sql查詢
// get orders data set: (orderkey, orderstatus, orderdate, orderpriority, shippriority)
DataSet<Tuple5<Integer, String, String, String, Integer>> orders = getOrdersDataSet(env);
// get lineitem data set: (orderkey, extendedprice)
DataSet<Tuple2<Integer, Double>> lineitems = getLineitemDataSet(env);
// orders filtered by year: (orderkey, custkey)
DataSet<Tuple2<Integer, Integer>> ordersFilteredByYear =
// filter orders
orders.filter(
new FilterFunction<Tuple5<Integer, String, String, String, Integer>>() {
@Override
public boolean filter(Tuple5<Integer, String, String, String, Integer> t) {
// status filter
if(!t.f1.equals(STATUS_FILTER)) {
return false;
// year filter
} else if(Integer.parseInt(t.f2.substring(0, 4)) <= YEAR_FILTER) {
return false;
// order priority filter
} else if(!t.f3.startsWith(OPRIO_FILTER)) {
return false;
}
return true;
}
})
// project fields out that are no longer required
.project(0,4).types(Integer.class, Integer.class);
// join orders with lineitems: (orderkey, shippriority, extendedprice)
DataSet<Tuple3<Integer, Integer, Double>> lineitemsOfOrders =
ordersFilteredByYear.joinWithHuge(lineitems)
.where(0).equalTo(0)
.projectFirst(0,1).projectSecond(1)
.types(Integer.class, Integer.class, Double.class);
// extendedprice sums: (orderkey, shippriority, sum(extendedprice))
DataSet<Tuple3<Integer, Integer, Double>> priceSums =
// group by order and sum extendedprice
lineitemsOfOrders.groupBy(0,1).aggregate(Aggregations.SUM, 2);
// emit result
priceSums.writeAsCsv(outputPath);
缺少scala例子(譯者注)
關係查詢程式實現了上述查詢。它需要以下引數執行–orders –lineitem –output 。
order和lineitem檔案可以使用TPC-H基準測試套件的資料生成工具(DBGEN)生成。採取以下步驟生成需提供給flink程式輸入的任意大小的資料檔案。
1、下載並解壓DBGEN
2、複製makefile.suite並更名為Makefile,編輯修改如下:
DATABASE = DB2
MACHINE = LINUX
WORKLOAD = TPCH
CC = gcc
1、使用make命令構建DBGEN
2、使用DBGEN生成lineitem和orders表。-s命令傳入1,將會一個生成約1 GB的大小的資料集。
./dbgen -T o -s 1