Java多執行緒簡單實現取消和進度
Java中簡單實現多執行緒排程時的可取消和顯示進度
一個簡單的多執行緒排程實現,統一開始,為了使得所有執行緒統一開始,類似運動員在聽到發令槍時一起進行,使用了CountDownLatch進行控制。
CountDownLatch beginLatch = new CountDownLatch(1);
CountDownLatch endLatch = new CountDownLatch(personCount);
主執行緒建立執行緒池,並進行排程,由於要在最後進行彙總結果,使用了FutureTask
List<FutureTask<String>> futureTaskList = new ArrayList<FutureTask<String>>(); for (int i = 0; i < personCount; i++) { futureTaskList.add(new FutureTask<String>(new ExecuteCallable(beginLatch, endLatch,i))); } ExecutorService execService = Executors.newFixedThreadPool(threadCount); for (FutureTask<String> futureTask : futureTaskList) { execService.execute(futureTask); } beginLatch.countDown();
這樣所有執行緒就會統一開始執行,執行完成後,彙總結果,並關閉執行緒池。
endLatch.await();
System.out.println("--------------");
for (FutureTask<String> futureTask : futureTaskList) {
System.out.println(futureTask.get());
}
execService.shutdown();
對於每個執行緒的執行,都需要共享變數beginLatch和endLatch,各執行緒程式碼:
public class ExecuteCallable implements Callable<String> { private int id; private CountDownLatch beginLatch; private CountDownLatch endLatch; public ExecuteCallable(CountDownLatch beginLatch, CountDownLatch endLatch, Exchanger<Integer> exchanger, int id, ConcurrentTaskExecutor concurrentTaskExecutor) { this.beginLatch = beginLatch; this.endLatch = endLatch; this.id = id; } @Override public String call() throws Exception { beginLatch.await(); long millis = (long) (Math.random() * 10 * 1000); String result = String.format("Player :%s arrived, use %s millis", id, millis); Thread.sleep(millis); System.out.println(result); endLatch.countDown(); return result; } }
每個執行緒在開始等待發令槍(beginLatch),隨機等待一段時間(模擬執行時間),最後通知endLatch減一(執行完畢通知),並返回結果。
到這裡只是一個簡單的實現,我們並不能在主執行緒中實時瞭解各執行緒的執行情況,除非到了所有執行緒執行完畢(endLatch解除阻塞狀態)。這時候我們使用Exchanger機制來進行執行緒之間資料的交換,在每個執行緒執行完成後,將其完成的資料量傳給主執行緒進行重新整理(模擬進度條工作)。
主執行緒ConcurrentTaskExecutor類中:
Exchanger<Integer> exchanger = new Exchanger<Integer>(); beginLatch.countDown(); Integer totalResult = Integer.valueOf(0); for (int i = 0; i < personCount; i++) { Integer partialResult = exchanger.exchange(Integer.valueOf(0)); if(partialResult != 0){ totalResult = totalResult + partialResult; System.out.println(String.format("Progress: %s/%s", totalResult, personCount)); } } endLatch.await();
執行緒類ExecuteCallable建構函式加入exchanger
@Override
public String call() throws Exception {
beginLatch.await();
long millis = (long) (Math.random() * 10 * 1000);
String result = String.format("Player :%s arrived, use %s millis", id, millis);
Thread.sleep(millis);
System.out.println(result);
exchanger.exchange(1);
endLatch.countDown();
return result;
}
在執行完成進行資料交換,返回本次執行進度給主執行緒(當前預設設定成1,可修改),主執行緒在所有執行緒執行完成前,endLatch.await()必定是阻塞狀態的,這樣主執行緒就能實時拿到子執行緒執行完成的進度資料。
下面我們再加入一個可以取消的功能,加入系統隨機在某個時間點進行取消操作,那麼開始執行的執行緒是無法進行實時響應了,只能等待當前操作執行完畢;如果執行緒還沒有開始執行,那麼就取消其行為。
更改的ExecuteCallable執行方法如下:
@Override
public String call() throws Exception {
beginLatch.await();
if(concurrentTaskExecutor.isCanceled()){
endLatch.countDown();
exchanger.exchange(0);
return String.format("Player :%s is given up", id);
}
long millis = (long) (Math.random() * 10 * 1000);
String result = String.format("Player :%s arrived, use %s millis", id, millis);
Thread.sleep(millis);
System.out.println(result);
exchanger.exchange(1);
endLatch.countDown();
return result;
}
其中concurrentTaskExecutor類中加入一個型別為boolean的canceled變數,注意這個變數必須是volatile的,以便能夠線上程間共享資料,並且該變數的setter和getter方法也是原子性的。
我們的取消操作不能放在主執行緒中操作,需要額外建立一個執行緒,並且這個執行緒也不能通過執行緒池進行排程,新建的InterruptRunnable類:
public class InterruptRunnable implements Runnable {
private CountDownLatch beginLatch;
private ConcurrentTaskExecutor concurrentTaskExecutor;
public InterruptRunnable(ConcurrentTaskExecutor currConcurrentTaskExecutor, CountDownLatch beginLatch) {
this.beginLatch = beginLatch;
this.concurrentTaskExecutor = currConcurrentTaskExecutor;
}
@Override
public void run() {
try {
beginLatch.await();
long millis = (long) (Math.random() * 10 * 1000);
System.out.println(String.format("System need sleep %s millis", millis));
Thread.sleep(millis);
} catch (InterruptedException e) {
e.printStackTrace();
}
concurrentTaskExecutor.setCanceled(true);
}
}
更改後的ConcurrentTaskExecutor,在執行發令前,先讓該中斷執行緒啟動,以便一起等待開始命令:
new Thread(new InterruptRunnable(this, beginLatch)).start();
beginLatch.countDown();
最後執行結果(取決於中斷執行緒的隨機時間長短):
System need sleep 2920 millis
Player :4 arrived, use 917 millis
Progress: 1/10
Player :5 arrived, use 1076 millis
Progress: 2/10
Player :3 arrived, use 2718 millis
Progress: 3/10
Player :1 arrived, use 4013 millis
Progress: 4/10
Player :0 arrived, use 8541 millis
Progress: 5/10
Player :2 arrived, use 8570 millis
Progress: 6/10
Player :6 arrived, use 7261 millis
Progress: 7/10
Player :7 arrived, use 7015 millis
Progress: 8/10
--------------
Player :0 arrived, use 8541 millis
Player :1 arrived, use 4013 millis
Player :2 arrived, use 8570 millis
Player :3 arrived, use 2718 millis
Player :4 arrived, use 917 millis
Player :5 arrived, use 1076 millis
Player :6 arrived, use 7261 millis
Player :7 arrived, use 7015 millis
Player :8 is given up
Player :9 is given up
最後,附上最終的程式程式碼
ConcurrentTaskExecutor:
public class ConcurrentTaskExecutor {
private volatile boolean canceled = false;
public void executeTask() throws Exception {
int personCount = 10;
int threadCount = 5;
CountDownLatch beginLatch = new CountDownLatch(1);
CountDownLatch endLatch = new CountDownLatch(personCount);
Exchanger<Integer> exchanger = new Exchanger<Integer>();
List<FutureTask<String>> futureTaskList = new ArrayList<FutureTask<String>>();
for (int i = 0; i < personCount; i++) {
futureTaskList.add(new FutureTask<String>(new ExecuteCallable(beginLatch, endLatch, exchanger, i, this)));
}
ExecutorService execService = Executors.newFixedThreadPool(threadCount);
for (FutureTask<String> futureTask : futureTaskList) {
execService.execute(futureTask);
}
new Thread(new InterruptRunnable(this, beginLatch)).start();
beginLatch.countDown();
Integer totalResult = Integer.valueOf(0);
for (int i = 0; i < personCount; i++) {
Integer partialResult = exchanger.exchange(Integer.valueOf(0));
if(partialResult != 0){
totalResult = totalResult + partialResult;
System.out.println(String.format("Progress: %s/%s", totalResult, personCount));
}
}
endLatch.await();
System.out.println("--------------");
for (FutureTask<String> futureTask : futureTaskList) {
System.out.println(futureTask.get());
}
execService.shutdown();
}
public boolean isCanceled() {
return canceled;
}
public void setCanceled(boolean canceled){
this.canceled = canceled;
}
public static void main(String[] args) throws Exception {
ConcurrentTaskExecutor executor = new ConcurrentTaskExecutor();
executor.executeTask();
}
}
ExecuteCallable
public class ExecuteCallable implements Callable<String> {
private int id;
private CountDownLatch beginLatch;
private CountDownLatch endLatch;
private Exchanger<Integer> exchanger;
private ConcurrentTaskExecutor concurrentTaskExecutor;
public ExecuteCallable(CountDownLatch beginLatch, CountDownLatch endLatch,
Exchanger<Integer> exchanger, int id,
ConcurrentTaskExecutor concurrentTaskExecutor) {
this.beginLatch = beginLatch;
this.endLatch = endLatch;
this.exchanger = exchanger;
this.id = id;
this.concurrentTaskExecutor = concurrentTaskExecutor;
}
@Override
public String call() throws Exception {
beginLatch.await();
if(concurrentTaskExecutor.isCanceled()){
endLatch.countDown();
exchanger.exchange(0);
return String.format("Player :%s is given up", id);
}
long millis = (long) (Math.random() * 10 * 1000);
String result = String.format("Player :%s arrived, use %s millis", id, millis);
Thread.sleep(millis);
System.out.println(result);
exchanger.exchange(1);
endLatch.countDown();
return result;
}
}
InterruptRunnable
public class InterruptRunnable implements Runnable {
private CountDownLatch beginLatch;
private ConcurrentTaskExecutor concurrentTaskExecutor;
public InterruptRunnable(ConcurrentTaskExecutor currConcurrentTaskExecutor, CountDownLatch beginLatch) {
this.beginLatch = beginLatch;
this.concurrentTaskExecutor = currConcurrentTaskExecutor;
}
@Override
public void run() {
try {
beginLatch.await();
long millis = (long) (Math.random() * 10 * 1000);
System.out.println(String.format("System need sleep %s millis", millis));
Thread.sleep(millis);
} catch (InterruptedException e) {
e.printStackTrace();
}
concurrentTaskExecutor.setCanceled(true);
}
}