1. 程式人生 > >Flink中非同步AsyncIO的實現 (原始碼分析)

Flink中非同步AsyncIO的實現 (原始碼分析)

先上張圖整體瞭解Flink中的非同步io

 

阿里貢獻給flink的,優點就不說了嘛,官網上都有,就是寫庫不會柱塞效能更好

然後來看一下, Flink 中非同步io主要分為兩種

  一種是有序Ordered

  一種是無序UNordered

主要區別是往下游output的順序(注意這裡順序不是寫庫的順序既然都非同步了寫庫的順序自然是無法保證的),有序的會按接收的順序繼續往下游output傳送,無序就是誰先處理完誰就先往下游傳送

兩張圖瞭解這兩種模式的實現

 

有序:record資料會通過非同步執行緒寫庫,Emitter是一個守護程序,會不停的拉取queue頭部的資料,如果頭部的資料非同步寫庫完成,Emitter將頭資料往下游傳送,如果頭元素還沒有非同步寫庫完成,柱塞 

     

無序:record資料會通過非同步執行緒寫庫,這裡有兩個queue,一開始放在uncompleteedQueue,當哪個record非同步寫庫成功後就直接放到completedQueue中,Emitter是一個守護程序,completedQueue只要有資料,會不停的拉取queue資料往下游傳送 

    

可以看到原理還是很簡單的,兩句話就總結完了,就是利用queue和java的非同步執行緒,現在來看下原始碼

這裡AsyncIO在Flink中被設計成operator中的一種,自然去OneInputStreamOperator的實現類中去找

於是來看一下AsyncWaitOperator.java

  

看到它的open方法(open方法會在taskmanager啟動job的時候全部統一呼叫,可以翻一下以前的文章)

這裡啟動了一個守護執行緒Emitter,來看下執行緒具體做了什麼

 

 1處拉取資料,2處就是常規的將拉取到的資料往下游emit,Emitter拉取資料,這裡先不講因為分為有序的和無序的

 這裡已經知道了這個Emitter的作用是迴圈的拉取資料往下游傳送

 回到AsyncWaitOperator.java在它的open方法初始化了Emitter,那它是如何處理接收到的資料的呢,看它的ProcessElement()方法

 

    

 

 其實主要就是三個個方法

先是!!!將record封裝成了一個包裝類StreamRecordQueueEntry,主要是這個包裝類的構造方法中,建立了一個CompleteableFuture(這個的complete方法其實會等到使用者程式碼執行的時候使用者自己決定什麼時候完成)

1處主要就是講元素加入到了對應的queue,這裡也分為兩種有序和無序的

 

這裡也先不講這兩種模式加入資料的區別

接著2處就是呼叫使用者的程式碼了,來看看官網的非同步io的例子

 

 給了一個Future作為引數,使用者自己起了一個執行緒(這裡思考一下就知道了為什麼要新起一個非同步執行緒去執行,因為如果不起執行緒的話,那processElement方法就柱塞了,無法非同步了)去寫庫讀庫等,然後呼叫了這個引數的complete方法(也就是前面那個包裝類中的CompleteableFuture)並且傳入了一個結果

看下complete方法原始碼

 

 這個resultFuture是每個record的包裝類StreamRecordQueueEntry的其中一個屬性是一個CompletableFuture

 那現在就清楚了,使用者程式碼在自己新起的執行緒中當自己的邏輯執行完以後會使這個非同步執行緒結束,並輸入一個結果

 那這個幹嘛用的呢

 

最開始的圖中看到有序和無序實現原理,有序用一個queue,無序用兩個queue分別就對應了

OrderedStreamElementQueue類中

 

 UnorderedStreamElementQueue類中

 

回到前面有兩個地方沒有細講,一是兩種模式的Emitter是如何拉取資料的,二是兩種模式下資料是如何加入OrderedStreamElementQueue的

有序模式:

1.先來看一下有序模式的,Emitter的資料拉取,和資料的加入

    其tryPut()方法

      

      

     onComplete方法

       

       onCompleteHandler方法

         

  這裡比較繞,先將接收的資料加入queue中,然後onComplete()中當上一個非同步執行緒getFuture() 其實就是每個元素包裝類裡面的那個CompletableFuture,當他結束時(會在使用者方法使用者呼叫complete時結束)非同步呼叫傳入的物件的 accept方法,accept方法中呼叫了onCompleteHandler()方法,onCompleteHandler方法中會判斷queue是否為空,以及queue的頭元素是否完成了使用者的非同步方法,當完成的時候,就會將headIsCompleted這個物件signalAll()喚醒

 

2.接著看有序模式Emitter的拉取資料

       

   這裡有序方式拉取資料的邏輯很清晰,如果為空或者頭元素沒有完成使用者的非同步方法,headIsCompleted這個物件會wait住(上面可以知道,當加入元素的到queue且頭元素完成非同步方法的時候會signalAll())然後將頭資料返回,往下游傳送

 

這樣就實現了有序傳送,因為Emitter只拉取頭元素且已經完成使用者非同步方法的頭元素

 

無序模式: 

  這裡和有序模式就大同小異了,只是變成了,接收資料後直接加入uncompletedQueue,當資料完成非同步方法的時候就,放到completedQueue裡面去並signalAll(),只要completedqueue裡面有資料,Emitter就拉取往下發

 

這樣就實現了無序模式,也就是非同步寫入誰先處理完就直接放到完成佇列裡面去,然後往下發,不用管接收資料的順序

&n