1. 程式人生 > >如何實現執行緒池的暫停和恢復功能

如何實現執行緒池的暫停和恢復功能

很多時候我們需要暫停執行緒池,而不是shutdown執行緒池,暫停執行緒池可以為我們儲存任務,稍後可以繼續執行,從而避免不必要的開銷。

這裡我提供一種暫停執行緒池的方法;

首先拿到ThreadPoolExecutor.java原始碼,將其變為自己包內的私有類;

接下來修改執行緒池,

先線上程池類中新增一下方法和變數:

BlockingQueue<Runnable> pauseQueue=new ArrayBlockingQueue<>(1);//暫停時用來則塞執行緒的空任務佇列

isPause=true;//暫停
public void pause(){//暫停執行緒池,但是仍然接受任務
isPause=true; System.out.println("暫停了"+isPause+exit); } public void resume(){//恢復執行緒池,開始接著執行任務 isPause=false; if (workQueue.isEmpty()) { return; }pauseQueue.offer(workQueue.poll());}
然後修改以下方法:
private Runnable getTask() {
    boolean timedOut = false; // Did the last poll() time out?
for (;;) { int c = ctl.get(); int rs = runStateOf(c); // Check if queue empty only if necessary. if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) { decrementWorkerCount(); return null; } int wc = workerCountOf
(c); if (isPause){ try { return pauseQueue.take(); } catch (InterruptedException e) { } }// Are workers subject to culling? boolean timed = allowCoreThreadTimeOut || wc > corePoolSize; if ((wc > maximumPoolSize || (timed && timedOut)) && (wc > 1 ||workQueue.isEmpty())) { if (compareAndDecrementWorkerCount(c)) { return null; } continue; } try { Runnable r = timed ?// TODO: 2017/5/14 keepAliveTime為空閒執行緒存活的時間 workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :// TODO: 2017/5/14 在空閒執行緒關閉之前嘗試取走佇列頭的任務,如果還沒有任務則返回null workQueue.take();// TODO: 2017/5/14 獲取佇列頭的任務 然後 在從佇列移除該任務; if (r != null) { return r; } timedOut = true;// TODO: 2017/5/14 如果取不到任務則迴圈重取,如果佇列已空則在上面返回空,當返回空時代表所有任務已完成,那麼工作機器人會關閉並銷燬 } catch (InterruptedException retry) { timedOut = false; } } }
public void execute(Runnable command) {
    if (command == null)
        throw new NullPointerException();
    
if (isPause) {
       if (!workQueue.offer(command))
          reject(command);
       return;
    }
/* * Proceed in 3 steps: * * 1. If fewer than corePoolSize threads are running, try to * start a new thread with the given command as its first * task. The call to addWorker atomically checks runState and * workerCount, and so prevents false alarms that would add * threads when it shouldn't, by returning false. * * 2. If a task can be successfully queued, then we still need * to double-check whether we should have added a thread * (because existing ones died since last checking) or that * the pool shut down since entry into this method. So we * recheck state and if necessary roll back the enqueuing if * stopped, or start a new thread if there are none. * * 3. If we cannot queue task, then we try to add a new * thread. If it fails, we know we are shut down or saturated * and so reject the task. */int c = ctl.get(); if (workerCountOf(c) < corePoolSize) { if (addWorker( command, true)) return; c = ctl.get(); } if (isRunning(c) && workQueue.offer(command)) { int recheck = ctl.get(); if (! isRunning(recheck) && remove(command)) reject(command); else if (workerCountOf(recheck) == 0) addWorker(null, false); } else if (!addWorker( command, false)) reject(command);} OK,先在只要呼叫pause()和resume()方法就能實現暫停和恢復。

原始碼下載