Dump Plugin並行化實踐
先簡單介紹下Dump Plugin的由來,在搜尋Dump中心服務化的專案中,我們把Dump中心的增量資料產出分為2個階段,Loader階段和Join階段,Loader階段把資料準備成Key-Values形式,Join階段將資料取出,計算各種業務邏輯併產出最終資料。業務邏輯的計算是相當繁瑣且易出錯,這類事情做一遍足以,所以設計了一個介面,按照業務自身劃分成一個個小塊邏輯實現介面。這些個小業務邏輯模組即構成Dump的業務Plugin。
這樣做的好處:
1, 按業務本身劃分,結構相對清晰,容易維護。
2, 架構和業務通過介面互動,重構架構將盡可能少的影響業務程式碼
3, 每個業務模組的耗時能準確統計出並能做針對性的優化。
在最初的版本中,先根據依賴關係計算好plugin的執行順序,然後順序執行,是一個序列的過程,如下圖:
此種方式,計算耗時與業務的複雜程度成正比。而目前Dump中心已經有十幾個個業務邏輯Plugin,並且plugin之間有複雜的依賴關係。所以我們嘗試用更高效的併發方式去執行這些plugin。這個專案用的開發語言是Java,Java的多執行緒有多種成熟的設計模式,結合現有框架,我們設計了兩種方案並分別嘗試。
方案1,以單條資料為粒度,在一條資料的執行內部實現並行化,如下圖:
簡單的來說,就是起一個工作執行緒組來執行plugin,來一條資料後,工作執行緒根據依賴關係獲取當前可執行的plugin,當所有plugin都執行完畢後,輸出資料。類似於Work Thread模式,工作執行緒沒資料就等著,來了資料就做。主要程式碼流程如下:
public class Main { private Semaphore mainSemaphore, workSemaphore; private Data data; private int workThreadNum; public Data run(Data data) { this.data = data; workSemaphore.release(workThreadNum); mainSemaphore.acquire(workThreadNum); return this.data; } class WorkThread implements Runnable { private boolean loop = true; public void run() { while(loop) { workSemaphore.acquire(); //getValidPlugin: 一個synchronized的呼叫,獲得未執行的Plguin Plugin plugin = getValidPlugin(); if(plugin != null) plugin.run(data); else mainSemaphore.release(1); } } } }
程式碼中使用兩個Semaphore訊號量來同步主執行緒和工作執行緒,每條資料都需要啟用和同步,並有一個synchronized的方法來獲取當前可執行的Plugin,執行緒同步開銷比較大。實現過程中,採用重任務優先,預先計算等方法,降低並行額外引入的開銷。在單個Plugin耗時長,關鍵路徑和非關鍵路徑上的plugin耗時相差不大的情況下,此種方案效果不錯。但在目前的業務情況下,效果提升不明顯,實測約提升了10%。
通過分析plugin的依賴關係,發現目前業務邏輯下,有兩個耗時大的plugin均是關鍵路徑上的,方案1的並行是針對單個寶貝的,我們想能否在批量資料或資料流中實現資料維度的並行。資料維度的並行,最簡單的方案是將資料逐條扔給ThreadPoolExecutor,每個執行緒序列執行,但這種方案對於現有結構來說不合適,原因是plugin的程式碼無法保證執行緒安全,於是就有了方案2,如下圖:
每個Plugin都起一個工作執行緒,資料像流水線一樣從Plugin中間流過,plugin的依賴關係決定資料的流向,類似於Guarded Suspension模式,工作執行緒維護一個Queue來快取,等plugin準備好,就從Queue中取資料處理。主要程式碼流程如下:
public interface QueuePutter { public void put(Data data); } public class Main implements QueuePutter{ private BlockingQueue <data> resultQueue = new LinkedBlockingQueue <data> (); public List <data> run(List <data> dataList) { List <data> resultList = new ArrayList <data> (); for(Data data : dataList) { firstPluginThread.put(data); } putLastData(); while(true) { Data data = resultQueue.take(); if(isLastData(data)) break; resultList.add(data); } return resultList; } public void put(Data data) { this.resultQueue.put(data); } } public class PluginThread implements Runnable,QueuePutter { private Plugin plugin = null; private PluginThread nextPluginThread = null; private boolean loop = true; private BlockingQueue <data> queue = new LinkedBlockingQueue <data> (10); public PluginThread(Plugin plugin, QueuePutter next) { this.plugin = plugin; this.nextPluginThread = next; } public void run() { while(loop) { Data data = this.queue.take(); data = this.plugin.run(data); this.nextPluginThread.put(data); } } public void put(Data data) { this.queue.put(data); } } </data> </data> </data> </data> </data> </data> </data> </data>
程式碼中同步操作通過BlockingQueue來實現。主執行緒將資料分發給第一個plugin執行緒,而最後一個plugin執行緒負責將資料寫回給主執行緒。主執行緒用一條特殊的資料來標識這組資料的結尾,而後在主執行緒佇列裡一直掃描特殊資料,FIFO佇列保證了處理的時序。邏輯上來說,方案2的單條資料的處理還是序列,而是多條資料之間的並行,整體效能只取決於最慢的Plugin的耗時,實測中對於批量資料來說,效果要好於方案1。
總結:實踐Dump Plugin並行的兩種實現方式,對單資料的列並行和對批量資料/資料流的行並行。