1. 程式人生 > >Flink 事件時間的陷進及解決思路

Flink 事件時間的陷進及解決思路

0x1 摘要

大家都知道Flink引入了事件時間(eventTime)這個重要概念,來提升資料統計的準確性,但引入事件時間後在具體業務實現時存在一些問題必需要合理去解決,否則會造成非常嚴重的問題。

0x2 Flink 時間概念介紹

Flink 支援不同的時間概念,包括:

  • Event Time :事件時間
  • Processing Time :處理時間
  • Ingestion Time :訊息提取時間

參考下圖可以清晰的知道這三者的關係:
時間概念圖
Ingestion Time是介於Event TimeProcessing Time之間的概念。
程式中可以通過env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);

指定使用時間型別。

0x3 事件時間存在的問題

事件時間存在什麼樣的問題呢?下面先看一個簡單的業務場景。
比如:要統計APP上搜索按鈕每1分鐘的點選次數。
前端埋點資料結構:

欄位名 欄位型別 描述
eventCode String 事件編碼
clickTime Long 點選時間

基於以上資料結構我們可設計如下水印處理器:

public static class TimestampExtractor implements AssignerWithPeriodicWatermarks<Tuple2<String, Long>> {

 private long currentMaxTimestamp = 0L;

 @Override
 public Watermark getCurrentWatermark() {
  return new Watermark(currentMaxTimestamp -3000);
 }

 @Override
 public long extractTimestamp(Tuple2<String, Long> tuple, long previousElementTimestamp) {
  long eventTime = tuple.f1;
  currentMaxTimestamp = Math.max(currentMaxTimestamp, eventTime);
  return eventTime;
 }
}

extractTimestamp方法會拿事件時間和上一次事件時間比較,並取較大值來更新當前水印值。
假設前端傳送了以下這些資料,方便直觀看資料clickTime直接採用格式化後的值,並以逗號分隔資料。

001,2018-12-17 13:30:00
001,2018-12-17 13:30:01
001,2018-12-17 13:30:02
001,2018-12-18 13:30:00
001,2018-12-17 13:30:03
001,2018-12-17 13:30:04
001,2018-12-17 13:30:05

正常資料都是17號,突然來了一條18號的資料,再結合上面的水印邏輯,一旦出現這種問題資料,直接導致水位上升到18號,後面再來17號的資料全部無法處理。針對業務來講這樣的錯誤是致命的,統計結果出現斷層。

0x4 解決思路

針對以上問題我們可以對水印實現類做如下改造:

public static class TimestampExtractor implements AssignerWithPeriodicWatermarks<Tuple2<String, Long>> {

 private long currentMaxTimestamp = 0L;

 @Override
 public Watermark getCurrentWatermark() {
  return new Watermark(currentMaxTimestamp -3000);
 }

 @Override
 public long extractTimestamp(Tuple2<String, Long> tuple, long previousElementTimestamp) {
  long eventTime = tuple.f1;
  if((currentMaxTimestamp == 0) || (eventTime - currentMaxTimestamp < MESSAGE_FORWARD_TIME)) {
            currentMaxTimestamp = Math.max(currentMaxTimestamp, eventTime);
        }
  return eventTime;
 }
}

MESSAGE_FORWARD_TIME變數是自定義的訊息最大跳躍時間,如果超出這個範圍則不更新最大水印時間。