深入學習理解(7):java:高效的解決死鎖問題的執行緒通訊方式:Semaphore 和 BlockingQueue
經典原始問題:生產者和消費者的問題,其實在實際專案中很容易遇到這樣的無奈的問題,但是面對這樣的問題的時候我們首先想到的就是多執行緒批處理,通過notify()…的處理,但只這樣的處理只能給我們貼上對java多執行緒不熟悉的標籤。比較讚的辦法是用Semaphore 或者 BlockingQueue來實現生產者消費者模型。下面我們就來看看吧!
Semaphore
一個計數訊號量。從概念上講,訊號量維護了一個許可集。如有必要,在許可可用前會阻塞每一個 acquire(),然後再獲取該許可。每個 release() 新增一個許可,從而可能釋放一個正在阻塞的獲取者。但是,不使用實際的許可物件,Semaphore 只對可用許可的號碼進行計數,並採取相應的行動。拿到訊號量的執行緒可以進入程式碼,否則就等待。通過acquire()和release()獲取和釋放訪問許可。
Semaphore分為單值和多值兩種,前者只能被一個執行緒獲得,後者可以被若干個執行緒獲得。
以一個停車場運作為例。為了簡單起見,假設停車場只有三個車位,一開始三個車位都是空的。這時如果同時來了五輛車,看門人允許其中三輛不受阻礙的進入,然後放下車攔,剩下的車則必須在入口等待,此後來的車也都不得不在入口處等待。這時,有一輛車離開停車場,看門人得知後,開啟車攔,放入一輛,如果又離開兩輛,則又可以放入兩輛,如此往復。
在這個停車場系統中,車位是公共資源,每輛車好比一個執行緒,看門人起的就是訊號量的作用。
更進一步,訊號量的特性如下:訊號量是一個非負整數(車位數),所有通過它的執行緒(車輛)都會將該整數減一(通過它當然是為了使用資源),當該整數值為零時,所有試圖通過它的執行緒都將處於等待狀態。在訊號量上我們定義兩種操作: Wait(等待) 和 Release(釋放)。 當一個執行緒呼叫Wait(等待)操作時,它要麼通過然後將訊號量減一,要麼一直等下去,直到訊號量大於一或超時。Release(釋放)實際上是在訊號量上執行加操作,對應於車輛離開停車場,該操作之所以叫做“釋放”是因為加操作實際上是釋放了由訊號量守護的資源。
在java中,還可以設定該訊號量是否採用公平模式,如果以公平方式執行,則執行緒將會按到達的順序(FIFO)執行,如果是非公平,則可以後請求的有可能排在佇列的頭部。
JDK中定義如下:
Semaphore(int permits, boolean fair)
//建立具有給定的許可數和給定的公平設定的Semaphore。
Semaphore當前在多執行緒環境下被擴放使用,作業系統的訊號量是個很重要的概念,在程序控制方面都有應用。Java併發庫Semaphore 可以很輕鬆完成訊號量控制,Semaphore可以控制某個資源可被同時訪問的個數,通過 acquire() 獲取一個許可,如果沒有就等待,而 release() 釋放一個許可。比如在Windows下可以設定共享檔案的最大客戶端訪問個數。
Semaphore實現的功能就類似廁所有5個坑,假如有10個人要上廁所,那麼同時只能有多少個人去上廁所呢?同時只能有5個人能夠佔用,當5個人中 的任何一個人讓開後,其中等待的另外5個人中又有一個人可以佔用了。另外等待的5個人中可以是隨機獲得優先機會,也可以是按照先來後到的順序獲得機會,這取決於構造Semaphore物件時傳入的引數選項。單個訊號量的Semaphore物件可以實現互斥鎖的功能,並且可以是由一個執行緒獲得了“鎖”,再由另一個執行緒釋放“鎖”,這可應用於死鎖恢復的一些場合
package concurrent;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;
public class SemaphoreDemo {
Semaphore semaphore=new Semaphore(1);
List<String>list=new ArrayList<>();
public static void main(String[] args) {
SemaphoreDemo s=new SemaphoreDemo();
//建立執行緒管理
ExecutorService executorService=Executors.newCachedThreadPool();
executorService.execute(s.new ThreadTestA());//啟動執行緒
executorService.execute(s.new ThreadTestB());//
executorService.shutdown();//結束服務
}
private class ThreadTestA implements Runnable{
@Override
public void run() {
try {
System.out.println("這個是執行緒A獲取許可證之前");
semaphore.acquire();//獲得許可證
System.out.println("這個是執行緒A獲取許可證之後");
list.add("test");
Thread.sleep(1000);//模擬延遲
} catch (InterruptedException e) {
e.printStackTrace();
}
finally{
semaphore.release();//吊銷
System.out.println("這個是執行緒A釋放許可證之後");
}
}
}
private class ThreadTestB implements Runnable{
@Override
public void run() {
try {
System.out.println("這個是執行緒B獲取許可證之前");
semaphore.acquire();
System.out.println("這個是執行緒B獲取許可證之後");
if(list.size()!=0)
System.out.println(list.get(0));
} catch (InterruptedException e) {
e.printStackTrace();
}
finally{
semaphore.release();
System.out.println("這個是執行緒B釋放許可證之後");
}
}
}
}
執行結果可能會出現幾種情況
1
這個是執行緒A獲取許可證之前
這個是執行緒A獲取許可證之後
這個是執行緒B獲取許可證之前
這個是執行緒A釋放許可證之後
這個是執行緒B獲取許可證之後
test
這個是執行緒B釋放許可證之後
2
這個是執行緒A獲取許可證之前
這個是執行緒A獲取許可證之後
這個是執行緒B獲取許可證之前
這個是執行緒B獲取許可證之後
test
這個是執行緒B釋放許可證之後
這個是執行緒A釋放許可證之後
3
這個是執行緒B獲取許可證之前
這個是執行緒B獲取許可證之後
這個是執行緒B釋放許可證之後
這個是執行緒A獲取許可證之前
這個是執行緒A獲取許可證之後
這個是執行緒A釋放許可證之後
BlockingQueue
首先是一個介面。有很多子介面和實現類:
BlockingQueue也是java.util.concurrent下的主要用來控制執行緒同步的工具。
BlockingQueue有四個具體的實現類,根據不同需求,選擇不同的實現類
1、ArrayBlockingQueue:一個由陣列支援的有界阻塞佇列,規定大小的BlockingQueue,其建構函式必須帶一個int引數來指明其大小.其所含的物件是以FIFO(先入先出)順序排序的。
2、LinkedBlockingQueue:大小不定的BlockingQueue,若其建構函式帶一個規定大小的引數,生成的BlockingQueue有大小限制,若不帶大小引數,所生成的BlockingQueue的大小由Integer.MAX_VALUE來決定.其所含的物件是以FIFO(先入先出)順序排序的。
3、PriorityBlockingQueue:類似於LinkedBlockQueue,但其所含物件的排序不是FIFO,而是依據物件的自然排序順序或者是建構函式的Comparator決定的順序。
4、SynchronousQueue:特殊的BlockingQueue,對其的操作必須是放和取交替完成的。
LinkedBlockingQueue 可以指定容量,也可以不指定,不指定的話,預設最大是Integer.MAX_VALUE,其中主要用到put和take方法,put方法在佇列滿的時候會阻塞直到有佇列成員被消費,take方法在佇列空的時候會阻塞,直到有佇列成員被放進來。
下面我們通過一個例子來說明:
Bread
package concurrent;
public class Bread {
private String name;
private String color;
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public String getColor() {
return color;
}
public void setColor(String color) {
this.color = color;
}
}
生產者機器 BreadMachine
package concurrent;
import java.awt.Color;
import java.util.Random;
import java.util.concurrent.BlockingQueue;
public class BreadMachine implements Runnable{
private BlockingQueue<Bread> queue;
public BreadMachine(BlockingQueue<Bread> queue){
this.queue=queue;
}
@Override
public void run() {
System.out.println("開始生產麵包");
Bread bread=new Bread();
bread.setColor(Color.BLACK+"");
Random random=new Random();
bread.setName(random.nextInt(100)+"麵包");
try {
queue.put(bread);
System.out.println(bread.getName()+"已經生產完畢!去吃吧");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
消費者People
package concurrent;
import java.util.concurrent.BlockingQueue;
public class People implements Runnable{
BlockingQueue<Bread> bread;
public People(BlockingQueue<Bread> bread ) {
this.bread=bread;
}
@Override
public void run() {
try {
System.out.println("我需要麵包");
System.out.println("這個二貨吃了"+bread.take().getName());
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
測試程式
package concurrent;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
public class BlockingQueenDemo {
static BlockingQueue<Bread> blockingQueue=new ArrayBlockingQueue<>(5);
public static void main(String[] args) {
BreadMachine breadMachine=new BreadMachine(blockingQueue);
People people=new People(blockingQueue);
for(int i=0;i<5;i++){
new Thread(people, "people").start();
new Thread(breadMachine, "breadMachine").start();
}
}
}
執行效果;
開始生產麵包
開始生產麵包
我需要麵包
我需要麵包
開始生產麵包
我需要麵包
開始生產麵包
我需要麵包
我需要麵包
開始生產麵包
29麵包已經生產完畢!去吃吧
23麵包已經生產完畢!去吃吧
這個二貨吃了29麵包
66麵包已經生產完畢!去吃吧
31麵包已經生產完畢!去吃吧
36麵包已經生產完畢!去吃吧
這個二貨吃了23麵包
這個二貨吃了66麵包
這個二貨吃了31麵包
這個二貨吃了36麵包
由於佇列的大小限定成了5,所以最多隻有兩個產品被加入到隊列當中,而且消費者取到產品的順序也是按照生產的先後順序,原因就是LinkedBlockingQueue和ArrayBlockingQueue都是按照FIFO的順序存取元素的。