1. 程式人生 > >Java執行緒池原理與例項詳解

Java執行緒池原理與例項詳解

Wiki

採用new Thread的方式產生多執行緒,可能有以下一些問題:
 執行緒的建立和銷燬開銷很大,尤其是有些執行緒的存在時間較短;
 執行緒的建立和銷燬過程中伴隨著CPU線上程間的切換,開銷很大;
執行緒池的優點有:
 減少了建立和銷燬執行緒的次數,工作執行緒可以一直重用。
在java中,每個執行緒都歸屬於某個ThreadGroup來管理。

ThreadGroup原始碼分析

首先看成員變數。

/**
 * {@code ThreadGroup} is a means of organizing threads into a hierarchical structure.
 * This class is obsolete. See <i>Effective Java</i> Item 73, "Avoid thread groups" for details.
 * @see
Thread */
public class ThreadGroup implements Thread.UncaughtExceptionHandler { // Name of this ThreadGroup // VM needs this field name for debugging. private String name; // Maximum priority for Threads inside this ThreadGroup private int maxPriority = Thread.MAX_PRIORITY; // The ThreadGroup to which this ThreadGroup belongs
// VM needs this field name for debugging. final ThreadGroup parent; /** * Weak references to the threads in this group. * Access is guarded by synchronizing on this field. */ private final List<WeakReference<Thread>> threadRefs = new ArrayList<WeakReference<Thread>>(5
); /** * View of the threads. * Access is guarded by synchronizing on threadRefs. */ private final Iterable<Thread> threads = CollectionUtils.dereferenceIterable(threadRefs, true); /** * Thread groups. Access is guarded by synchronizing on this field. */ private final List<ThreadGroup> groups = new ArrayList<ThreadGroup>(3); // Whether this ThreadGroup is a daemon ThreadGroup or not private boolean isDaemon; // Whether this ThreadGroup has already been destroyed or not private boolean isDestroyed; /* the VM uses these directly; do not rename */ **static final ThreadGroup systemThreadGroup = new ThreadGroup(); static final ThreadGroup mainThreadGroup = new ThreadGroup(systemThreadGroup, "main");** ……省略程式碼…… }

ThreadGroup中採用弱引用的連結串列方式,可以新增Thread(連結串列預設容量為5)。同時有一個ThreadGroup本身也可以通過連結串列方式組織其他的ThreadGroup(連結串列預設容量為3)。
這裡寫圖片描述
另外,這裡定義了一個main型別的ThreadGroup。也就是說,在主執行緒的執行緒組的名字就是“main”。例如:

public class Main {
    public static void main(String[] args) {
        // TODO Auto-generated method stub
        String name=Thread.currentThread().getThreadGroup().getName();
        System.out.println(name);
    }
}

下面來看ThreadGroup的3建構函式。

/**
 * {@code ThreadGroup} is a means of organizing threads into a hierarchical structure.
 * This class is obsolete. See <i>Effective Java</i> Item 73, "Avoid thread groups" for details.
 * @see Thread
 */
public class ThreadGroup implements Thread.UncaughtExceptionHandler {
……省略程式碼……

    /**
     * Constructs a new {@code ThreadGroup} with the given name. The new {@code ThreadGroup}
     * will be child of the {@code ThreadGroup} to which the calling thread belongs.
     *
     * @param name the name
     * @see Thread#currentThread
     */
    public ThreadGroup(String name) {
        this(Thread.currentThread().getThreadGroup(), name);
    }

    /**
     * Constructs a new {@code ThreadGroup} with the given name, as a child of the
     * given {@code ThreadGroup}.
     *
     * @param parent the parent
     * @param name the name
     * @throws NullPointerException if {@code parent == null}
     * @throws IllegalThreadStateException if {@code parent} has been
     *         destroyed already
     */
    public ThreadGroup(ThreadGroup parent, String name) {
        if (parent == null) {
            throw new NullPointerException("parent == null");
        }
        this.name = name;
        this.parent = parent;
        if (parent != null) {
            parent.add(this);
            this.setMaxPriority(parent.getMaxPriority());
            if (parent.isDaemon()) {
                this.setDaemon(true);
            }
        }
    }

    /**
     * Initialize the special "system" ThreadGroup. Was "main" in Harmony,
     * but we have an additional group above that in Android.
     */
    private ThreadGroup() {
        this.name = "system";
this.parent = null;
    }
……省略程式碼……
}

如果是完全無引數的建構函式,表明這個ThreadGroup沒有parent,否則parent就是生成ThreadGroup的當前執行緒的執行緒組。這樣,ThreadGroup就可以形成一棵樹。
這裡寫圖片描述
繼續檢視ThreadGroup的關鍵方法。

/**
 * {@code ThreadGroup} is a means of organizing threads into a hierarchical structure.
 * This class is obsolete. See <i>Effective Java</i> Item 73, "Avoid thread groups" for details.
 * @see Thread
 */
public class ThreadGroup implements Thread.UncaughtExceptionHandler {
……省略程式碼……
/**
     * Returns the number of running {@code Thread}s which are children of this thread group,
     * directly or indirectly.
     *
     * @return the number of children
     */
    public int activeCount() {
        int count = 0;
        synchronized (threadRefs) {
            for (Thread thread : threads) {
                if (thread.isAlive()) {
                    count++;
                }
            }
        }
        synchronized (groups) {
            for (ThreadGroup group : groups) {
                count += group.activeCount();
            }
        }
        return count;
}
    /**
     * Interrupts every {@code Thread} in this group and recursively in all its
     * subgroups.
     *
     * @see Thread#interrupt
     */
    public final void interrupt() {
        synchronized (threadRefs) {
            for (Thread thread : threads) {
                thread.interrupt();
            }
        }
        synchronized (groups) {
            for (ThreadGroup group : groups) {
                group.interrupt();
            }
        }
    }
……省略程式碼……
}

由於一個ThreadGroup中的Thread是由連結串列方式組織的弱引用型別,activeCount()可以返回當前還有效的Thread。interrupt()函式讓一個ThreadGroup中所有的Thread都中斷。

Thread原始碼分析

這裡主要分析與ThreadGroup相關的內容。
首先看Thread的9個建構函式。

/**
     * Constructs a new {@code Thread} with no {@code Runnable} object and a
     * newly generated name. The new {@code Thread} will belong to the same
     * {@code ThreadGroup} as the {@code Thread} calling this constructor.
     *
     * @see java.lang.ThreadGroup
     * @see java.lang.Runnable
     */
    public Thread() {
        create(null, null, null, 0);
    }

    /**
     * Constructs a new {@code Thread} with a {@code Runnable} object and a
     * newly generated name. The new {@code Thread} will belong to the same
     * {@code ThreadGroup} as the {@code Thread} calling this constructor.
     *
     * @param runnable
     *            a {@code Runnable} whose method <code>run</code> will be
     *            executed by the new {@code Thread}
     *
     * @see java.lang.ThreadGroup
     * @see java.lang.Runnable
     */
    public Thread(Runnable runnable) {
        create(null, runnable, null, 0);
    }

    /**
     * Constructs a new {@code Thread} with a {@code Runnable} object and name
     * provided. The new {@code Thread} will belong to the same {@code
     * ThreadGroup} as the {@code Thread} calling this constructor.
     *
     * @param runnable
     *            a {@code Runnable} whose method <code>run</code> will be
     *            executed by the new {@code Thread}
     * @param threadName
     *            the name for the {@code Thread} being created
     *
     * @see java.lang.ThreadGroup
     * @see java.lang.Runnable
     */
    public Thread(Runnable runnable, String threadName) {
        if (threadName == null) {
            throw new NullPointerException("threadName == null");
        }

        create(null, runnable, threadName, 0);
    }

    /**
     * Constructs a new {@code Thread} with no {@code Runnable} object and the
     * name provided. The new {@code Thread} will belong to the same {@code
     * ThreadGroup} as the {@code Thread} calling this constructor.
     *
     * @param threadName
     *            the name for the {@code Thread} being created
     *
     * @see java.lang.ThreadGroup
     * @see java.lang.Runnable
     *
     */
    public Thread(String threadName) {
        if (threadName == null) {
            throw new NullPointerException("threadName == null");
        }

        create(null, null, threadName, 0);
    }

    /**
     * Constructs a new {@code Thread} with a {@code Runnable} object and a
     * newly generated name. The new {@code Thread} will belong to the {@code
     * ThreadGroup} passed as parameter.
     *
     * @param group
     *            {@code ThreadGroup} to which the new {@code Thread} will
     *            belong
     * @param runnable
     *            a {@code Runnable} whose method <code>run</code> will be
     *            executed by the new {@code Thread}
     * @throws IllegalThreadStateException
     *             if <code>group.destroy()</code> has already been done
     * @see java.lang.ThreadGroup
     * @see java.lang.Runnable
     */
    public Thread(ThreadGroup group, Runnable runnable) {
        create(group, runnable, null, 0);
    }

    /**
     * Constructs a new {@code Thread} with a {@code Runnable} object, the given
     * name and belonging to the {@code ThreadGroup} passed as parameter.
     *
     * @param group
     *            ThreadGroup to which the new {@code Thread} will belong
     * @param runnable
     *            a {@code Runnable} whose method <code>run</code> will be
     *            executed by the new {@code Thread}
     * @param threadName
     *            the name for the {@code Thread} being created
     * @throws IllegalThreadStateException
     *             if <code>group.destroy()</code> has already been done
     * @see java.lang.ThreadGroup
     * @see java.lang.Runnable
     */
    public Thread(ThreadGroup group, Runnable runnable, String threadName) {
        if (threadName == null) {
            throw new NullPointerException("threadName == null");
        }

        create(group, runnable, threadName, 0);
    }

    /**
     * Constructs a new {@code Thread} with no {@code Runnable} object, the
     * given name and belonging to the {@code ThreadGroup} passed as parameter.
     *
     * @param group
     *            {@code ThreadGroup} to which the new {@code Thread} will belong
     * @param threadName
     *            the name for the {@code Thread} being created
     * @throws IllegalThreadStateException
     *             if <code>group.destroy()</code> has already been done
     * @see java.lang.ThreadGroup
     * @see java.lang.Runnable
     */
    public Thread(ThreadGroup group, String threadName) {
        if (threadName == null) {
            throw new NullPointerException("threadName == null");
        }

        create(group, null, threadName, 0);
    }

    /**
     * Constructs a new {@code Thread} with a {@code Runnable} object, the given
     * name and belonging to the {@code ThreadGroup} passed as parameter.
     *
     * @param group
     *            {@code ThreadGroup} to which the new {@code Thread} will
     *            belong
     * @param runnable
     *            a {@code Runnable} whose method <code>run</code> will be
     *            executed by the new {@code Thread}
     * @param threadName
     *            the name for the {@code Thread} being created
     * @param stackSize
     *            a stack size for the new {@code Thread}. This has a highly
     *            platform-dependent interpretation. It may even be ignored
     *            completely.
     * @throws IllegalThreadStateException
     *             if <code>group.destroy()</code> has already been done
     * @see java.lang.ThreadGroup
     * @see java.lang.Runnable
     */
    public Thread(ThreadGroup group, Runnable runnable, String threadName, long stackSize) {
        if (threadName == null) {
            throw new NullPointerException("threadName == null");
        }
        create(group, runnable, threadName, stackSize);
    }

    /**
     * Package-scope method invoked by Dalvik VM to create "internal"
     * threads or attach threads created externally.
     *
     * Don't call Thread.currentThread(), since there may not be such
     * a thing (e.g. for Main).
     */
    Thread(ThreadGroup group, String name, int priority, boolean daemon) {
        synchronized (Thread.class) {
            id = ++Thread.count;
        }

        if (name == null) {
            this.name = "Thread-" + id;
        } else {
            this.name = name;
        }

        if (group == null) {
            throw new InternalError("group == null");
        }

        this.group = group;

        this.target = null;
        this.stackSize = 0;
        this.priority = priority;
        this.daemon = daemon;

        /* add ourselves to our ThreadGroup of choice */
        this.group.addThread(this);
    }

可以看出,create(group, runnable, threadName, stackSize)是關鍵方法。

  /**
     * Initializes a new, existing Thread object with a runnable object,
     * the given name and belonging to the ThreadGroup passed as parameter.
     * This is the method that the several public constructors delegate their
     * work to.
     *
     * @param group ThreadGroup to which the new Thread will belong
     * @param runnable a java.lang.Runnable whose method <code>run</code> will
     *        be executed by the new Thread
     * @param threadName Name for the Thread being created
     * @param stackSize Platform dependent stack size
     * @throws IllegalThreadStateException if <code>group.destroy()</code> has
     *         already been done
     * @see java.lang.ThreadGroup
     * @see java.lang.Runnable
     */
    private void create(ThreadGroup group, Runnable runnable, String threadName, long stackSize) {
        Thread currentThread = Thread.currentThread();
        if (group == null) {
            group = currentThread.getThreadGroup();//當前執行緒的執行緒組
        }

        if (group.isDestroyed()) {
            throw new IllegalThreadStateException("Group already destroyed");
        }

        this.group = group;

        synchronized (Thread.class) {
            id = ++Thread.count;
        }

        if (threadName == null) {
            this.name = "Thread-" + id;
        } else {
            this.name = threadName;
        }

        this.target = runnable;
        this.stackSize = stackSize;

        this.priority = currentThread.getPriority();

        this.contextClassLoader = currentThread.contextClassLoader;

        // Transfer over InheritableThreadLocals.
        if (currentThread.inheritableValues != null) {
            inheritableValues = new ThreadLocal.Values(currentThread.inheritableValues);
        }

        // add ourselves to our ThreadGroup of choice
        this.group.addThread(this);
    }

可以看出,Thread建構函式中ThreadGroup為null,或者不包含ThreaGroup引數時,Thread都會被加入到當前執行緒的ThreadGroup中。

自定義ThreadPool

執行緒池可以理解為其中有多個固定的工作執行緒,然後不斷從同一個任務佇列中取得任務並執行。如果某執行緒取任務時沒有任務,該執行緒處於wait()狀態;當新增任務時,notiyfy()一次通知最近的處於wait()狀態的執行緒。這符合一個生產者,多個消費者的模型。
這裡參考《Java網路程式設計精解(孫衛琴)》P67的內容來編寫自定義的ThreadPool。

public class ThreadPool extends ThreadGroup{
    private boolean isClosed=false;
    private LinkedList<Runnable> workQueue;//任務佇列
    private static int threadPoolID;
    private int threadID;
    public ThreadPool(int poolSize) {
        // TODO Auto-generated constructor stub
        super("ThreadPool"+(threadPoolID++));
        setDaemon(true);
        workQueue=new LinkedList<Runnable>();
        for (int i = 0; i < poolSize; i++) {
            new WorkThread().start();//啟用固定數量的執行緒
        }
    }
    public synchronized void execute(Runnable task){
        if (isClosed) {
            throw new IllegalStateException();
        }
        if (task!=null) {
            workQueue.add(task);
            notify();//喚醒一個wait()的執行緒
        }
    }
    /**
     * 通過interrupt()來通知執行緒組中所有的執行緒執行中斷
     * */
    public synchronized void close(){
        if (!isClosed) {
            isClosed=true;
            workQueue.clear();
            interrupt();//中斷所有執行緒
        }
    }
    public void join(){
        synchronized (this) {
            isClosed=true;//任務取得為null
            notifyAll();//喚醒所有執行緒
        }
        Thread[] threads=new Thread[activeCount()];
        int count=enumerate(threads);
        for (int i = 0; i < count; i++) {
            try {
                threads[i].join();//
            } catch (InterruptedException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }
        }
    }
    protected synchronized Runnable getTask() throws InterruptedException{
        while (workQueue.size()==0) {
            if (isClosed) return null;
            wait();

        }
        return workQueue.removeFirst();
    }
    private class WorkThread extends Thread{
        public WorkThread() {
            // TODO Auto-generated constructor stub
            super(ThreadPool.this,"WokrThread-"+(threadID++));
        }

        @Override
        public void run() {
            // TODO Auto-generated method stub
            super.run();
            while (!isInterrupted()) {
                Runnable task=null;
                try {
                    task=getTask();
                } catch (InterruptedException e) {
                    // TODO Auto-generated catch block
                    e.printStackTrace();
                }
                if (task==null) {
                    return;
                }
                task.run();

            }
        }

    }
}

這裡進行測試。

public class Main {
    public static void main(String[] args) {
        // TODO Auto-generated method stub
        ThreadPool pool=new ThreadPool(3);
        for (int i = 0; i < 10; i++) {
            pool.execute(createTask(i));
        }
        pool.join();//執行完正在執行的任務再結束
//      pool.close();//立即結束

    }
    private static Runnable createTask(final int taskID){
        return new Runnable() {

            @Override
            public void run() {
                // TODO Auto-generated method stub
                System.out.println("Task"+taskID+":start");
                try {
                    Thread.sleep(500);
                } catch (InterruptedException e) {
                    // TODO Auto-generated catch block
                    e.printStackTrace();
                    System.out.println("Task"+taskID+":end");
                }
            }
        };
    }

}

Java JDK執行緒池

Java執行緒池類

這裡寫圖片描述
真正實現的執行緒池類有3個。
(1) ThreadPoolExecutor
採用幾個執行緒來執行提交的任務。其中一個建構函式如下:

ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) 

API引數如下:
corePoolSize :the number of threads to keep in the pool, even if they are idle, unless allowCoreThreadTimeOut is set
maximumPoolSize :the maximum number of threads to allow in the pool
keepAliveTime :when the number of threads is greater than the core, this is the maximum time that excess idle threads will wait for new tasks before terminating.
unit the time:unit for the keepAliveTime argument
workQueue :the queue to use for holding tasks before they are executed. This queue will hold only the Runnable tasks submitted by the execute method.
最常用的場景是通過Executors類的工廠方法來構造,包括newCachedThreadPool()、newFixedThreadPool(int) 、newSingleThreadExecutor() ,具體檢視下一節。
(2) ScheduledThreadPoolExecutor
任務可以在一定時間後執行,也可以設定週期性執行。建構函式比較簡單,只需要指定執行緒數量。主要方法在schedule相關的成員方法。例如:

public ScheduledFuture<?>  scheduleWithFixedDelay (Runnable command, long initialDelay, long delay, TimeUnit unit)

API引數如下:
command:the task to execute
initialDelay :the time to delay first execution
delay :the delay between the termination of one execution and the commencement of the next
unit the time:unit of the initialDelay and delay parameters
返回值:
a ScheduledFuture representing pending completion of the task, and whose get() method will throw an exception upon cancellation
最常用的場景是通過Executors類的工廠方法來構造,包括newCachedThreadPool()、newFixedThreadPool(int) 、newSingleThreadExecutor() ,具體檢視下一節。
(3) ForkJoinPool
Fork/Join模式的執行緒池,用於支援平行計算。更多的資訊來源於JDK 7 中的 Fork/Join 模式

Executors工廠方法初始化執行緒池

一般來講,不直接初始化執行緒池物件。Executors中幾個靜態方法提供了執行緒池的例項化方法。
(1)只有一個工作執行緒的執行緒池(ThreadPoolExecutor)

/**
     * Creates an Executor that uses a single worker thread operating
     * off an unbounded queue. (Note however that if this single
     * thread terminates due to a failure during execution prior to
     * shutdown, a new one will take its place if needed to execute
     * subsequent tasks.)  Tasks are guaranteed to execute
     * sequentially, and no more than one task will be active at any
     * given time. Unlike the otherwise equivalent
     * {@code newFixedThreadPool(1)} the returned executor is
     * guaranteed not to be reconfigurable to use additional threads.
     *
     * @return the newly created single-threaded Executor
     */
    public static ExecutorService newSingleThreadExecutor() {
        return new FinalizableDelegatedExecutorService
            (new ThreadPoolExecutor(1, 1,
                                    0L, TimeUnit.MILLISECONDS,
                                    new LinkedBlockingQueue<Runnable>()));
}
/**
     * Creates an Executor that uses a single worker thread operating
     * off an unbounded queue, and uses the provided ThreadFactory to
     * create a new thread when needed. Unlike the otherwise
     * equivalent {@code newFixedThreadPool(1, threadFactory)} the
     * returned executor is guaranteed not to be reconfigurable to use
     * additional threads.
     *
     * @param threadFactory the factory to use when creating new
     * threads
     *
     * @return the newly created single-threaded Executor
     * @throws NullPointerException if threadFactory is null
     */
    public static ExecutorService newSingleThreadExecutor(ThreadFactory threadFactory) {
        return new FinalizableDelegatedExecutorService
            (new ThreadPoolExecutor(1, 1,
                                    0L, TimeUnit.MILLISECONDS,
                                    new LinkedBlockingQueue<Runnable>(),
                                    threadFactory));
    }

(2)固定執行緒數量(ThreadPoolExecutor)

/**
     * Creates a thread pool that reuses a fixed number of threads
     * operating off a shared unbounded queue.  At any point, at most
     * {@code nThreads} threads will be active processing tasks.
     * If additional tasks are submitted when all threads are active,
     * they will wait in the queue until a thread is available.
     * If any thread terminates due to a failure during execution
     * prior to shutdown, a new one will take its place if needed to
     * execute subsequent tasks.  The threads in the pool will exist
     * until it is explicitly {@link ExecutorService#shutdown shutdown}.
     *
     * @param nThreads the number of threads in the pool
     * @return the newly created thread pool
     * @throws IllegalArgumentException if {@code nThreads <= 0}
     */
    public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>());
}

    /**
     * Creates a thread pool that reuses a fixed number of threads
     * operating off a shared unbounded queue, using the provided
     * ThreadFactory to create new threads when needed.  At any point,
     * at most {@code nThreads} threads will be active processing
     * tasks.  If additional tasks are submitted when all threads are
     * active, they will wait in the queue until a thread is
     * available.  If any thread terminates due to a failure during
     * execution prior to shutdown, a new one will take its place if
     * needed to execute subsequent tasks.  The threads in the pool will
     * exist until it is explicitly {@link ExecutorService#shutdown
     * shutdown}.
     *
     * @param nThreads the number of threads in the pool
     * @param threadFactory the factory to use when creating new threads
     * @return the newly created thread pool
     * @throws NullPointerException if threadFactory is null
     * @throws IllegalArgumentException if {@code nThreads <= 0}
     */
    public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>(),
                                      threadFactory);
    }

(3) 不固定數量的執行緒池(ThreadPoolExecutor)
如果執行緒不被使用,執行緒的存活時間為60s。

/**
     * Creates a thread pool that creates new threads as needed, but
     * will reuse previously constructed threads when they are
     * available.  These pools will typically improve the performance
     * of programs that execute many short-lived asynchronous tasks.
     * Calls to {@code execute} will reuse previously constructed
     * threads if available. If no existing thread is available, a new
     * thread will be created and added to the pool. Threads that have
     * not been used for sixty seconds are terminated and removed from
     * the cache. Thus, a pool that remains idle for long enough will
     * not consume any resources. Note that pools with similar
     * properties but different details (for example, timeout parameters)
     * may be created using {@link ThreadPoolExecutor} constructors.
     *
     * @return the newly created thread pool
     */
    public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>());
}
  /**
     * Creates a thread pool that creates new threads as needed, but
     * will reuse previously constructed threads when they are
     * available, and uses the provided
     * ThreadFactory to create new threads when needed.
     * @param threadFactory the factory to use when creating new threads
     * @return the newly created thread pool
     * @throws NullPointerException if threadFactory is null
     */
    public static ExecutorService newCachedThreadPool(ThreadFactory threadFactory) {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>(),
                                      threadFactory);
    }

(4)Fork/Join模式的執行緒池(ForkJoinPool)

  /**
     * Creates a thread pool that maintains enough threads to support
     * the given parallelism level, and may use multiple queues to
     * reduce contention. The parallelism level corresponds to the
     * maximum number of threads actively engaged in, or available to
     * engage in, task processing. The actual number of threads may
     * grow and shrink dynamically. A work-stealing pool makes no
     * guarantees about the order in which submitted tasks are
     * executed.
     *
     * @param parallelism the targeted parallelism level
     * @return the newly created thread pool
     * @throws IllegalArgumentException if {@code parallelism <= 0}
     * @since 1.8
     * @hide
     */
    public static ExecutorService newWorkStealingPool(int parallelism) {
        return new ForkJoinPool
            (parallelism,
             ForkJoinPool.defaultForkJoinWorkerThreadFactory,
             null, true);
}
/**
     * Creates a work-stealing thread pool using all
     * {@link Runtime#availableProcessors available processors}
     * as its target parallelism level.
     * @return the newly created thread pool
     * @since 1.8
     * @hide
     */
    public static ExecutorService newWorkStealingPool() {
        return new ForkJoinPool
            (Runtime.getRuntime().availableProcessors(),
             ForkJoinPool.defaultForkJoinWorkerThreadFactory,
             null, true);
    }

(5)單個執行緒的Scheduled型別執行緒池(ScheduledThreadPoolExecutor)

/**
     * Creates a single-threaded executor that can schedule commands
     * to run after a given delay, or to execute periodically.
     * (Note however that if this single
     * thread terminates due to a failure during execution prior to
     * shutdown, a new one will take its place if needed to execute
     * subsequent tasks.)  Tasks are guaranteed to execute
     * sequentially, and no more than one task will be active at any
     * given time. Unlike the otherwise equivalent
     * {@code newScheduledThreadPool(1)} the returned executor is
     * guaranteed not to be reconfigurable to use additional threads.
     * @return the newly created scheduled executor
     */
    public static ScheduledExecutorService newSingleThreadScheduledExecutor() {
        return new DelegatedScheduledExecutorService
            (new ScheduledThreadPoolExecutor(1));
}
/**
     * Creates a single-threaded executor that can schedule commands
     * to run after a given delay, or to execute periodically.  (Note
     * however that if this single thread terminates due to a failure
     * during execution prior to shutdown, a new one will take its
     * place if needed to execute subsequent tasks.)  Tasks are
     * guaranteed to execute sequentially, and no more than one task
     * will be active at any given time. Unlike the otherwise
     * equivalent {@code newScheduledThreadPool(1, threadFactory)}
     * the returned executor is guaranteed not to be reconfigurable to
     * use additional threads.
     * @param threadFactory the factory to use when creating new
     * threads
     * @return a newly created scheduled executor
     * @throws NullPointerException if threadFactory is null
     */
    public static ScheduledExecutorService newSingleThreadScheduledExecutor(ThreadFactory threadFactory) {
        return new DelegatedScheduledExecutorService
            (new ScheduledThreadPoolExecutor(1, threadFactory));
    }

(6)多個執行緒的Scheduled型別執行緒池(ScheduledThreadPoolExecutor)

  /**
     * Creates a thread pool that can schedule commands to run after a
     * given delay, or to execute periodically.
     * @param corePoolSize the number of threads to keep in the pool,
     * even if they are idle
     * @return a newly created scheduled thread pool
     * @throws IllegalArgumentException if {@code corePoolSize < 0}
     */
    public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
        return new ScheduledThreadPoolExecutor(corePoolSize);
    }

    /**
     * Creates a thread pool that can schedule commands to run after a
     * given delay, or to execute periodically.
     * @param corePoolSize the number of threads to keep in the pool,
     * even if they are idle
     * @param threadFactory the factory to use when the executor
     * creates a new thread
     * @return a newly created scheduled thread pool
     * @throws IllegalArgumentException if {@code corePoolSize < 0}
     * @throws NullPointerException if threadFactory is null
     */
    public static ScheduledExecutorService newScheduledThreadPool(
            int corePoolSize, ThreadFactory threadFactory) {
        return new ScheduledThreadPoolExecutor(corePoolSize, threadFactory);
    }

例項-1:Network Service

這裡寫了一個執行緒池的網路伺服器。

import java.io.BufferedInputStream;
import java.io.IOException;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.Scanner;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
/**
 * created by CaoYanfeng
 * 一個簡單的伺服器程式。採用執行緒池來併發處理Socket。
 * */
public class NetworkService {
    private final ServerSocket serverSocket;
    private final ExecutorService pool;
    public NetworkService(int port,int poolSize) throws IOException {
        // TODO Auto-generated constructor stub
        serverSocket=new ServerSocket(port);
        pool=Executors.newCachedThreadPool();
    }

    public static void main(String[] args) {
        // TODO Auto-generated method stub
        try {
            final NetworkService service=new NetworkService(8000, 3);
            service.run();
        } catch (IOException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }




    }
    //關閉執行緒池
    public void shutdownAndAwaitTermination(ExecutorService pool){
        pool.shutdown();
        pool.shutdownNow();
        try {
            if (pool.awaitTermination(60, TimeUnit.MILLISECONDS)) {
                pool.shutdownNow();
                if (!pool.awaitTermination(60, TimeUnit.MILLISECONDS))
                       System.err.println("Pool did not terminate");
            }
        } catch (InterruptedException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
            pool.shutdownNow();
            Thread.currentThread().interrupt();
        }
    }
    public void run(){
        System.out.println("啟動Service!");
        while (true) {
            try {
                pool.execute(new Handler(serverSocket.accept()));
            } catch (IOException e) {
                // TODO Auto-generated catch block
                pool.shutdown();
                e.printStackTrace();

            }

        }

    }
    //socket處理的控制代碼
    private class Handler implements Runnable{
        private static final int BUFFER_SIZE=1024;
        private final