1. 程式人生 > >java 執行緒池詳解及四種執行緒池用法介紹

java 執行緒池詳解及四種執行緒池用法介紹

java 執行緒池詳解

     Executor框架是一種將執行緒的建立和執行分離的機制。它基於Executor和ExecutorService介面,及這兩個介面的實現類ThreadPoolExecutor展開,Executor有一個內部執行緒池,並提供了將任務傳遞到池中執行緒以獲得執行的方法,可傳遞的任務有如下兩種:通過Runnable介面實現的任務和通過Callable介面實現的任務。在這兩種情況下,只需要傳遞任務到執行器,執行器即可使用執行緒池中的執行緒或新建立的執行緒來執行任務。執行器也決定了任務執行時間。

     java提供了四種執行緒池的實現:

     (1)newCachedThreadPool建立一個可快取執行緒池,如果執行緒池長度超過處理需要,可靈活回收空閒執行緒,若無可回收,則新建執行緒。
     (2)newFixedThreadPool 建立一個定長執行緒池,可控制執行緒最大併發數,超出的執行緒會在佇列中等待。
     (3)newScheduledThreadPool 建立一個定長執行緒池,支援定時及週期性任務執行。
     (4)newSingleThreadExecutor 建立一個單執行緒化的執行緒池,它只會用唯一的工作執行緒來執行任務,保證所有任務按照指定順序(FIFO, LIFO, 優先順序)執行。

    下面就對這些執行緒池的使用方式進行簡要的程式碼介紹:

    首先是可快取執行緒池:

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class ThreadPoolDemo {
	public static void main (String[] args) {
		ExecutorService cachedThreadPool = Executors.newCachedThreadPool();
		for(int i = 0; i < 10; i++) {
			final int index = i;
			try {
				Thread.sleep(index*1000);
			} catch (InterruptedException e) {
				e.printStackTrace();
			}
			cachedThreadPool.execute(new Runnable () {
				public void run () {
					System.out.println(index);
				}
			});
		}
	}

}
然後是定長執行緒池:
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class ThreadPoolDemo {
	public static void main (String[] args) {
		ExecutorService fixedThreadPool = Executors.newFixedThreadPool(3);
		for(int i = 0; i < 10; i++) {
			final int index = i;
			fixedThreadPool.execute(new Runnable () {
				public void run () {
					System.out.println(index);
					try {
						Thread.sleep(2000);
					} catch (InterruptedException e) {
						// TODO Auto-generated catch block
						e.printStackTrace();
					}
				}
			});
		}
	}
}

然後是定長執行緒池支援定時和週期性任務:
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

public class ThreadPoolDemo {
	public static void main (String[] args) {
		ScheduledExecutorService scheduleThreadPool = Executors.newScheduledThreadPool(5);
		scheduleThreadPool.schedule(new Runnable() {
			public void run() {
				System.out.println("delay 3 seconds");
			}
		}, 3, TimeUnit.SECONDS);
	}
}
最後是:單執行緒化執行緒池
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

public class ThreadPoolDemo {
	public static void main (String[] args) {
		ExecutorService singleThreadExecutor = Executors.newSingleThreadExecutor();
		for(int i = 0; i < 10; i++) {
			final int index = i;
			singleThreadExecutor.execute(new Runnable() {
				public void run() {
					try {
						System.out.println(index);
						Thread.sleep(2000);
					} catch (InterruptedException e) {
						e.printStackTrace();
					} 
				}
			});
		}
	}
}

好了四種執行緒池的使用上面已經介紹完了,現在來看看執行緒池的原理吧,其中最重要的就是ThreadPoolExecutor類的建構函式,你會有疑惑,上面程式壓根沒有出現這個ThreadPoolExecutor類啊,其實,如果你追程式碼到

Executors.newSingleThreadExecutor()
中檢視你會發現如下程式碼:
    public static ExecutorService newSingleThreadExecutor() {
        return new FinalizableDelegatedExecutorService
            (new ThreadPoolExecutor(1, 1,
                                    0L, TimeUnit.MILLISECONDS,
                                    new LinkedBlockingQueue<Runnable>()));
    }

這四類執行緒池類底層都是ThreadPoolExecutor類進行初始化的,你不信你可以一個一個點進去看一下,而且我告訴你,四大執行緒池是通過使用ThreadPoolExecutor建構函式實現的;你看看下面實現就知道了

    public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>());
    }
    public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>());
    }
    public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
        return new ScheduledThreadPoolExecutor(corePoolSize);//這邊居然不是直接調ThreadPoolExecutor建構函式,但是我們追一下程式碼看看下面這個函式你就會明白

    }
    public ScheduledThreadPoolExecutor(int corePoolSize) {
        super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
              new DelayedWorkQueue());//裡面使用了父類的建構函式,下面就是本類和父類的繼承關係,看看他的父類是什麼,你就明白了
    }
    public class ScheduledThreadPoolExecutor
        extends ThreadPoolExecutor
        implements ScheduledExecutorService 
  OK。到這裡,你就應該知道我們今天的主角是誰了---------ThreadPoolExecutor
好了我們可以來看看這個類的建構函式原始碼了:
    public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue) {
        this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
             Executors.defaultThreadFactory(), defaultHandler);
    }
	看見有好多引數啊,現在主要對其中的構造引數進行解釋:

	corePoolSize:核心池大小,意思是當超過這個範圍的時候,就需要將新的執行緒放到等待佇列中了即workQueue;

	maximumPoolSize:執行緒池最大執行緒數量,表明執行緒池能建立的最大執行緒數

	keepAlivertime:當活躍執行緒數大於核心執行緒數,空閒的多餘執行緒最大存活時間。

	unit:存活時間的單位

	workQueue:存放任務的佇列---阻塞佇列

	handler:超出執行緒範圍(maximumPoolSize)和佇列容量的任務的處理程式

	
	我們執行執行緒時都會呼叫到ThreadPoolExecutor的execute()方法,現在我們來看看這個方法的原始碼(就是下面這段程式碼了,這裡面有一些註釋解析),我直接來解釋一下吧:在這段程式碼中我們至少要看懂一個邏輯:噹噹前執行緒數小於核心池執行緒數時,只需要新增一個執行緒並且啟動它,如果執行緒數數目大於核心執行緒池數目,我們將任務放到workQueue中,如果連workQueue滿了,那麼就要拒絕任務了。詳細的函式我就不介紹了

	
public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();
        /*
         * Proceed in 3 steps:
         *
         * 1. If fewer than corePoolSize threads are running, try to
         * start a new thread with the given command as its first
         * task.  The call to addWorker atomically checks runState and
         * workerCount, and so prevents false alarms that would add
         * threads when it shouldn't, by returning false.
         *
         * 2. If a task can be successfully queued, then we still need
         * to double-check whether we should have added a thread
         * (because existing ones died since last checking) or that
         * the pool shut down since entry into this method. So we
         * recheck state and if necessary roll back the enqueuing if
         * stopped, or start a new thread if there are none.
         *
         * 3. If we cannot queue task, then we try to add a new
         * thread.  If it fails, we know we are shut down or saturated
         * and so reject the task.
         */
        int c = ctl.get();
        if (workerCountOf(c) < corePoolSize) {
            if (addWorker(command, true))
                return;
            c = ctl.get();
        }
        if (isRunning(c) && workQueue.offer(command)) {
            int recheck = ctl.get();
            if (! isRunning(recheck) && remove(command))
                reject(command);
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        }
        else if (!addWorker(command, false))
            reject(command);
    }