1. 程式人生 > >Flink中傳送端反壓以及Credit機制(原始碼分析)

Flink中傳送端反壓以及Credit機制(原始碼分析)

上一篇《Flink接收端反壓機制》說到因為Flink每個Task的接收端和傳送端是共享一個bufferPool的,形成了天然的反壓機制,當Task接收資料的時候,接收端會根據積壓的資料量以及可用的buffer數量(可用的memorySegment數)來決定是否向上遊傳送Credit(簡而言之就是當我還有空間的時候,我向上游也就是上一個Task的傳送端傳送一個ack訊息,表明我還有空間你可以傳送資料過來,如果下游沒有給你Credit就證明下游已經堵了,沒有空間了也就不能繼續往下游傳送了)

現在從原始碼來看一下Task的資料傳送端,也就是Netty的Server端的實現

先看Task初始化的時候TaskManagerRunner.java中startTaskManager()方法中

 這個connectionManager其實分為兩種,Netty,local一看就知道netty這種肯定是對應需要通過網路傳輸,本地模式這裡就不講了

 

這個地方看到Flink的client和server都初始化了,需要注意的是其實這個地方client端只是初始化了一些配置,並沒有呼叫bind()方法啟動起來,這裡看過上一篇文章的同學就會知道,client只有當第一次需要拉取上游subpatition資料的時候才會啟動起來也就是bind(),

而server端在這裡也就是task啟動的時候就啟動起來了,繼續看server端如何啟動的server.init()方法

 init方法中,這裡可以看到,這是Flink1.6以前只有基於netty的tcp網路層反壓,這裡是通過bootstrap的兩個引數

    ChannelOption.WRITE_BUFFER_HIGH_WATER_MARK  大小為兩倍的memorySegment大小

    ChannelOption.WRITE_BUFFER_LOW_WATER_MARK   大小為memorySegment + 1

接著

  

       

1處2處常規的Netty定長編解碼器,沒有什麼好說的

看看3處,4這裡先不講後面會提到

 看到3是一個inboundHandler,反壓機制時他的用處是用來接收來自下游響應的Credit,來看他的ChannelRead0方法

 當接收到的訊息是一個Credit信任的時候

先是

增加了這個reader的可用的Credit可用數

然後

 其實瞭解了接收端的反壓,傳送端接收到了下游的credit,那傳送資料的時候肯定有一個地方會先判斷是否有可用的Credit才決定是否往下發資料

其實就是這個帶星號的地方判斷,然後下面就是常規的從queue中拉取reader往netty下游writeAndFlash()資料了,沒什麼好講的

來看一下他判斷Credit是否滿足的地方

可以看到只有當

    有資料且可用的Credit數量大於0

    或者有資料且資料是一個事件而不是record的時候,才返回true往下游傳送

 

可以看到這個 enqueueAvailableReader()方法比較重要,裡面包含了判斷credit以及往後下游傳送資料的邏輯

那這個enqueueAvailableReader()方法除了會在接收到下游的Credit的時候觸發一次,還有哪會被觸發呢

既然是往下游傳送資料那我們task處理完資料以後應該也會呼叫這個方法

於是來看一下Task傳送資料,以前的文章講過,這裡就不贅述了,直接看到RecordWriterOutput的emit()

會先將record寫入到這個Serializer裡面去

然後copyFromSerializerToTargetChannel()方法中

 先去localBufferPool中請求buffer,這裡就是反壓了

請求到buffer了以後

這個呼叫鏈有點長不全列出來了

最後

 這個requestQueue其實是前面Netty初始化時具體邏輯中的4,是一個ChannelInboundHandlerAdapter

 這個Inbound一開始我也很疑惑,這個Inbound沒有重寫他的channelRead()方法,那這個不就直接轉發資料了嗎,那他的作用是幹嘛的呢

繼續往下看

原來發送資料的時候會觸發這個inbound的eventTrigger

看下userEventTriggered()具體觸發了什麼

 

這個方法就很眼熟了,就是前面到接收到下游傳送過來的Credit時會觸發一次的方法,用來判斷是否有Credit以及通過netty往下游傳送資料

這裡在傳送資料的時候果然又觸發了,後面就是判斷是否有Credit滿足往下游傳送資料的條件,然後往下游傳送資料

也就是說

    當接收到下游返回的Credit的時候會觸發一次,是否能往下游寫資料的判斷並拉queue資料寫資料

    每次Task處理完資料以後emit,也會觸發一次判斷並拉queue資料寫資料

&n