1. 程式人生 > >Java學習筆記—多線程(並發工具類,java.util.concurrent.atomic包)

Java學習筆記—多線程(並發工具類,java.util.concurrent.atomic包)

配對 初始 訪問 接收 iter nco .get 執行 string

在JDK的並發包裏提供了幾個非常有用的並發工具類。CountDownLatch、CyclicBarrier和Semaphore工具類提供了一種並發流程控制的手段,Exchanger工具類則提供了在線程間交換數據的一種手段。本章會配合一些應用場景來介紹如何使用這些工具類。

CountDownLatch

CountDownLatch允許一個或多個線程等待其他線程完成操作。
假如有這樣一個需求:我們需要解析一個Excel裏多個sheet的數據,此時可以考慮使用多線程,每個線程解析一個sheet裏的數據,等到所有的sheet都解析完之後,程序需要提示解析完成(或者匯總結果)。在這個需求中,要實現主線程等待所有線程完成sheet的解析操作,最簡單的做法是使用join()方法。

import java.util.Random;
import java.util.concurrent.atomic.AtomicInteger;

public class JoinCountDownLatchTest {
    private static Random sr=new Random(47); 
    private static AtomicInteger result=new AtomicInteger(0);
    private static int threadCount=10;
    private static class Parser implements
Runnable{ String name; public Parser(String name){ this.name=name; } @Override public void run() { int sum=0; int seed=Math.abs(sr.nextInt()) ; Random r=new Random(47); for(int i=0;i<100;i++){ sum
+=r.nextInt(seed); } result.addAndGet(sum); System.out.println(name+"線程的解析結果:"+sum); } } public static void main(String[] args) throws InterruptedException { Thread[] threads=new Thread[threadCount]; for(int i=0;i<threadCount;i++){ threads[i]=new Thread(new Parser("Parser-"+i)); } for(int i=0;i<threadCount;i++){ threads[i].start(); } for(int i=0;i<threadCount;i++){ threads[i].join(); } System.out.println("所有線程解析結束!"); System.out.println("所有線程的解析結果:"+result); } }

輸出:

Parser-1線程的解析結果:-2013585201
Parser-0線程的解析結果:1336321192
Parser-2線程的解析結果:908136818
Parser-5線程的解析結果:-1675827227
Parser-3線程的解析結果:1638121055
Parser-4線程的解析結果:1513365118
Parser-6線程的解析結果:489607354
Parser-8線程的解析結果:1513365118
Parser-7線程的解析結果:-1191966831
Parser-9線程的解析結果:-912399159
所有線程解析結束!
所有線程的解析結果:1605138237

join用於讓當前執行線程等待join線程執行結束。其實現原理是不停檢查join線程是否存活,如果join線程存活則讓當前線程永遠等待。

在JDK 1.5之後的並發包中提供的CountDownLatch也可以實現join的功能,並且比join的功能更多。

import java.util.Random;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicInteger;

public class CountDownLatchTest {
    private static Random sr=new Random(47); 
    private static AtomicInteger result=new AtomicInteger(0);
    private static int threadCount=10;//線程數量
    private static CountDownLatch countDown=new CountDownLatch(threadCount);//CountDownLatch
    private static class Parser implements Runnable{ 
        String name;
        public Parser(String name){
            this.name=name;
        }
        @Override
        public void run() {
            int sum=0;
            int seed=Math.abs(sr.nextInt()) ;
            Random r=new Random(47); 
            for(int i=0;i<100;i++){  
                sum+=r.nextInt(seed);
            }  
            result.addAndGet(sum);
            System.out.println(name+"線程的解析結果:"+sum);
            countDown.countDown();//註意這裏
        } 
    }
    public static void main(String[] args) throws InterruptedException {
        Thread[] threads=new Thread[threadCount];
        for(int i=0;i<threadCount;i++){
            threads[i]=new Thread(new Parser("Parser-"+i));
        } 
        for(int i=0;i<threadCount;i++){
            threads[i].start();
        } 
        /*
        for(int i=0;i<threadCount;i++){
            threads[i].join();
        }*/
        countDown.await();//將join改為使用CountDownLatch
        System.out.println("所有線程解析結束!");
        System.out.println("所有線程的解析結果:"+result);
    } 
}

輸出:

Parser-0線程的解析結果:1336321192
Parser-1線程的解析結果:-2013585201
Parser-2線程的解析結果:-1675827227
Parser-4線程的解析結果:1638121055
Parser-3線程的解析結果:908136818
Parser-5線程的解析結果:1513365118
Parser-7線程的解析結果:489607354
Parser-6線程的解析結果:1513365118
Parser-8線程的解析結果:-1191966831
Parser-9線程的解析結果:-912399159
所有線程解析結束!
所有線程的解析結果:1605138237

CountDownLatch的構造函數接收一個int類型的參數作為計數器,如果你想等待N個點完成,這裏就傳入N。

當我們調用CountDownLatch的countDown方法時,N就會減1,CountDownLatch的await方法會阻塞當前線程,直到N變成零。由於countDown方法可以用在任何地方,所以這裏說的N個點,可以是N個線程,也可以是1個線程裏的N個執行步驟。用在多個線程時,只需要把這個CountDownLatch的引用傳遞到線程裏即可。
如果有某個解析sheet的線程處理得比較慢,我們不可能讓主線程一直等待,所以可以使用另外一個帶指定時間的await方法——await(long time,TimeUnit unit),這個方法等待特定時間後,就會不再阻塞當前線程。join也有類似的方法。
註意:計數器必須大於等於0,只是等於0時候,計數器就是零,調用await方法時不會阻塞當前線程。CountDownLatch不可能重新初始化或者修改CountDownLatch對象的內部計數器的值。一個線程調用countDown方法happen-before,另外一個線程調用await方法。

CyclicBarrier

CyclicBarrier的字面意思是可循環使用(Cyclic)的屏障(Barrier)。它要做的事情是,讓一組線程到達一個屏障(也可以叫同步點)時被阻塞,直到最後一個線程到達屏障時,屏障才會
開門,所有被屏障攔截的線程才會繼續運行。當所有等待線程都被釋放以後,CyclicBarrier可以被重用。CyclicBarrier類位於java.util.concurrent包下,CyclicBarrier提供2個構造器:

public CyclicBarrier(int parties, Runnable barrierAction) {
}
 
public CyclicBarrier(int parties) {
}

參數parties指讓多少個線程或者任務等待至barrier狀態;參數barrierAction為當這些線程都達到barrier狀態時會執行的內容

public class Test {
    public static void main(String[] args) {
        int N = 4;
        CyclicBarrier barrier  = new CyclicBarrier(N);
        for(int i=0;i<N;i++)
            new Writer(barrier).start();
    }
    static class Writer extends Thread{
        private CyclicBarrier cyclicBarrier;
        public Writer(CyclicBarrier cyclicBarrier) {
            this.cyclicBarrier = cyclicBarrier;
        }
 
        @Override
        public void run() {
            System.out.println("線程"+Thread.currentThread().getName()+"正在寫入數據...");
            try {
                Thread.sleep(5000);      //以睡眠來模擬寫入數據操作
                System.out.println("線程"+Thread.currentThread().getName()+"寫入數據完畢,等待其他線程寫入完畢");
                cyclicBarrier.await();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }catch(BrokenBarrierException e){
                e.printStackTrace();
            }
            System.out.println("所有線程寫入完畢,繼續處理其他任務...");
        }
    }
}

執行結果:

線程Thread-0正在寫入數據...
線程Thread-3正在寫入數據...
線程Thread-2正在寫入數據...
線程Thread-1正在寫入數據...
線程Thread-2寫入數據完畢,等待其他線程寫入完畢
線程Thread-0寫入數據完畢,等待其他線程寫入完畢
線程Thread-3寫入數據完畢,等待其他線程寫入完畢
線程Thread-1寫入數據完畢,等待其他線程寫入完畢
所有線程寫入完畢,繼續處理其他任務...
所有線程寫入完畢,繼續處理其他任務...
所有線程寫入完畢,繼續處理其他任務...
所有線程寫入完畢,繼續處理其他任務...

CyclicBarrier和CountDownLatch的區別

CountDownLatch的計數器只能使用一次,而CyclicBarrier的計數器可以使用reset()方法重置。所以CyclicBarrier能處理更為復雜的業務場景。例如,如果計算發生錯誤,可以重置計數器,並讓線程重新執行一次。
CyclicBarrier還提供其他有用的方法,比如getNumberWaiting方法可以獲得Cyclic-Barrier阻塞的線程數量。isBroken()方法用來了解阻塞的線程是否被中斷。

Semaphore

Semaphore(信號量)是用來控制同時訪問特定資源的線程數量,它通過協調各個線程,以保證合理的使用公共資源。Semaphore可以控同時訪問的線程個數,通過acquire()獲取一個許可,如果沒有就等待,而 release() 釋放一個許可。

假若一個工廠有5臺機器,但是有8個工人,一臺機器同時只能被一個工人使用,只有使用完了,其他工人才能繼續使用。那麽我們就可以通過Semaphore來實現:

public class Test {
    public static void main(String[] args) {
        int N = 8;            //工人數
        Semaphore semaphore = new Semaphore(5); //機器數目
        for(int i=0;i<N;i++)
            new Worker(i,semaphore).start();
    }
     
    static class Worker extends Thread{
        private int num;
        private Semaphore semaphore;
        public Worker(int num,Semaphore semaphore){
            this.num = num;
            this.semaphore = semaphore;
        }
         
        @Override
        public void run() {
            try {
                semaphore.acquire();
                System.out.println("工人"+this.num+"占用一個機器在生產...");
                Thread.sleep(2000);
                System.out.println("工人"+this.num+"釋放出機器");
                semaphore.release();           
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}

執行結果:

工人0占用一個機器在生產...
工人1占用一個機器在生產...
工人2占用一個機器在生產...
工人4占用一個機器在生產...
工人5占用一個機器在生產...
工人0釋放出機器
工人2釋放出機器
工人3占用一個機器在生產...
工人7占用一個機器在生產...
工人4釋放出機器
工人5釋放出機器
工人1釋放出機器
工人6占用一個機器在生產...
工人3釋放出機器
工人7釋放出機器
工人6釋放出機器

Exchanger

Exchanger(交換者)是一個用於線程間協作的工具類。Exchanger用於進行線程間的數據交換。它提供一個同步點,在這個同步點,兩個線程可以交換彼此的數據。這兩個線程通過exchange方法交換數據,如果第一個線程先執行exchange()方法,它會一直等待第二個線程也執行exchange方法,當兩個線程都到達同步點時,這兩個線程就可以交換數據,將本線程生產出來的數據傳遞給對方。
下面來看一下Exchanger的應用場景。
1、Exchanger可以用於遺傳算法,遺傳算法裏需要選出兩個人作為交配對象,這時候會交換兩人的數據,並使用交叉規則得出2個交配結果。

2、Exchanger也可以用於校對工作,比如我們需要將紙制銀行流水通過人工的方式錄入成電子銀行流水,為了避免錯誤,采用AB崗兩人進行錄入,錄入到Excel之後,系統需要加載這兩個Excel,並對兩個Excel數據進行校對,看看是否錄入一致.

import java.util.concurrent.Exchanger;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class ExchangerTest {

    private static final Exchanger<String> exgr = new Exchanger<String>();
    private static ExecutorService threadPool = Executors.newFixedThreadPool(2);

    public static void main(String[] args) {
        threadPool.execute(new Runnable() {
            @Override
            public void run() {
                try {
                    String A = "銀行流水100";// A錄入銀行流水數據
                    String B=exgr.exchange(A);
                    System.out.println("A的視角:A和B數據是否一致:" + A.equals(B) + 
",A錄入的是:" + A + ",B錄入是:" + B);
                } catch (InterruptedException e) {
                }
            }
        });
        threadPool.execute(new Runnable() {
            @Override
            public void run() {
                try {
                    String B = "銀行流水200";// B錄入銀行流水數據
                    String A = exgr.exchange(B);
                    System.out.println("B的視角:A和B數據是否一致:" + A.equals(B) + 
",A錄入的是:" + A + ",B錄入是:" + B);
                } catch (InterruptedException e) {
                }
            }
        });
        threadPool.shutdown();
    }
}

輸出:

B的視角:A和B數據是否一致:false,A錄入的是:銀行流水100,B錄入是:銀行流水200
A的視角:A和B數據是否一致:false,A錄入的是:銀行流水100,B錄入是:銀行流水200

如果兩個線程有一個沒有執行exchange()方法,則會一直等待,如果擔心有特殊情況發生,避免一直等待,可以使用exchange(V x,longtimeout,TimeUnit unit)設置最大等待時長。

參考:

Java並發工具類詳解

Java並發編程-原子類及並發工具類

Java學習筆記—多線程(並發工具類,java.util.concurrent.atomic包)