實戰Java高併發程式設計(3.2 執行緒池)
阿新 • • 發佈:2018-11-21
1.Executor
jdk提供了一套Executor框架,本質上是一個執行緒池。
- newFixedThreadPool()方法:該方法返回一個固定數量的執行緒池。該執行緒池中的執行緒數量始終不變,當有一個新任務提交時,執行緒池中若有空閒執行緒,則立即執行,若沒有,則任務會暫存在一個任務佇列中,待有執行緒空閒時,便處理在任務佇列中的任務。
- newSingleThreadExecutor():該方法返回一個只有一個執行緒的執行緒池。若有多餘一個任務被提交到該執行緒池,任務會被儲存到一個任務佇列中,待執行緒空閒,按先入先出的順序執行佇列中的任務。
- newCachedThreadPool():該方法返回一個可根據實際情況調整執行緒數量的執行緒池。執行緒池的執行緒數量不確定,但若有空閒執行緒可以複用,則會優先使用可複用的執行緒。若所有執行緒均在工作,又有新的任務提交,則會建立新的執行緒處理任務。所有執行緒在使用完畢後,將返回執行緒池進行復用。
- newSingleThreadScheduledExecutor():該方法返回一個ScheduledExecutorService物件,執行緒池大小為1。ScheduledExecutorService介面在ExecutorService介面之上擴充套件了在給定時間執行某任務的功能,如在某個固定的延時之後執行,或者週期性的執行某個任務。
- newScheduledThreadPool():該方法也返回一個ScheduledExecutorService物件,但該執行緒池可以指定執行緒數量。
public class Test { public static class MyTask implements Runnable { @Override public void run() { System.out.println(System.currentTimeMillis() + ":Thread ID:" + Thread.currentThread().getId()); try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } } } public static void main(String[] args) { MyTask myTask = new MyTask(); ExecutorService es = Executors.newFixedThreadPool(5); for (int i = 0; i < 5; i++) { es.submit(myTask); } } }
2.拒絕策略
當任務數量超過系統實際承載能力時,用拒絕策略進行補救。
- AbortPolicy():該策略會直接丟擲異常,阻止系統正常工作。
- CallerRunsPolicy():只要執行緒池未關閉,該策略直接在呼叫者的執行緒中,運行當前被丟棄的任務。
- DiscardOldestPolicy():該策略將丟棄最老的一個請求,也就是即將被執行的任務,並嘗試在次提交當前任務.
- DiscardPolicy():該策略默默地丟棄無法處理的任務,不予任何處理。
public class Test { public static class MyTask implements Runnable { @Override public void run() { System.out.println(System.currentTimeMillis() + ":Thread ID:" + Thread.currentThread().getId()); try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } } } public static void main(String[] args) throws InterruptedException{ MyTask myTask = new MyTask(); ExecutorService es = new ThreadPoolExecutor(5, 5, 0L, TimeUnit.MICROSECONDS, new LinkedBlockingDeque<Runnable>(10), Executors.defaultThreadFactory(), new RejectedExecutionHandler() { @Override public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) { System.out.println(r.toString()+" is discard"); } }); for (int i = 0; i < Integer.MAX_VALUE; i++) { es.submit(myTask); Thread.sleep(10); } } }
在執行幾個任務後,拒絕策略就開始生效了。在實際應用中,可以利用這一點來分析系統的負載和任務丟失情況。
3.擴充套件執行緒池
ThreadPoolExecutor是一個可擴充套件的執行緒池。
public class Test {
public static class MyTask implements Runnable {
public String name;
public MyTask(String name) {
this.name = name;
}
@Override
public void run() {
System.out.println("正在執行" + ":Thread ID:" + Thread.currentThread().getId() + ",Task name = " + name);
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
public static void main(String[] args) throws InterruptedException {
ExecutorService es = new ThreadPoolExecutor(5, 5, 0L,
TimeUnit.MICROSECONDS, new LinkedBlockingDeque<Runnable>()) {
@Override
protected void beforeExecute(Thread t, Runnable r) {
System.out.println("準備執行:" + ((MyTask) r).name);
}
@Override
protected void afterExecute(Runnable r,Throwable t){
System.out.println("執行完成:"+((MyTask) r).name);
}
@Override
protected void terminated(){
System.out.println("執行緒退出");
}
};
for (int i = 0; i < 5; i++) {
MyTask myTask = new MyTask("TASK-GEYM-"+i);
es.execute(myTask);
Thread.sleep(10);
}
es.shutdown();
}
}
所有任務的前中後時間點以及任務名字都可以捕捉。便於應用程式除錯。
4.線上程池中捕獲異常
改造ThreadPoolExecutor方法,使其能丟擲異常,以便於查詢錯誤的地方。
public class Test extends ThreadPoolExecutor {
public Test(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
}
@Override
public void execute(Runnable task) {
super.execute(wrap(task, clientTrace(), Thread.currentThread().getName()));
}
@Override
public Future<?> submit(Runnable task) {
return super.submit(wrap(task, clientTrace(), Thread.currentThread().getName()));
}
private Exception clientTrace() {
return new Exception("Client stack trace");
}
private Runnable wrap(final Runnable task, final Exception clientStack, String clientThreadName) {
return new Runnable() {
@Override
public void run() {
try {
task.run();
} catch (Exception e) {
e.printStackTrace();
throw e;
}
}
};
}
public static void main(String[] args) {
ThreadPoolExecutor pools = new Test(0, Integer.MAX_VALUE, 0L, TimeUnit.SECONDS,
new SynchronousQueue<>());
for (int i = 0; i < 5; i++) {
pools.execute(new DivTask(100,i));
}
}
private static class DivTask implements Runnable {
int a, b;
public DivTask(int a, int b) {
this.a = a;
this.b = b;
}
@Override
public void run() {
double re = a/b;
System.out.println(re);
}
}
}
5.Fork/Join框架
假如要處理1000個數據,但不具備同時處理的能力,先處理10個,在分階段處理100次,最後將結果合併。某一執行緒處理完後可以幫助其他未完成的執行緒處理。
public class Test extends RecursiveTask<Long> {
private static final int THRESHOLD = 10000;
private long start;
private long end;
public Test(long start, long end) {
this.start = start;
this.end = end;
}
@Override
protected Long compute() {
long sum = 0;
boolean canConpute = (end - start) < THRESHOLD;
if (canConpute) {
for (long i = start; i < end; i++) {
sum += i;
}
}else {
// 分成100個任務
long step = (start+end)/100;
ArrayList<Test> subTasks = new ArrayList<>();
long pos = start;
for (int i = 0; i < 100; i++) {
long lastOne = pos+step;
if (lastOne>end){
lastOne = end;
}
Test subTask = new Test(pos,lastOne);
pos+=step+1;
subTasks.add(subTask);
subTask.fork();
}
for (Test t : subTasks){
sum+=t.join();
}
}
return sum;
}
public static void main(String[] args) {
ForkJoinPool forkJoinPool = new ForkJoinPool();
Test test = new Test(0,200000L);
ForkJoinTask<Long> result = forkJoinPool.submit(test);
try {
long res = result.get();
System.out.println("sum = "+res);
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
}
}