1. 程式人生 > >Java多執行緒——生產者消費者模型-演義

Java多執行緒——生產者消費者模型-演義

這裡主要解讀(亂侃)生產者消費者模型的工作方式以及JDK5.0新增實現和傳統同步關鍵字實現方式的區別。

在JDK5.0新增了Lock、Condition這兩個介面來處理多執行緒安全問題。

Lock:可替代synchronized實現的同步函式或同步程式碼塊。

Condition:封裝 Object 監視器方法(wait、notify 和 notifyAll)成為專用的監視器物件, Lock 可以繫結多組監視器物件.

這倆兄弟的出世帶來了什麼利好呢?

Lock可以繫結多組Condition最重要的好處應該是,通過對Condition物件的分組使用(一組監視消費者,一組監視生產者)可以達到只喚醒對方執行緒的目的,而不必使用

notifyAll不分青紅皁白喚醒所有執行緒。

API裡給出了一個很好的示例(在這裡

class BoundedBuffer {
   final Lock lock = new ReentrantLock();
   final Condition notFull  = lock.newCondition(); 
   final Condition notEmpty = lock.newCondition(); 

   final Object[] items = new Object[100];
   int putptr, takeptr, count;

   public void put(Object x) throws InterruptedException {
     lock.lock();
     try {
       while (count == items.length)
         notFull.await();
       items[putptr] = x;
       if (++putptr == items.length) putptr = 0;
       ++count;
       notEmpty.signal();
     } finally {
       lock.unlock();
     }
   }

   public Object take() throws InterruptedException {
     lock.lock();
     try {
       while (count == 0)
         notEmpty.await();
       Object x = items[takeptr];
       if (++takeptr == items.length) takeptr = 0;
       --count;
       notFull.signal();
       return x;
     } finally {
       lock.unlock();
     }
   }
 }

這裡實現了一個執行緒安全的緩衝區,下面對它進行一番演義。
/**
 * BoundedBuffer2 諸葛亮率領的蜀漢軍隊的糧倉(2.0版本)
 */
public class BoundedBuffer2 {
		// lock-糧草官 :主要職責 督查籌糧分配糧食;主要裝備:高音大喇叭
		final Lock lock = new ReentrantLock();

		/*
		 * Condition:諸葛武侯引進的黑科技:兩路訊號手動切換多功能高音大喇叭
		 * 
		 * notFull (籌糧隊訊號):糧草官發現糧倉沒裝滿時
		 * 拿起他的大喇叭開啟通向籌糧兵這路訊號將響起(只有負責籌糧的士兵能接收到此路訊號,所以只喚醒負責籌糧的士兵):
		 * “丫的糧倉還沒裝滿!去弄糧食,接丞相令是要渭水分兵屯田的,屯田不行去找劉禪要呀,要不來去曹魏那搶呀!”
		 * 
		 * notEmpty(火頭軍訊號):
		 * 糧草官發現糧倉已經有糧食的時候(不是空的了)拿起他的大喇叭向火頭軍喊話(只有他們能接收到):“喂,喂!有糧食了”。
		 */
		final Condition notFull = lock.newCondition();
		final Condition notEmpty = lock.newCondition();

		final Object[] items = new Object[100];// 糧倉總共能裝100萬石糧食,夠蜀漢十萬大軍吃十個月了。
		int putptr, takeptr, count;

		public void put(Object x) throws InterruptedException {// 籌糧的具體運作方式
			lock.lock();// 首先糧官就位,開始監工(入庫)嘛!
			try {
				while (count == items.length)
					// 一旦糧倉裝滿
					notFull.await();// 糧官大喇叭響起(通向正在籌糧兵訊號  如果有多個籌糧隊伍(生產者執行緒)也只是通知正在工作的這一個  ):
																		//"正在籌糧士兵全線 立刻 就地 停止工作 等待通知!"。
				// 沒滿的話就籌糧嘛,putptr大糧倉內帶有編號的小糧倉一百個,從0到99按順序往裡裝嘛
				items[putptr] = x;
				System.out.println(Thread.currentThread().getName()+"---"+putptr+"號小糧倉  運---進 糧食,現存糧食共計:"+(count+1)+"小倉!");
				if (++putptr == items.length)// 裝到99號小糧倉了咋辦?
					// 從0號開始繼續裝嘛。列為看官可能有問題了 0號不是有糧食嗎?
					// 沒有了,籌糧的同時被火頭軍運走做飯消耗了。(迴圈佇列)
					putptr = 0;
				++count;// 計數,新增了一小糧倉糧食
				notEmpty.signal();// 糧官大喇叭響起(通向已經休息的火頭軍中的一支):糧倉已經有糧食了,可以來取了!
			} finally {
				lock.unlock();// 這一小倉入庫完畢 ,糧官休息去了!
			}
		}

		public Object take() throws InterruptedException {
			lock.lock();// 首先糧官就位,開始監工(出庫)嘛!
			try {
				while (count == 0)
					// 一旦糧倉一顆糧食都沒有
					notEmpty.await();// 糧官大喇叭響起(通向來取糧的火頭軍):糧倉沒有糧食了,不要來取了!
				// 糧倉沒空的話就取出糧食,takeptr 大糧倉內帶有編號的小糧倉一百個,從0到99按順序往外取出糧食
				Object x = items[takeptr];
				System.out.println(Thread.currentThread().getName()+"---"+takeptr+"號小糧倉  運出  糧食,現存糧食共計:"+(count-1)+"小倉!");
				if (++takeptr == items.length)// 取到99號小糧倉了
					takeptr = 0;// 從0號開始繼續取
				--count;// 計數,減少了一小糧倉糧食
				notFull.signal();// 糧官大喇叭響起(通向已休息的籌糧兵中的一支):喂!喂!籌糧士兵開工,糧倉不滿了,有空位了,渣渣!
				return x;// 這一小倉運出庫
			} finally {
				lock.unlock();// 這一小出入庫完畢 ,糧官休息去了!
			}
		}
	}

這裡緩衝區是一個阻塞的迴圈佇列,存取糧食的過程如下:


測試一下:

/**
 * 封裝生產者任務:負責籌糧
 * 
 */
class Producer implements Runnable {
	private BoundedBuffer2 buffer;

	public Producer(BoundedBuffer2 buffer) {
		this.buffer = buffer;

	}

	public void run() {
		int i = 0;

		try {
			while (true) {
				// Thread.sleep(1);
				buffer.put(new Object());

			}
		} catch (InterruptedException e) {
			e.printStackTrace();
		}
	}

}

/**
 * 封裝消費者任務:負責取糧
 * 
 */
class Consumer implements Runnable {
	private BoundedBuffer2 buffer;

	public Consumer(BoundedBuffer2 buffer) {
		this.buffer = buffer;
	}

	public void run() {
		int i = 0;

		try {
			while (true) {
				// Thread.sleep(1);
				buffer.take();
			}
		} catch (InterruptedException e) {
			e.printStackTrace();
		}
	}

}

public class MultithreadTest {
	public static void main(String[] args) {
		BoundedBuffer2 buffer = new BoundedBuffer2();
		Producer pdc = new Producer(buffer);
		Consumer csm = new Consumer(buffer);
		Thread pThread1 = new Thread(pdc);
		Thread pThread2 = new Thread(pdc);
		Thread cThread1 = new Thread(csm);
		Thread cThread2 = new Thread(csm);
		pThread1.start();
		pThread2.start();
		cThread1.start();
		cThread2.start();
	}
}

執行現象:



以上是JDK5.0後的Lock和Condition的實現方式,下面看一下synchronized 實現主要註釋不同之處,這裡用同步方法,用同步程式碼塊也是可以。
/**
 * BoundedBuffer 諸葛亮率領的蜀漢軍隊的糧倉(1.0版本)
 */
public class BoundedBuffer {
	// this-糧草官 :主要職責 督查籌糧分配糧食;主要裝備:無,靠吼!
	final Object lock = new Object();
	final Object[] items = new Object[100];// 糧倉總共能裝100萬石糧食,夠蜀漢十萬大軍吃十個月了。
	int putptr, takeptr, count;

	public synchronized void put(Object x) throws InterruptedException {
		while (count == items.length)
			this.wait();// 糧官扯著嗓子吼起(來到糧草的兵,不分什麼兵都會接收到):正在籌糧士兵全線 立刻 就地 停止工作 等待通知
		items[putptr] = x;
		System.out.println(Thread.currentThread().getName() + "---" + putptr
				+ "號小糧倉  運---進 糧食,現存糧食共計:" + (count + 1) + "小倉!");
		if (++putptr == items.length)
			putptr = 0;
		++count;
		this.notifyAll();// 糧官吼起(通向所有已經休息的小部隊):糧倉已經有糧食了,可以來取了!
	}

	public synchronized Object take() throws InterruptedException {
		while (count == 0)
			this.wait();// 糧官扯著嗓子吼起(來到糧草的兵,不分什麼兵都會接收到):糧倉沒有糧食了,不要來取了!
		Object x = items[takeptr];
		System.out.println(Thread.currentThread().getName() + "---" + takeptr
				+ "號小糧倉  運出  糧食,現存糧食共計:" + (count - 1) + "小倉!");
		if (++takeptr == items.length)
			takeptr = 0;
		--count;
		this.notifyAll();// 糧官大喇叭響起(通向籌糧兵):喂!喂!籌糧士兵全線開工,糧倉不滿了,有空位了,渣渣!
		return x;
	}
}

小結:通過2.0版本的糧草官持有高科技喇叭可以指定喚醒籌糧隊或者是火頭軍,而不像1.0版本那樣不能區分軍種在需要籌糧時把火頭軍也喚醒了。

這就是Lock、Condition這對組合相較與synchronized 實現方式的優勢。另外ArrayBlockingQueue這個類提供了執行緒安全有界快取區功能,有需要可以使用這個類,本篇中2.0版本就是其核心實現方式。