1. 程式人生 > >【Java】多執行緒系列(三)之阻塞執行緒的多種方法

【Java】多執行緒系列(三)之阻塞執行緒的多種方法

前言:

在某些應用場景下,我們可能需要等待某個執行緒執行完畢,然後才能進行後續的操作。也就是說,主執行緒需要等待子執行緒都執行完畢才能執行後續的任務。
例如,當你在計算利用多執行緒執行幾個比較耗時的任務的時候,主執行緒需要利用這幾個執行緒計算的結果,才能進行後續的操作。那麼我們其實就需要等待所有執行緒執行完畢。

這裡,介紹幾個常用的方法

執行緒執行單次的場景下

1,利用Thread類的join()方法

package concurrent;

import java.util.ArrayList;
import java.util.List;

public class JoinTest {
    public
static void main(String[] args) { List<Thread> list=new ArrayList<>(); for(int i=0;i<5;i++){ Thread t=new Thread(){ @Override public void run() { System.out.println(Thread.currentThread().getName()+" is running!"
); try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } } }; list.add(t); t.start(); } for(int i=0
;i<5;i++){ try { list.get(i).join(); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } } System.out.println(Thread.currentThread()+" is running!"); } }

程式執行結果如下:

Thread-2 is running!
Thread-0 is running!
Thread-1 is running!
Thread-3 is running!
Thread-4 is running!
Thread[main,5,main] is running!

注:

對於上面的join()進行執行緒阻塞的時候,需要注意一下另外一種情況: 上面的5個執行緒,每次執行緒start()之後立即呼叫join()
這樣會產生類似於單執行緒模式,即5個執行緒序列化形式組織,順序執行。

2,利用ExecutorService的invokeAll()方法

package concurrent;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class ExecutorServiceInvokeAllTest {
    public static void main(String[] args) {
        ExecutorService exec=Executors.newFixedThreadPool(5);
        List<Callable<String>> list=new ArrayList<>();
        for(int i=0;i<5;i++){
            callableTest cl=new callableTest();
            list.add(cl);
        }
        try {
            exec.invokeAll(list);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println(Thread.currentThread()+" is running!");
        exec.shutdown();
    }
}
class callableTest implements Callable<String>{

    @Override
    public String call() throws Exception {
        System.out.println(Thread.currentThread()+" is running!");
        Thread.sleep(1000);//模擬執行緒執行的耗時過程
        return null;
    }

}

程式執行結果:

Thread[pool-1-thread-1,5,main] is running!
Thread[pool-1-thread-3,5,main] is running!
Thread[pool-1-thread-2,5,main] is running!
Thread[pool-1-thread-5,5,main] is running!
Thread[pool-1-thread-4,5,main] is running!
Thread[main,5,main] is running!

invokeAll是一個阻塞方法,會等待任務列表中的所有任務都執行完成。等待所有的任務完成之後,就會返回一個Future的列表,裡面記錄了每個執行緒執行之後的結果。一旦ExecutorService.invokeAll()方法產生了異常,執行緒池中還沒有完成的任務會被取消執行。

3,利用Future的get()方法

package concurrent;

import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;

public class FutureTest {
    public static void main(String[] args) {
        ExecutorService exc=Executors.newFixedThreadPool(5);
        Set<Future<Integer>> set = new HashSet<Future<Integer>>();
        for(int i=0;i<5;i++){
            callTest cl=new callTest();
            Future<Integer> f=exc.submit(cl);
            set.add(f);
        }
        int num=0;
        for(Future<Integer> f:set){
            try {
                num+=f.get();
            } catch (InterruptedException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            } catch (ExecutionException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }
        }
        System.out.println(Thread.currentThread()+" is running!");
        System.out.println("The value num="+num);
        exc.shutdown();
    }
}
class callTest implements Callable<Integer>{

    @Override
    public Integer call() throws Exception {
        System.out.println(Thread.currentThread()+" is running!");
        Thread.sleep(1000);
        return Integer.valueOf(1);
    }
}

輸出結果如下:

Thread[pool-1-thread-1,5,main] is running!
Thread[pool-1-thread-3,5,main] is running!
Thread[pool-1-thread-2,5,main] is running!
Thread[pool-1-thread-4,5,main] is running!
Thread[pool-1-thread-5,5,main] is running!
Thread[main,5,main] is running!
The value num=5

Runnable是執行工作的獨立任務,不返回任何結果。如果希望在任務完成之後,能夠返回一個值,那麼可以實現Callable介面而不是Runnable介面。在Java SE5中引入的Callable是一種具有型別引數的泛型,它的型別引數表示的是從方法call()中返回的值,並且必須使用ExecutorService.submit()方法呼叫它。
submit()方法會產生Future物件,它用Callable返回的結果的特定型別進行了引數化。你可以使用isDone()方法來檢視Future是否已經完成。當任務完成時,就會返回結果,可以呼叫get()方法來獲取返回的結果。當然,也可以不用isDone()方法進行檢查就可以直接呼叫get(),在這種情況下,get()方法就會被阻塞,直至返回結果就緒。

4,利用ExecutorService的awaitTermination()方法

package concurrent;

import java.util.Random;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

public class ESawaitTerminationTest {
    public static void main(String[] args) {
        ExecutorService exc=Executors.newFixedThreadPool(5);
        for(int i=0;i<5;i++){
            Thread thread=new Thread(){
                @Override
                public void run() {
                    System.out.println(Thread.currentThread()+" is running!");
                    try {
                        int temp=new Random().nextInt(5000);
                        System.out.println("sleep duration: "+temp+"ms");
                        sleep(temp);
                    } catch (InterruptedException e) {
                        // TODO Auto-generated catch block
                        e.printStackTrace();
                    }
                }
            };
            exc.execute(thread);
        }
        exc.shutdown();
        try {
            while(!exc.awaitTermination(1, TimeUnit.SECONDS)){
                System.out.println("Executors is not terminated!");
            }
            System.out.println("Executors is terminated!");
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

輸出結果如下:

Thread[pool-1-thread-2,5,main] is running!
Thread[pool-1-thread-1,5,main] is running!
Thread[pool-1-thread-3,5,main] is running!
sleep duration: 80ms
sleep duration: 468ms
sleep duration: 1200ms
Thread[pool-1-thread-5,5,main] is running!
sleep duration: 195ms
Thread[pool-1-thread-4,5,main] is running!
sleep duration: 2209ms
Executors is not terminated!
Executors is not terminated!
Executors is terminated!

上面的awaitTermination()方法也可以使用isTerminated()方法進行判斷,同時在while()迴圈體內加一個Thread.sleep()方法,即每隔一段時間檢查一遍執行緒池是否終止。這兩個方法是同樣的效果!

5,利用CountDownLatch

CountDownLatch類可以生成指定數目的鎖,每把鎖對應一個執行緒,每個執行緒執行完畢之後,就可以相應的減少一把鎖。當鎖的數目減為0之後,相當於所有的鎖都已經”歸還“,所有執行緒執行完畢。

package concurrent;

import java.util.Random;
import java.util.concurrent.CountDownLatch;

public class CountDownLatchTest {
    public static void main(String[] args) {
        CountDownLatch cl=new CountDownLatch(5);
        for(int i=0;i<5;i++){
            Thread t=new Thread(){
                public void run() {
                    System.out.println(Thread.currentThread()+" is running!");
                    int temp=new Random().nextInt(5000);
                    System.out.println("sleep duration: "+temp+"ms");
                    try {
                        sleep(temp);
                        cl.countDown();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                };
            };
            t.start();
        }
        try {
            cl.await();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println(Thread.currentThread()+" is running!");
    }
}

輸出結果如下:

Thread[Thread-0,5,main] is running!
Thread[Thread-3,5,main] is running!
Thread[Thread-2,5,main] is running!
Thread[Thread-1,5,main] is running!
sleep duration: 1105ms
sleep duration: 1169ms
sleep duration: 4168ms
sleep duration: 642ms
Thread[Thread-4,5,main] is running!
sleep duration: 1949ms
Thread[main,5,main] is running!

迴圈業務場景下的使用

很多時候,一些執行緒為了實現定時更新的功能,就會每隔一段時間,迴圈執行某個任務。

例如:

web開發中,主執行緒M,子執行緒A、B。M需要等待子執行緒A、B至少各自完成一次任務執行才能進行後面的任務執行。之後子執行緒A、B就每隔一段時間迴圈執行任務去更新資料。

那麼這種場景下如何實現需求呢?

參見筆者之前的多執行緒系列中的一篇文章: