1. 程式人生 > >Flink中的多source+event watermark測試

Flink中的多source+event watermark測試

這次需要做一個監控專案,全網日誌的指標計算,上線的話,計算量應該是百億/天

單個source對應的sql如下


最原始的sql

select pro,throwable,level,ip,`count`,id,`time`,firstl,lastl  

from 
(

select pro,throwable,level,ip,
count(*) as `count`,
lastStrInGroupSkipNull(CONCAT_WS('_',KAFKA_TOPIC,CAST(KAFKA_PARTITION AS VARCHAR),CAST(KAFKA_OFFSET as VARCHAR))) as id,
firstLong(l) as firstl,
lastLong(l) as lastl,
TUMBLE_END(SPT, INTERVAL '3' SECOND) as `time` 

from input.`ymm-appmetric-dev-self1` 

where 
pro IS NOT NULL and throwable IS NOT NULL and level IS NOT NULL and level='ERROR' and ip IS NOT NULL 
group by pro,throwable,level,ip,TUMBLE(SPT,INTERVAL '3' SECOND)

) 

where 1=uniqueWithin100MS(pro,throwable,level,ip,`time`)

---先做技術論證,寫了下面一個sql


select pro,throwable,level,ip,`count`,id,`time`,firstl,lastl  

from (

select pro,throwable,level,ip,count(*) as `count`,
lastStrInGroupSkipNull(CONCAT_WS('_',KAFKA_TOPIC,CAST(KAFKA_PARTITION AS VARCHAR),CAST(KAFKA_OFFSET as VARCHAR))) as id,
firstLong(l) as firstl,
lastLong(l) as lastl,
TUMBLE_END(SPT, INTERVAL '3' SECOND) as `time` 
from (

select pro,throwable,level,ip
from input.`ymm-appmetric-dev-self1` 
where pro IS NOT NULL and throwable IS NOT NULL and level IS NOT NULL and level='ERROR' and ip IS NOT NULL 
union
select pro,throwable,level,ip
from input.`ymm-appmetric-dev-self2` 
where pro IS NOT NULL and throwable IS NOT NULL and level IS NOT NULL and level='ERROR' and ip IS NOT NULL 

)

group by pro,throwable,level,ip,TUMBLE(SPT,INTERVAL '3' SECOND)

)

where 1=uniqueWithin100MS(pro,throwable,level,ip,`time`)

然後拉起flink任務,觀察是否可順利啟動---果然報錯了


Caused by: org.apache.calcite.sql.validate.SqlValidatorException: Column 'SPT' not found in any table

定位一下,看看是什麼問題導致的,看了下之前寫的sql,猜測是因為UNION的時候,沒有在每個表裡帶上SPT時間屬性欄位以及其它欄位,補上後sql如下


select pro,throwable,level,ip,`count`,id,`time`,firstl,lastl  

from (

select pro,throwable,level,ip,count(*) as `count`,
lastStrInGroupSkipNull(CONCAT_WS('_',KAFKA_TOPIC,CAST(KAFKA_PARTITION AS VARCHAR),CAST(KAFKA_OFFSET as VARCHAR))) as id,
firstLong(l) as firstl,
lastLong(l) as lastl,
TUMBLE_END(SPT, INTERVAL '3' SECOND) as `time` 
from (

select pro,throwable,level,ip,l,KAFKA_TOPIC,KAFKA_PARTITION,KAFKA_OFFSET,SPT
from input.`ymm-appmetric-dev-self1` 
where pro IS NOT NULL and throwable IS NOT NULL and level IS NOT NULL and level='ERROR' and ip IS NOT NULL 
union
select pro,throwable,level,ip,l,KAFKA_TOPIC,KAFKA_PARTITION,KAFKA_OFFSET,SPT
from input.`ymm-appmetric-dev-self2` 
where pro IS NOT NULL and throwable IS NOT NULL and level IS NOT NULL and level='ERROR' and ip IS NOT NULL 

)

group by pro,throwable,level,ip,TUMBLE(SPT,INTERVAL '3' SECOND)

)

where 1=uniqueWithin100MS(pro,throwable,level,ip,`time`)

再重啟看看,這次應該差不多了吧---sql可以順利編譯,但是還是有錯

奇怪了,之前並沒有這樣的錯誤,贊,我們來看看問題在哪!

 

我們開啟類的層次圖如下

借這個機會加強對這些類的理解!

---經過我的除錯,發現問題出現在union上,不加這個Union,啥事沒有;加了就報錯,下面我們再回到呼叫棧看看

一個人除錯了一個下午,-_-||,最終發現知道修改一個地方就行


union -> union all

厲害了,給大佬低頭!

----好,既然解決了,我們繼續來debug原理層!

測試了一下,發現多source跟單source相比,單source的watermark很好理解,但是多source就稍微複雜些,下面我們來研究下原理!

首先,觀察一下現有的圖,如下所示:

下面再來研究一下執行緒,jstack一把

我們來分析上面的執行緒,看看有沒有收穫!挑幾個重點執行緒講解


"VM Periodic Task Thread" os_prio=0 tid=0x00007f366825e800 nid=0x63d waiting on condition 
百度可以知道
該執行緒是JVM週期性任務排程的執行緒,它由WatcherThread建立,是一個單例物件。該執行緒在JVM內使用得比較頻繁,比如:定期的記憶體監控、JVM執行狀況監控。

下面幾個是GC執行緒
"Gang worker#0 (Parallel GC Threads)" os_prio=0 tid=0x00007f3668031800 nid=0x626 runnable 

"Gang worker#1 (Parallel GC Threads)" os_prio=0 tid=0x00007f3668033800 nid=0x627 runnable 

"Gang worker#2 (Parallel GC Threads)" os_prio=0 tid=0x00007f3668035800 nid=0x628 runnable 

"Gang worker#3 (Parallel GC Threads)" os_prio=0 tid=0x00007f3668037800 nid=0x629 runnable 

"Gang worker#4 (Parallel GC Threads)" os_prio=0 tid=0x00007f3668039800 nid=0x62a runnable 

"Gang worker#5 (Parallel GC Threads)" os_prio=0 tid=0x00007f366803b000 nid=0x62b runnable 

"Gang worker#6 (Parallel GC Threads)" os_prio=0 tid=0x00007f366803d000 nid=0x62c runnable 

"Gang worker#7 (Parallel GC Threads)" os_prio=0 tid=0x00007f366803f000 nid=0x62d runnable 

"Concurrent Mark-Sweep GC Thread" os_prio=0 tid=0x00007f36680b7000 nid=0x630 runnable 

"Gang worker#0 (Parallel CMS Threads)" os_prio=0 tid=0x00007f36680b2800 nid=0x62e runnable 

"Gang worker#1 (Parallel CMS Threads)" os_prio=0 tid=0x00007f36680b4800 nid=0x62f runnable 

---


"main" #1 prio=5 os_prio=0 tid=0x00007f3668019800 nid=0x625 waiting on condition [0x00007f3670010000]
主執行緒,在flink內部等待所有事情結束

"New I/O worker #1" #24 prio=5 os_prio=0 tid=0x00007f366995f000 nid=0x648 runnable [0x00007f3642cd1000]
內部netty執行緒

---


"Source: MyKafka010JsonTableSource -> from: (l, KAFKA_TOPIC, KAFKA_PARTITION, KAFKA_OFFSET, pro, throwable, level, ip, SPT) -> Timestamps/Watermarks -> where: (AND(=(level, _UTF-16LE'ERROR'), IS NOT NULL(pro), IS NOT NULL(throwable), IS NOT NULL(ip))), select: (pro, throwable, CAST(_UTF-16LE'ERROR') AS level, ip, SPT, CONCAT_WS(_UTF-16LE'_', KAFKA_TOPIC, CAST(KAFKA_PARTITION), CAST(KAFKA_OFFSET)) AS $f5, l) (1/1)" #51 prio=5 os_prio=0 tid=0x00007f363d11a800 nid=0x65e in Object.wait() [0x00007f3641ac3000]
   java.lang.Thread.State: WAITING (on object monitor)
    at java.lang.Object.wait(Native Method)
    at java.lang.Object.wait(Object.java:502)
    at org.apache.flink.streaming.connectors.kafka.internal.Handover.pollNext(Handover.java:74)
    - locked <0x00000000e6ee2df0> (a java.lang.Object)
    at org.apache.flink.streaming.connectors.kafka.internal.Kafka09Fetcher.runFetchLoop(Kafka09Fetcher.java:133)
    at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:721)
    at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:87)
    at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:56)
    at org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:99)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:306)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:703)
    at java.lang.Thread.run(Thread.java:748)

"Source: MyKafka010JsonTableSource -> from: (l, KAFKA_TOPIC, KAFKA_PARTITION, KAFKA_OFFSET, pro, throwable, level, ip, SPT) -> Timestamps/Watermarks -> where: (AND(=(level, _UTF-16LE'ERROR'), IS NOT NULL(pro), IS NOT NULL(throwable), IS NOT NULL(ip))), select: (pro, throwable, CAST(_UTF-16LE'ERROR') AS level, ip, SPT, CONCAT_WS(_UTF-16LE'_', KAFKA_TOPIC, CAST(KAFKA_PARTITION), CAST(KAFKA_OFFSET)) AS $f5, l) (1/1)" #50 prio=5 os_prio=0 tid=0x00007f363d120800 nid=0x65d in Object.wait() [0x00007f3641bc4000]
   java.lang.Thread.State: WAITING (on object monitor)
    at java.lang.Object.wait(Native Method)
    at java.lang.Object.wait(Object.java:502)
    at org.apache.flink.streaming.connectors.kafka.internal.Handover.pollNext(Handover.java:74)
    - locked <0x00000000e6ee2e98> (a java.lang.Object)
    at org.apache.flink.streaming.connectors.kafka.internal.Kafka09Fetcher.runFetchLoop(Kafka09Fetcher.java:133)
    at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:721)
    at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:87)
    at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:56)
    at org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:99)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:306)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:703)
    at java.lang.Thread.run(Thread.java:748)

有2個執行緒是用來獲取訊息,對於這2個執行緒來說,這2個訊息不是直接讀取kafka,而是其它執行緒讀取kafka餵給這2個執行緒

---


"time attribute: (SPT) (1/1)" #53 prio=5 os_prio=0 tid=0x00007f363d8e4000 nid=0x662 in Object.wait() [0x00007f36418c1000]
   java.lang.Thread.State: WAITING (on object monitor)
    at java.lang.Object.wait(Native Method)
    at java.lang.Object.wait(Object.java:502)
    at org.apache.flink.runtime.io.network.partition.consumer.UnionInputGate.waitAndGetNextInputGate(UnionInputGate.java:205)
    - locked <0x00000000e6ee8210> (a java.util.ArrayDeque)
    at org.apache.flink.runtime.io.network.partition.consumer.UnionInputGate.getNextBufferOrEvent(UnionInputGate.java:163)
    at org.apache.flink.streaming.runtime.io.BarrierTracker.getNextNonBlocked(BarrierTracker.java:94)
    at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:209)
    at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:103)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:306)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:703)
    at java.lang.Thread.run(Thread.java:748)
這個執行緒對應了我們sql裡的union運算元

---


"groupBy: (pro, throwable, level, ip), window: (TumblingGroupWindow('w$, 'SPT, 3000.millis)), select: (pro, throwable, level, ip, COUNT(*) AS count, lastStrInGroupSkipNull($f5) AS id, firstLong(l) AS firstl, lastLong(l) AS lastl, start('w$) AS w$start, end('w$) AS w$end, rowtime('w$) AS w$rowtime, proctime('w$) AS w$proctime) -> where: (=(1, uniqueWithin100MS(pro, throwable, _UTF-16LE'ERROR', ip, w$end))), select: (pro, throwable, level, ip, count, id, w$end AS time, firstl, lastl) -> to: Row -> Sink: Kafka010JsonTableSink(pro, throwable, level, ip, count, id, time, firstl, lastl) (1/1)" #54 prio=5 os_prio=0 tid=0x00007f363fde3800 nid=0x664 in Object.wait() [0x00007f3641127000]
   java.lang.Thread.State: WAITING (on object monitor)
    at java.lang.Object.wait(Native Method)
    at java.lang.Object.wait(Object.java:502)
    at org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.getNextBufferOrEvent(SingleInputGate.java:533)
    - locked <0x00000000e6ee2d48> (a java.util.ArrayDeque)
    at org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.getNextBufferOrEvent(SingleInputGate.java:502)
    at org.apache.flink.streaming.runtime.io.BarrierTracker.getNextNonBlocked(BarrierTracker.java:94)
    at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:209)
    at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:103)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:306)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:703)
    at java.lang.Thread.run(Thread.java:748)
這個對應了group by運算元

---生產者


"kafka-producer-network-thread | producer-1" #55 daemon prio=5 os_prio=0 tid=0x00007f364d0f0800 nid=0x667 runnable [0x00007f3640a26000]
   java.lang.Thread.State: RUNNABLE
    at sun.nio.ch.EPollArrayWrapper.epollWait(Native Method)
    at sun.nio.ch.EPollArrayWrapper.poll(EPollArrayWrapper.java:269)
    at sun.nio.ch.EPollSelectorImpl.doSelect(EPollSelectorImpl.java:93)
    at sun.nio.ch.SelectorImpl.lockAndDoSelect(SelectorImpl.java:86)
    - locked <0x00000000e6ef3358> (a sun.nio.ch.Util$3)
    - locked <0x00000000e6ef3340> (a java.util.Collections$UnmodifiableSet)
    - locked <0x00000000e6eedbd8> (a sun.nio.ch.EPollSelectorImpl)
    at sun.nio.ch.SelectorImpl.select(SelectorImpl.java:97)
    at org.apache.kafka.common.network.Selector.select(Selector.java:489)
    at org.apache.kafka.common.network.Selector.poll(Selector.java:298)
    at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:349)
    at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:225)
    at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:126)
    at java.lang.Thread.run(Thread.java:748)
對應著生產者,直連kafka

---


"Time Trigger for Source: MyKafka010JsonTableSource -> from: (l, KAFKA_TOPIC, KAFKA_PARTITION, KAFKA_OFFSET, pro, throwable, level, ip, SPT) -> Timestamps/Watermarks -> where: (AND(=(level, _UTF-16LE'ERROR'), IS NOT NULL(pro), IS NOT NULL(throwable), IS NOT NULL(ip))), select: (pro, throwable, CAST(_UTF-16LE'ERROR') AS level, ip, SPT, CONCAT_WS(_UTF-16LE'_', KAFKA_TOPIC, CAST(KAFKA_PARTITION), CAST(KAFKA_OFFSET)) AS $f5, l) (1/1)" #57 daemon prio=5 os_prio=0 tid=0x00007f364d264800 nid=0x669 waiting on condition [0x00007f3640624000]
   java.lang.Thread.State: TIMED_WAITING (parking)
    at sun.misc.Unsafe.park(Native Method)
    - parking to wait for  <0x00000000e6ef84c0> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
    at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215)
    at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2078)
    at java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:1093)
    at java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:809)
    at java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:1067)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1127)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:748)

"Time Trigger for Source: MyKafka010JsonTableSource -> from: (l, KAFKA_TOPIC, KAFKA_PARTITION, KAFKA_OFFSET, pro, throwable, level, ip, SPT) -> Timestamps/Watermarks -> where: (AND(=(level, _UTF-16LE'ERROR'), IS NOT NULL(pro), IS NOT NULL(throwable), IS NOT NULL(ip))), select: (pro, throwable, CAST(_UTF-16LE'ERROR') AS level, ip, SPT, CONCAT_WS(_UTF-16LE'_', KAFKA_TOPIC, CAST(KAFKA_PARTITION), CAST(KAFKA_OFFSET)) AS $f5, l) (1/1)" #56 daemon prio=5 os_prio=0 tid=0x00007f363e937800 nid=0x668 waiting on condition [0x00007f3640725000]
   java.lang.Thread.State: TIMED_WAITING (parking)
    at sun.misc.Unsafe.park(Native Method)
    - parking to wait for  <0x00000000e6ee2bc8> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
    at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215)
    at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2078)
    at java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:1093)
    at java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:809)
    at java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:1067)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1127)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:748)
每個流對應著一個水印定時傳送執行緒,因為我這邊的輸入是2個流
所以有2個水印發送執行緒

---


"Kafka Partition Discovery for Source: MyKafka010JsonTableSource -> from: (l, KAFKA_TOPIC, KAFKA_PARTITION, KAFKA_OFFSET, pro, throwable, level, ip, SPT) -> Timestamps/Watermarks -> where: (AND(=(level, _UTF-16LE'ERROR'), IS NOT NULL(pro), IS NOT NULL(throwable), IS NOT NULL(ip))), select: (pro, throwable, CAST(_UTF-16LE'ERROR') AS level, ip, SPT, CONCAT_WS(_UTF-16LE'_', KAFKA_TOPIC, CAST(KAFKA_PARTITION), CAST(KAFKA_OFFSET)) AS $f5, l) (1/1)" #61 prio=5 os_prio=0 tid=0x00007f364d25f000 nid=0x66c waiting on condition [0x00007f3640121000]
   java.lang.Thread.State: TIMED_WAITING (sleeping)
    at java.lang.Thread.sleep(Native Method)
    at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase$2.run(FlinkKafkaConsumerBase.java:701)
    at java.lang.Thread.run(Thread.java:748)
    
    
"Kafka Partition Discovery for Source: MyKafka010JsonTableSource -> from: (l, KAFKA_TOPIC, KAFKA_PARTITION, KAFKA_OFFSET, pro, throwable, level, ip, SPT) -> Timestamps/Watermarks -> where: (AND(=(level, _UTF-16LE'ERROR'), IS NOT NULL(pro), IS NOT NULL(throwable), IS NOT NULL(ip))), select: (pro, throwable, CAST(_UTF-16LE'ERROR') AS level, ip, SPT, CONCAT_WS(_UTF-16LE'_', KAFKA_TOPIC, CAST(KAFKA_PARTITION), CAST(KAFKA_OFFSET)) AS $f5, l) (1/1)" #59 prio=5 os_prio=0 tid=0x00007f363f4bc800 nid=0x66a waiting on condition [0x00007f3640323000]
   java.lang.Thread.State: TIMED_WAITING (sleeping)
    at java.lang.Thread.sleep(Native Method)
    at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase$2.run(FlinkKafkaConsumerBase.java:701)
    at java.lang.Thread.run(Thread.java:748)
2個自動分割槽發現執行緒

---


"Kafka 0.10 Fetcher for Source: MyKafka010JsonTableSource -> from: (l, KAFKA_TOPIC, KAFKA_PARTITION, KAFKA_OFFSET, pro, throwable, level, ip, SPT) -> Timestamps/Watermarks -> where: (AND(=(level, _UTF-16LE'ERROR'), IS NOT NULL(pro), IS NOT NULL(throwable), IS NOT NULL(ip))), select: (pro, throwable, CAST(_UTF-16LE'ERROR') AS level, ip, SPT, CONCAT_WS(_UTF-16LE'_', KAFKA_TOPIC, CAST(KAFKA_PARTITION), CAST(KAFKA_OFFSET)) AS $f5, l) (1/1)" #60 daemon prio=5 os_prio=0 tid=0x00007f364d269800 nid=0x66d runnable [0x00007f363bffe000]
   java.lang.Thread.State: RUNNABLE
    at sun.nio.ch.EPollArrayWrapper.epollWait(Native Method)
    at sun.nio.ch.EPollArrayWrapper.poll(EPollArrayWrapper.java:269)
    at sun.nio.ch.EPollSelectorImpl.doSelect(EPollSelectorImpl.java:93)
    at sun.nio.ch.SelectorImpl.lockAndDoSelect(SelectorImpl.java:86)
    - locked <0x00000000e73f0888> (a sun.nio.ch.Util$3)
    - locked <0x00000000e73f0870> (a java.util.Collections$UnmodifiableSet)
    - locked <0x00000000e7279b20> (a sun.nio.ch.EPollSelectorImpl)
    at sun.nio.ch.SelectorImpl.select(SelectorImpl.java:97)
    at org.apache.kafka.common.network.Selector.select(Selector.java:489)
    at org.apache.kafka.common.network.Selector.poll(Selector.java:298)
    at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:349)
    at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:226)
    - locked <0x00000000e7497ec0> (a org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient)
    at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1047)
    at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:995)
    at org.apache.flink.streaming.connectors.kafka.internal.KafkaConsumerThread.run(KafkaConsumerThread.java:257)



"Kafka 0.10 Fetcher for Source: MyKafka010JsonTableSource -> from: (l, KAFKA_TOPIC, KAFKA_PARTITION, KAFKA_OFFSET, pro, throwable, level, ip, SPT) -> Timestamps/Watermarks -> where: (AND(=(level, _UTF-16LE'ERROR'), IS NOT NULL(pro), IS NOT NULL(throwable), IS NOT NULL(ip))), select: (pro, throwable, CAST(_UTF-16LE'ERROR') AS level, ip, SPT, CONCAT_WS(_UTF-16LE'_', KAFKA_TOPIC, CAST(KAFKA_PARTITION), CAST(KAFKA_OFFSET)) AS $f5, l) (1/1)" #58 daemon prio=5 os_prio=0 tid=0x00007f363f4be800 nid=0x66b runnable [0x00007f3640222000]
   java.lang.Thread.State: RUNNABLE
    at sun.nio.ch.EPollArrayWrapper.epollWait(Native Method)
    at sun.nio.ch.EPollArrayWrapper.poll(EPollArrayWrapper.java:269)
    at sun.nio.ch.EPollSelectorImpl.doSelect(EPollSelectorImpl.java:93)
    at sun.nio.ch.SelectorImpl.lockAndDoSelect(SelectorImpl.java:86)
    - locked <0x00000000e6ef0758> (a sun.nio.ch.Util$3)
    - locked <0x00000000e6ef0740> (a java.util.Collections$UnmodifiableSet)
    - locked <0x00000000e6ee0248> (a sun.nio.ch.EPollSelectorImpl)
    at sun.nio.ch.SelectorImpl.select(SelectorImpl.java:97)
    at org.apache.kafka.common.network.Selector.select(Selector.java:489)
    at org.apache.kafka.common.network.Selector.poll(Selector.java:298)
    at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:349)
    at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:226)
    - locked <0x00000000e6f03398> (a org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient)
    at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1047)
    at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:995)
    at org.apache.flink.streaming.connectors.kafka.internal.KafkaConsumerThread.run(KafkaConsumerThread.java:257)
對應著2個直連kafka的生產者執行緒

執行緒debug完了,下面我們來看每個執行緒做什麼事情!這裡先簡單交代一下訊息記錄和watermark的背景


對於每個流,有1個消費者執行緒來讀取kafka的訊息
然後通過本地記憶體交換,餵給另外一個執行緒,就是文中Handover字樣的執行緒,這個執行緒會把訊息往下游傳送,同時,有1個水印執行緒定時探測是否有更大時間戳出現,出現的話,把這個時間戳放在一個水印事件裡下廣播給下游.

---下面先來debug下Handover執行緒,看看是如何訊息餵給unionInputGate執行緒的

斷點在


stop at org.apache.flink.streaming.connectors.kafka.internal.Kafka09Fetcher:154

跑起來!

然後,傳送一條訊息到kafka,斷點順利命中

接下來就是具體看訊息的流轉過程!

訊息處理過程中,會記錄下當前事件的時間戳,位置在

作用是如果時間戳比當前值更大,則更新這個時間戳,後面會有水印執行緒定時讀取這個值決定是否需要傳送水印資訊

好,繼續觀察訊息的流動,執行到了下面這個地方


[1] org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit (RecordWriter.java:104)
  [2] org.apache.flink.streaming.runtime.io.StreamRecordWriter.emit (StreamRecordWriter.java:81)
  [3] org.apache.flink.streaming.runtime.io.RecordWriterOutput.pushToRecordWriter (RecordWriterOutput.java:107)
  [4] org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect (RecordWriterOutput.java:89)
  [5] org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect (RecordWriterOutput.java:45)
  [6] org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect (AbstractStreamOperator.java:679)
  [7] org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect (AbstractStreamOperator.java:657)
  [8] org.apache.flink.streaming.api.operators.TimestampedCollector.collect (TimestampedCollector.java:51)
  [9] org.apache.flink.table.runtime.CRowWrappingCollector.collect (CRowWrappingCollector.scala:37)
  [10] org.apache.flink.table.runtime.CRowWrappingCollector.collect (CRowWrappingCollector.scala:28)
  [11] DataStreamCalcRule$69.processElement (null)
  [12] org.apache.flink.table.runtime.CRowProcessRunner.processElement (CRowProcessRunner.scala:66)
  [13] org.apache.flink.table.runtime.CRowProcessRunner.processElement (CRowProcessRunner.scala:35)
  [14] org.apache.flink.streaming.api.operators.ProcessOperator.processElement (ProcessOperator.java:66)
  [15] org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator (OperatorChain.java:560)
  [16] org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect (OperatorChain.java:535)
  [17] org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect (OperatorChain.java:515)
  [18] org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect (AbstractStreamOperator.java:679)
  [19] org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect (AbstractStreamOperator.java:657)
  [20] org.apache.flink.streaming.runtime.operators.TimestampsAndPeriodicWatermarksOperator.processElement (TimestampsAndPeriodicWatermarksOperator.java:67)
  [21] org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator (OperatorChain.java:560)
  [22] org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect (OperatorChain.java:535)
  [23] org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect (OperatorChain.java:515)
  [24] org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect (AbstractStreamOperator.java:679)
  [25] org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect (AbstractStreamOperator.java:657)
  [26] org.apache.flink.streaming.api.operators.TimestampedCollector.collect (TimestampedCollector.java:51)
  [27] org.apache.flink.table.runtime.CRowWrappingCollector.collect (CRowWrappingCollector.scala:37)
  [28] org.apache.flink.table.runtime.CRowWrappingCollector.collect (CRowWrappingCollector.scala:28)
  [29] DataStreamSourceConversion$23.processElement (null)
  [30] org.apache.flink.table.runtime.CRowOutputProcessRunner.processElement (CRowOutputProcessRunner.scala:67)
  [31] org.apache.flink.streaming.api.operators.ProcessOperator.processElement (ProcessOperator.java:66)
  [32] org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator (OperatorChain.java:560)
  [33] org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect (OperatorChain.java:535)
  [34] org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect (OperatorChain.java:515)
  [35] org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect (AbstractStreamOperator.java:679)
  [36] org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect (AbstractStreamOperator.java:657)
  [37] org.apache.flink.streaming.api.operators.StreamSourceContexts$ManualWatermarkContext.processAndCollectWithTimestamp (StreamSourceContexts.java:310)
  [38] org.apache.flink.streaming.api.operators.StreamSourceContexts$WatermarkContext.collectWithTimestamp (StreamSourceContexts.java:409)
  [39] org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecordWithTimestamp (AbstractFetcher.java:398)
  [40] org.apache.flink.streaming.connectors.kafka.internal.Kafka010Fetcher.emitRecord (Kafka010Fetcher.java:89)
  [41] org.apache.flink.streaming.connectors.kafka.internal.Kafka09Fetcher.runFetchLoop (Kafka09Fetcher.java:154)
  [42] org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run (FlinkKafkaConsumerBase.java:721)
  [43] org.apache.flink.streaming.api.operators.StreamSource.run (StreamSource.java:87)
  [44] org.apache.flink.streaming.api.operators.StreamSource.run (StreamSource.java:56)
  [45] org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run (SourceStreamTask.java:99)
  [46] org.apache.flink.streaming.runtime.tasks.StreamTask.invoke (StreamTask.java:306)
  [47] org.apache.flink.runtime.taskmanager.Task.run (Task.java:703)
  [48] java.lang.Thread.run (Thread.java:748)

看一下這裡的即將執行的程式碼


    public void emit(T record) throws IOException, InterruptedException {
        for (int targetChannel : channelSelector.selectChannels(record, numChannels)) {
            sendToTarget(record, targetChannel);
        }
    }

這裡的print numChannels
 numChannels = 1 --->因為我們有一個union操作,union自然是所有源歸一!這就對了!

---最後放入訊息並提醒消費執行緒,完整的呼叫棧如下:


[1] org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.queueChannel (SingleInputGate.java:623)
  [2] org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.notifyChannelNonEmpty (SingleInputGate.java:612)
  [3] org.apache.flink.runtime.io.network.partition.consumer.InputChannel.notifyChannelNonEmpty (InputChannel.java:121)
  [4] org.apache.flink.runtime.io.network.partition.consumer.LocalInputChannel.notifyDataAvailable (LocalInputChannel.java:202)
  [5] org.apache.flink.runtime.io.network.partition.PipelinedSubpartitionView.notifyDataAvailable (PipelinedSubpartitionView.java:56)
  [6] org.apache.flink.runtime.io.network.partition.PipelinedSubpartition.notifyDataAvailable (PipelinedSubpartition.java:290)
  [7] org.apache.flink.runtime.io.network.partition.PipelinedSubpartition.flush (PipelinedSubpartition.java:76)
  [8] org.apache.flink.runtime.io.network.partition.ResultPartition.flush (ResultPartition.java:269)
  [9] org.apache.flink.runtime.io.network.api.writer.RecordWriter.sendToTarget (RecordWriter.java:149)
  [10] org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit (RecordWriter.java:105)
  [11] org.apache.flink.streaming.runtime.io.StreamRecordWriter.emit (StreamRecordWriter.java:81)
  [12] org.apache.flink.streaming.runtime.io.RecordWriterOutput.pushToRecordWriter (RecordWriterOutput.java:107)
  [13] org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect (RecordWriterOutput.java:89)
  [14] org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect (RecordWriterOutput.java:45)
  [15] org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect (AbstractStreamOperator.java:679)
  [16] org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect (AbstractStreamOperator.java:657)
  [17] org.apache.flink.streaming.api.operators.TimestampedCollector.collect (TimestampedCollector.java:51)
  [18] org.apache.flink.table.runtime.CRowWrappingCollector.collect (CRowWrappingCollector.scala:37)
  [19] org.apache.flink.table.runtime.CRowWrappingCollector.collect (CRowWrappingCollector.scala:28)
  [20] DataStreamCalcRule$69.processElement (null)
  [21] org.apache.flink.table.runtime.CRowProcessRunner.processElement (CRowProcessRunner.scala:66)
  [22] org.apache.flink.table.runtime.CRowProcessRunner.processElement (CRowProcessRunner.scala:35)
  [23] org.apache.flink.streaming.api.operators.ProcessOperator.processElement (ProcessOperator.java:66)
  [24] org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator (OperatorChain.java:560)
  [25] org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect (OperatorChain.java:535)
  [26] org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect (OperatorChain.java:515)
  [27] org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect (AbstractStreamOperator.java:679)
  [28] org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect (AbstractStreamOperator.java:657)
  [29] org.apache.flink.streaming.runtime.operators.TimestampsAndPeriodicWatermarksOperator.processElement (TimestampsAndPeriodicWatermarksOperator.java:67)
  [30] org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator (OperatorChain.java:560)
  [31] org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect (OperatorChain.java:535)
  [32] org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect (OperatorChain.java:515)
  [33] org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect (AbstractStreamOperator.java:679)
  [34] org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect (AbstractStreamOperator.java:657)
  [35] org.apache.flink.streaming.api.operators.TimestampedCollector.collect (TimestampedCollector.java:51)
  [36] org.apache.flink.table.runtime.CRowWrappingCollector.collect (CRowWrappingCollector.scala:37)
  [37] org.apache.flink.table.runtime.CRowWrappingCollector.collect (CRowWrappingCollector.scala:28)
  [38] DataStreamSourceConversion$23.processElement (null)
  [39] org.apache.flink.table.runtime.CRowOutputProcessRunner.processElement (CRowOutputProcessRunner.scala:67)
  [40] org.apache.flink.streaming.api.operators.ProcessOperator.processElement (ProcessOperator.java:66)
  [41] org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator (OperatorChain.java:560)
  [42] org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect (OperatorChain.java:535)
  [43] org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect (OperatorChain.java:515)
  [44] org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect (AbstractStreamOperator.java:679)
  [45] org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect (AbstractStreamOperator.java:657)
  [46] org.apache.flink.streaming.api.operators.StreamSourceContexts$ManualWatermarkContext.processAndCollectWithTimestamp (StreamSourceContexts.java:310)
  [47] org.apache.flink.streaming.api.operators.StreamSourceContexts$WatermarkContext.collectWithTimestamp (StreamSourceContexts.java:409)
  [48] org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecordWithTimestamp (AbstractFetcher.java:398)
  [49] org.apache.flink.streaming.connectors.kafka.internal.Kafka010Fetcher.emitRecord (Kafka010Fetcher.java:89)
  [50] org.apache.flink.streaming.connectors.kafka.internal.Kafka09Fetcher.runFetchLoop (Kafka09Fetcher.java:154)
  [51] org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run (FlinkKafkaConsumerBase.java:721)
  [52] org.apache.flink.streaming.api.operators.StreamSource.run (StreamSource.java:87)
  [53] org.apache.flink.streaming.api.operators.StreamSource.run (StreamSource.java:56)
  [54] org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run (SourceStreamTask.java:99)
  [55] org.apache.flink.streaming.runtime.tasks.StreamTask.invoke (StreamTask.java:306)
  [56] org.apache.flink.runtime.taskmanager.Task.run (Task.java:703)
  [57] java.lang.Thread.run (Thread.java:748)

---水印的處理應該也是類似的,所以接下來,我們來看Union所在的執行緒

我們再來複習下上面裡提到的這個執行緒的呼叫棧


"time attribute: (SPT) (1/1)" #53 prio=5 os_prio=0 tid=0x00007f363d8e4000 nid=0x662 in Object.wait() [0x00007f36418c1000]
   java.lang.Thread.State: WAITING (on object monitor)
    at java.lang.Object.wait(Native Method)
    at java.lang.Object.wait(Object.java:502)
    at org.apache.flink.runtime.io.network.partition.consumer.UnionInputGate.waitAndGetNextInputGate(UnionInputGate.java:205)
    - locked <0x00000000e6ee8210> (a java.util.ArrayDeque)
    at org.apache.flink.runtime.io.network.partition.consumer.UnionInputGate.getNextBufferOrEvent(UnionInputGate.java:163)
    at org.apache.flink.streaming.runtime.io.BarrierTracker.getNextNonBlocked(BarrierTracker.java:94)
    at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:209)
    at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:103)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:306)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:703)
    at java.lang.Thread.run(Thread.java:748)
這個執行緒對應了我們sql裡的union運算元

上面這個圖,是等待有訊息過來就提取訊息,任何一個源有訊息都會觸發訊息提取,否則wait

---注意:這裡的訊息有4種類型,一般我們只需要關注record+watermark即可

具體地點是:

---這裡講一下,關於LatencyMarker,預設2秒鐘傳送一次,截圖如下

其它的不管是record還是watermark都會往下發送!

下面我們來在union裡同時針對record和watermark打斷點,猜一猜哪個斷點先被觸發?

斷點位於【針對flink-1.5版本】


Breakpoints set:
    breakpoint org.apache.flink.streaming.runtime.io.StreamInputProcessor:184
    breakpoint org.apache.flink.streaming.runtime.io.StreamInputProcessor:198

觸發的順序如下:

---跟想的是一樣的! 下面就去研究下groupby執行緒


"groupBy: (pro, throwable, level, ip), window: (TumblingGroupWindow('w$, 'SPT, 3000.millis)), select: (pro, throwable, level, ip, COUNT(*) AS count, lastStrInGroupSkipNull($f5) AS id, firstLong(l) AS firstl, lastLong(l) AS lastl, start('w$) AS w$start, end('w$) AS w$end, rowtime('w$) AS w$rowtime, proctime('w$) AS w$proctime) -> where: (=(1, uniqueWithin100MS(pro, throwable, _UTF-16LE'ERROR', ip, w$end))), select: (pro, throwable, level, ip, count, id, w$end AS time, firstl, lastl) -> to: Row -> Sink: Kafka010JsonTableSink(pro, throwable, level, ip, count, id, time, firstl, lastl) (1/1)" #54 prio=5 os_prio=0 tid=0x00007f363fde3800 nid=0x664 in Object.wait() [0x00007f3641127000]
   java.lang.Thread.State: WAITING (on object monitor)
    at java.lang.Object.wait(Native Method)
    at java.lang.Object.wait(Object.java:502)
    at org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.getNextBufferOrEvent(SingleInputGate.java:533)
    - locked <0x00000000e6ee2d48> (a java.util.ArrayDeque)
    at org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.getNextBufferOrEvent(SingleInputGate.java:502)
    at org.apache.flink.streaming.runtime.io.BarrierTracker.getNextNonBlocked(BarrierTracker.java:94)
    at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:209)
    at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:103)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:306)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:703)
    at java.lang.Thread.run(Thread.java:748)
這個對應了group by運算元

針對group by來說,最重要的環節,這個其實跟union執行緒一樣的,也是在


org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput

這裡面來做事件的分發,所以斷點都是一樣的

---

這裡主要強調,在groupby處理watermark時的位置如下:【尤其是針對多個source來說,很容易出問題】

這個時候,我意識到在groupby執行緒中來觀察watermark還早了點,因為在union執行緒中針對watermark的處理還有一些祕密

所以我們回到union執行緒來挖這些祕密,把groupby執行緒用suspend命令掛起來,專門debug union執行緒即可!

---打個斷點【針對flink-1.5】


stop at org.apache.flink.streaming.runtime.io.StreamInputProcessor:184

研究了一把,大致明白原理了,這麼說吧,執行緒模型如下


流1-------
         |
         |
         |
         |
         |
         |---------->union執行緒的watermark--------->groupby執行緒的watermark
         |
         |
         |
         |
流2-------

其中,流1和流2---每次都發送自己看到的最大時間戳傳送個下游(看到小的就什麼都不做)

union這裡會動態更新流1和流2的各自所看到的最大時間戳,同時取Min(流1的最大時間戳,流2的最大時間戳),跟上一次的值比較

如果>上一次的Min值,則傳送給group by.

---我覺得讀者看到這裡,肯定已經懵逼了,我來解釋下思想


強調一下:訊息在中間過程中不攔截,直達最後的windowoperator那裡做windowLate判斷決定是否丟棄!
===========================================================================================
對於流1來說,它每次傳送自己已知的最大時間戳給下游,就是說“你好,下游,對我來說小於這個時間戳的就算是延遲訊息,你看著辦”
對於流2來說,它每次傳送自己已知的最大時間戳給下游,就是說“你好,下游,對我來說小於這個時間戳的就算是延遲訊息,你看著辦”
---對於union來說,這裡複雜些
它取值min( 流1的max時間戳,流2的max時間戳)跟上一次的min( 流1的max時間戳,流2的max時間戳)比較,
如果發現遞增了,就把當前較大的這個min值傳送給下游,說“你好,下游,全域性來說,對我來說小於這個時間戳的就算是延遲訊息,我只能幫到這裡了,已經盡力拖住時間戳了,你看著辦”

---對於groupby來說,它收到時間戳,每次保留最大值,然後參考最大值來快速決定每個訊息是不是延遲訊息(最大值-可容忍的延遲訊息)。


所以,在多源情況下,判斷全域性一個訊息是不是延遲訊息,實際上由min( 流1的max時間戳,流2的max時間戳)這個值來參與決定
---
我們再跳出來想一想這個事情,我估計讀者最懵逼的地方就是union為啥取每個流的最小值,而不是最大值
我們就這麼理解吧,如果取最大值,那消費慢的流的資料大部分都成為了late資料被丟棄,union就會被打
所以union為了防止被打,它不想惹眾怒,就取了min(每個流),這樣所有人都無話可說了
union旁白:我都取了你們每個流的各自的時間戳最大值的全域性最小值,還要我怎麼樣,
最慢的那個流也不會說啥了,因為取的就是它這個流上報的自身最大值。

上面都是從技術角度來闡述這個事情,那麼我們再拔高一下,從更高的層次來看這個事情
其實就是讓更多的資料沒有成為late資料,納入正常運算範圍內,由min( 流1的max時間戳,流2的max時間戳)的遞增來推動全域性windowoperator的計算輸出結果. 相應的,消費最慢的流會拖累最終業務資料的延遲生成.

 

---讀者可以再細細琢磨裡面的門道,下面我們來做邏輯測試!驗證我們是否真正理解了這個遊戲規則!


背景:容忍延遲3000毫秒
下面每行的格式就是:流名稱 + 時間戳 ,每次只輸出1條
1)流1 + 1545703896000
2)流1 + 1545703896000
3)流2 + 1545703896000
4)流2 + 1545703898999
5)流2 + 1545703899000
6)流1 + 1545703899000
7)流1 + 1545703900000
8)流2 + 1545703902000-1 --->這個不會觸發windowOperator的輸出,因為流1的最小值還不夠
9)流1 + 1545703902000-1 --->這個才會觸發windowOperator的輸出
正確輸出了,記住,一定要2個流
【齊頭並進,理實交融】

但是,其實,僅僅研究到這一步,並沒有完全結束,欲知後事如何請聽下回分解 :)

原文連結: