1. 程式人生 > >java多執行緒設計模式 -- 流水線模式(Pipeline)

java多執行緒設計模式 -- 流水線模式(Pipeline)

十一、流水線模式(Pipeline)
1、核心思想
將一個任務處理分解為若干個處理階段,其中每個處理階段的輸出作為下一個處理階段的輸入,並且各個處理階段都有相應的工作者執行緒去執行相應的計算。
2、評價:
充分利用CPU,提高其計算效率。
允許子任務間存在依賴關係的條件下實現平行計算。
非常便於採用單執行緒模型實現對子任務的處理。
有錯誤處理 PipeContext
3、適用場景
a、適合於處理規模較大的任務,否則可能得不償失。各個處理階段所使用的工作者執行緒或者執行緒池、輸入輸出物件的建立和轉移都有自身的時間和空間消耗。

/**
 * 對處理階段的抽象。
 * 負責對輸入進行處理,並將輸出作為下一處理階段的輸入
 * @author huzhiqiang
 *
 * @param <IN>
 * @param <OUT>
 */
public interface Pipe<IN, OUT> { /** * 設定當前Pipe例項的下個Pipe例項 * @param nextPipe */ public void setNextPipe(Pipe<?,?> nextPipe); /** * 對輸入的元素進行處理,並將處理結果作為下一個Pipe例項的輸入 * @param input * @throws InterruptedException */ public void process(IN input) throws InterruptedException; public
void init(PipeContext pipeCtx); public void shutdown(long timeout, TimeUnit unit); } /** * 對複合Pipe的抽象。一個Pipeline例項可包含多個Pipe例項 * @author huzhiqiang * * @param <IN> * @param <OUT> */ public interface PipeLine<IN, OUT> extends Pipe<IN, OUT> { void addPipe(Pipe<?,?> pipe); } public
abstract class AbsractPipe<IN, OUT> implements Pipe<IN, OUT> { protected volatile Pipe<?, ?> nextPipe = null; protected volatile PipeContext PipeCtx = null; @Override public void setNextPipe(Pipe<?, ?> nextPipe) { this.nextPipe = nextPipe; } @SuppressWarnings("unchecked") @Override public void process(IN input) throws InterruptedException { try { OUT out = doProcess(input); if(null != nextPipe){ if(null != out){ ((Pipe<OUT, ?>) nextPipe).process(out); } } } catch (InterruptedException e) { Thread.currentThread().interrupt(); } catch (PipeException e) { PipeCtx.handleError(e); } } @Override public void init(PipeContext pipeCtx) { this.PipeCtx = pipeCtx; } @Override public void shutdown(long timeout, TimeUnit unit) { //什麼也不做 } /** * 留給子類實現,用於子類實現其任務處理邏輯 */ public abstract OUT doProcess(IN input) throws PipeException; } public abstract class AbstractParallePipe<IN, OUT, V> extends AbsractPipe<IN, OUT> { private final ExecutorService executorService; public AbstractParallePipe(BlockingQueue<IN> queue, ExecutorService executorService) { super(); this.executorService = executorService; } /** * 留給子類實現,用於根據指定的輸入元素input構造一組子任務 * @param input * @return * @throws Exception */ protected abstract List<Callable<V>> buildTasks(IN input) throws Exception; /** * 留給子類實現,對各個子任務的處理結果進行合併,形成相應輸入元素的輸出結果 * @param subTaskResults * @return * @throws Exception */ protected abstract OUT combineResults(List<Future<V>> subTaskResults) throws Exception; /** * 以並行的方式執行一組子任務 * @param tasks * @return * @throws Exception */ protected List<Future<V>> invokeParallel(List<Callable<V>> tasks) throws Exception{ return executorService.invokeAll(tasks); } @Override public OUT doProcess(IN input) throws PipeException { OUT out = null; try { out = combineResults(invokeParallel(buildTasks(input))); } catch (Exception e) { throw new PipeException(this, input, "Task failed", e); } return out; } } public class SimplePipeline<IN, OUT> extends AbsractPipe<IN, OUT> implements PipeLine<IN, OUT> { private final Queue<Pipe<?, ?>> pipes = new LinkedList<Pipe<?, ?>>(); private final ExecutorService helperService; public SimplePipeline() { //建立固定執行緒數為1的執行緒池,整型的最大數的LinkedBlockingQueue的快取佇列 this(Executors.newSingleThreadExecutor(new ThreadFactory() { @Override public Thread newThread(Runnable r) { Thread t = new Thread(r, "SimplePpeLine-Helper"); t.setDaemon(true); return t; } })); } public SimplePipeline(final ExecutorService helperService) { super(); this.helperService = helperService; } @Override public void shutdown(long timeout, TimeUnit unit) { Pipe<?,?> pipe; while(null != (pipe = pipes.poll())){ pipe.shutdown(timeout, unit); } helperService.shutdown(); } @Override public void addPipe(Pipe<?, ?> pipe) { pipes.add(pipe); } @Override public OUT doProcess(IN input) throws PipeException { // TODO Auto-generated method stub return null; } @Override public void process(IN input) throws InterruptedException { @SuppressWarnings("unchecked") Pipe<IN, ?> firstPipe = (Pipe<IN, ?>) pipes.peek(); firstPipe.process(input); } @Override public void init(PipeContext pipeCtx) { LinkedList<Pipe<?, ?>> pipesList = (LinkedList<Pipe<?, ?>>) pipes; Pipe<?, ?> prevPipe = this; //設定處理任務的先後順序 for(Pipe<?, ?> pipe: pipesList){ prevPipe.setNextPipe(pipe); prevPipe = pipe; } Runnable task = new Runnable() { @Override public void run() { for(Pipe<?, ?> pipe: pipes){ pipe.init(pipeCtx); } } }; helperService.submit(task); } public <INPUT, OUTPUT> void addAsWorkerThreadBasedPipe(Pipe<INPUT, OUTPUT> delegate, int workCount){ addPipe(new WorkThreadPipeDecorator<INPUT, OUTPUT>(delegate, workCount)); } public <INPUT, OUTPUT> void addAsThreadBasedPipe(Pipe<INPUT, OUTPUT> delegate, ExecutorService executorService){ addPipe(new ThreadPoolPipeDecorator<INPUT, OUTPUT>(delegate, executorService)); } public PipeContext newDefaultPipeContext(){ return new PipeContext() { @Override public void handleError(PipeException exp) { helperService.submit(new Runnable() { @Override public void run() { exp.printStackTrace(); } }); } }; } } public class ThreadPoolPipeDecorator<IN, OUT> implements Pipe<IN, OUT> { private final Pipe<IN, OUT> delegate; private final TerminationToken terminationToken; private final ExecutorService executorService; private final CountDownLatch stageProcessDoneLatch = new CountDownLatch(1); public ThreadPoolPipeDecorator(Pipe<IN, OUT> delegate, ExecutorService executorService) { super(); this.delegate = delegate; this.executorService = executorService; terminationToken = TerminationToken.newInstance(executorService); } @Override public void setNextPipe(Pipe<?, ?> nextPipe) { delegate.setNextPipe(nextPipe); } @Override public void process(IN input) throws InterruptedException { Runnable task = new Runnable() { @Override public void run() { int remainingReservations = -1; try { delegate.process(input); } catch (InterruptedException e) { e.printStackTrace(); }finally { remainingReservations = terminationToken.reservations.decrementAndGet(); } if(terminationToken.isToShutDown() && 0 == remainingReservations){ //最後一個任務執行結束 stageProcessDoneLatch.countDown(); } } }; executorService.submit(task); terminationToken.reservations.incrementAndGet(); } @Override public void init(PipeContext pipeCtx) { delegate.init(pipeCtx); } @Override public void shutdown(long timeout, TimeUnit unit) { terminationToken.setIsToShutdown(); if(terminationToken.reservations.get() > 0){ try { if(stageProcessDoneLatch.getCount() > 0){ //保證執行緒池中的所有任務都已經執行結束才delegate.shutdown stageProcessDoneLatch.await(timeout, unit); } } catch (InterruptedException e) { e.printStackTrace(); } } delegate.shutdown(timeout, unit); } private static class TerminationToken extends com.threadDesign.twoPhase.TerminationToken{ private final static ConcurrentHashMap<ExecutorService, TerminationToken> INSTANCE_MAP = new ConcurrentHashMap<ExecutorService, TerminationToken>(); private TerminationToken(){ } void setIsToShutdown(){ this.toShutDown = true; } static TerminationToken newInstance(ExecutorService executorService){ TerminationToken token = INSTANCE_MAP.get(executorService); if(null == token){ token = new TerminationToken(); TerminationToken existingToken = INSTANCE_MAP.putIfAbsent(executorService, token); if(null != existingToken){ token = existingToken; } } return token; } } } /** * 基於工作者執行緒的Pipe實現類 * 提交到該Pipe的任務由指定個數的工作者執行緒共同處理 * @author huzhiqiang * * @param <IN> * @param <OUT> */ public class WorkThreadPipeDecorator<IN, OUT> implements Pipe<IN, OUT> { protected final BlockingQueue<IN> workQueue; protected final Set<AbstractTerminatableThread> workerThreads = new HashSet<AbstractTerminatableThread>(); protected final TerminationToken terminationToken = new TerminationToken(); private final Pipe<IN, OUT> delegate; public WorkThreadPipeDecorator(Pipe<IN, OUT> delegate, int workerCount){ this(new SynchronousQueue<IN>(), delegate, workerCount); } public WorkThreadPipeDecorator(BlockingQueue<IN> workQueue, Pipe<IN, OUT> delegate, int workerCount) { if(workerCount <= 0){ throw new IllegalArgumentException("workerCount should be positive!"); } this.workQueue = workQueue; this.delegate = delegate; for(int i=0; i<workerCount; i++){ workerThreads.add(new AbstractTerminatableThread() { @Override protected void doRun() throws Exception { try { dispatch(); }finally { terminationToken.reservations.decrementAndGet(); } } }); } } private void dispatch() throws InterruptedException { IN input = workQueue.take(); delegate.process(input); } @Override public void setNextPipe(Pipe<?, ?> nextPipe) { delegate.setNextPipe(nextPipe); } @Override public void process(IN input) throws InterruptedException { workQueue.put(input); terminationToken.reservations.incrementAndGet(); } @Override public void init(PipeContext pipeCtx) { delegate.init(pipeCtx); for(AbstractTerminatableThread thread : workerThreads){ thread.start(); } } @Override public void shutdown(long timeout, TimeUnit unit) { for(AbstractTerminatableThread thread : workerThreads){ thread.terminate(); try { thread.join(TimeUnit.MILLISECONDS.convert(timeout, unit)); } catch (InterruptedException e) { } } delegate.shutdown(timeout, unit); } } public class PipeException extends Exception { private static final long serialVersionUID = 8647786507719222800L; /** * 丟擲異常的Pipe例項 */ public final Pipe<?, ?> sourcePipe; public final Object input; public PipeException(Pipe<?, ?> sourcePipe, Object input, String message) { super(message); this.sourcePipe = sourcePipe; this.input = input; } public PipeException(Pipe<?, ?> sourcePipe, Object input, String message, Throwable cause) { super(message, cause); this.sourcePipe = sourcePipe; this.input = input; } } /** * 對各個處理階段的計算環境進行抽象,主要用於異常處理 * @author huzhiqiang */ public interface PipeContext { public void handleError(PipeException exp); } /** * 測試程式碼 * @author huzhiqiang * */ public class ThreadPoolBasedPipeExample { public static void main(String[] args) { final ThreadPoolExecutor threadPoolExecutor; threadPoolExecutor = new ThreadPoolExecutor(1, Runtime.getRuntime().availableProcessors()*2, 60, TimeUnit.MINUTES, new SynchronousQueue<Runnable>(), new ThreadPoolExecutor.CallerRunsPolicy()); final SimplePipeline<String, String> pipeLine = new SimplePipeline<String, String>(); Pipe<String, String> pipe = new AbsractPipe<String, String>() { @Override public String doProcess(String input) throws PipeException { String result = input + "->[pipe1, " + Thread.currentThread().getName() + "]"; System.out.println(result); return result; } }; pipeLine.addAsThreadBasedPipe(pipe, threadPoolExecutor); pipe = new AbsractPipe<String, String>() { @Override public String doProcess(String input) throws PipeException { String result = input + "->[pipe2, " + Thread.currentThread().getName() + "]"; System.out.println(result); try { Thread.sleep(new Random().nextInt(100)); } catch (InterruptedException e) { e.printStackTrace(); } return result; } }; pipeLine.addAsThreadBasedPipe(pipe, threadPoolExecutor); pipe = new AbsractPipe<String, String>() { @Override public String doProcess(String input) throws PipeException { String result = input + "->[pipe3, " + Thread.currentThread().getName() + "]"; System.out.println(result); try { Thread.sleep(new Random().nextInt(200)); } catch (InterruptedException e) { e.printStackTrace(); } return result; } @Override public void shutdown(long timeout, TimeUnit unit) { threadPoolExecutor.shutdown(); try { threadPoolExecutor.awaitTermination(timeout, unit); } catch (InterruptedException e) { e.printStackTrace(); } } }; pipeLine.addAsThreadBasedPipe(pipe, threadPoolExecutor); pipeLine.init(pipeLine.newDefaultPipeContext()); int N = 10; try { for(int i=0; i<N; i++){ pipeLine.process("Task-" + i); } } catch (InterruptedException e) { e.printStackTrace(); } pipeLine.shutdown(10, TimeUnit.SECONDS); } }