1. 程式人生 > >Flink的Time Extractor和Watermark Emitter

Flink的Time Extractor和Watermark Emitter

前言

Flink提供了抽象類來讓開發者賦值自己的時間戳併發送他們自己的Watermark。更具體來說,開發者需要依照不同用例情況來實現介面AssignerWithPeriodicWatermarks或介面AssignerWithPunctuatedWatermarks。簡而言之,前一個介面將會週期性傳送Watermark,而第二個介面根據一些到達資料的屬性,例如一旦在流中碰到一個特殊的element便傳送Watermark。下面有一個AssignerWithPunctuatedWatermarks的例子:

public class ConnectedCarAssigner implements
AssignerWithPunctuatedWatermarks<ConnectedCarEvent> { @Override public long extractTimestamp(ConnectedCarEvent event, long previousElementTimestamp) { return event.timestamp; } @Override public Watermark checkAndGetNextWatermark(ConnectedCarEvent event, long extractedTimestamp) { // 每個事件都發射一個watermark
return new Watermark(extractedTimestamp - 30000); } }

為了進一步簡化開發者開發類似的task,Flink自帶了一些預先實現的timestamp assigner。本節提供了它們的一個列表。除過引用即用的函式,這些預先實現的assigner還可以作為自定義assigner的實現示例。

遞增時間戳的Assigner

最簡單的週期性Watermark生成的特例便是由一個給定的Source task所見的時間戳都以遞增順序發生的情況。在這種情況下,由於不會有比當前時間戳更早的時間戳到達,故總是可以將當前時間戳看作是一個Watermark。

注意上述情況僅在每個並行資料來源task的時間戳都是以遞增順序到達時才是必要的,例如,在某特定部署中,一個Kafka分割槽是由一個並行性資料來源讀取的,那麼上述情況僅在每個Kafka分割槽內的時間戳都是遞增順序出現時才是必要的。Flink的Watermark合併機制保證會在任何並行流在進行shuffle、 union、 connect或merge後都可以生成正確的Watermark。

DataStream<MyEvent> stream = ...
DataStream<MyEvent> withTimestampsAndWatermarks = 
  stream.assignTimestampsAndWatermarks(new AscendingTimestampExtractor<MyEvent>() {

  @Override
  public long extractAscendingTimestamp(MyEvent element) {
    return element.getCreationTime();
  }
});

允許固定量的遲到資料的Assigner

另一個週期性Watermark生成的例子是Watermark落在流中的一個固定時間段內觀察到的最大(事件時間的)時間戳的後面。該情況同樣包括預先知道在流中將會遇到的最大遲到量(lateness)的情況,例如建立的一個測試用的自定義source中,它的element的時間戳會分佈在一個固定的時間段內。Flink為這種情況提供了BoundedOutofOrdernessTimestampExtractor介面,該介面需要引數maxOutofOrderness,即在一個element被給定視窗在計算最終結果時忽略之前(即該element過期前),所允許該element遲到的最大lateness。lateness的值為"t-t_w",其中t是一個element的(事件時間的)時間戳,t_w是前一個watermark。如果lateness > 0,則我們就認為該element已經遲到,並且在job計算對應視窗的結果時忽略它。

	public static class TaxiRideTSExtractor extends BoundedOutOfOrdernessTimestampExtractor<TaxiRide> {

		public TaxiRideTSExtractor() {
			super(Time.seconds(60));
		}

		@Override
		public long extractTimestamp(TaxiRide ride) {
			if (ride.isStart) {
				return ride.startTime.getMillis();
			}
			else {
				return ride.endTime.getMillis();
			}
		}
	}