Flink Backpressure指標和機制
頁面上反壓指標意義
為了判斷是否進行反壓,jobmanager會每50ms觸發100次stack traces。
Web界面中顯示阻塞在內部方法調用的stacktraces占所有的百分比。
例如,0.01,代表著100次中有一次阻塞在內部調用。
? OK: 0 <= Ratio <= 0.10
? LOW: 0.10 < Ratio <= 0.5
? HIGH: 0.5 < Ratio <= 1
出現反壓時候典型的線程堆棧阻塞情況
"offline_data_source (42/48)" #225 prio=5 os_prio=0 tid=0x00007fb379712000 nid=0xbc5ba in Object.wait() [0x00007fb32b307000] java.lang.Thread.State: TIMED_WAITING (on object monitor) at java.lang.Object.wait(Native Method) at org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestMemorySegment(LocalBufferPool.java:261) - locked <0x00000006bf00f958> (a java.util.ArrayDeque) at org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBufferBuilderBlocking(LocalBufferPool.java:218) at org.apache.flink.runtime.io.network.api.writer.RecordWriter.requestNewBufferBuilder(RecordWriter.java:213) at org.apache.flink.runtime.io.network.api.writer.RecordWriter.sendToTarget(RecordWriter.java:144) at org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:107) at org.apache.flink.streaming.runtime.io.StreamRecordWriter.emit(StreamRecordWriter.java:81) at org.apache.flink.streaming.runtime.io.RecordWriterOutput.pushToRecordWriter(RecordWriterOutput.java:107) at org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:89) at org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:45) at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:717) at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:695) at org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:41) at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:202) - locked <0x00000006bfed93f8> (a java.lang.Object) at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:105) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711) at java.lang.Thread.run(Thread.java:745)
堆棧阻塞原理過程
其中RecordWriter將數據(封裝為BufferBuilder)被寫入ResultPartition下的各個subPartition裏過程中會有通過memorySegment來限流阻塞的過程。
具體做法是構造BufferBuilder時候LocalBufferPool通過requestMemorySegment()方法從NetworkBufferPool中申請memorySegment。
最多會從網絡棧中申請和currentPoolSize數量相等的memorySegment對象來傳遞數據。
申請到的memorySegment都會用來處理傳遞真實數據,如果下遊處理堵塞,上遊netty發送數據時無法flush數據
PartitionRequestQueue.java
// Write and flush and wait until this is done before
// trying to continue with the next buffer.
channel.writeAndFlush(msg).addListener(writeListener);
進而導致無法執行回調,無法回調影響了LocalBufferPool中recycle()。無法將正在傳遞數據的memorySegment重新釋放並加入availableMemorySegments。此時用來存儲回收memorySegment對象的availableMemorySegments隊列是空的,因為所有從網絡棧中申請來的memorySegment都卡在flush數據環節,無法被recycle到availableMemorySegments隊列。
正因如此,下面代碼中會陷入死循環,一直在wait。
從而導致jobmanager每50ms觸發100次stack traces都看到operator線程阻塞在這裏,然後認為該operator出現了反壓。
LocalBufferPool.java
private MemorySegment requestMemorySegment(boolean isBlocking) throws InterruptedException, IOException {
synchronized (availableMemorySegments) {
returnExcessMemorySegments();
boolean askToRecycle = owner.isPresent();
// fill availableMemorySegments with at least one element, wait if required
while (availableMemorySegments.isEmpty()) {
if (isDestroyed) {
throw new IllegalStateException("Buffer pool is destroyed.");
}
if (numberOfRequestedMemorySegments < currentPoolSize) {
final MemorySegment segment = networkBufferPool.requestMemorySegment();
if (segment != null) {
numberOfRequestedMemorySegments++;
return segment;
}
}
if (askToRecycle) {
owner.get().releaseMemory(1);
}
if (isBlocking) {
availableMemorySegments.wait(2000);
}
else {
return null;
}
}
return availableMemorySegments.poll();
}
}
監控
可以通過flink metrics中的output buffer usage指標來確定是否下遊出現了背壓,具體指標為flink_taskmanager_job_task_buffers_outPoolUsage
如下圖,如果usage打滿到100%則說明出現了嚴重的背壓情況
其它
所以出現反壓的時候,罪魁禍首往往不是flink UI上backpressure HIGH的那個operator,而是最後一個HIGH operator下遊的那個operator
參考文章
Monitoring Back Pressure
數據流轉:Flink的數據抽象及數據交換過程
Flink Backpressure指標和機制