Java多執行緒——生產者消費者模型-演義
阿新 • • 發佈:2019-01-09
這裡主要解讀(亂侃)生產者消費者模型的工作方式以及JDK5.0新增實現和傳統同步關鍵字實現方式的區別。
在JDK5.0新增了Lock、Condition這兩個介面來處理多執行緒安全問題。
Lock:可替代synchronized實現的同步函式或同步程式碼塊。
Condition:封裝 Object 監視器方法(wait、notify 和 notifyAll)成為專用的監視器物件, Lock 可以繫結多組監視器物件.
這倆兄弟的出世帶來了什麼利好呢?
Lock可以繫結多組Condition最重要的好處應該是,通過對Condition物件的分組使用(一組監視消費者,一組監視生產者)可以達到只喚醒對方執行緒的目的,而不必使用
notifyAll不分青紅皁白喚醒所有執行緒。
API裡給出了一個很好的示例(在這裡)
class BoundedBuffer { final Lock lock = new ReentrantLock(); final Condition notFull = lock.newCondition(); final Condition notEmpty = lock.newCondition(); final Object[] items = new Object[100]; int putptr, takeptr, count; public void put(Object x) throws InterruptedException { lock.lock(); try { while (count == items.length) notFull.await(); items[putptr] = x; if (++putptr == items.length) putptr = 0; ++count; notEmpty.signal(); } finally { lock.unlock(); } } public Object take() throws InterruptedException { lock.lock(); try { while (count == 0) notEmpty.await(); Object x = items[takeptr]; if (++takeptr == items.length) takeptr = 0; --count; notFull.signal(); return x; } finally { lock.unlock(); } } }
這裡實現了一個執行緒安全的緩衝區,下面對它進行一番演義。
/** * BoundedBuffer2 諸葛亮率領的蜀漢軍隊的糧倉(2.0版本) */ public class BoundedBuffer2 { // lock-糧草官 :主要職責 督查籌糧分配糧食;主要裝備:高音大喇叭 final Lock lock = new ReentrantLock(); /* * Condition:諸葛武侯引進的黑科技:兩路訊號手動切換多功能高音大喇叭 * * notFull (籌糧隊訊號):糧草官發現糧倉沒裝滿時 * 拿起他的大喇叭開啟通向籌糧兵這路訊號將響起(只有負責籌糧的士兵能接收到此路訊號,所以只喚醒負責籌糧的士兵): * “丫的糧倉還沒裝滿!去弄糧食,接丞相令是要渭水分兵屯田的,屯田不行去找劉禪要呀,要不來去曹魏那搶呀!” * * notEmpty(火頭軍訊號): * 糧草官發現糧倉已經有糧食的時候(不是空的了)拿起他的大喇叭向火頭軍喊話(只有他們能接收到):“喂,喂!有糧食了”。 */ final Condition notFull = lock.newCondition(); final Condition notEmpty = lock.newCondition(); final Object[] items = new Object[100];// 糧倉總共能裝100萬石糧食,夠蜀漢十萬大軍吃十個月了。 int putptr, takeptr, count; public void put(Object x) throws InterruptedException {// 籌糧的具體運作方式 lock.lock();// 首先糧官就位,開始監工(入庫)嘛! try { while (count == items.length) // 一旦糧倉裝滿 notFull.await();// 糧官大喇叭響起(通向正在籌糧兵訊號 如果有多個籌糧隊伍(生產者執行緒)也只是通知正在工作的這一個 ): //"正在籌糧士兵全線 立刻 就地 停止工作 等待通知!"。 // 沒滿的話就籌糧嘛,putptr大糧倉內帶有編號的小糧倉一百個,從0到99按順序往裡裝嘛 items[putptr] = x; System.out.println(Thread.currentThread().getName()+"---"+putptr+"號小糧倉 運---進 糧食,現存糧食共計:"+(count+1)+"小倉!"); if (++putptr == items.length)// 裝到99號小糧倉了咋辦? // 從0號開始繼續裝嘛。列為看官可能有問題了 0號不是有糧食嗎? // 沒有了,籌糧的同時被火頭軍運走做飯消耗了。(迴圈佇列) putptr = 0; ++count;// 計數,新增了一小糧倉糧食 notEmpty.signal();// 糧官大喇叭響起(通向已經休息的火頭軍中的一支):糧倉已經有糧食了,可以來取了! } finally { lock.unlock();// 這一小倉入庫完畢 ,糧官休息去了! } } public Object take() throws InterruptedException { lock.lock();// 首先糧官就位,開始監工(出庫)嘛! try { while (count == 0) // 一旦糧倉一顆糧食都沒有 notEmpty.await();// 糧官大喇叭響起(通向來取糧的火頭軍):糧倉沒有糧食了,不要來取了! // 糧倉沒空的話就取出糧食,takeptr 大糧倉內帶有編號的小糧倉一百個,從0到99按順序往外取出糧食 Object x = items[takeptr]; System.out.println(Thread.currentThread().getName()+"---"+takeptr+"號小糧倉 運出 糧食,現存糧食共計:"+(count-1)+"小倉!"); if (++takeptr == items.length)// 取到99號小糧倉了 takeptr = 0;// 從0號開始繼續取 --count;// 計數,減少了一小糧倉糧食 notFull.signal();// 糧官大喇叭響起(通向已休息的籌糧兵中的一支):喂!喂!籌糧士兵開工,糧倉不滿了,有空位了,渣渣! return x;// 這一小倉運出庫 } finally { lock.unlock();// 這一小出入庫完畢 ,糧官休息去了! } } }
這裡緩衝區是一個阻塞的迴圈佇列,存取糧食的過程如下:
測試一下:
/**
* 封裝生產者任務:負責籌糧
*
*/
class Producer implements Runnable {
private BoundedBuffer2 buffer;
public Producer(BoundedBuffer2 buffer) {
this.buffer = buffer;
}
public void run() {
int i = 0;
try {
while (true) {
// Thread.sleep(1);
buffer.put(new Object());
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
/**
* 封裝消費者任務:負責取糧
*
*/
class Consumer implements Runnable {
private BoundedBuffer2 buffer;
public Consumer(BoundedBuffer2 buffer) {
this.buffer = buffer;
}
public void run() {
int i = 0;
try {
while (true) {
// Thread.sleep(1);
buffer.take();
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
public class MultithreadTest {
public static void main(String[] args) {
BoundedBuffer2 buffer = new BoundedBuffer2();
Producer pdc = new Producer(buffer);
Consumer csm = new Consumer(buffer);
Thread pThread1 = new Thread(pdc);
Thread pThread2 = new Thread(pdc);
Thread cThread1 = new Thread(csm);
Thread cThread2 = new Thread(csm);
pThread1.start();
pThread2.start();
cThread1.start();
cThread2.start();
}
}
執行現象:
以上是JDK5.0後的Lock和Condition的實現方式,下面看一下synchronized 實現主要註釋不同之處,這裡用同步方法,用同步程式碼塊也是可以。
/**
* BoundedBuffer 諸葛亮率領的蜀漢軍隊的糧倉(1.0版本)
*/
public class BoundedBuffer {
// this-糧草官 :主要職責 督查籌糧分配糧食;主要裝備:無,靠吼!
final Object lock = new Object();
final Object[] items = new Object[100];// 糧倉總共能裝100萬石糧食,夠蜀漢十萬大軍吃十個月了。
int putptr, takeptr, count;
public synchronized void put(Object x) throws InterruptedException {
while (count == items.length)
this.wait();// 糧官扯著嗓子吼起(來到糧草的兵,不分什麼兵都會接收到):正在籌糧士兵全線 立刻 就地 停止工作 等待通知
items[putptr] = x;
System.out.println(Thread.currentThread().getName() + "---" + putptr
+ "號小糧倉 運---進 糧食,現存糧食共計:" + (count + 1) + "小倉!");
if (++putptr == items.length)
putptr = 0;
++count;
this.notifyAll();// 糧官吼起(通向所有已經休息的小部隊):糧倉已經有糧食了,可以來取了!
}
public synchronized Object take() throws InterruptedException {
while (count == 0)
this.wait();// 糧官扯著嗓子吼起(來到糧草的兵,不分什麼兵都會接收到):糧倉沒有糧食了,不要來取了!
Object x = items[takeptr];
System.out.println(Thread.currentThread().getName() + "---" + takeptr
+ "號小糧倉 運出 糧食,現存糧食共計:" + (count - 1) + "小倉!");
if (++takeptr == items.length)
takeptr = 0;
--count;
this.notifyAll();// 糧官大喇叭響起(通向籌糧兵):喂!喂!籌糧士兵全線開工,糧倉不滿了,有空位了,渣渣!
return x;
}
}
小結:通過2.0版本的糧草官持有高科技喇叭可以指定喚醒籌糧隊或者是火頭軍,而不像1.0版本那樣不能區分軍種在需要籌糧時把火頭軍也喚醒了。
這就是Lock、Condition這對組合相較與synchronized 實現方式的優勢。另外ArrayBlockingQueue這個類提供了執行緒安全有界快取區功能,有需要可以使用這個類,本篇中2.0版本就是其核心實現方式。