Java執行緒池原理與例項詳解
Wiki
採用new Thread的方式產生多執行緒,可能有以下一些問題:
執行緒的建立和銷燬開銷很大,尤其是有些執行緒的存在時間較短;
執行緒的建立和銷燬過程中伴隨著CPU線上程間的切換,開銷很大;
執行緒池的優點有:
減少了建立和銷燬執行緒的次數,工作執行緒可以一直重用。
在java中,每個執行緒都歸屬於某個ThreadGroup來管理。
ThreadGroup原始碼分析
首先看成員變數。
/**
* {@code ThreadGroup} is a means of organizing threads into a hierarchical structure.
* This class is obsolete. See <i>Effective Java</i> Item 73, "Avoid thread groups" for details.
* @see Thread
*/
public class ThreadGroup implements Thread.UncaughtExceptionHandler {
// Name of this ThreadGroup
// VM needs this field name for debugging.
private String name;
// Maximum priority for Threads inside this ThreadGroup
private int maxPriority = Thread.MAX_PRIORITY;
// The ThreadGroup to which this ThreadGroup belongs
// VM needs this field name for debugging.
final ThreadGroup parent;
/**
* Weak references to the threads in this group.
* Access is guarded by synchronizing on this field.
*/
private final List<WeakReference<Thread>> threadRefs = new ArrayList<WeakReference<Thread>>(5 );
/**
* View of the threads.
* Access is guarded by synchronizing on threadRefs.
*/
private final Iterable<Thread> threads = CollectionUtils.dereferenceIterable(threadRefs, true);
/**
* Thread groups. Access is guarded by synchronizing on this field.
*/
private final List<ThreadGroup> groups = new ArrayList<ThreadGroup>(3);
// Whether this ThreadGroup is a daemon ThreadGroup or not
private boolean isDaemon;
// Whether this ThreadGroup has already been destroyed or not
private boolean isDestroyed;
/* the VM uses these directly; do not rename */
**static final ThreadGroup systemThreadGroup = new ThreadGroup();
static final ThreadGroup mainThreadGroup = new ThreadGroup(systemThreadGroup, "main");**
……省略程式碼……
}
ThreadGroup中採用弱引用的連結串列方式,可以新增Thread(連結串列預設容量為5)。同時有一個ThreadGroup本身也可以通過連結串列方式組織其他的ThreadGroup(連結串列預設容量為3)。
另外,這裡定義了一個main型別的ThreadGroup。也就是說,在主執行緒的執行緒組的名字就是“main”。例如:
public class Main {
public static void main(String[] args) {
// TODO Auto-generated method stub
String name=Thread.currentThread().getThreadGroup().getName();
System.out.println(name);
}
}
下面來看ThreadGroup的3建構函式。
/**
* {@code ThreadGroup} is a means of organizing threads into a hierarchical structure.
* This class is obsolete. See <i>Effective Java</i> Item 73, "Avoid thread groups" for details.
* @see Thread
*/
public class ThreadGroup implements Thread.UncaughtExceptionHandler {
……省略程式碼……
/**
* Constructs a new {@code ThreadGroup} with the given name. The new {@code ThreadGroup}
* will be child of the {@code ThreadGroup} to which the calling thread belongs.
*
* @param name the name
* @see Thread#currentThread
*/
public ThreadGroup(String name) {
this(Thread.currentThread().getThreadGroup(), name);
}
/**
* Constructs a new {@code ThreadGroup} with the given name, as a child of the
* given {@code ThreadGroup}.
*
* @param parent the parent
* @param name the name
* @throws NullPointerException if {@code parent == null}
* @throws IllegalThreadStateException if {@code parent} has been
* destroyed already
*/
public ThreadGroup(ThreadGroup parent, String name) {
if (parent == null) {
throw new NullPointerException("parent == null");
}
this.name = name;
this.parent = parent;
if (parent != null) {
parent.add(this);
this.setMaxPriority(parent.getMaxPriority());
if (parent.isDaemon()) {
this.setDaemon(true);
}
}
}
/**
* Initialize the special "system" ThreadGroup. Was "main" in Harmony,
* but we have an additional group above that in Android.
*/
private ThreadGroup() {
this.name = "system";
this.parent = null;
}
……省略程式碼……
}
如果是完全無引數的建構函式,表明這個ThreadGroup沒有parent,否則parent就是生成ThreadGroup的當前執行緒的執行緒組。這樣,ThreadGroup就可以形成一棵樹。
繼續檢視ThreadGroup的關鍵方法。
/**
* {@code ThreadGroup} is a means of organizing threads into a hierarchical structure.
* This class is obsolete. See <i>Effective Java</i> Item 73, "Avoid thread groups" for details.
* @see Thread
*/
public class ThreadGroup implements Thread.UncaughtExceptionHandler {
……省略程式碼……
/**
* Returns the number of running {@code Thread}s which are children of this thread group,
* directly or indirectly.
*
* @return the number of children
*/
public int activeCount() {
int count = 0;
synchronized (threadRefs) {
for (Thread thread : threads) {
if (thread.isAlive()) {
count++;
}
}
}
synchronized (groups) {
for (ThreadGroup group : groups) {
count += group.activeCount();
}
}
return count;
}
/**
* Interrupts every {@code Thread} in this group and recursively in all its
* subgroups.
*
* @see Thread#interrupt
*/
public final void interrupt() {
synchronized (threadRefs) {
for (Thread thread : threads) {
thread.interrupt();
}
}
synchronized (groups) {
for (ThreadGroup group : groups) {
group.interrupt();
}
}
}
……省略程式碼……
}
由於一個ThreadGroup中的Thread是由連結串列方式組織的弱引用型別,activeCount()可以返回當前還有效的Thread。interrupt()函式讓一個ThreadGroup中所有的Thread都中斷。
Thread原始碼分析
這裡主要分析與ThreadGroup相關的內容。
首先看Thread的9個建構函式。
/**
* Constructs a new {@code Thread} with no {@code Runnable} object and a
* newly generated name. The new {@code Thread} will belong to the same
* {@code ThreadGroup} as the {@code Thread} calling this constructor.
*
* @see java.lang.ThreadGroup
* @see java.lang.Runnable
*/
public Thread() {
create(null, null, null, 0);
}
/**
* Constructs a new {@code Thread} with a {@code Runnable} object and a
* newly generated name. The new {@code Thread} will belong to the same
* {@code ThreadGroup} as the {@code Thread} calling this constructor.
*
* @param runnable
* a {@code Runnable} whose method <code>run</code> will be
* executed by the new {@code Thread}
*
* @see java.lang.ThreadGroup
* @see java.lang.Runnable
*/
public Thread(Runnable runnable) {
create(null, runnable, null, 0);
}
/**
* Constructs a new {@code Thread} with a {@code Runnable} object and name
* provided. The new {@code Thread} will belong to the same {@code
* ThreadGroup} as the {@code Thread} calling this constructor.
*
* @param runnable
* a {@code Runnable} whose method <code>run</code> will be
* executed by the new {@code Thread}
* @param threadName
* the name for the {@code Thread} being created
*
* @see java.lang.ThreadGroup
* @see java.lang.Runnable
*/
public Thread(Runnable runnable, String threadName) {
if (threadName == null) {
throw new NullPointerException("threadName == null");
}
create(null, runnable, threadName, 0);
}
/**
* Constructs a new {@code Thread} with no {@code Runnable} object and the
* name provided. The new {@code Thread} will belong to the same {@code
* ThreadGroup} as the {@code Thread} calling this constructor.
*
* @param threadName
* the name for the {@code Thread} being created
*
* @see java.lang.ThreadGroup
* @see java.lang.Runnable
*
*/
public Thread(String threadName) {
if (threadName == null) {
throw new NullPointerException("threadName == null");
}
create(null, null, threadName, 0);
}
/**
* Constructs a new {@code Thread} with a {@code Runnable} object and a
* newly generated name. The new {@code Thread} will belong to the {@code
* ThreadGroup} passed as parameter.
*
* @param group
* {@code ThreadGroup} to which the new {@code Thread} will
* belong
* @param runnable
* a {@code Runnable} whose method <code>run</code> will be
* executed by the new {@code Thread}
* @throws IllegalThreadStateException
* if <code>group.destroy()</code> has already been done
* @see java.lang.ThreadGroup
* @see java.lang.Runnable
*/
public Thread(ThreadGroup group, Runnable runnable) {
create(group, runnable, null, 0);
}
/**
* Constructs a new {@code Thread} with a {@code Runnable} object, the given
* name and belonging to the {@code ThreadGroup} passed as parameter.
*
* @param group
* ThreadGroup to which the new {@code Thread} will belong
* @param runnable
* a {@code Runnable} whose method <code>run</code> will be
* executed by the new {@code Thread}
* @param threadName
* the name for the {@code Thread} being created
* @throws IllegalThreadStateException
* if <code>group.destroy()</code> has already been done
* @see java.lang.ThreadGroup
* @see java.lang.Runnable
*/
public Thread(ThreadGroup group, Runnable runnable, String threadName) {
if (threadName == null) {
throw new NullPointerException("threadName == null");
}
create(group, runnable, threadName, 0);
}
/**
* Constructs a new {@code Thread} with no {@code Runnable} object, the
* given name and belonging to the {@code ThreadGroup} passed as parameter.
*
* @param group
* {@code ThreadGroup} to which the new {@code Thread} will belong
* @param threadName
* the name for the {@code Thread} being created
* @throws IllegalThreadStateException
* if <code>group.destroy()</code> has already been done
* @see java.lang.ThreadGroup
* @see java.lang.Runnable
*/
public Thread(ThreadGroup group, String threadName) {
if (threadName == null) {
throw new NullPointerException("threadName == null");
}
create(group, null, threadName, 0);
}
/**
* Constructs a new {@code Thread} with a {@code Runnable} object, the given
* name and belonging to the {@code ThreadGroup} passed as parameter.
*
* @param group
* {@code ThreadGroup} to which the new {@code Thread} will
* belong
* @param runnable
* a {@code Runnable} whose method <code>run</code> will be
* executed by the new {@code Thread}
* @param threadName
* the name for the {@code Thread} being created
* @param stackSize
* a stack size for the new {@code Thread}. This has a highly
* platform-dependent interpretation. It may even be ignored
* completely.
* @throws IllegalThreadStateException
* if <code>group.destroy()</code> has already been done
* @see java.lang.ThreadGroup
* @see java.lang.Runnable
*/
public Thread(ThreadGroup group, Runnable runnable, String threadName, long stackSize) {
if (threadName == null) {
throw new NullPointerException("threadName == null");
}
create(group, runnable, threadName, stackSize);
}
/**
* Package-scope method invoked by Dalvik VM to create "internal"
* threads or attach threads created externally.
*
* Don't call Thread.currentThread(), since there may not be such
* a thing (e.g. for Main).
*/
Thread(ThreadGroup group, String name, int priority, boolean daemon) {
synchronized (Thread.class) {
id = ++Thread.count;
}
if (name == null) {
this.name = "Thread-" + id;
} else {
this.name = name;
}
if (group == null) {
throw new InternalError("group == null");
}
this.group = group;
this.target = null;
this.stackSize = 0;
this.priority = priority;
this.daemon = daemon;
/* add ourselves to our ThreadGroup of choice */
this.group.addThread(this);
}
可以看出,create(group, runnable, threadName, stackSize)是關鍵方法。
/**
* Initializes a new, existing Thread object with a runnable object,
* the given name and belonging to the ThreadGroup passed as parameter.
* This is the method that the several public constructors delegate their
* work to.
*
* @param group ThreadGroup to which the new Thread will belong
* @param runnable a java.lang.Runnable whose method <code>run</code> will
* be executed by the new Thread
* @param threadName Name for the Thread being created
* @param stackSize Platform dependent stack size
* @throws IllegalThreadStateException if <code>group.destroy()</code> has
* already been done
* @see java.lang.ThreadGroup
* @see java.lang.Runnable
*/
private void create(ThreadGroup group, Runnable runnable, String threadName, long stackSize) {
Thread currentThread = Thread.currentThread();
if (group == null) {
group = currentThread.getThreadGroup();//當前執行緒的執行緒組
}
if (group.isDestroyed()) {
throw new IllegalThreadStateException("Group already destroyed");
}
this.group = group;
synchronized (Thread.class) {
id = ++Thread.count;
}
if (threadName == null) {
this.name = "Thread-" + id;
} else {
this.name = threadName;
}
this.target = runnable;
this.stackSize = stackSize;
this.priority = currentThread.getPriority();
this.contextClassLoader = currentThread.contextClassLoader;
// Transfer over InheritableThreadLocals.
if (currentThread.inheritableValues != null) {
inheritableValues = new ThreadLocal.Values(currentThread.inheritableValues);
}
// add ourselves to our ThreadGroup of choice
this.group.addThread(this);
}
可以看出,Thread建構函式中ThreadGroup為null,或者不包含ThreaGroup引數時,Thread都會被加入到當前執行緒的ThreadGroup中。
自定義ThreadPool
執行緒池可以理解為其中有多個固定的工作執行緒,然後不斷從同一個任務佇列中取得任務並執行。如果某執行緒取任務時沒有任務,該執行緒處於wait()狀態;當新增任務時,notiyfy()一次通知最近的處於wait()狀態的執行緒。這符合一個生產者,多個消費者的模型。
這裡參考《Java網路程式設計精解(孫衛琴)》P67的內容來編寫自定義的ThreadPool。
public class ThreadPool extends ThreadGroup{
private boolean isClosed=false;
private LinkedList<Runnable> workQueue;//任務佇列
private static int threadPoolID;
private int threadID;
public ThreadPool(int poolSize) {
// TODO Auto-generated constructor stub
super("ThreadPool"+(threadPoolID++));
setDaemon(true);
workQueue=new LinkedList<Runnable>();
for (int i = 0; i < poolSize; i++) {
new WorkThread().start();//啟用固定數量的執行緒
}
}
public synchronized void execute(Runnable task){
if (isClosed) {
throw new IllegalStateException();
}
if (task!=null) {
workQueue.add(task);
notify();//喚醒一個wait()的執行緒
}
}
/**
* 通過interrupt()來通知執行緒組中所有的執行緒執行中斷
* */
public synchronized void close(){
if (!isClosed) {
isClosed=true;
workQueue.clear();
interrupt();//中斷所有執行緒
}
}
public void join(){
synchronized (this) {
isClosed=true;//任務取得為null
notifyAll();//喚醒所有執行緒
}
Thread[] threads=new Thread[activeCount()];
int count=enumerate(threads);
for (int i = 0; i < count; i++) {
try {
threads[i].join();//
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
protected synchronized Runnable getTask() throws InterruptedException{
while (workQueue.size()==0) {
if (isClosed) return null;
wait();
}
return workQueue.removeFirst();
}
private class WorkThread extends Thread{
public WorkThread() {
// TODO Auto-generated constructor stub
super(ThreadPool.this,"WokrThread-"+(threadID++));
}
@Override
public void run() {
// TODO Auto-generated method stub
super.run();
while (!isInterrupted()) {
Runnable task=null;
try {
task=getTask();
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
if (task==null) {
return;
}
task.run();
}
}
}
}
這裡進行測試。
public class Main {
public static void main(String[] args) {
// TODO Auto-generated method stub
ThreadPool pool=new ThreadPool(3);
for (int i = 0; i < 10; i++) {
pool.execute(createTask(i));
}
pool.join();//執行完正在執行的任務再結束
// pool.close();//立即結束
}
private static Runnable createTask(final int taskID){
return new Runnable() {
@Override
public void run() {
// TODO Auto-generated method stub
System.out.println("Task"+taskID+":start");
try {
Thread.sleep(500);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
System.out.println("Task"+taskID+":end");
}
}
};
}
}
Java JDK執行緒池
Java執行緒池類
真正實現的執行緒池類有3個。
(1) ThreadPoolExecutor
採用幾個執行緒來執行提交的任務。其中一個建構函式如下:
ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue)
API引數如下:
corePoolSize :the number of threads to keep in the pool, even if they are idle, unless allowCoreThreadTimeOut is set
maximumPoolSize :the maximum number of threads to allow in the pool
keepAliveTime :when the number of threads is greater than the core, this is the maximum time that excess idle threads will wait for new tasks before terminating.
unit the time:unit for the keepAliveTime argument
workQueue :the queue to use for holding tasks before they are executed. This queue will hold only the Runnable tasks submitted by the execute method.
最常用的場景是通過Executors類的工廠方法來構造,包括newCachedThreadPool()、newFixedThreadPool(int) 、newSingleThreadExecutor() ,具體檢視下一節。
(2) ScheduledThreadPoolExecutor
任務可以在一定時間後執行,也可以設定週期性執行。建構函式比較簡單,只需要指定執行緒數量。主要方法在schedule相關的成員方法。例如:
public ScheduledFuture<?> scheduleWithFixedDelay (Runnable command, long initialDelay, long delay, TimeUnit unit)
API引數如下:
command:the task to execute
initialDelay :the time to delay first execution
delay :the delay between the termination of one execution and the commencement of the next
unit the time:unit of the initialDelay and delay parameters
返回值:
a ScheduledFuture representing pending completion of the task, and whose get() method will throw an exception upon cancellation
最常用的場景是通過Executors類的工廠方法來構造,包括newCachedThreadPool()、newFixedThreadPool(int) 、newSingleThreadExecutor() ,具體檢視下一節。
(3) ForkJoinPool
Fork/Join模式的執行緒池,用於支援平行計算。更多的資訊來源於JDK 7 中的 Fork/Join 模式。
Executors工廠方法初始化執行緒池
一般來講,不直接初始化執行緒池物件。Executors中幾個靜態方法提供了執行緒池的例項化方法。
(1)只有一個工作執行緒的執行緒池(ThreadPoolExecutor)
/**
* Creates an Executor that uses a single worker thread operating
* off an unbounded queue. (Note however that if this single
* thread terminates due to a failure during execution prior to
* shutdown, a new one will take its place if needed to execute
* subsequent tasks.) Tasks are guaranteed to execute
* sequentially, and no more than one task will be active at any
* given time. Unlike the otherwise equivalent
* {@code newFixedThreadPool(1)} the returned executor is
* guaranteed not to be reconfigurable to use additional threads.
*
* @return the newly created single-threaded Executor
*/
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
/**
* Creates an Executor that uses a single worker thread operating
* off an unbounded queue, and uses the provided ThreadFactory to
* create a new thread when needed. Unlike the otherwise
* equivalent {@code newFixedThreadPool(1, threadFactory)} the
* returned executor is guaranteed not to be reconfigurable to use
* additional threads.
*
* @param threadFactory the factory to use when creating new
* threads
*
* @return the newly created single-threaded Executor
* @throws NullPointerException if threadFactory is null
*/
public static ExecutorService newSingleThreadExecutor(ThreadFactory threadFactory) {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(),
threadFactory));
}
(2)固定執行緒數量(ThreadPoolExecutor)
/**
* Creates a thread pool that reuses a fixed number of threads
* operating off a shared unbounded queue. At any point, at most
* {@code nThreads} threads will be active processing tasks.
* If additional tasks are submitted when all threads are active,
* they will wait in the queue until a thread is available.
* If any thread terminates due to a failure during execution
* prior to shutdown, a new one will take its place if needed to
* execute subsequent tasks. The threads in the pool will exist
* until it is explicitly {@link ExecutorService#shutdown shutdown}.
*
* @param nThreads the number of threads in the pool
* @return the newly created thread pool
* @throws IllegalArgumentException if {@code nThreads <= 0}
*/
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
/**
* Creates a thread pool that reuses a fixed number of threads
* operating off a shared unbounded queue, using the provided
* ThreadFactory to create new threads when needed. At any point,
* at most {@code nThreads} threads will be active processing
* tasks. If additional tasks are submitted when all threads are
* active, they will wait in the queue until a thread is
* available. If any thread terminates due to a failure during
* execution prior to shutdown, a new one will take its place if
* needed to execute subsequent tasks. The threads in the pool will
* exist until it is explicitly {@link ExecutorService#shutdown
* shutdown}.
*
* @param nThreads the number of threads in the pool
* @param threadFactory the factory to use when creating new threads
* @return the newly created thread pool
* @throws NullPointerException if threadFactory is null
* @throws IllegalArgumentException if {@code nThreads <= 0}
*/
public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(),
threadFactory);
}
(3) 不固定數量的執行緒池(ThreadPoolExecutor)
如果執行緒不被使用,執行緒的存活時間為60s。
/**
* Creates a thread pool that creates new threads as needed, but
* will reuse previously constructed threads when they are
* available. These pools will typically improve the performance
* of programs that execute many short-lived asynchronous tasks.
* Calls to {@code execute} will reuse previously constructed
* threads if available. If no existing thread is available, a new
* thread will be created and added to the pool. Threads that have
* not been used for sixty seconds are terminated and removed from
* the cache. Thus, a pool that remains idle for long enough will
* not consume any resources. Note that pools with similar
* properties but different details (for example, timeout parameters)
* may be created using {@link ThreadPoolExecutor} constructors.
*
* @return the newly created thread pool
*/
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
/**
* Creates a thread pool that creates new threads as needed, but
* will reuse previously constructed threads when they are
* available, and uses the provided
* ThreadFactory to create new threads when needed.
* @param threadFactory the factory to use when creating new threads
* @return the newly created thread pool
* @throws NullPointerException if threadFactory is null
*/
public static ExecutorService newCachedThreadPool(ThreadFactory threadFactory) {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>(),
threadFactory);
}
(4)Fork/Join模式的執行緒池(ForkJoinPool)
/**
* Creates a thread pool that maintains enough threads to support
* the given parallelism level, and may use multiple queues to
* reduce contention. The parallelism level corresponds to the
* maximum number of threads actively engaged in, or available to
* engage in, task processing. The actual number of threads may
* grow and shrink dynamically. A work-stealing pool makes no
* guarantees about the order in which submitted tasks are
* executed.
*
* @param parallelism the targeted parallelism level
* @return the newly created thread pool
* @throws IllegalArgumentException if {@code parallelism <= 0}
* @since 1.8
* @hide
*/
public static ExecutorService newWorkStealingPool(int parallelism) {
return new ForkJoinPool
(parallelism,
ForkJoinPool.defaultForkJoinWorkerThreadFactory,
null, true);
}
/**
* Creates a work-stealing thread pool using all
* {@link Runtime#availableProcessors available processors}
* as its target parallelism level.
* @return the newly created thread pool
* @since 1.8
* @hide
*/
public static ExecutorService newWorkStealingPool() {
return new ForkJoinPool
(Runtime.getRuntime().availableProcessors(),
ForkJoinPool.defaultForkJoinWorkerThreadFactory,
null, true);
}
(5)單個執行緒的Scheduled型別執行緒池(ScheduledThreadPoolExecutor)
/**
* Creates a single-threaded executor that can schedule commands
* to run after a given delay, or to execute periodically.
* (Note however that if this single
* thread terminates due to a failure during execution prior to
* shutdown, a new one will take its place if needed to execute
* subsequent tasks.) Tasks are guaranteed to execute
* sequentially, and no more than one task will be active at any
* given time. Unlike the otherwise equivalent
* {@code newScheduledThreadPool(1)} the returned executor is
* guaranteed not to be reconfigurable to use additional threads.
* @return the newly created scheduled executor
*/
public static ScheduledExecutorService newSingleThreadScheduledExecutor() {
return new DelegatedScheduledExecutorService
(new ScheduledThreadPoolExecutor(1));
}
/**
* Creates a single-threaded executor that can schedule commands
* to run after a given delay, or to execute periodically. (Note
* however that if this single thread terminates due to a failure
* during execution prior to shutdown, a new one will take its
* place if needed to execute subsequent tasks.) Tasks are
* guaranteed to execute sequentially, and no more than one task
* will be active at any given time. Unlike the otherwise
* equivalent {@code newScheduledThreadPool(1, threadFactory)}
* the returned executor is guaranteed not to be reconfigurable to
* use additional threads.
* @param threadFactory the factory to use when creating new
* threads
* @return a newly created scheduled executor
* @throws NullPointerException if threadFactory is null
*/
public static ScheduledExecutorService newSingleThreadScheduledExecutor(ThreadFactory threadFactory) {
return new DelegatedScheduledExecutorService
(new ScheduledThreadPoolExecutor(1, threadFactory));
}
(6)多個執行緒的Scheduled型別執行緒池(ScheduledThreadPoolExecutor)
/**
* Creates a thread pool that can schedule commands to run after a
* given delay, or to execute periodically.
* @param corePoolSize the number of threads to keep in the pool,
* even if they are idle
* @return a newly created scheduled thread pool
* @throws IllegalArgumentException if {@code corePoolSize < 0}
*/
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
return new ScheduledThreadPoolExecutor(corePoolSize);
}
/**
* Creates a thread pool that can schedule commands to run after a
* given delay, or to execute periodically.
* @param corePoolSize the number of threads to keep in the pool,
* even if they are idle
* @param threadFactory the factory to use when the executor
* creates a new thread
* @return a newly created scheduled thread pool
* @throws IllegalArgumentException if {@code corePoolSize < 0}
* @throws NullPointerException if threadFactory is null
*/
public static ScheduledExecutorService newScheduledThreadPool(
int corePoolSize, ThreadFactory threadFactory) {
return new ScheduledThreadPoolExecutor(corePoolSize, threadFactory);
}
例項-1:Network Service
這裡寫了一個執行緒池的網路伺服器。
import java.io.BufferedInputStream;
import java.io.IOException;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.Scanner;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
/**
* created by CaoYanfeng
* 一個簡單的伺服器程式。採用執行緒池來併發處理Socket。
* */
public class NetworkService {
private final ServerSocket serverSocket;
private final ExecutorService pool;
public NetworkService(int port,int poolSize) throws IOException {
// TODO Auto-generated constructor stub
serverSocket=new ServerSocket(port);
pool=Executors.newCachedThreadPool();
}
public static void main(String[] args) {
// TODO Auto-generated method stub
try {
final NetworkService service=new NetworkService(8000, 3);
service.run();
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
//關閉執行緒池
public void shutdownAndAwaitTermination(ExecutorService pool){
pool.shutdown();
pool.shutdownNow();
try {
if (pool.awaitTermination(60, TimeUnit.MILLISECONDS)) {
pool.shutdownNow();
if (!pool.awaitTermination(60, TimeUnit.MILLISECONDS))
System.err.println("Pool did not terminate");
}
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
pool.shutdownNow();
Thread.currentThread().interrupt();
}
}
public void run(){
System.out.println("啟動Service!");
while (true) {
try {
pool.execute(new Handler(serverSocket.accept()));
} catch (IOException e) {
// TODO Auto-generated catch block
pool.shutdown();
e.printStackTrace();
}
}
}
//socket處理的控制代碼
private class Handler implements Runnable{
private static final int BUFFER_SIZE=1024;
private final