基於線程實現的生產者消費者模型(Object.wait(),Object.notify()方法)
需求背景
利用線程來模擬生產者和消費者模型
系統建模
這個系統涉及到三個角色,生產者,消費者,任務隊列,三個角色之間的關系非常簡單,生產者和消費者擁有一個任務隊列的引用,生產者負責往隊列中放置對象(id),消費者負責從隊列中獲取對象(id),其關聯關系如下
方案1
因為是多線程操作,所以對任務的存取都要使用線程同步加鎖機制,看一下我們的TaskQueue類,兩個主方法都加了synchronized修飾,這就意味著,一個時間點只可能有一個線程對這個方法進行操作
TaskQueue類代碼
[java] view plain copy print?
- package com.crazycoder2010.thread;
- public class TaskQueue {
- private int id;
- public synchronized void put(int id){
- this.id = id;
- System.out.println("Put:"+id);
- }
- public synchronized int get(){
- System.out.println("Got:"+this.id);
- return this.id;
- }
- }
再來看一下生產者,這個主要不停的往TaskQueue中添加新對象,這裏我們搞了個死循環,運行時不斷的對當前數字做加一操作如下
Producer類代碼
[java] view plain copy print?
- package com.crazycoder2010.thread;
- public class Producer implements Runnable{
- private TaskQueue taskQuery;
- public Producer(TaskQueue taskQuery){
- this.taskQuery = taskQuery;
- new Thread(this).start();
- }
- @Override
- public void run() {
- int i = 0;
- while(true){
- taskQuery.put(i++);
- }
- }
- }
消費者對象也是基於線程實現,在循環中不停的輪訓TaskQuery.get()來獲取當前對象
Consumer類
[java] view plain copy print?
- package com.crazycoder2010.thread;
- public class Consumer implements Runnable {
- private TaskQueue taskQuery;
- public Consumer(TaskQueue taskQuery){
- this.taskQuery = taskQuery;
- new Thread(this).start();
- }
- @Override
- public void run() {
- while(true){
- taskQuery.get();
- }
- }
- }
運行一下,看看結果是否是我們所期望的那樣
Launcher類代碼
[java] view plain copy print?
- package com.crazycoder2010.thread;
- public class Launcher {
- public static void main(String[] args) {
- TaskQueue taskQuery = new TaskQueue();
- new Producer(taskQuery);
- new Consumer(taskQuery);
- System.out.println("Press Control-C to stop.");
- }
- }
輸出結果:
...
Put:58
Put:59
Put:60
Put:61
Put:62
Put:63
Put:64
Got:64
Got:64
Got:64
Put:65
Put:66
...
問題出現在哪裏呢?
從結果輸出我們可以看出,生產者的對象放入順序是按次序進行的,但是消費者讀取對象的數序很奇怪,一段時間內讀取同一個數值,這個是怎麽造成的呢?
我們知道,當啟動線程後線程什麽時候被jvm所調度是不確定的,上面的結果在不同的機器上運行很可能得到不同的結果,當Producer線程被調度運行一段時間後,線程Consumer獲取到運行資格,然後從TaskQueue中取對象,這個時候由於Producer處於等待被調度狀態,所以TaskQueue中的id一直都是個固定值,所以這個時候Consumer獲取到的一直都是Producer在被jvm設置為等待狀態那一刻的值,運行一段時間,Producer又被jvm調度器調度,獲取運行資格,這個時候id繼續從上次暫定時的值開始累加,依次循環,然後就得到了上面的運行結果
方案2
這個方案裏我們要對方案1做一些改造,當有Producer生產出一個id時,直到有ConSumer來將他拿走,然後再生產下一個id,Consumer也是類似,直到Producer生產出id後才來取,否則一直等待下去
解決:
Object類裏有個wait()方法和notify()/notifyAll(),一直很神秘,先前沒怎麽用過,看了一下原來這個東東是與線程同步操作密接相關的,
比如我們在應用中如果要對某一個方法或某個代碼段做線程同步控制,那麽需要為這個方法添加synchronized修飾或者是synchronized(obj){},這個obj可以理解成我們實際生活中房間的一把鎖,默認情況下,當一個線程擁有了一個方法或代碼段的鎖後,就可以進入房間(方法或代碼塊)任意幹壞事,而其他的線程只能在門外等待直到當前線程執行完畢打開房間(釋放鎖),而Object的wait和notify則是給這個鎖提供了一些更先進的功能,這個鎖可以自己開鎖wait()(讓同步方法或代碼塊暫時放棄占用的鎖),進而讓別的線程有機會進入運行,而當實際成熟時(滿足運行的條件)又可以把自身鎖住notify(),進而又繼續進入上次被暫停的操作
改造後的TaskQueue2
[java] view plain copy print?
- package com.crazycoder2010.thread;
- public class TaskQuery2 {
- private int id;
- private boolean valueSet;
- public synchronized int get(){
- if(!valueSet){
- try {
- wait();
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- }
- valueSet = false;
- notify();
- System.out.println("Got:"+this.id);
- return this.id;
- }
- public synchronized void put(int id){
- if(valueSet){
- try {
- wait();
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- }
- this.id = id;
- valueSet = true;
- System.out.println("Put:"+id);
- notify();
- }
- }
Producer,Consumer和Launcher類的代碼不改動
運行結果如下
Put:0
Got:0
Put:1
Got:1
Put:2
Got:2
Put:3
Got:3
Put:4
Got:4
Put:5
Got:5
Put:6
Got:6
基於線程實現的生產者消費者模型(Object.wait(),Object.notify()方法)