Flink流計算程式設計--Flink中allowedLateness詳細介紹及思考
1、簡介
Flink中藉助watermark以及window和trigger來處理基於event time的亂序問題,那麼如何處理“late element”呢?
也許有人會問,out-of-order element與late element有什麼區別?不都是一回事麼?
答案是一回事,都是為了處理亂序問題而產生的概念。要說區別,可以總結如下:
1、通過watermark機制來處理out-of-order的問題,屬於第一層防護,屬於全域性性的防護,通常說的亂序問題的解決辦法,就是指這類;
2、通過視窗上的allowedLateness機制來處理out-of-order的問題,屬於第二層防護,屬於特定window operator 的防護,late element的問題就是指這類。
下面我們重點介紹allowedLateness。
2、allowedLateness介紹
預設情況下,當watermark通過end-of-window之後,再有之前的資料到達時,這些資料會被刪除。
為了避免有些遲到的資料被刪除,因此產生了allowedLateness的概念。
簡單來講,allowedLateness就是針對event time而言,對於watermark超過end-of-window之後,還允許有一段時間(也是以event time來衡量)來等待之前的資料到達,以便再次處理這些資料。
下圖是其API:
預設情況下,如果不指定allowedLateness,其值是0,即對於watermark超過end-of-window之後,還有此window的資料到達時,這些資料被刪除掉了。
注意:對於trigger是預設的EventTimeTrigger的情況下,allowedLateness會再次觸發視窗的計算,而之前觸發的資料,會buffer起來,直到watermark超過end-of-window + allowedLateness()的時間,視窗的資料及元資料資訊才會被刪除。再次計算就是DataFlow模型中的Accumulating的情況。
同時,對於sessionWindow的情況,當late element在allowedLateness範圍之內到達時,可能會引起視窗的merge,這樣,之前視窗的資料會在新視窗中累加計算,這就是DataFlow模型中的AccumulatingAndRetracting的情況。
3、allowedLateness的例子
3.1、TumblingEventTime視窗
這裡watermark允許3秒的亂序,allowedLateness允許資料遲到5秒。
import java.text.SimpleDateFormat
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks
import org.apache.flink.streaming.api.{CheckpointingMode, TimeCharacteristic}
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.api.watermark.Watermark
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows
import org.apache.flink.streaming.api.windowing.time.Time
/**
* windowedStream.allowedLateness() test,this is also called window accumulating.
* allowedLateness will trigger window again when 'late element' arrived to the window
*/
object TumblingWindowAccumulatingTest {
def main(args : Array[String]) : Unit = {
if (args.length != 2) {
System.err.println("USAGE:\nSocketTextStreamWordCount <hostname> <port>")
return
}
val hostName = args(0)
val port = args(1).toInt
val env = StreamExecutionEnvironment.getExecutionEnvironment //獲取流處理執行環境
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) //設定Event Time作為時間屬性
//env.setBufferTimeout(10)
//env.enableCheckpointing(5000,CheckpointingMode.EXACTLY_ONCE)
val input = env.socketTextStream(hostName,port) //socket接收資料
val inputMap = input.map(f=> {
val arr = f.split("\\W+")
val code = arr(0)
val time = arr(1).toLong
(code,time)
})
/**
* 允許3秒的亂序
*/
val watermarkDS = inputMap.assignTimestampsAndWatermarks(new AssignerWithPeriodicWatermarks[(String,Long)] {
var currentMaxTimestamp = 0L
val maxOutOfOrderness = 3000L
val format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS")
override def getCurrentWatermark: Watermark = {
new Watermark(currentMaxTimestamp - maxOutOfOrderness)
}
override def extractTimestamp(t: (String,Long), l: Long): Long = {
val timestamp = t._2
currentMaxTimestamp = Math.max(timestamp, currentMaxTimestamp)
timestamp
}
})
/**
* 對於此視窗而言,允許5秒的遲到資料,即第一次觸發是在watermark > end-of-window時
* 第二次(或多次)觸發的條件是watermark < end-of-window + allowedLateness時間內,這個視窗有late資料到達
*/
val accumulatorWindow = watermarkDS
.keyBy(_._1)
.window(TumblingEventTimeWindows.of(Time.seconds(10)))
.allowedLateness(Time.seconds(5))
.apply(new AccumulatingWindowFunction)
.name("window accumulate test")
.setParallelism(2)
accumulatorWindow.print()
env.execute()
}
}
AccumulatingWindowFunction:
主要是計算視窗內的元素個數以及累計個數:
import java.text.SimpleDateFormat
import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor}
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.scala.function.RichWindowFunction
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
import org.apache.flink.util.Collector
class AccumulatingWindowFunction extends RichWindowFunction[(String, Long),(String,String,String,Int, String, Int),String,TimeWindow]{
val format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS")
var state: ValueState[Int] = _
var count = 0
override def open(config: Configuration): Unit = {
state = getRuntimeContext.getState(new ValueStateDescriptor[Int]("AccumulatingWindow Test", classOf[Int], 0))
}
override def apply(key: String, window: TimeWindow, input: Iterable[(String, Long)], out: Collector[(String, String, String, Int, String, Int)]): Unit = {
count = state.value() + input.size
state.update(count)
// key,window start time, window end time, window size, system time, total size
out.collect(key, format.format(window.getStart),format.format(window.getEnd),input.size, format.format(System.currentTimeMillis()),count)
}
}
測試:
輸出結果:
解釋:
對於key1而言,1487225040000代表2017-02-16 14:04:00.000,10秒鐘的視窗範圍是[2017-02-16 14:04:00.000,2017-02-16 14:04:10.000),因此當watermark超過2017-02-16 14:04:10.000時,視窗會被觸發,由於watermark是允許3秒亂序,因此當標籤時間是1487225053000的資料到來時,視窗被觸發了。我們看到結果中的第一行就是第一次觸發的視窗,視窗內的元素個數是2,累加個數也是2。
由於此視窗還允許5秒的late,因此在watermark < end-of-window + allowedLateness(2017-02-16 14:04:15.000)之內到達的資料,都會被再次觸發視窗的計算。我們看標籤是key1,1487225045000的資料,此時的watermark是2017-02-16 14:04:12.000(1487225055000 - 3000得來),因此符合觸發條件,立刻觸發視窗的計算。也就是我們看到的第二條資料的結果。此時視窗內的元素個數是3,累加個數卻是5!!!
同理,key1,1487225048000這條資料第三次觸發了視窗的計算。視窗內的個數是4,累加個數是9!!!
當key1,1487225058000的資料到來時,此時的watermark是2017-02-16 14:04:15.000,已經超過了這個視窗的觸發範圍(視窗都是前閉後開的區間),因此,這個視窗再有遲到的資料,將直接被刪除。
我們看到key1,1487225049000這條資料到達時,系統沒有任何的反應,即此資料太晚了,被刪除了。
最後的一條資料是key1,1487225063000,其watermark是2017-02-16 14:04:20.000,已經超過了下一個視窗的觸發時間,因此會觸發下一個視窗的計算,結果就是我們看到的最後一條輸出的資料,視窗內的元素個數是4(分別是1487225050000,1487225053000,1487225055000和1487225058000),但是累加個數是13!!!
思考:
對於TumblingEventTime視窗的累加處理,很好的一點是及時更新了最後的結果,但是也有一個令人無法忽視的問題,即再次觸發的視窗,視窗內的UDF狀態錯了!這就是我們要關注的問題,我們要做的是去重操作。
至於這個去重怎麼做,這個就要看UDF中的狀態具體要算什麼內容了,例如本例中的state只是簡單的sum累計值,此時可以在UDF中新增一個hashMap,map中的key就設計為視窗的起始與結束時間,如果map中已經存在這個key,則state.update時,就不要再加上window.size,而是直接加1即可。這裡不再做具體的演示。
附上一張這個測試程式的DAG圖:
可以看到,window操作接到10條資料(輸入),觸發了4次window計算(輸出)。而且Flink1.2中對與DAG也增加了一些reason元資料資訊,例如每個邊的型別(Hash或Rebalance)。
3.2、Session Window的視窗(EventTimeSessionWindows)
修改window的型別為SessionWindow並調整windowFunction,去掉state,程式碼如下:
import java.text.SimpleDateFormat
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.api.watermark.Watermark
import org.apache.flink.streaming.api.windowing.assigners.{EventTimeSessionWindows, TumblingEventTimeWindows}
import org.apache.flink.streaming.api.windowing.time.Time
object SessionWindowAccumulatingAndRetractingTest {
def main(args : Array[String]) : Unit = {
if (args.length != 2) {
System.err.println("USAGE:\nSocketTextStreamWordCount <hostname> <port>")
return
}
val hostName = args(0)
val port = args(1).toInt
val env = StreamExecutionEnvironment.getExecutionEnvironment //獲取流處理執行環境
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) //設定Event Time作為時間屬性
//env.setBufferTimeout(10)
//env.enableCheckpointing(5000,CheckpointingMode.EXACTLY_ONCE)
val input = env.socketTextStream(hostName,port)
val inputMap = input.map(f=> {
val arr = f.split("\\W+")
val code = arr(0)
val time = arr(1).toLong
(code,time)
})
val watermarkDS = inputMap.assignTimestampsAndWatermarks(new AssignerWithPeriodicWatermarks[(String,Long)] {
var currentMaxTimestamp = 0L
val maxOutOfOrderness = 3000L
val format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS")
override def getCurrentWatermark: Watermark = {
new Watermark(currentMaxTimestamp - maxOutOfOrderness)
}
override def extractTimestamp(t: (String,Long), l: Long): Long = {
val timestamp = t._2
currentMaxTimestamp = Math.max(timestamp, currentMaxTimestamp)
timestamp
}
})
// allow 5 sec for late element after watermark passed the
val accumulatorWindow = watermarkDS
.keyBy(_._1)
.window(EventTimeSessionWindows.withGap(Time.seconds(10)))
.allowedLateness(Time.seconds(5))
.apply(new AccumulatingAndRetractingWindowFunction)
.name("window accumulate test")
.setParallelism(2)
accumulatorWindow.print()
env.execute()
}
}
AccumulatingAndRetractingWindowFunction:
import java.text.SimpleDateFormat
import org.apache.flink.streaming.api.scala.function.WindowFunction
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
import org.apache.flink.util.Collector
class AccumulatingAndRetractingWindowFunction extends WindowFunction[(String, Long),(String,String,String,Int, String),String,TimeWindow]{
val format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS")
override def apply(key: String, window: TimeWindow, input: Iterable[(String, Long)], out: Collector[(String, String, String, Int, String)]): Unit = {
// key,window start time, window end time, window size, system time
out.collect(key,format.format(window.getStart),format.format(window.getEnd),input.size,format.format(System.currentTimeMillis()))
}
}
測試:
輸出結果:
解釋:
sessionWindow由於存在視窗的merge,所以對於late element,並不是來一條就重新觸發一次視窗。
第2條資料的watermark沒有超過第1條資料的end-of-window時間,因此沒有發生gap。
對於第3條資料key1,1487225063000到達時,其watermark是2017-02-16 14:04:20.000,第2條資料的end-of-window的時間,因此第一個視窗觸發了,即我們看到的第一條輸出記錄,其視窗數量是2(前2條輸入資料)。
第4條資料key1,1487225065000,其watermark是2017-02-16 14:04:22.000,因此和上一條資料沒有gap。
第5條資料key1,1487225057000,其watermar依然是2017-02-16 14:04:22.000,但此資料所在的視窗範圍在[2017-02-16 14:04:17.000,2017-02-16 14:04:27.000),屬於第一個視窗。雖然此時第一個視窗已經觸發,但是第一個視窗的結束時間2017-02-16 14:04:20.000 + allowedLateness(5秒) = 2017-02-16 14:04:25.000,而此時的watermark是2017-02-16 14:04:22.000,因此這條資料不能被刪除,而是重新納入到第一個視窗中,被累加起來。注意,此時視窗發生了合併,但是還未到新的視窗的觸發條件。
第6條資料key1,1487225078000到達時,其watermark提高到2017-02-16 14:04:35.000,比此前1487225065000的視窗結束時間(正好是2017-02-16 14:04:35.000)大,因此符合新視窗的觸發條件。
即我們看到的輸出中第2條記錄。此時的視窗範圍是從[2017-02-16 14:04:00.000,2017-02-16 14:04:35.000),其中late element和之前的視窗發生了merge,因此前一條輸出中的視窗時間變了。新視窗中一共包含了5條資料(前5條資料)。
思考:
這個session window late element的例子,並沒有包含UDF的state的內容。但是這裡同樣不能忽視這個問題,假如你的應用程式中包含UDF的state,那麼也要考慮當late element到達且沒有被刪除的情況下,當發生或者不發生視窗merge的情況下,如何處理state的問題。
對於session window而言,allowedLateness的引入會使得部分late element可以被正確處理,同時也會增加state處理複雜度,這裡主要涉及late element可能導致原視窗的merge或者不發生merge。
4、allowedLateness原始碼
此方法位於WindowedStream中,僅僅是對成員變數allowedLateness賦予初值(但不能指定負數值):
/**
* Sets the time by which elements are allowed to be late. Elements that
* arrive behind the watermark by more than the specified time will be dropped.
* By default, the allowed lateness is {@code 0L}.
*
* <p>Setting an allowed lateness is only valid for event-time windows.
*/
@PublicEvolving
public WindowedStream<T, K, W> allowedLateness(Time lateness) {
final long millis = lateness.toMilliseconds();
checkArgument(millis >= 0, "The allowed lateness cannot be negative.");
this.allowedLateness = millis;
return this;
}
我們再來看下windowedStream之上執行的apply方法:
/**
* Applies the given window function to each window. The window function is called for each
* evaluation of the window for each key individually. The output of the window function is
* interpreted as a regular non-windowed stream.
*
* <p>
* Not that this function requires that all data in the windows is buffered until the window
* is evaluated, as the function provides no means of incremental aggregation.
*
* @param function The window function.
* @return The data stream that is the result of applying the window function to the window.
*/
public <R> SingleOutputStreamOperator<R> apply(WindowFunction<T, R, K, W> function) {
TypeInformation<R> resultType = TypeExtractor.getUnaryOperatorReturnType(
function, WindowFunction.class, true, true, getInputType(), null, false);//拿到Window的型別(Tumbling、Sliding或Session)以及輸入型別T等資訊
return apply(function, resultType);
}
返回apply(function, resultType)方法:
/**
* Applies the given window function to each window. The window function is called for each
* evaluation of the window for each key individually. The output of the window function is
* interpreted as a regular non-windowed stream.
*
* <p>
* Note that this function requires that all data in the windows is buffered until the window
* is evaluated, as the function provides no means of incremental aggregation.
*
* @param function The window function.
* @param resultType Type information for the result type of the window function
* @return The data stream that is the result of applying the window function to the window.
*/
public <R> SingleOutputStreamOperator<R> apply(WindowFunction<T, R, K, W> function, TypeInformation<R> resultType) {
//clean the closure
function = input.getExecutionEnvironment().clean(function);
String callLocation = Utils.getCallLocationName();
String udfName = "WindowedStream." + callLocation;
SingleOutputStreamOperator<R> result = createFastTimeOperatorIfValid(function, resultType, udfName);
if (result != null) {
return result;
}
LegacyWindowOperatorType legacyWindowOpType = getLegacyWindowType(function);//根據WindowAssigner判斷windowOperator的型別,用於儲存window的狀態
String opName;
KeySelector<T, K> keySel = input.getKeySelector(); //根據keyedStream獲取key
WindowOperator<K, T, Iterable<T>, R, W> operator;
if (evictor != null) {
@SuppressWarnings({"unchecked", "rawtypes"})
TypeSerializer<StreamRecord<T>> streamRecordSerializer =
(TypeSerializer<StreamRecord<T>>) new StreamElementSerializer(input.getType().createSerializer(getExecutionEnvironment().getConfig()));
ListStateDescriptor<StreamRecord<T>> stateDesc =
new ListStateDescriptor<>("window-contents", streamRecordSerializer);
opName = "TriggerWindow(" + windowAssigner + ", " + stateDesc + ", " + trigger + ", " + evictor + ", " + udfName + ")";
operator =
new EvictingWindowOperator<>(windowAssigner,
windowAssigner.getWindowSerializer(getExecutionEnvironment().getConfig()),
keySel,
input.getKeyType().createSerializer(getExecutionEnvironment().getConfig()),
stateDesc,
new InternalIterableWindowFunction<>(function),
trigger,
evictor,
allowedLateness);
} else {
// 視窗中的ListState,用於在視窗觸發前,buffer視窗中的元素
ListStateDescriptor<T> stateDesc = new ListStateDescriptor<>("window-contents",
input.getType().createSerializer(getExecutionEnvironment().getConfig()));
opName = "TriggerWindow(" + windowAssigner + ", " + stateDesc + ", " + trigger + ", " + udfName + ")"; // 拼接唯一的operator名字,記錄標誌
operator =
new WindowOperator<>(windowAssigner,
windowAssigner.getWindowSerializer(getExecutionEnvironment().getConfig()),
keySel,
input.getKeyType().createSerializer(getExecutionEnvironment().getConfig()),
stateDesc,
new InternalIterableWindowFunction<>(function),
trigger,
allowedLateness,
legacyWindowOpType);
}
return input.transform(opName, resultType, operator);//根據operator name,視窗函式的型別,以及window operator,執行keyedStream.transaform操作
}
我們看到,最終是生成了WindowOperator,初始化時包含了trigger以及allowedLateness的值。然後經過transform轉換,實際上是執行了DataStream中的transform方法。
其中,WindowOperator中有一個方法onEventTime,其中,此方法中判斷當element的watermark時間超過視窗的結束時間+allowedLateness時,就會清空window中的快取的所有資料,這裡不再贅述。
5、總結
Flink中處理亂序依賴watermark+window+trigger,屬於全域性性的處理;
同時,對於window而言,還提供了allowedLateness方法,使得更大限度的允許亂序,屬於區域性性的處理;
其中,allowedLateness只針對Event Time有效;
allowedLateness可用於TumblingEventTimeWindow、SlidingEventTimeWindow以及EventTimeSessionWindows,要注意這可能使得視窗再次被觸發,相當於對前一次視窗的視窗的修正(累加計算或者累加撤回計算);
要注意再次觸發視窗時,UDF中的狀態值的處理,要考慮state在計算時的去重問題。
最後要注意的問題,就是sink的問題,由於同一個key的同一個window可能被sink多次,因此sink的資料庫要能夠接收此類資料。