1. 程式人生 > >深入學習理解(7):java:高效的解決死鎖問題的執行緒通訊方式:Semaphore 和 BlockingQueue

深入學習理解(7):java:高效的解決死鎖問題的執行緒通訊方式:Semaphore 和 BlockingQueue

經典原始問題:生產者和消費者的問題,其實在實際專案中很容易遇到這樣的無奈的問題,但是面對這樣的問題的時候我們首先想到的就是多執行緒批處理,通過notify()…的處理,但只這樣的處理只能給我們貼上對java多執行緒不熟悉的標籤。比較讚的辦法是用Semaphore 或者 BlockingQueue來實現生產者消費者模型。下面我們就來看看吧!
Semaphore
一個計數訊號量。從概念上講,訊號量維護了一個許可集。如有必要,在許可可用前會阻塞每一個 acquire(),然後再獲取該許可。每個 release() 新增一個許可,從而可能釋放一個正在阻塞的獲取者。但是,不使用實際的許可物件,Semaphore 只對可用許可的號碼進行計數,並採取相應的行動。拿到訊號量的執行緒可以進入程式碼,否則就等待。通過acquire()和release()獲取和釋放訪問許可。
Semaphore分為單值和多值兩種,前者只能被一個執行緒獲得,後者可以被若干個執行緒獲得。
以一個停車場運作為例。為了簡單起見,假設停車場只有三個車位,一開始三個車位都是空的。這時如果同時來了五輛車,看門人允許其中三輛不受阻礙的進入,然後放下車攔,剩下的車則必須在入口等待,此後來的車也都不得不在入口處等待。這時,有一輛車離開停車場,看門人得知後,開啟車攔,放入一輛,如果又離開兩輛,則又可以放入兩輛,如此往復。

在這個停車場系統中,車位是公共資源,每輛車好比一個執行緒,看門人起的就是訊號量的作用。
更進一步,訊號量的特性如下:訊號量是一個非負整數(車位數),所有通過它的執行緒(車輛)都會將該整數減一(通過它當然是為了使用資源),當該整數值為零時,所有試圖通過它的執行緒都將處於等待狀態。在訊號量上我們定義兩種操作: Wait(等待) 和 Release(釋放)。 當一個執行緒呼叫Wait(等待)操作時,它要麼通過然後將訊號量減一,要麼一直等下去,直到訊號量大於一或超時。Release(釋放)實際上是在訊號量上執行加操作,對應於車輛離開停車場,該操作之所以叫做“釋放”是因為加操作實際上是釋放了由訊號量守護的資源。

在java中,還可以設定該訊號量是否採用公平模式,如果以公平方式執行,則執行緒將會按到達的順序(FIFO)執行,如果是非公平,則可以後請求的有可能排在佇列的頭部。
JDK中定義如下:

     Semaphore(int permits, boolean fair)
     //建立具有給定的許可數和給定的公平設定的Semaphore。

Semaphore當前在多執行緒環境下被擴放使用,作業系統的訊號量是個很重要的概念,在程序控制方面都有應用。Java併發庫Semaphore 可以很輕鬆完成訊號量控制,Semaphore可以控制某個資源可被同時訪問的個數,通過 acquire() 獲取一個許可,如果沒有就等待,而 release() 釋放一個許可。比如在Windows下可以設定共享檔案的最大客戶端訪問個數。

Semaphore實現的功能就類似廁所有5個坑,假如有10個人要上廁所,那麼同時只能有多少個人去上廁所呢?同時只能有5個人能夠佔用,當5個人中 的任何一個人讓開後,其中等待的另外5個人中又有一個人可以佔用了。另外等待的5個人中可以是隨機獲得優先機會,也可以是按照先來後到的順序獲得機會,這取決於構造Semaphore物件時傳入的引數選項。單個訊號量的Semaphore物件可以實現互斥鎖的功能,並且可以是由一個執行緒獲得了“鎖”,再由另一個執行緒釋放“鎖”,這可應用於死鎖恢復的一些場合

package concurrent;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;

public class SemaphoreDemo {
	Semaphore semaphore=new Semaphore(1);
	List<String>list=new ArrayList<>();
public static void main(String[] args) {
	SemaphoreDemo s=new SemaphoreDemo();
	//建立執行緒管理
	ExecutorService executorService=Executors.newCachedThreadPool();
	executorService.execute(s.new ThreadTestA());//啟動執行緒
	executorService.execute(s.new ThreadTestB());//
	executorService.shutdown();//結束服務
}
private class ThreadTestA implements Runnable{
	@Override
	public  void  run() {
		try {
			System.out.println("這個是執行緒A獲取許可證之前");
			semaphore.acquire();//獲得許可證
			System.out.println("這個是執行緒A獲取許可證之後");
			list.add("test");
			Thread.sleep(1000);//模擬延遲
		} catch (InterruptedException e) {
			e.printStackTrace();
		}
		finally{
			semaphore.release();//吊銷
			System.out.println("這個是執行緒A釋放許可證之後");
			}	
	}
}
private class ThreadTestB implements Runnable{
	@Override
	public void run() {
		try {
			System.out.println("這個是執行緒B獲取許可證之前");
			semaphore.acquire();
			System.out.println("這個是執行緒B獲取許可證之後");
			if(list.size()!=0)
			System.out.println(list.get(0));
		} catch (InterruptedException e) {
			e.printStackTrace();
		}
		finally{
			semaphore.release();
			System.out.println("這個是執行緒B釋放許可證之後");
		}
	}
}

}

執行結果可能會出現幾種情況
1

這個是執行緒A獲取許可證之前
這個是執行緒A獲取許可證之後
這個是執行緒B獲取許可證之前
這個是執行緒A釋放許可證之後
這個是執行緒B獲取許可證之後
test
這個是執行緒B釋放許可證之後

2

這個是執行緒A獲取許可證之前
這個是執行緒A獲取許可證之後
這個是執行緒B獲取許可證之前
這個是執行緒B獲取許可證之後
test
這個是執行緒B釋放許可證之後
這個是執行緒A釋放許可證之後

3

這個是執行緒B獲取許可證之前
這個是執行緒B獲取許可證之後
這個是執行緒B釋放許可證之後
這個是執行緒A獲取許可證之前
這個是執行緒A獲取許可證之後
這個是執行緒A釋放許可證之後

BlockingQueue
首先是一個介面。有很多子介面和實現類:

BlockingQueue也是java.util.concurrent下的主要用來控制執行緒同步的工具。

BlockingQueue有四個具體的實現類,根據不同需求,選擇不同的實現類
1、ArrayBlockingQueue:一個由陣列支援的有界阻塞佇列,規定大小的BlockingQueue,其建構函式必須帶一個int引數來指明其大小.其所含的物件是以FIFO(先入先出)順序排序的。

2、LinkedBlockingQueue:大小不定的BlockingQueue,若其建構函式帶一個規定大小的引數,生成的BlockingQueue有大小限制,若不帶大小引數,所生成的BlockingQueue的大小由Integer.MAX_VALUE來決定.其所含的物件是以FIFO(先入先出)順序排序的。

3、PriorityBlockingQueue:類似於LinkedBlockQueue,但其所含物件的排序不是FIFO,而是依據物件的自然排序順序或者是建構函式的Comparator決定的順序。

4、SynchronousQueue:特殊的BlockingQueue,對其的操作必須是放和取交替完成的。

LinkedBlockingQueue 可以指定容量,也可以不指定,不指定的話,預設最大是Integer.MAX_VALUE,其中主要用到put和take方法,put方法在佇列滿的時候會阻塞直到有佇列成員被消費,take方法在佇列空的時候會阻塞,直到有佇列成員被放進來。
下面我們通過一個例子來說明:
Bread

package concurrent;

public class Bread {
private String name;
private String color;
public String getName() {
	return name;
}
public void setName(String name) {
	this.name = name;
}
public String getColor() {
	return color;
}
public void setColor(String color) {
	this.color = color;
}
}

生產者機器 BreadMachine

package concurrent;

import java.awt.Color;
import java.util.Random;
import java.util.concurrent.BlockingQueue;

public class BreadMachine implements Runnable{
	 private BlockingQueue<Bread> queue;
     public BreadMachine(BlockingQueue<Bread> queue){
   	  this.queue=queue;
     }
		@Override
		public void run() {
			System.out.println("開始生產麵包");
			Bread bread=new Bread();
			bread.setColor(Color.BLACK+"");
			Random random=new Random();
			bread.setName(random.nextInt(100)+"麵包");
			try {
				queue.put(bread);
				System.out.println(bread.getName()+"已經生產完畢!去吃吧");
			} catch (InterruptedException e) {
				e.printStackTrace();
			}
			
		}
   	
}

消費者People

package concurrent;

import java.util.concurrent.BlockingQueue;

public class People implements Runnable{

	BlockingQueue<Bread> bread;
	public People(BlockingQueue<Bread> bread ) {
		this.bread=bread;
	}
	@Override
	public void run() {
		try {
			System.out.println("我需要麵包");
		System.out.println("這個二貨吃了"+bread.take().getName());
		} catch (InterruptedException e) {
			e.printStackTrace();
		}
	}

}

測試程式

package concurrent;

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;

public class BlockingQueenDemo {
    static BlockingQueue<Bread> blockingQueue=new ArrayBlockingQueue<>(5);
    public static void main(String[] args) {
		BreadMachine breadMachine=new BreadMachine(blockingQueue);
		People people=new People(blockingQueue);
		for(int i=0;i<5;i++){
			new Thread(people, "people").start();
			new Thread(breadMachine, "breadMachine").start();
			
		}
	}
    
}

執行效果;

開始生產麵包
開始生產麵包
我需要麵包
我需要麵包
開始生產麵包
我需要麵包
開始生產麵包
我需要麵包
我需要麵包
開始生產麵包
29麵包已經生產完畢!去吃吧
23麵包已經生產完畢!去吃吧
這個二貨吃了29麵包
66麵包已經生產完畢!去吃吧
31麵包已經生產完畢!去吃吧
36麵包已經生產完畢!去吃吧
這個二貨吃了23麵包
這個二貨吃了66麵包
這個二貨吃了31麵包
這個二貨吃了36麵包

由於佇列的大小限定成了5,所以最多隻有兩個產品被加入到隊列當中,而且消費者取到產品的順序也是按照生產的先後順序,原因就是LinkedBlockingQueue和ArrayBlockingQueue都是按照FIFO的順序存取元素的。