1. 程式人生 > >ExecutorCompletionService公司遇到的問題

ExecutorCompletionService公司遇到的問題

1.近期工作的時候,運維通知一個系統的記憶體一直在增長,leader叫我去排查,我開始看了一下,沒處理,leader自己去看了一下,發現是執行緒池的問題,我開頭沒注意那塊,一看才發現,確實因為CompletionService裡的結果佇列引起的。CompletionService裡面有一個BlockingQueue維護結果,如果不去取結果就會導致一直裡面一直增長

  @SuppressWarnings("unchecked")
    public void doExecute(Msg msg, List<Object> actList) {
        try {
            // 1、開啟任務處理mq訊息
            service.submit(new ActMqTask(msg, actList));
        } catch (Exception e) {
            LOG.error(prefix + " doExecute is Exception", e);
            msg.setStatus(MqMsgStatus.PROCESS);
            msg.setResultDesc("訊息處理異常" + e.getMessage());
        }

    }

就這段程式碼,裡面沒有去消費這個結果佇列,導致結果佇列一直增長。

 

已經找原因了,那現在分析下這個ExecutorCompletionService

分析前,我是會預設當前讀者是會使用執行緒池以及瞭解FutureTask了,不熟悉的原始碼強烈建議看下這篇博文Java執行緒池原始碼分析,讀完可能理解就輕鬆許多

接下來我們就進入分析階段

 

1.ExecutorCompletionService

來看下這段程式碼,網上都有的

public static void main(String[] args) throws InterruptedException, ExecutionException {

    Random random = new Random();
    ExecutorService pool = Executors.newFixedThreadPool(3);

    CompletionService<String> service = new ExecutorCompletionService<String>(pool);

    for(int i = 0; i<4; i++) {

        service.submit(() -> {
            Thread.sleep(random.nextInt(1000));
            System.out.println(Thread.currentThread().getName()+"|完成任務");
            return "data"+random.nextInt(10);
        });
    }

    for(int j = 0; j < 4; j++) {
        Future<String> take = service.take(); //這一行沒有完成的任務就阻塞
        String result = take.get(); // 這一行在這裡不會阻塞,引入放入佇列中的都是已經完成的任務
        System.out.println("獲取到結果:"+result);
    }   
} 

CompletionService裡的結果集,就是take出來的結果,不是先進先出原則,先完成先出

所以你放入blockingQueue<Future<V>>都是已經完成的執行結果。所以take去拿的時候都是由結果的不會去阻塞

public class ExecutorCompletionService<V> implements CompletionService<V> {
    private final Executor executor;
    private final AbstractExecutorService aes;
    private final BlockingQueue<Future<V>> completionQueue;


    private class QueueingFuture extends FutureTask<Void> {
        QueueingFuture(RunnableFuture<V> task) {
            super(task, null);
            this.task = task;
        }
        protected void done() { completionQueue.add(task); }
        private final Future<V> task;
    }
    public Future<V> submit(Callable<V> task) {
        if (task == null) throw new NullPointerException();
        RunnableFuture<V> f = newTaskFor(task);
        executor.execute(new QueueingFuture(f));
        return f;
    }
    .......
}

這裡主要重寫了FutureTask<Void>裡的done方法,執行完之後把結果集放入blockQueue裡

 

 

再貼一段日常的結果集程式碼,與之對比

public static void main(String[] args) throws InterruptedException, ExecutionException {

    Random random = new Random();
    ExecutorService pool = Executors.newFixedThreadPool(5);     
    List<Future<String>> resultFuture = new ArrayList<>();

    for(int i = 0; i<4; i++) {
        final int tmp = i;
        Future<String> future = pool.submit(() -> {
            Thread.sleep(1000+10*tmp);
            System.out.println(Thread.currentThread().getName()+"|完成任務");
            return "data"+random.nextInt(10);
        });
        resultFuture.add(future);
    }
    System.out.println("--------------");

    for(Future<String> future:resultFuture) {
        String result = future.get();
        System.out.println("執行結果"+result);      
    }
}

 

區別對比

1.上面這段程式碼裡沒有維護一個結果集的佇列

2.取出的結果的不同和執行效率的不同。ExecutorCompletionService裡拿結果是最快的,他是根據裡面的任務完成就取出。而上面那段程式碼是根據任務先後順序然後取出結果集。

 

注意:

一:結果集的順序,因為ExecutorCompletionService是根據完成的先後,順序是不定的