多執行緒程式設計模型:Pipeline模式 上
簡介
核心思想:將一個任務分解為若干個階段(Stage),前階段的輸出為下階段的輸入,各個階段由不同的工作者執行緒負責執行。
多個任務的各個階段是並行(Parallel)處理的。
對於這一批任務中的某一個任務而言,其處理仍然是序列的,即完成一個任務的處理要依次執行各個階段,但從整體任務上看,各個處理階段的執行是並行的。
例子
從顧客的迎接和安排座位、點菜、菜餚烹煮、上菜到買單,每個階段都有相應的工作人員(迎賓、服務員、廚師、傳菜員和收銀員)負責,而不是由一名工作人員對一桌客人負責到底,從而減少了顧客的等待,提高了服務效率。
-
- Pipeline模式示意圖
架構——分類
將任務分解為若干個階段,並將每個階段抽象為一個物件。階段物件都有相應的工作者執行緒負責對輸入進行處理,並將輸出作為下一個處理階段的輸入。
分類(是否包含併發處理階段):線性Pipeline、非線性Pipeline。
架構——參與者
● Pipe(輸入、輸出和處理):對階段的抽象。負責對輸入進行處理,並將輸出作為下一個階段的輸入。
◇ process:處理前一個階段的輸入。
◇ init:初始化Pipe。
◇ shutdown:關閉Pipe。
◇ setNextPipe:設定下一個階段。
● PipeContext:階段的計算環境,主要用於異常處理。
◇ handleError:對丟擲的異常進行處理。
● AbstractPipe:Pipe介面的抽象實現類。
◇ process:呼叫doProcess方法對前一個階段的輸入進行處理,並處理結果提交給下一個階段。
◇ init:儲存傳遞過來的PipeContext例項,子類可根據需要覆蓋該方法。
◇ shutdown:預設實現什麼也不做。
◇ setNextPipe:設定當前階段的下一個處理階段。
◇ doProcess:留給子類實現的抽象方法。待子類實現的處理邏輯。
● WorkerThreadPipeDecorator:工作者執行緒的Pipe。將輸入存入佇列,由指定個數的工作者執行緒對佇列中的輸入進行處理。該類自身主要負責工作者執行緒的生命週期的管理。它通過呼叫delegate例項變數(委託Pipe例項)的相應方法實現Pipe介面中定義的各個方法。
◇ process:將前一個階段的輸入存入佇列,由工作者執行緒執行時取出進行處理。
◇ init:啟動工作者執行緒,並呼叫委託Pipe例項的init方法。
◇ shutdown:停止工作者執行緒,並呼叫委託Pipe例項的shutdown方法。
◇ setNextPipe:呼叫委託Pipe例項的setNextPipe方法。
◇ dispatch:取出佇列中的輸入並呼叫委託Pipe例項的process方法。
● ThreadPoolPipeDecorator:執行緒池的Pipe。Pipe介面的實現委託給delegate例項變數(委託Pipe例項)。
◇ process:接收前一個階段的輸入,並向執行緒池提交一個對該輸入進行相應處理的任務。
◇ init:呼叫委託Pipe例項的init方法。
◇ shutdown:關閉當前服務,並呼叫委託Pipe例項的shutdown方法。
◇ setNextPipe:呼叫委託Pipe例項的setNextPipe方法。
● AbstractParallelPipe:AbstractPipe的子類,支援並行處理的Pipe實現類。該類對其每個輸入(原始任務)生成相應的一組子任務,並以並行的方式去執行這些子任務。各個子任務的執行結果會被合併為相應原始任務的輸出結果。
◇ buildTasks:留給子類實現的抽象方法。用於根據指定的輸入構造一組子任務。
◇ combineResults:留給子類實現的抽象方法。對各個並行子任務的處理結果進行合併,形成相應輸入元素的輸出結果。
◇ invokeParallel:實現以並行的方式執行一組任務。
◇ doProcess:實現該類對其輸入的處理邏輯。
● ConcreteParallelPipe:由應用定義的AbstractParallelPipe的子類。
◇ buildTasks:根據指定的輸入構造一組子任務。
◇ combineResults:對各個並行子任務的處理結果進行合併,形成相應輸入元素的輸出結果。
● Pipeline:複合Pipe,可包含多個Pipe。
◇ addPipe:新增一個Pipe。
● SimplePipeline:基於AbstractPipe的Pipeline。
◇ addPipe:新增一個Pipe。
◇ addAsWorkerThreadBasedPipe:將WorkerThreadPipeDecorator包裝後的Pipe加入Pipeline。
◇ addAsThreadPoolBasedPipe:將ThreadPoolPipeDecorator包裝後的Pipe加入Pipeline。
序列圖
第1步:客戶端建立Pipeline。
第2~4步:客戶端建立各個Pipe。
第5步:客戶端呼叫Pipeline的add方法新增各個Pipe。
第6步:客戶端建立PipeContext。
第7步:客戶端呼叫Pipeline的init方法。
第8~13步:init方法呼叫各個Pipe的init方法,初始化各個Pipe。
第14步:init方法呼叫返回。
第1步:客戶端呼叫Pipeline的process方法。
第2、3步:process方法呼叫第一個Pipe的process方法。
第4步:process方法返回。
第5步:第一個Pipe例項開始處理任務,其dispatch方法被工作者執行緒(或者執行緒池中的工作者執行緒)呼叫。
第6步:dispatch方法呼叫pipe1的委託Pipe例項delegate1的process方法。
第7步:delegate1的process方法呼叫其doProcess方法對輸入元素inputA進行處理,相應處理結果為outA。
第8、9步:delegate1的process方法獲取其下一個Pipe例項(同時也是pipe1的下一個Pipe例項)pipe2,並呼叫pipe2的process方法,將outA作為輸入元素提交給pipe2處理。
第10步:pipe1的process方法返回。