1. 程式人生 > >java threadPool 執行緒池簡單分析

java threadPool 執行緒池簡單分析

java 1.5 concurrent 工具包中提供了五類執行緒池的建立:

		ExecutorService executor=Executors.newCachedThreadPool();
		ExecutorService cacheExecutor=Executors.newCachedThreadPool(new TestThreadFactory());
		
		ExecutorService fixExecutor=Executors.newFixedThreadPool(10);
		ExecutorService fixedExecutor=Executors.newFixedThreadPool(10, new TestThreadFactory());
		
		ExecutorService sigExecutor=Executors.newSingleThreadExecutor();
		ExecutorService singleExecutor=Executors.newSingleThreadExecutor(new TestThreadFactory());
		
		ScheduledExecutorService schExecutor=Executors.newScheduledThreadPool(10);
		ScheduledExecutorService scheduledExecutor=Executors.newScheduledThreadPool(10,new TestThreadFactory());
		
		ScheduledExecutorService ssExecutor=Executors.newSingleThreadScheduledExecutor();
		ScheduledExecutorService sigSchExcutor=Executors.newSingleThreadScheduledExecutor(new TestThreadFactory());

底層的實現原理基本一樣: new執行緒池的時候生成一個任務佇列(blockQueue<Runnable>),第一次執行execute()或者submit()方法時會建立一個迴圈的執行緒,用於反覆讀取佇列中的任務並執行之(ps:第一次提交的任務是不用進入任務佇列,由剛建立的執行緒直接執行 ),後續的 execute()或者submit()操作則直接提交Runnable任務到佇列裡.佇列為空時,迴圈執行緒就會被blockQueue的take()方法阻塞住.

SingleThreadExecutor其實是FixedThreadPool的一個特例,SingleThreadExecutor指定對於同一個佇列只有一個執行緒去迴圈讀取佇列任務並執行, FiexedThreadPool則可以為同一佇列指定多個執行緒去迴圈讀取佇列任務並執行.

newFixedThreadPool(10)會產生10個執行緒去讀取同一個任務佇列,但這10個執行緒不是同時產生,而是提交一個任務(即執行一次execute()或者submit()方法)產生一個,當提交的任務數量超過10個,第11個任務直接提交到blockQueue<Runnable>佇列裡,然後由這10個執行緒中的某個執行緒去獲取並執行該任務.FixedThreadPool產生的10個執行緒以後也不會被回收成9個,更不可能增加到11個.

CacheThreadPool不指定具體數量的執行緒去讀取並只執行任務佇列中的任務,但是它有個最大執行緒數(Integer.MAX_VALUE=2的32次-1),  當 任務佇列飽和無法插入新任務時,會自動生成一個新的執行緒去執行新插入的任務,並參與讀取飽和的任務佇列並執行.如果高峰期生成了10個執行緒,低谷期只需要一個執行緒來執行,其餘的9個執行緒在存活一段時間後就會被終止.存活時間預設是一分鐘.這一點要和FixedThreadPool區分.

ScheduledThreadPool執行緒池執行緒數量也需要預先指定,它的主要特點是按計劃延時讀取並執行佇列任務

無論何種執行緒,當任務佇列增加任務的速度大於佇列讀取執行的速度時,就可能產生任務丟失的情況,丟失的概率由低到高依次是

CacheThreadPool > newFixedThreadPool > SingleThreadExecutor,這個很好理解.這種情況下,程式預設都會向外丟擲RejectedExecutionException異常

new 執行緒池的時候另一個構造引數 ThreadFactory,主要用途就是對提交的任務做個簡單的封裝.

附上幾個核心的程式碼片段

    public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();
        if (poolSize >= corePoolSize || !addIfUnderCorePoolSize(command)) {
            if (runState == RUNNING && workQueue.offer(command)) {
                if (runState != RUNNING || poolSize == 0)
                    ensureQueuedTaskHandled(command);
            }
            else if (!addIfUnderMaximumPoolSize(command))
                reject(command); // is shutdown or saturated
        }
    }

    private boolean addIfUnderCorePoolSize(Runnable firstTask) {
        Thread t = null;
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            if (poolSize < corePoolSize && runState == RUNNING)
                t = addThread(firstTask);
        } finally {
            mainLock.unlock();
        }
        if (t == null)
            return false;
        t.start();
        return true;
    }

    private Thread addThread(Runnable firstTask) {
        Worker w = new Worker(firstTask);
        Thread t = threadFactory.newThread(w);
        if (t != null) {
            w.thread = t;
            workers.add(w);
            int nt = ++poolSize;
            if (nt > largestPoolSize)
                largestPoolSize = nt;
        }
        return t;
    }

/**
         * Runs a single task between before/after methods.
         */
        private void runTask(Runnable task) {
            final ReentrantLock runLock = this.runLock;
            runLock.lock();
            try {
                /*
                 * Ensure that unless pool is stopping, this thread
                 * does not have its interrupt set. This requires a
                 * double-check of state in case the interrupt was
                 * cleared concurrently with a shutdownNow -- if so,
                 * the interrupt is re-enabled.
                 */
                if (runState < STOP &&
                    Thread.interrupted() &&
                    runState >= STOP)
                    thread.interrupt();
                /*
                 * Track execution state to ensure that afterExecute
                 * is called only if task completed or threw
                 * exception. Otherwise, the caught runtime exception
                 * will have been thrown by afterExecute itself, in
                 * which case we don't want to call it again.
                 */
                boolean ran = false;
                beforeExecute(thread, task);
                try {
                    task.run();
                    ran = true;
                    afterExecute(task, null);
                    ++completedTasks;
                } catch (RuntimeException ex) {
                    if (!ran)
                        afterExecute(task, ex);
                    throw ex;
                }
            } finally {
                runLock.unlock();
            }
        }

        /**
         * Main run loop
         */
        public void run() {
            try {
                Runnable task = firstTask;
                firstTask = null;
                while (task != null || (task = getTask()) != null) {
                    runTask(task);
                    task = null;
                }
            } finally {
                workerDone(this);
            }
        }