前言、flink介紹:
Apache Flink 是一個分散式處理引擎,用於在無界和有界資料流上進行有狀態的計算。通過對時間精確控制以及狀態化控制,Flink能夠執行在任何處理無界流的應用中,同時對有界流,則由一些專為固定資料集設計的演算法和資料結構進行了內部處理,從而提升了效能。
1、flink特性
(1)Flink是一個開源的流處理框架,它具有以下特點:
- 分散式:Flink程式可以執行在多臺機器上。
- 高效能:處理效能比較高。
- 高可用:由於Flink程式本身是穩定的,因此它支援高可用性。
- 準確:Flink可以保證資料處理的準確性。
Flink主要由Java程式碼實現,它同時支援實時流處理和批處理。對於Flink而言,作為一個流處理框架,批資料只是流資料的一個極限特例而已。此外,Flink還支援迭代計算、記憶體管理和程式優化,這是它的原生特性。
(2)優勢:
- 流式優先:Flink可以連續處理流式資料。
- 容錯:Flink提供有狀態的計算,可以記錄資料的處理狀態,當資料處理失敗的時候,能夠無縫地從失敗中恢復,並保持Exactly-once。
- 可伸縮:Flink中的一個叢集支援上千個節點。
- 效能:Flink支援高吞吐(單位時間內可大量完成處理的資料操作)、低延遲(可快速支援海量資料)。
2、flink架構
Flink架構可以分為4層,包括Deploy層、Core層、API層和Library層
- Deploy層:該層主要涉及Flink的部署模式,Flink支援多種部署模式——本地、叢集(Standalone/YARN)和雲伺服器(GCE/EC2)。
- Core層:該層提供了支援Flink計算的全部核心實現,為API層提供基礎服務。
- API層:該層主要實現了面向無界Stream的流處理和麵向Batch的批處理API,其中流處理對應DataStream API,批處理對應DataSet API。
- Library層:該層也被稱為Flink應用框架層,根據API層的劃分,在API層之上構建的滿足特定應用的實現計算框架,也分別對應於面向流處理和麵向批處理兩類。面向流處理支援CEP(複雜事件處理)、基於SQL-like的操作(基於Table的關係操作);面向批處理支援FlinkML(機器學習庫)、Gelly(圖處理)、Table 操作。
一、相關概念:
1、watermark
watermark是一種衡量Event Time進展的機制,它是資料本身的一個隱藏屬性。通常基於Event Time的資料,自身都包含一個timestamp;
1)作用:
watermark是用於處理亂序事件的,通常用watermark機制結合window來實現。流處理從事件產生、到流經source、再到operator,中間是有一個過程和時間。大部分情況下,流到operator的資料都是按照事件產生的時間順序來的,但是也不排除由於網路等原因,導致亂序的產生(out-of-order或late element)。對於late element,我們又不能無限期的等下去,必須要有個機制來保證一個特定的時間後,必須觸發window去進行計算了。這個機制就是watermark。
2、 CheckPoint
(2.1)概述
為了保證State的容錯性,Flink需要對State進行CheckPoint。CheckPoint是Flink實現容錯機制的核心功能,它能夠根據配置週期性地基於Stream中各個Operator/Task的狀態來生成快照,從而將這些狀態資料定期持久化儲存下來。Flink程式一旦意外崩潰,重新執行程式時可以有選擇地從這些快照進行恢復,從而修正因為故障帶來的程式資料異常。
(2.2)使用說明
1)Checkpoint 在預設的情況下僅用於恢復失敗的作業,並不保留,當程式取消時 checkpoint 就會被刪除。
2) 預設情況下,CheckPoint功能是Disabled(禁用)的,使用時需要先開啟它。
env.enableCheckpointing(1000)
(2.3)目錄結構
checkpoint 由元資料檔案、資料檔案(與 state backend 相關)組成。可通過配置檔案中 “state.checkpoints.dir” 配置項來指定元資料檔案和資料檔案的儲存路徑,另外也可以在程式碼中針對單個作業特別指定該配置項。
當前的 checkpoint 目錄結構如下所示:
/user-defined-checkpoint-dir
/{job-id}
|
+ --shared/
+ --taskowned/
+ --chk-1/
+ --chk-2/
+ --chk-3/
...
在hdfs中的儲存結構
其中 SHARED 目錄儲存了可能被多個 checkpoint 引用的檔案,TASKOWNED 儲存了不會被 JobManager 刪除的檔案,EXCLUSIVE 則儲存那些僅被單個 checkpoint 引用的檔案。
3、Flink基本元件
Flink中提供了3個元件,包括DataSource、Transformation和DataSink。
- DataSource:表示資料來源元件,主要用來接收資料,目前官網提供了readTextFile、socketTextStream、fromCollection以及一些第三方的Source。
- Transformation:表示運算元,主要用來對資料進行處理,比如Map、FlatMap、Filter、Reduce、Aggregation等。
- DataSink:表示輸出元件,主要用來把計算的結果輸出到其他儲存介質中,比如writeAsText以及Kafka、Redis、Elasticsearch等第三方Sink元件。因此,想要組裝一個Flink Job,至少需要這3個元件。
即Flink Job=DataSource+Transformation+DataSink
二、Flink DataStreams API
1、DataStreams操作
獲得一個StreamExecutionEnvironment
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); DataStream<String> text = env.readTextFile("file:///path/to/file");
使用轉換函式在DataStream上呼叫方法轉換(通過將原始集合中的每個String轉換為Integer,將建立一個新的DataStream)
DataStream<String> input = ...; DataStream<Integer> parsed = input.map(new MapFunction<String, Integer>() {
@Override
public Integer map(String value) {
return Integer.parseInt(value);
}
});
writeAsText(String path) print()
1)同步執行:一旦觸發呼叫execute()
上StreamExecutionEnvironment,
根據ExecutionEnvironment
執行型別的不同,在本地計算機上觸發或將job提交到群集上執行。該execute()
方法將等待作業完成,然後返回JobExecutionResult
,其中包含執行時間和累加器結果。
2)非同步執行:呼叫觸發非同步作業執行executeAysnc()
的StreamExecutionEnvironment
。它將返回一個JobClient
與提交的作業進行通訊的。
final JobClient jobClient = env.executeAsync(); final JobExecutionResult jobExecutionResult = jobClient.getJobExecutionResult().get();
eg:
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector; public class WindowWordCount { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); DataStream<Tuple2<String, Integer>> dataStream = env
.socketTextStream("localhost", 9999)
.flatMap(new Splitter())
.keyBy(value -> value.f0)
.window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
.sum(1); dataStream.print(); env.execute("Window WordCount");
} public static class Splitter implements FlatMapFunction<String, Tuple2<String, Integer>> {
@Override
public void flatMap(String sentence, Collector<Tuple2<String, Integer>> out) throws Exception {
for (String word: sentence.split(" ")) {
out.collect(new Tuple2<String, Integer>(word, 1));
}
}
} }
2、dataSource
基於檔案:
readTextFile(path)
-TextInputFormat
逐行讀取文字檔案,即符合規範的檔案,並將其作為字串返回。readFile(fileInputFormat, path)
-根據指定的檔案輸入格式讀取(一次)檔案。readFile(fileInputFormat, path, watchType, interval, pathFilter, typeInfo)
-這是前兩個內部呼叫的方法。它path
根據給定的讀取檔案fileInputFormat
。根據提供的內容watchType
,此源可以定期(每interval
ms)監視路徑中的新資料(FileProcessingMode.PROCESS_CONTINUOUSLY
),或者對路徑中當前的資料進行一次處理並退出(FileProcessingMode.PROCESS_ONCE
)。使用pathFilter
,使用者可以進一步從檔案中排除檔案。
基於套接字:
socketTextStream
-從套接字讀取。元素可以由定界符分隔。
基於集合:
fromCollection(Collection)
-從Java Java.util.Collection建立資料流。集合中的所有元素必須具有相同的型別。fromCollection(Iterator, Class)
-從迭代器建立資料流。該類指定迭代器返回的元素的資料型別。fromElements(T ...)
-從給定的物件序列建立資料流。所有物件必須具有相同的型別。fromParallelCollection(SplittableIterator, Class)
-從迭代器並行建立資料流。該類指定迭代器返回的元素的資料型別。generateSequence(from, to)
-在給定間隔內並行生成數字序列。
自定義:
addSource
-附加新的源功能。例如,Apache Kafka,可以使用 addSource(new FlinkKafkaConsumer<>(...))
。
eg:
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); // read text file from local files system
DataSet<String> localLines = env.readTextFile("file:///path/to/my/textfile"); // read text file from an HDFS running at nnHost:nnPort
DataSet<String> hdfsLines = env.readTextFile("hdfs://nnHost:nnPort/path/to/my/textfile"); // read a CSV file with three fields
DataSet<Tuple3<Integer, String, Double>> csvInput = env.readCsvFile("hdfs:///the/CSV/file")
.types(Integer.class, String.class, Double.class); // read a CSV file with five fields, taking only two of them
DataSet<Tuple2<String, Double>> csvInput = env.readCsvFile("hdfs:///the/CSV/file")
.includeFields("10010") // take the first and the fourth field
.types(String.class, Double.class); // read a CSV file with three fields into a POJO (Person.class) with corresponding fields
DataSet<Person>> csvInput = env.readCsvFile("hdfs:///the/CSV/file")
.pojoType(Person.class, "name", "age", "zipcode"); // read a file from the specified path of type SequenceFileInputFormat
DataSet<Tuple2<IntWritable, Text>> tuples =
env.createInput(HadoopInputs.readSequenceFile(IntWritable.class, Text.class, "hdfs://nnHost:nnPort/path/to/file")); // creates a set from some given elements
DataSet<String> value = env.fromElements("Foo", "bar", "foobar", "fubar"); // generate a number sequence
DataSet<Long> numbers = env.generateSequence(1, 10000000); // Read data from a relational database using the JDBC input format
DataSet<Tuple2<String, Integer> dbData =
env.createInput(
JdbcInputFormat.buildJdbcInputFormat()
.setDrivername("org.apache.derby.jdbc.EmbeddedDriver")
.setDBUrl("jdbc:derby:memory:persons")
.setQuery("select name, age from persons")
.setRowTypeInfo(new RowTypeInfo(BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO))
.finish()
); // Note: Flink's program compiler needs to infer the data types of the data items which are returned
// by an InputFormat. If this information cannot be automatically inferred, it is necessary to
// manually provide the type information as shown in the examples above.
對於基於檔案的輸入,當輸入路徑為目錄時,預設情況下不列舉巢狀檔案。而是隻讀取基本目錄中的檔案,而忽略巢狀檔案。可以通過recursive.file.enumeration
配置引數啟用巢狀檔案的遞迴列舉,如以下示例所示:
// enable recursive enumeration of nested input files
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); // create a configuration object
Configuration parameters = new Configuration(); // set the recursive enumeration parameter
parameters.setBoolean("recursive.file.enumeration", true); // pass the configuration to the data source
DataSet<String> logs = env.readTextFile("file:///path/with.nested/files")
.withParameters(parameters);
3、DataStream轉換
(1)資料接收器
writeAsText()
/TextOutputFormat
-將元素按行寫為字串。通過呼叫每個元素的toString()方法獲得字串。writeAsCsv(...)
/CsvOutputFormat
-將元組寫為逗號分隔的值檔案。行和欄位定界符是可配置的。每個欄位的值來自物件的toString()方法。print()
/printToErr()
-在標準輸出/標準錯誤流上列印每個元素的toString()值。可選地,可以提供字首(msg),該字首在輸出之前。這可以幫助區分列印的不同調用。如果並行度大於1,則輸出之前還將帶有產生輸出的任務的識別符號。writeUsingOutputFormat()
/FileOutputFormat
-的方法和自定義檔案輸出基類。支援自定義物件到位元組的轉換。writeToSocket
-根據SerializationSchema
addSink
-呼叫自定義接收器功能。Flink與其他系統(例如Apache Kafka)的聯結器捆綁在一起,這些系統已實現為接收器功能。
請注意:
1)write*()
方法DataStream
主要用於除錯目的。它們不參與Flink的檢查點(這些功能通常具有至少一次語義)。
2)重新整理到目標系統的資料取決於OutputFormat的實現,即並非所有傳送到OutputFormat的元素都立即顯示在目標系統中。同樣,在失敗的情況下,這些記錄可能會丟失。
3)為了將流可靠、準確地一次傳輸到檔案系統中,請使用StreamingFileSink
。同樣,通過該.addSink(...)
方法的自定義實現可以參與Flink一次精確語義的檢查點。
eg:
// text data
DataSet<String> textData = // [...] // write DataSet to a file on the local file system
textData.writeAsText("file:///my/result/on/localFS"); // write DataSet to a file on an HDFS with a namenode running at nnHost:nnPort
textData.writeAsText("hdfs://nnHost:nnPort/my/result/on/localFS"); // write DataSet to a file and overwrite the file if it exists
textData.writeAsText("file:///my/result/on/localFS", WriteMode.OVERWRITE); // tuples as lines with pipe as the separator "a|b|c"
DataSet<Tuple3<String, Integer, Double>> values = // [...]
values.writeAsCsv("file:///path/to/the/result/file", "\n", "|"); // this writes tuples in the text formatting "(a, b, c)", rather than as CSV lines
values.writeAsText("file:///path/to/the/result/file"); // this writes values as strings using a user-defined TextFormatter object
values.writeAsFormattedText("file:///path/to/the/result/file",
new TextFormatter<Tuple2<Integer, Integer>>() {
public String format (Tuple2<Integer, Integer> value) {
return value.f1 + " - " + value.f0;
}
});
自定義輸出格式
DataSet<Tuple3<String, Integer, Double>> myResult = [...] // write Tuple DataSet to a relational database
myResult.output(
// build and configure OutputFormat
JdbcOutputFormat.buildJdbcOutputFormat()
.setDrivername("org.apache.derby.jdbc.EmbeddedDriver")
.setDBUrl("jdbc:derby:memory:persons")
.setQuery("insert into persons (name, age, height) values (?,?,?)")
.finish()
);
本地排序輸出
可以使用元組欄位位置或欄位表示式按指定順序在指定欄位上對資料接收器的輸出進行本地排序。這適用於每種輸出格式。
eg:
DataSet<Tuple3<Integer, String, Double>> tData = // [...]
DataSet<Tuple2<BookPojo, Double>> pData = // [...]
DataSet<String> sData = // [...] // sort output on String field in ascending order
tData.sortPartition(1, Order.ASCENDING).print(); // sort output on Double field in descending and Integer field in ascending order
tData.sortPartition(2, Order.DESCENDING).sortPartition(0, Order.ASCENDING).print(); // sort output on the "author" field of nested BookPojo in descending order
pData.sortPartition("f0.author", Order.DESCENDING).writeAsText(...); // sort output on the full tuple in ascending order
tData.sortPartition("*", Order.ASCENDING).writeAsCsv(...); // sort atomic type (String) output in descending order
sData.sortPartition("*", Order.DESCENDING).writeAsText(...);
注:目前尚不支援全域性排序的輸出。
(2)控制延遲(設定使用流處理、批處理)
預設情況下,元素不會在網路上一對一傳輸(產生不必要的網路通訊開銷),通常會進行緩衝。緩衝區的大小可以在Flink配置檔案中設定。控制吞吐量和延遲,可以在執行環境(或各個運算子)上使用來設定緩衝區填充的最大等待時間env.setBufferTimeout(timeoutMillis)
。超過設定時間,即使緩衝區未滿,也會自動傳送緩衝區。
LocalStreamEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();
env.setBufferTimeout(timeoutMillis);//預設是100ms env.generateSequence(1,10).map(new MyMapper()).setBufferTimeout(timeoutMillis);
為了最大程度地提高吞吐量,設定setBufferTimeout(-1)表示永不超時
,僅在緩衝區已滿時才重新整理緩衝區。最大程度地減少延遲,可以將超時設定為接近0的值(例如5或10 ms)(避免將緩衝區超時設定為0,可能導致嚴重的效能下降)。
4、執行模式(批處理/流式傳輸)
DataStream API支援不同的執行時執行模式,可以根據用例的要求和工作特徵從中選擇執行模式。
DataStream API有“經典”執行行為,稱之為 STREAMING
執行模式。應用於需要連續增量處理且有望無限期保持線上狀態的無限制作業。另外,有一個批處理式執行模式,我們稱為BATCH
執行模式。這以一種類似於批處理框架(如MapReduce)的方式執行作業。應用於具有已知固定輸入且不會連續執行的有邊界作業。
Apache Flink的流和批處理的統一方法意味著,無論配置的執行模式如何,在有界輸入上執行的DataStream應用程式都將產生相同的最終結果。重要的是要注意final在這裡意味著什麼:以STREAMING
模式執行的作業可能會產生增量更新(請考慮資料庫中的upsert),而BATCH
作業最後只會產生一個最終結果。如果正確解釋,最終結果將是相同的,但到達那裡的方式可能會有所不同。
通過啟用BATCH
執行,我們允許Flink應用其他優化,只有當我們知道輸入是有限的時,我們才能做這些優化。
(1)配置BATCH執行模式
可以通過execution.runtime-mode
設定配置執行模式。有三個可能的值:
STREAMING
:經典的DataStream執行模式(預設)BATCH
:在DataStream API上以批處理方式執行AUTOMATIC
:讓系統根據源的有界性來決定
可以通過的命令列引數進行配置bin/flink run ...
,也可以在建立/配置時以程式設計方式進行配置StreamExecutionEnvironment
。
通過命令列配置執行模式的方法如下:
$ bin/flink run -Dexecution.runtime-mode=BATCH examples/streaming/WordCount.jar
此示例說明如何在程式碼中配置執行模式:
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setRuntimeMode(RuntimeExecutionMode.BATCH);
5、Keyed DataStream(鍵控資料流)
(1)概念:
在flink中資料集為DataStream,對其進行分割槽時,會產生一個KeyedDataStream,然後允許使用Keyed DataStream的operator以及特有的state(如mapstate、valuestate等),keyby可以通過列下標選擇使用列,也可以選擇使用列名進行分割槽。
eg:
// some ordinary POJO
public class WC {
public String word;
public int count;
public String getWord() { return word; }
}
DataStream<WC> words = // [...]
KeyedStream<WC> keyed = words
.keyBy(WC::getWord);
三、Flink DataSet API
Flink中的DataSet程式是常規程式,可對資料集進行轉換(例如,過濾,對映,聯接,分組)。最初從某些來源(例如,通過讀取檔案或從本地集合)建立資料集。結果通過接收器返回,接收器可以例如將資料寫入(分散式)檔案或標準輸出(例如命令列終端)。Flink程式可以在各種上下文中執行,獨立執行或嵌入其他程式中。執行可以在本地JVM或許多計算機的群集中進行。
public class WordCountExample {
public static void main(String[] args) throws Exception {
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); DataSet<String> text = env.fromElements(
"Who's there?",
"I think I hear them. Stand, ho! Who's there?"); DataSet<Tuple2<String, Integer>> wordCounts = text
.flatMap(new LineSplitter())
.groupBy(0)
.sum(1); wordCounts.print();
} public static class LineSplitter implements FlatMapFunction<String, Tuple2<String, Integer>> {
@Override
public void flatMap(String line, Collector<Tuple2<String, Integer>> out) {
for (String word : line.split(" ")) {
out.collect(new Tuple2<String, Integer>(word, 1));
}
}
}
}
1、DataSet轉換
(1)資料轉換將一個或多個數據集轉換為新的資料集。程式可以將多種轉換組合成複雜的程式集。
Transformation | Description |
---|---|
Map | Takes one element and produces one element.
|
FlatMap | Takes one element and produces zero, one, or more elements.
|
MapPartition | Transforms a parallel partition in a single function call. The function gets the partition as an Iterable stream and can produce an arbitrary number of result values. The number of elements in each partition depends on the degree-of-parallelism and previous operations.
|
Filter | Evaluates a boolean function for each element and retains those for which the function returns true. IMPORTANT: The system assumes that the function does not modify the elements on which the predicate is applied. Violating this assumption can lead to incorrect results.
|
Reduce | Combines a group of elements into a single element by repeatedly combining two elements into one. Reduce may be applied on a full data set or on a grouped data set.
If the reduce was applied to a grouped data set then you can specify the way that the runtime executes the combine phase of the reduce by supplying a |
Aggregate | Aggregates a group of values into a single value. Aggregation functions can be thought of as built-in reduce functions. Aggregate may be applied on a full data set, or on a grouped data set.
You can also use short-hand syntax for minimum, maximum, and sum aggregations.
|
Distinct | Returns the distinct elements of a data set. It removes the duplicate entries from the input DataSet, with respect to all fields of the elements, or a subset of fields.
Distinct is implemented using a reduce function. You can specify the way that the runtime executes the combine phase of the reduce by supplying a |
Join | Joins two data sets by creating all pairs of elements that are equal on their keys. Optionally uses a JoinFunction to turn the pair of elements into a single element, or a FlatJoinFunction to turn the pair of elements into arbitrarily many (including none) elements. See the keys section to learn how to define join keys.
You can specify the way that the runtime executes the join via Join Hints. The hints describe whether the join happens through partitioning or broadcasting, and whether it uses a sort-based or a hash-based algorithm. Please refer to the Transformations Guide for a list of possible hints and an example.
Note that the join transformation works only for equi-joins. Other join types need to be expressed using OuterJoin or CoGroup. |
OuterJoin | Performs a left, right, or full outer join on two data sets. Outer joins are similar to regular (inner) joins and create all pairs of elements that are equal on their keys. In addition, records of the "outer" side (left, right, or both in case of full) are preserved if no matching key is found in the other side. Matching pairs of elements (or one element and a null value for the other input) are given to a JoinFunction to turn the pair of elements into a single element, or to a FlatJoinFunction to turn the pair of elements into arbitrarily many (including none) elements. See the keys section to learn how to define join keys.
|
Cross | Builds the Cartesian product (cross product) of two inputs, creating all pairs of elements. Optionally uses a CrossFunction to turn the pair of elements into a single element
Note: Cross is potentially a very compute-intensive operation which can challenge even large compute clusters! It is advised to hint the system with the DataSet sizes by using crossWithTiny() and crossWithHuge(). |
Union | Produces the union of two data sets.
|
First-n | Returns the first n (arbitrary) elements of a data set. First-n can be applied on a regular data set, a grouped data set, or a grouped-sorted data set. Grouping keys can be specified as key-selector functions or field position keys.
|
(2)在元組的資料集上可以進行以下轉換:
Transformation | Description |
---|---|
Project | Selects a subset of fields from the tuples
|
MinBy / MaxBy | Selects a tuple from a group of tuples whose values of one or more fields are minimum (maximum). The fields which are used for comparison must be valid key fields, i.e., comparable. If multiple tuples have minimum (maximum) field values, an arbitrary tuple of these tuples is returned. MinBy (MaxBy) may be applied on a full data set or a grouped data set.
|
(3)指定key
某些轉換(join,coGroup,groupBy)要求在元素集合上定義鍵。其他轉換(Reduce,GroupReduce,Aggregate)允許在應用資料之前對資料進行分組。
DataSet<...> input = // [...]
DataSet<...> reduced = input
.groupBy(/*define key here*/)
.reduceGroup(/*do something*/);
Flink的資料模型不是基於鍵值對。因此,無需將資料集型別實際打包到鍵和值中。key是“虛擬的”,定義為對實際資料的功能,用於分組操作。
(4)使用者定義的功能
實現介面方式
class MyMapFunction implements MapFunction<String, Integer> {
public Integer map(String value) { return Integer.parseInt(value); }
};
data.map(new MyMapFunction());
匿名類方式
data.map(new MapFunction<String, Integer> () {
public Integer map(String value) { return Integer.parseInt(value); }
});
Java 8 Lambdas(Flink在Java API中還支援Java 8 Lambda)
data.filter(s -> s.startsWith("http://"));
data.reduce((i1,i2) -> i1 + i2);
所有需要使用者定義函式的轉換都可以將Rich()函式作為引數。
eg:
class MyMapFunction implements MapFunction<String, Integer> {
public Integer map(String value) { return Integer.parseInt(value); }
};
可以替換為以下寫法
class MyMapFunction extends RichMapFunction<String, Integer> {
public Integer map(String value) { return Integer.parseInt(value); }
};
將函式照常傳遞給map
轉換:
data.map(new MyMapFunction());
也可以定義為匿名類:
data.map (new RichMapFunction<String, Integer>() {
public Integer map(String value) { return Integer.parseInt(value); }
});
accumulators
首先,在要使用它的使用者定義的轉換函式中建立一個累加器物件(此處是一個計數器)。
private IntCounter numLines = new IntCounter();
其次,在rich函式的open()
方法中 註冊累加器物件。您還可以在此處定義名稱。
getRuntimeContext().addAccumulator("num-lines", this.numLines);
在運算子函式中的任何位置(包括open()
和 close()
方法中)使用累加器。
this.numLines.add(1);
結果將儲存在JobExecutionResult
從execute()
執行環境的方法返回的物件中(僅在執行等待作業完成時才起作用)。
myJobExecutionResult.getAccumulatorResult("num-lines")
所有累加器為每個作業共享一個名稱空間。因此,可以在作業的不同操作功能中使用同一累加器。Flink將在內部合併所有具有相同名稱的累加器。
注:累加器的結果僅在整個作業結束後才可用。
自定義累加器:
要實現自己的累加器,只需要編寫累加器介面的實現即可。
若自定義累加器應隨Flink一起提供,則可以隨意建立拉取請求,可以選擇實現 Accumulator 或SimpleAccumulator。
1)Accumulator<V,R>
最靈活:它定義V
要新增的值的型別,並定義R
最終結果的結果型別。例如,對於直方圖,V
是一個數字,並且R
是一個直方圖。
2)SimpleAccumulator
適用於兩種型別相同的情況,例如計數器。
三、Operators
將一個或多個DataStream轉換為新的DataStream。程式可以將多種轉換組合成複雜的資料流拓撲。
1、DataStream Transformations
Transformation | Description |
---|---|
MapDataStream → DataStream |
Takes one element and produces one element. A map function that doubles the values of the input stream:
|
FlatMapDataStream → DataStream |
Takes one element and produces zero, one, or more elements. A flatmap function that splits sentences to words:
|
FilterDataStream → DataStream |
Evaluates a boolean function for each element and retains those for which the function returns true. A filter that filters out zero values:
|
KeyByDataStream → KeyedStream |
Logically partitions a stream into disjoint partitions. All records with the same key are assigned to the same partition. Internally, keyBy() is implemented with hash partitioning. There are different ways to specify keys.This transformation returns a KeyedStream, which is, among other things, required to use keyed state.
Attention A type cannot be a key if:
|
ReduceKeyedStream → DataStream |
A "rolling" reduce on a keyed data stream. Combines the current element with the last reduced value and emits the new value. A reduce function that creates a stream of partial sums:
|
AggregationsKeyedStream → DataStream |
Rolling aggregations on a keyed data stream. The difference between min and minBy is that min returns the minimum value, whereas minBy returns the element that has the minimum value in this field (same for max and maxBy).
|
WindowKeyedStream → WindowedStream |
Windows can be defined on already partitioned KeyedStreams. Windows group the data in each key according to some characteristic (e.g., the data that arrived within the last 5 seconds). See windows for a complete description of windows.
|
WindowAllDataStream → AllWindowedStream |
Windows can be defined on regular DataStreams. Windows group all the stream events according to some characteristic (e.g., the data that arrived within the last 5 seconds). See windows for a complete description of windows.WARNING: This is in many cases a non-parallel transformation. All records will be gathered in one task for the windowAll operator.
|
Window ApplyWindowedStream → DataStream |
Applies a general function to the window as a whole. Below is a function that manually sums the elements of a window.Note: If you are using a windowAll transformation, you need to use an AllWindowFunction instead.
|
Window ReduceWindowedStream → DataStream |
Applies a functional reduce function to the window and returns the reduced value.
|
Aggregations on windowsWindowedStream → DataStream |
Aggregates the contents of a window. The difference between min and minBy is that min returns the minimum value, whereas minBy returns the element that has the minimum value in this field (same for max and maxBy).
|
UnionDataStream* → DataStream |
Union of two or more data streams creating a new stream containing all the elements from all the streams. Note: If you union a data stream with itself you will get each element twice in the resulting stream.
|
Window JoinDataStream,DataStream → DataStream |
Join two data streams on a given key and a common window.
|
Interval JoinKeyedStream,KeyedStream → DataStream |
Join two elements e1 and e2 of two keyed streams with a common key over a given time interval, so that e1.timestamp + lowerBound <= e2.timestamp <= e1.timestamp + upperBound
|
Window CoGroupDataStream,DataStream → DataStream |
Cogroups two data streams on a given key and a common window.
|
ConnectDataStream,DataStream → ConnectedStreams |
"Connects" two data streams retaining their types. Connect allowing for shared state between the two streams.
|
CoMap, CoFlatMapConnectedStreams → DataStream |
Similar to map and flatMap on a connected data stream
|
IterateDataStream → IterativeStream → DataStream |
Creates a "feedback" loop in the flow, by redirecting the output of one operator to some previous operator. This is especially useful for defining algorithms that continuously update a model. The following code starts with a stream and applies the iteration body continuously. Elements that are greater than 0 are sent back to the feedback channel, and the rest of the elements are forwarded downstream.
|
以下轉換用於tuples的dataStream上:
Transformation | Description |
---|---|
ProjectDataStream → DataStream |
Selects a subset of fields from the tuples
|
2、物理分割槽
Flink還通過以下功能對轉換後的確切流分割槽進行了底層控制。
Transformation | Description |
---|---|
Custom partitioningDataStream → DataStream |
Uses a user-defined Partitioner to select the target task for each element.
|
Random partitioningDataStream → DataStream |
Partitions elements randomly according to a uniform distribution.
|
Rebalancing (Round-robin partitioning)DataStream → DataStream |
Partitions elements round-robin, creating equal load per partition. Useful for performance optimization in the presence of data skew.
|
RescalingDataStream → DataStream |
Partitions elements, round-robin, to a subset of downstream operations. This is useful if you want to have pipelines where you, for example, fan out from each parallel instance of a source to a subset of several mappers to distribute load but don't want the full rebalance that rebalance() would incur. This would require only local data transfers instead of transferring data over network, depending on other configuration values such as the number of slots of TaskManagers.The subset of downstream operations to which the upstream operation sends elements depends on the degree of parallelism of both the upstream and downstream operation. For example, if the upstream operation has parallelism 2 and the downstream operation has parallelism 6, then one upstream operation would distribute elements to three downstream operations while the other upstream operation would distribute to the other three downstream operations. If, on the other hand, the downstream operation has parallelism 2 while the upstream operation has parallelism 6 then three upstream operations would distribute to one downstream operation while the other three upstream operations would distribute to the other downstream operation.In cases where the different parallelisms are not multiples of each other one or several downstream operations will have a differing number of inputs from upstream operations.Please see this figure for a visualization of the connection pattern in the above example:
![]()
|
BroadcastingDataStream → DataStream |
Broadcasts elements to every partition.
|
3、任務鏈和資源組
連結兩個後續的轉換意味著將它們共同定位在同一執行緒內以獲得更好的效能。預設情況下Flink會連結運算子(例如,兩個後續的對映轉換),API可以對連結進行細粒度的控制。如果要在整個作業中禁用連結,使用StreamExecutionEnvironment.disableOperatorChaining()。
備註:這些函式只能在DataStream轉換後使用,因為它們引用的是先前的轉換。例如,可以使用someStream.map(...).startNewChain(),但不能使用someStream.startNewChain()。
資源組是Flink中的slot。
Transformation | Description |
---|---|
Start new chain | Begin a new chain, starting with this operator. The two mappers will be chained, and filter will not be chained to the first mapper.
|
Disable chaining | Do not chain the map operator
|
Set slot sharing group | Set the slot sharing group of an operation. Flink will put operations with the same slot sharing group into the same slot while keeping operations that don't have the slot sharing group in other slots. This can be used to isolate slots. The slot sharing group is inherited from input operations if all input operations are in the same slot sharing group. The name of the default slot sharing group is "default", operations can explicitly be put into this group by calling slotSharingGroup("default").
|
4、Windows
Windows是處理無限流的核心。Windows將流分成有限大小的“儲存桶”。
(0)視窗化Flink程式的一般結構如下所示:
Keyed Windows(鍵控流)
stream
.keyBy(...) <- keyed versus non-keyed windows
.window(...) <- required: "assigner"
[.trigger(...)] <- optional: "trigger" (else default trigger)
[.evictor(...)] <- optional: "evictor" (else no evictor)
[.allowedLateness(...)] <- optional: "lateness" (else zero)
[.sideOutputLateData(...)] <- optional: "output tag" (else no side output for late data)
.reduce/aggregate/fold/apply() <- required: "function"
[.getSideOutput(...)] <- optional: "output tag"
Non-Keyed Windows(非鍵控流)
stream
.windowAll(...) <- required: "assigner"
[.trigger(...)] <- optional: "trigger" (else default trigger)
[.evictor(...)] <- optional: "evictor" (else no evictor)
[.allowedLateness(...)] <- optional: "lateness" (else zero)
[.sideOutputLateData(...)] <- optional: "output tag" (else no side output for late data)
.reduce/aggregate/fold/apply() <- required: "function"
[.getSideOutput(...)] <- optional: "output tag"
在上面,方括號([…])中的命令是可選的。Flink允許以多種不同方式自定義視窗邏輯,使其最切合需求。
(1)Window Assigners(視窗分配器)
視窗分配器定義瞭如何將元素分配給視窗,WindowAssigner
在window(...)
(鍵控流)或windowAll()
(非鍵控流)函式中呼叫。WindowAssigner
負責將每個傳入元素分配給一個或多個視窗。Flink 為最常見的用例提供了預定義的視窗分配器,即滾動視窗、 滑動視窗、會話視窗和全域性視窗,還可以通過擴充套件WindowAssigner
類來實現自定義視窗分配器。所有內建視窗分配器(全域性視窗除外)都根據時間將元素分配給視窗,時間可以是處理時間或事件時間。
基於時間的視窗通過開始時間戳(包括)和一個結束時間戳(不包括)表示視窗大小。在程式碼中,FlinkTimeWindow
在處理基於時間視窗時使用,該視窗具有查詢開始和結束時間戳的方法maxTimestamp()
,以及返回給定視窗的最大允許時間戳。
下圖視覺化描述每個分配器的工作原理。紫色圓圈代表流的元素,它們由某個key(在本例中為user 1、user 2和user 3)分割槽。x 軸顯示時間的進展。
(1.1)翻滾視窗
翻滾視窗分配器分配每一個元素到固定大小的視窗(滾動視窗具有固定的大小且不重疊)。
eg:如果指定大小為5分鐘的翻滾視窗,從當前視窗開始計算,每五分鐘將啟動一個新視窗,如下圖所示
程式碼示例:
DataStream<T> input = ...; // tumbling event-time windows
input
.keyBy(<key selector>)
.window(TumblingEventTimeWindows.of(Time.seconds(5)))
.<windowed transformation>(<window function>); // tumbling processing-time windows
input
.keyBy(<key selector>)
.window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
.<windowed transformation>(<window function>); // daily tumbling event-time windows offset by -8 hours.
input
.keyBy(<key selector>)
.window(TumblingEventTimeWindows.of(Time.days(1), Time.hours(-8)))
.<windowed transformation>(<window function>);
時間間隔可以通過Time.milliseconds(x)
,Time.seconds(x)
, Time.minutes(x)
等新增。如上面最後一個示例,滾動視窗分配器採用offset(可選)
引數,用於更改視窗的對齊方式。若沒有offsets ,則時間滾動視窗與epoch對齊,即1:00:00.000 - 1:59:59.999
,2:00:00.000 - 2:59:59.999等;如果offset設定為
15分鐘,則得到如 1:15:00.000 - 2:14:59.999
,2:15:00.000 - 3:14:59.999
等,即offset可以用來調整視窗時區為UTC-0以外的時區,如採用中國時區,必須指定的偏移量Time.hours(-8)
。
(1.2)滑動窗
類似於滾動視窗分配器,視窗的大小由視窗大小引數配置,同時視窗滑動引數控制滑動視窗啟動的頻率。因此,如果slide引數小於視窗大小,則滑動視窗可能會重疊。在這種情況下,元素被分配給多個視窗。
例如,可以將大小為10分鐘的視窗滑動5分鐘。這樣,每隔5分鐘就會得到一個視窗,其中包含最近10分鐘內到達的事件,如下圖所示:
DataStream<T> input = ...; // sliding event-time windows
input
.keyBy(<key selector>)
.window(SlidingEventTimeWindows.of(Time.seconds(10), Time.seconds(5)))
.<windowed transformation>(<window function>); // sliding processing-time windows
input
.keyBy(<key selector>)
.window(SlidingProcessingTimeWindows.of(Time.seconds(10), Time.seconds(5)))
.<windowed transformation>(<window function>); // sliding processing-time windows offset by -8 hours
input
.keyBy(<key selector>)
.window(SlidingProcessingTimeWindows.of(Time.hours(12), Time.hours(1), Time.hours(-8)))
.<windowed transformation>(<window function>);
(1.3)會話視窗
與滾動視窗和滑動視窗相比,會話視窗不重疊且沒有固定的開始和結束時間。相反,當會話視窗在一定時間段內未收到元素時(即不活動間隙),它將關閉。會話視窗分配器可與靜態配置會話間隙或與會話間隙函式(指定不活動週期)使用。當此時間段到期時,當前會話將關閉,隨後的元素將分配給新的會話視窗。
DataStream<T> input = ...; // event-time session windows with static gap
input
.keyBy(<key selector>)
.window(EventTimeSessionWindows.withGap(Time.minutes(10)))
.<windowed transformation>(<window function>); // event-time session windows with dynamic gap
input
.keyBy(<key selector>)
.window(EventTimeSessionWindows.withDynamicGap((element) -> {
// determine and return session gap
}))
.<windowed transformation>(<window function>); // processing-time session windows with static gap
input
.keyBy(<key selector>)
.window(ProcessingTimeSessionWindows.withGap(Time.minutes(10)))
.<windowed transformation>(<window function>); // processing-time session windows with dynamic gap
input
.keyBy(<key selector>)
.window(ProcessingTimeSessionWindows.withDynamicGap((element) -> {
// determine and return session gap
}))
.<windowed transformation>(<window function>);
靜態間隙可以通過Time.milliseconds(x)
,Time.seconds(x)
, Time.minutes(x)
等設定。動態間隙是通過實現SessionWindowTimeGapExtractor
介面指定的。
注意由於會話視窗沒有固定的開始和結束,在內部,會話視窗運算子會為每個到達的記錄建立一個新視窗,如果視窗彼此之間比已定義的間隔小,則將它們進行merge操作。(merge操作,會話視窗操作需要一個merge觸發器以及merge的window函式,如ReduceFunction
,AggregateFunction
或ProcessWindowFunction)
(1.4)全域性視窗
全域性視窗分配器對同單個視窗分配相同的key元素。當指定了自定義觸發器時,此視窗schema才有用。否則,將不會執行任何計算,因為當執行聚合函式時,全域性視窗不會自動結束。
DataStream<T> input = ...; input
.keyBy(<key selector>)
.window(GlobalWindows.create())
.<windowed transformation>(<window function>);
2、Window Functions
(2.1)Reduce功能
ReduceFunction
指定如何將輸入中的兩個元素組合在一起以產生相同型別的輸出元素。Flink使用aReduceFunction
來逐步聚合視窗的元素。
eg:
DataStream<Tuple2<String, Long>> input = ...; input
.keyBy(<key selector>)
.window(<window assigner>)
.reduce(new ReduceFunction<Tuple2<String, Long>> {
public Tuple2<String, Long> reduce(Tuple2<String, Long> v1, Tuple2<String, Long> v2) {
return new Tuple2<>(v1.f0, v1.f1 + v2.f1);
}
});
(2.2)聚合函式
一個AggregateFunction
是一個一般化版本ReduceFunction
,其具有三種類型:輸入型別(IN
),蓄壓式(ACC
),和一個輸出型別(OUT
)。輸入型別是輸入流中元素的型別,並且AggregateFunction
具有將一個輸入元素新增到累加器的方法。該介面還具有建立初始累加器,將兩個累加器合併為一個累加器以及OUT
從累加器提取輸出(型別)的方法。我們將在下面的示例中看到它的工作原理。
與ReduceFunction一樣
,Flink將在視窗輸入元素到達時增量地聚合它們。
一個AggregateFunction
可以被定義並這樣使用:
/**
* The accumulator is used to keep a running sum and a count. The {@code getResult} method
* computes the average.
*/
private static class AverageAggregate
implements AggregateFunction<Tuple2<String, Long>, Tuple2<Long, Long>, Double> {
@Override
public Tuple2<Long, Long> createAccumulator() {
return new Tuple2<>(0L, 0L);
} @Override
public Tuple2<Long, Long> add(Tuple2<String, Long> value, Tuple2<Long, Long> accumulator) {
return new Tuple2<>(accumulator.f0 + value.f1, accumulator.f1 + 1L);
} @Override
public Double getResult(Tuple2<Long, Long> accumulator) {
return ((double) accumulator.f0) / accumulator.f1;
} @Override
public Tuple2<Long, Long> merge(Tuple2<Long, Long> a, Tuple2<Long, Long> b) {
return new Tuple2<>(a.f0 + b.f0, a.f1 + b.f1);
}
} DataStream<Tuple2<String, Long>> input = ...; input
.keyBy(<key selector>)
.window(<window assigner>)
.aggregate(new AverageAggregate());
(2.3)ProcessWindowFunction
ProcessWindowFunction獲得一個Iterable,該Iterable包含視窗的所有元素,以及一個可以訪問時間和狀態資訊的Context物件,這使其能夠比其他視窗函式提供更大的靈活性。這是以效能和資源消耗為代價的,因為不能增量聚合元素,而是需要在內部對其進行緩衝,直到認為該視窗已準備好進行處理為止。
ProcessWindowFunction
:
public abstract class ProcessWindowFunction<IN, OUT, KEY, W extends Window> implements Function { /**
* Evaluates the window and outputs none or several elements.
*
* @param key The key for which this window is evaluated.
* @param context The context in which the window is being evaluated.
* @param elements The elements in the window being evaluated.
* @param out A collector for emitting elements.
*
* @throws Exception The function may throw exceptions to fail the program and trigger recovery.
*/
public abstract void process(
KEY key,
Context context,
Iterable<IN> elements,
Collector<OUT> out) throws Exception; /**
* The context holding window metadata.
*/
public abstract class Context implements java.io.Serializable {
/**
* Returns the window that is being evaluated.
*/
public abstract W window(); /** Returns the current processing time. */
public abstract long currentProcessingTime(); /** Returns the current event-time watermark. */
public abstract long currentWatermark(); /**
* State accessor for per-key and per-window state.
*
* <p><b>NOTE:</b>If you use per-window state you have to ensure that you clean it up
* by implementing {@link ProcessWindowFunction#clear(Context)}.
*/
public abstract KeyedStateStore windowState(); /**
* State accessor for per-key global state.
*/
public abstract KeyedStateStore globalState();
} }
注意:Tuple
必須手動將其強制轉換為正確大小的元組以提取key欄位。
ProcessWindowFunction
可以定義成這樣使用(ProcessWindowFunction
計算視窗中元素的方法):
DataStream<Tuple2<String, Long>> input = ...; input
.keyBy(t -> t.f0)
.window(TumblingEventTimeWindows.of(Time.minutes(5)))
.process(new MyProcessWindowFunction()); /* ... */ public class MyProcessWindowFunction
extends ProcessWindowFunction<Tuple2<String, Long>, String, String, TimeWindow> { @Override
public void process(String key, Context context, Iterable<Tuple2<String, Long>> input, Collector<String> out) {
long count = 0;
for (Tuple2<String, Long> in: input) {
count++;
}
out.collect("Window: " + context.window() + "count: " + count);
}
}
注意將ProcessWindowFunction
簡單的聚合(例如count)效率很低。
(2.4)具有增量聚合的ProcessWindowFunction
ProcessWindowFunction
可與一組合ReduceFunction
,或AggregateFunction
因為它們在視窗到達逐步聚合的元件。視窗關閉後,ProcessWindowFunction
將提供彙總結果。這樣一來,它便可以遞增地計算視窗,同時可以訪問的其他視窗元資訊ProcessWindowFunction
。
注意也可以使用舊版WindowFunction
而不是 ProcessWindowFunction
用於增量視窗聚合。
具有ReduceFunction的增量視窗聚合
eg:如何將增量ReduceFunction
與ProcessWindowFunction組合
以返回視窗中的最小事件以及該視窗的開始時間:
DataStream<SensorReading> input = ...; input
.keyBy(<key selector>)
.window(<window assigner>)
.reduce(new MyReduceFunction(), new MyProcessWindowFunction()); // Function definitions private static class MyReduceFunction implements ReduceFunction<SensorReading> { public SensorReading reduce(SensorReading r1, SensorReading r2) {
return r1.value() > r2.value() ? r2 : r1;
}
} private static class MyProcessWindowFunction
extends ProcessWindowFunction<SensorReading, Tuple2<Long, SensorReading>, String, TimeWindow> { public void process(String key,
Context context,
Iterable<SensorReading> minReadings,
Collector<Tuple2<Long, SensorReading>> out) {
SensorReading min = minReadings.iterator().next();
out.collect(new Tuple2<Long, SensorReading>(context.window().getStart(), min));
}
}
具有AggregateFunction的增量視窗聚合
eg:如何將增量AggregateFunction
與一個組合ProcessWindowFunction
以計算平均值,並與平均值一起發出鍵和視窗:
DataStream<Tuple2<String, Long>> input = ...; input
.keyBy(<key selector>)
.window(<window assigner>)
.aggregate(new AverageAggregate(), new MyProcessWindowFunction()); // Function definitions /**
* The accumulator is used to keep a running sum and a count. The {@code getResult} method
* computes the average.
*/
private static class AverageAggregate
implements AggregateFunction<Tuple2<String, Long>, Tuple2<Long, Long>, Double> {
@Override
public Tuple2<Long, Long> createAccumulator() {
return new Tuple2<>(0L, 0L);
} @Override
public Tuple2<Long, Long> add(Tuple2<String, Long> value, Tuple2<Long, Long> accumulator) {
return new Tuple2<>(accumulator.f0 + value.f1, accumulator.f1 + 1L);
} @Override
public Double getResult(Tuple2<Long, Long> accumulator) {
return ((double) accumulator.f0) / accumulator.f1;
} @Override
public Tuple2<Long, Long> merge(Tuple2<Long, Long> a, Tuple2<Long, Long> b) {
return new Tuple2<>(a.f0 + b.f0, a.f1 + b.f1);
}
} private static class MyProcessWindowFunction
extends ProcessWindowFunction<Double, Tuple2<String, Double>, String, TimeWindow> { public void process(String key,
Context context,
Iterable<Double> averages,
Collector<Tuple2<String, Double>> out) {
Double average = averages.iterator().next();
out.collect(new Tuple2<>(key, average));
}
}
3、Triggers
Trigger
確定視窗(由視窗分配器形成)何時準備好由視窗函式處理。每個WindowAssigner
都有一個預設值Trigger
。如果預設觸發器不符合您的需求,則可以使用指定自定義觸發器trigger(...)
。
觸發器介面具有五種方法,它們允許aTrigger
對不同事件做出反應:
onElement()
對於新增到視窗中的每個元素,都會呼叫該方法。onEventTime()
當註冊的事件時間計時器觸發時,將呼叫該方法。onProcessingTime()
當註冊的處理時間計時器觸發時,將呼叫該方法。該
onMerge()
方法與有狀態觸發器相關,並且在兩個觸發器的相應視窗合併時(例如,在使用會話視窗時)合併兩個觸發器的狀態。最後,該
clear()
方法執行刪除相應視窗後所需的任何操作。
關於上述方法,需要注意兩件事:
1)前三個通過返回來決定如何對它們的呼叫事件採取行動TriggerResult
。該動作可以是以下之一:
CONTINUE
: 沒做什麼FIRE
:觸發計算PURGE
:清除視窗中的元素FIRE_AND_PURGE
:觸發計算並隨後清除視窗中的元素。
2)這些方法中的任何一種均可用於以後operator註冊(處理或時間)事件計時器。
4、允許遲到
(1)定義
當使用事件時間視窗時,元素可能會延遲到達,即Flink 用來跟蹤事件時間進度的水印已經超過了元素所屬視窗的結束時間戳。預設情況下,當水印超過視窗末尾時,將刪除後期元素。但是,Flink 允許為視窗操作符指定最大允許延遲。Allowed lateness 指定元素在被丟棄之前可以延遲多長時間,其預設值為 0。 在 watermark 已經通過視窗末尾之後但在它通過視窗末尾之前到達的元素加上允許的延遲,仍然新增到視窗中。根據使用的觸發器,延遲但未丟棄的元素可能會導致視窗再次觸發。對於EventTimeTrigger
.為了完成這項工作,Flink 會保持視窗的狀態,直到它們允許的延遲到期。一旦發生這種情況,Flink 將移除視窗並刪除其狀態。
預設情況下,允許的延遲設定為 0,即
到達水印之後的元素將被丟棄。
允許的延遲程式碼示例:
DataStream<T> input = ...; input
.keyBy(<key selector>)
.window(<window assigner>)
.allowedLateness(<time>)
.<windowed transformation>(<window function>);
注意:當使用GlobalWindows
視窗分配器時,沒有資料被認為是延遲的,因為全域性視窗的結束時間戳是Long.MAX_VALUE
。
(2)獲取延遲資料作為側流輸出
使用Flink的側流輸出功能,可以獲取最近被丟棄的資料流。
final OutputTag<T> lateOutputTag = new OutputTag<T>("late-data"){}; DataStream<T> input = ...; SingleOutputStreamOperator<T> result = input
.keyBy(<key selector>)
.window(<window assigner>)
.allowedLateness(<time>)
.sideOutputLateData(lateOutputTag)
.<windowed transformation>(<window function>); DataStream<T> lateStream = result.getSideOutput(lateOutputTag);
5、狀態大小注意事項:
Windows可以定義很長時間(例如幾天,幾周或幾個月),因此會累積很大的狀態。在估算視窗計算的儲存需求時,有以下規則:
Flink為每個視窗所屬的每個元素建立一個副本。鑑於此,滾動視窗保留每個元素的一個副本(一個元素恰好屬於一個視窗,除非被延遲放置)。相反,滑動視窗會為每個元素建立多個。因此,並不推薦大小為1天的滑動視窗和滑動1秒的滑動視窗。
ReduceFunction和
AggregateFunction極大地減少了儲存需求,因為它們聚合了元素,且每個視窗僅儲存一個值。相反,使用ProcessWindowFunction
需要累積所有元素。使用
Evictor
防止了任何預聚合,作為視窗的所有元件必須通過evictor()施加的計算。
6、WindowAssigners 的預設觸發器
預設Trigger
的WindowAssigner
是適用於很多情況。例如,所有事件時間視窗分配器都有一個EventTimeTrigger作為
預設觸發器。一旦水印通過視窗的末尾,這個觸發器就會觸發。
注意:
(1)GlobalWindow預設觸發器是NeverTrigger(
從不觸發),使用全域性視窗時需要自定義一個觸發器。
(2)使用指定觸發器trigger(),
將覆蓋WindowAssigner的預設觸發器
。
四、Side Outputs(側面輸出流)
除了DataStream
操作產生的主流之外,還可以附加產生任意數量的側面輸出流。側流中的資料型別不必與主流中的資料型別匹配,並且不同側輸出的型別也可以不同。拆分資料流時,通常必須複製該流,然後從每個流中過濾掉不需要的資料。
定義一個OutputTag
用於標識側面輸出流的:
// 這需要是一個匿名的內部類,以便我們分析型別
OutputTag<String> outputTag = new OutputTag<String>("side-output") {};
可以通過以下功能將資料傳送到側面輸出:
過程功能
KeyedProcessFunction
協同處理功能
KeyedCoProcessFunction
ProcessWindowFunction
ProcessAllWindowFunction
使用上述方法中向用戶暴露Context
引數,將資料傳送到由 OutputTag
標識的側流中。
eg:從 ProcessFunction
傳送資料到側流輸出
DataStream<Integer> input = ...; final OutputTag<String> outputTag = new OutputTag<String>("side-output"){}; SingleOutputStreamOperator<Integer> mainDataStream = input
.process(new ProcessFunction<Integer, Integer>() { @Override
public void processElement(
Integer value,
Context ctx,
Collector<Integer> out) throws Exception {
// emit data to regular output
out.collect(value); // emit data to side output
ctx.output(outputTag, "sideout-" + String.valueOf(value));
}
});
在 DataStream
運算結果上使用 getSideOutput(OutputTag)
方法獲取旁路輸出流,會產生一個與側面輸出流結果型別一致的 DataStream。
eg:
final OutputTag<String> outputTag = new OutputTag<String>("side-output"){}; SingleOutputStreamOperator<Integer> mainDataStream = ...; DataStream<String> sideOutputStream = mainDataStream.getSideOutput(outputTag);
flink官方文件:https://ci.apache.org/projects/flink/flink-docs-release-1.12/zh/concepts/index.html
借鑑了不少文章,感謝各路大神分享,如需轉載請註明出處,謝謝:https://www.cnblogs.com/huyangshu-fs/p/14489114.html
想