1. 程式人生 > >java中併發常用工具類

java中併發常用工具類

前言:在你無聊的時候,想想比你優秀還努力的人,也許就不覺的無聊了

今天下午沒事幹把買的java併發程式設計藝術這本書拿出來看了看,看了下也記不住,還是好記性不如爛筆頭,今天講四個併發中可能會用到的工具類,分別是:

CountDownLatch

CyclicBarrier

Semaphore

Exchanger

CountDownLatch

countDownLatch允許一個或多個執行緒等待其他執行緒完成操作.比如說有三個執行緒分別是老二,老大,老爸,這三個執行緒必須是老二吃好了,老大吃,老大吃完了,老爸吃,在20年錢,農村家裡窮,一般有好吃的都是先留個最小的,然後才給其他兄弟姐妹吃,都不吃了,才由我們的父母吃,所以父母都不容易了,為了兒女,雖然是多個執行緒但是確實線性的,


我同事面試別人就問過好幾次這個問題,在java或者android中常用的有2個方式實現

第一種方式:

使用jdk中Thread自帶的函式join實現,join()用於當前執行執行緒等待join執行緒執行結束,其實實現原理是不停檢查join執行緒是否存活,如果join執行緒存活則讓當前執行緒永遠等待,其中,wait(0)表示永遠等待下去,join在jdk中的是實現方式如下:

/**
     * Waits for this thread to die.
     *
     * <p> An invocation of this method behaves in exactly the same
     * way as the invocation
     *
     * <blockquote>
     * {@linkplain #join(long) join}{@code (0)}
     * </blockquote>
     *
     * @throws  InterruptedException
     *          if any thread has interrupted the current thread. The
     *          <i>interrupted status</i> of the current thread is
     *          cleared when this exception is thrown.
     */
    public final void join() throws InterruptedException {
        join(0);
    }
直到join執行緒中止後,執行緒的this.notifyAll()方法會被呼叫,呼叫notifyAll()方法是在JVM裡實現的,現在把上面的例子用程式碼實現下:
package com.thread;
public class RunnableJob {
   public static void main(String[] args) throws InterruptedException {
	   Worker runnableJob = new Worker();
       Thread t1 = new Thread(runnableJob, "老二");
       Thread t2 = new Thread(runnableJob, "老大");
       Thread t3 = new Thread(runnableJob, "老爸");
       t1.start();  
       t1.join();  
       t2.start();  
       t2.join();  
       t3.start();  
       t3.join();  
       System.out.println("主執行緒執行完畢----");  
  }
}
class Worker implements Runnable{
	public void run() {
        Thread thread = Thread.currentThread();
        try {
            Thread.sleep(5000);
            System.out.println(thread.getName()+"吃完了");
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}
log:



CountDownLatch是一個同步計數器,現在有個需求,在古代是男人先吃飯,特別是有客人在的時候,女人不能上桌,等待男人吃完,女人才能上桌吃飯,

package com.thread;
import java.util.concurrent.CountDownLatch;
public class CountDownLatchTest {
	public static void main(String[] args) {
		 final CountDownLatch countDownLatch = new CountDownLatch(3); 
		 new Thread("老二"){
             public void run() {
                 try {
                    Thread.sleep(3000);
                    System.out.println(Thread.currentThread().getName()+"吃完了");
                    countDownLatch.countDown();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
             };
         }.start();
         new Thread("老大"){
             public void run() {
                 try {
                    Thread.sleep(3000);
                    System.out.println(Thread.currentThread().getName()+"吃完了");
                    countDownLatch.countDown();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
             };
         }.start();
         new Thread("老爸"){
             public void run() {
                 try {
                    Thread.sleep(3000);
                    System.out.println(Thread.currentThread().getName()+"吃完了");
                    countDownLatch.countDown();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
             };
         }.start();
         System.out.println("等待三個男人吃完,女人才能上桌吃飯,等....");
         try {
			countDownLatch.await();
		} catch (InterruptedException e) {
			e.printStackTrace();
		}
         System.out.println("女人們可以上桌吃飯了");
	}
}
執行結果:


你會發現CountDownLatch和join還是有區別:

相同點:都能等待一個或者多個執行緒執行完成操作,比如等待三個執行緒執行完畢後,第四個執行緒才能執行

不同點:join能讓執行緒按我們預想的的順序執行,比如執行緒1執行完了,執行緒2才能執行,執行緒2執行完,執行緒3才能執行,但是CountDownLatch就做不到.

如果檢視CountDownLatch原始碼,發現這原始碼還是很少的,沒幾行程式碼,方法也就幾個:CountDownLatch的建構函式接受一個int型別的引數作為計時器,如果你想等待N個點完成,就在這裡傳入N

當我們呼叫CountDownLatch的countDown方法時,N就會減1,CountDownLatch的await方法會阻塞當前執行緒,直到N變為零(也就是執行緒都執行完了),由於countDown方法可以用在任何地方,所以這裡說的N個點,可以是N個執行緒,也可以是1個執行緒裡的N個執行步驟。用在多執行緒時,只需把這個CountDownLatch的引用傳遞到執行緒中即可,

當然阻塞還有一個過載的方法就是await(long timeout, TimeUnit unit) 阻塞的時間,超過這個時間就不等待了.

注意點:

計時器必須大於等於0,只是等於0的時候,計時器就是0,呼叫await()方法時,不會阻塞當前執行緒,CountDownLatch不可能重新初始化或者修改CountDownLatch物件的內部 計數器的值,不可逆性.

Semaphore

semaphore(訊號量)是用來控制同時訪問特定資源的執行緒數量,它通過協調各個執行緒,以保證合理的使用公共資源.這個跟佇列有點像,畫圖理解更直觀,


程式碼:

package com.thread;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;
public class SemaphoreDemo {
	public static void main(String[] args) {  
        ExecutorService service = Executors.newCachedThreadPool();  
        final Semaphore semaphore = new Semaphore(2);  
        final int count = 5;  
        for (int i = 0; i < count; i++) {  
            Runnable runnable = new Runnable() {  
                public void run() {  
                    try {  
                        semaphore.acquire();  
                        System.out.println("執行緒:" + Thread.currentThread().getName()+"開始下載");  
                        Thread.sleep(5000);  
                    } catch (InterruptedException e) {  
                        e.printStackTrace();  
                    } finally {  
                        System.out.println("執行緒:" + Thread.currentThread().getName() + "下載完畢");  
                        semaphore.release();  
                    }  
                }  
            };  
            service.execute(runnable);//放線上程池中去維護
        }  
        service.shutdown();  
    }  
}
執行結果:


發現我們併發下載的數量是隻允許2個執行緒,當訪問匯流排程大於2時,其他執行緒就要等待,Semaphore類常用的方法:

acquire():獲取一個許可證,

release():釋放許可證,這時候有多餘的執行緒就加入到執行緒池中

tryAcquire():嘗試獲取許可證

intavailblePermits():返回此訊號量當前可用的許可證數

intgetQueueLength():返回正在等待獲取許可證的執行緒數

hasQueuedThreads():是否有執行緒正在等待獲取許可證

reducePermits(int reduction):減少reduction個許可證,

getQueueThreads():返回所有等待獲取許可證的執行緒集合

Semaphore類的建構函式中傳入的數,表示同時併發訪問控制在多少個執行緒.


Exchanger

exchanger是一個用於執行緒間協作的工具類,Exchanger用於進行執行緒間的資料交換,它提供一個同步點,在這個同步點,二個執行緒可以交換彼此的資料.在這二個執行緒通過exchange方法交換資料,如果第一個執行緒先執行exchange()方法,它會一直等待第二個執行緒也執行exchange方法,當二個執行緒都達到同步點時,這二個執行緒就可以交換資料,將本執行緒生產出來的資料傳遞給對方.

打個比方吧,發現我們那農村結婚是需要30,40萬,這對一般的家庭確實是一個很大的錢,因為農村沒讀書的孩子多是 21~23之間就要討老婆了,這麼年輕自己打工賺到30萬還是很少很少的,哪女孩一般會說等你賺到了30萬馬上給你結婚,於是只好苦逼的年復一年的工作,每個月把打工的錢給女孩,到了30萬馬上通知男方說可以結婚了,


現在用程式碼實現下:

package com.thread;
import java.util.Random;
import java.util.concurrent.Exchanger;
public class ExchangerDemo {
	 public static void main(String[] args) {  
	        Exchanger<Integer> exchanger = new Exchanger<>();  
	        new Girl(exchanger).start();  
	        new Man(exchanger).start();  
	    }  
}
/**
 * 男人
 * @author admin
 */
class Man extends Thread {  
    Exchanger<Integer> exchanger = null;  
    public Man(Exchanger<Integer> exchanger) {  
        super();  
        this.exchanger = exchanger;  
    }  
    @Override  
    public void run() {  
        Random rand = new Random();
        int money = 0;
        for(int i=0;i<4;i++){
        	money+=100000;//年薪在10萬以內
        	try {
				exchanger.exchange(money);//存錢
			} catch (InterruptedException e) {
				e.printStackTrace();
			}
        }
    }  
}  
/**
 * 女人
 */
class Girl extends Thread {  
    Exchanger<Integer> exchanger = null;  
    int money = 0;
    public Girl(Exchanger<Integer> exchanger) {  
        super();  
        this.exchanger = exchanger;  
    }  
    @Override  
    public void run() {  
    	for(int i=0;i<4;i++){
    		try {
				money = exchanger.exchange(money) ;
				System.out.println(money>300000?"親愛的"+money+"萬我們可以結婚了":money+"塊這麼少,"+"臭屌絲活該單身,還不去賺錢娶老婆");
			} catch (InterruptedException e) {
				e.printStackTrace();
			}
    	}
    }  
} 

執行結果:


Exchanger主要用於二個執行緒之間交換資料,注意,只能是2個執行緒,如果有一個執行緒沒執行exchange()方法,則會一直等待,執行緒就處於阻塞狀態了!如果怕一直等待,可以設定時間:exchange()有一個過載的方法.

exchange(V x, long timeout, TimeUnit unit)

CyclicBarrier

CyclicBarrierr如果你在翻譯的話是關卡,柵欄的意思,也就是被卡住在哪裡,還可以把這個詞扯開成cyclic和barrier,cyclic是迴圈的意思,barrier是障礙的意思,從字面意思連起來讀就是可迴圈的障礙,翻譯成java語言就是當有幾個執行緒同時去訪問,要阻塞,直到最後一個執行緒達到屏障時,程式才會繼續執行,

CyclicBarrier使用也很簡單,首先它有2個建構函式:

public CyclicBarrier(int parties):parties一個int型別引數,表示屏障攔截的執行緒數量,

public CyclicBarrier(int parties,Runnable barrierAction),意思是線上程到達屏障時,優先執行barrierAction執行緒.

打個比方吧,你們主管帶領小明,小王經過2個月的日夜工作,完成了一個重要的專案,領導準備獎勵你們專案組一些錢,讓你們出去玩,主管,小明,小王呢?經過思考,去xxx天上人間玩下吧,他們約好在萬達商場見面,車子已經提前安排好了,所以只有三個人都到了,才上車.

來個圖片賞析下:


程式碼如下:

package thread;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
class Employee implements Runnable {
	 private String name;
	    private CyclicBarrier barrier;
	    public Employee(String name, final CyclicBarrier barrier) {
	        this.name = name;
	        this.barrier = barrier;
	    }
	    public void run() {
	        try {
	            System.out.println(name+"已經到達萬達商場等待");
	            barrier.await(); //等待最後一個執行緒到達(底層會去判斷)
	        } catch (InterruptedException | BrokenBarrierException e) {
	            e.printStackTrace();
	        }
	        System.out.println(name+"上車");
	    }
	}
	 public class TestCyclicBarrier {
	    public static void main(String[] args) {
	        int num = 3;
	        String[] strs = {"主管","小明","小王"};
	        CyclicBarrier barrier = new CyclicBarrier(num);
	        for (int i = 0; i <num; i++) {
	            new Thread(new Employee(strs[i], barrier)).start();
	        }
	    }
	}
執行結果:


假如在中途經理打電話來說也想去,那麼三個人在等,但是到最後經理突然有急事,電話也沒電了,沒通知這三個哥們,那麼這三個哥們也出去了,因為沒有第四個執行緒去執行await()方法,那麼之前的執行緒達到屏障時都不會往下執行了,也就是各自上車的行為不會執行,那還怎麼去天上人間.

CyclicBarrier barrier = new CyclicBarrier(num+1);
只要把這個建構函式傳入的引數+1

執行結果:


這就是這三個人被經理放了鴿子了,一直等待,等到天荒地老也到不了天上人間!直到天上人間被查了,呵呵.
CyclicBarrier有2個建構函式,第二個建構函式第二個引數傳遞的一個Runnable,意思這個執行緒先執行

現在有個生活場景,就是聚餐,一般聚餐之前領導都要先說幾句然後才能開始吃,

程式碼如下:

package thread;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;

public class TestCyclicBarrierDemo {
	public static void main(String[] args) {
		int num = 5;
		CyclicBarrier barrier = new CyclicBarrier(num,new Leader());
		for (int i = 0; i <num; i++) {
			new Thread(new Employee( barrier)).start();
		}
	}
}
class Leader implements Runnable{
	@Override
	public void run() {
		System.out.println("吃飯前我先說幾句");
		try {
			Thread.sleep(5000);//說了半個小時  比如吧
		} catch (InterruptedException e) {
			e.printStackTrace();
		}
	}
}
class Employee implements Runnable {
	    private CyclicBarrier barrier;
	    public Employee(final CyclicBarrier barrier) {
	        this.barrier = barrier;
	    }
	    public void run() {
	        try {
	            System.out.println("都在等領導說完話準備吃");
	            barrier.await(); 
	        } catch (InterruptedException | BrokenBarrierException e) {
	            e.printStackTrace();
	        }
	        System.out.println("說了30分鐘準備用筷子開始吃了");
	    }
	}

執行結果:


現在講下CyclicBarrier和CountDownLatch的區別:

CountDownLatch的計數器只能使用一次,而CyclicBarrier的計數器可以使用reset()方法重置,所以CyclicBarrier能處理的業務場景相對CountDownLatch更多,