1. 程式人生 > >多線程生產者/消費者模式實現

多線程生產者/消費者模式實現

als true not eat bre creat 測試類 void group

參考書籍《java多線程編程核心技術》

都是基於wait/notify實現的

一個生產者和一個消費者:操作值

1 package com.qf.test10.pojo;
2 
3 /**
4  * @author qf
5  * @create 2018-09-18 15:59
6  */
7 public class Entity {
8     public static String value = "";
9 }
 1 package com.qf.test10;
 2 
 3 import com.qf.test10.pojo.Entity;
 4 
 5
/** 6 * @author qf 7 * @create 2018-09-18 15:52 8 * 生產者類 9 */ 10 public class Producer { 11 private String lock; 12 13 public Producer(String lock) { 14 this.lock = lock; 15 } 16 17 public void setValue(){ 18 try { 19 synchronized (lock){
20 if(!Entity.value.equals("")){ 21 lock.wait(); 22 } 23 String value = System.currentTimeMillis()+"_"+System.nanoTime(); 24 System.out.println("set的值是"+value); 25 Entity.value = value; 26 lock.notify();
27 } 28 } catch (InterruptedException e) { 29 e.printStackTrace(); 30 } 31 } 32 }
 1 package com.qf.test10;
 2 
 3 import com.qf.test10.pojo.Entity;
 4 
 5 /**
 6  * @author qf
 7  * @create 2018-09-18 15:52
 8  * 消費者類
 9  */
10 public class Consumer {
11     private String lock;
12 
13     public Consumer(String lock) {
14         this.lock = lock;
15     }
16 
17     public void getValue(){
18         try {
19             synchronized (lock){
20                 if(Entity.value.equals("")){
21                     lock.wait();
22                 }
23                 System.out.println("get的值"+Entity.value);
24                 Entity.value = "";
25                 lock.notify();
26             }
27         } catch (InterruptedException e) {
28             e.printStackTrace();
29         }
30     }
31 }

線程類

 1 package com.qf.test10.thread;
 2 
 3 import com.qf.test10.Producer;
 4 
 5 /**
 6  * @author qf
 7  * @create 2018-09-18 16:08
 8  */
 9 public class ThreadP extends Thread {
10     private Producer producer;
11 
12     public ThreadP(Producer producer) {
13         this.producer = producer;
14     }
15 
16     @Override
17     public void run() {
18         while(true) {
19             producer.setValue();
20         }
21     }
22 }
 1 package com.qf.test10.thread;
 2 
 3 import com.qf.test10.Consumer;
 4 
 5 /**
 6  * @author qf
 7  * @create 2018-09-18 16:11
 8  */
 9 public class ThreadC extends Thread {
10     private Consumer consumer;
11 
12     public ThreadC(Consumer consumer) {
13         this.consumer = consumer;
14     }
15 
16     @Override
17     public void run() {
18         while (true) {
19             consumer.getValue();
20         }
21     }
22 }

測試運行

 1 package com.qf.test10;
 2 
 3 import com.qf.test10.thread.ThreadC;
 4 import com.qf.test10.thread.ThreadP;
 5 
 6 /**
 7  * @author qf
 8  * @create 2018-09-18 16:12
 9  */
10 public class Run {
11     public static void main(String[] args) {
12         String lock = new String("");
13         Producer p = new Producer(lock);
14         Consumer c = new Consumer(lock);
15         ThreadP tp = new ThreadP(p);
16         tp.start();
17         ThreadC tc = new ThreadC(c);
18         tc.start();
19     }
20 }

打印輸出

set的值是1537259244097_800479975994656
get的值1537259244097_800479975994656
set的值是1537259244097_800479976020503
get的值1537259244097_800479976020503
set的值是1537259244097_800479976042246
get的值1537259244097_800479976042246
set的值是1537259244097_800479976062349
get的值1537259244097_800479976062349
set的值是1537259244097_800479976083272
get的值1537259244097_800479976083272
set的值是1537259244097_800479976103785
get的值1537259244097_800479976103785
set的值是1537259244097_800479976124298
get的值1537259244097_800479976124298
set的值是1537259244097_800479976144400
get的值1537259244097_800479976144400
.............

如果以此為基礎,設計多個生產者和多個消費者,那麽運行過程中很可能會發生假死的情況,也就是所有線程都呈現等待的狀態

多個生產者與多個消費者:操作值

修改Producer.java,Consumer.java以及測試類

 1 package com.qf.test10;
 2 
 3 import com.qf.test10.pojo.Entity;
 4 
 5 /**
 6  * @author qf
 7  * @create 2018-09-18 15:52
 8  * 生產者類
 9  */
10 public class Producer {
11     private String lock;
12 
13     public Producer(String lock) {
14         this.lock = lock;
15     }
16 
17     public void setValue(){
18         try {
19             synchronized (lock){
20                 while (!Entity.value.equals("")){
21                     System.out.println("生產者 "+Thread.currentThread().getName()+" WAITING了★");
22                     lock.wait();
23                 }
24                 System.out.println("生產者 "+Thread.currentThread().getName()+" RUNNABLE了");
25                 String value = System.currentTimeMillis()+"_"+System.nanoTime();
26                 //System.out.println("set的值是"+value);
27                 Entity.value = value;
28                 lock.notify();
29             }
30         } catch (InterruptedException e) {
31             e.printStackTrace();
32         }
33     }
34 }
 1 package com.qf.test10;
 2 
 3 import com.qf.test10.pojo.Entity;
 4 
 5 /**
 6  * @author qf
 7  * @create 2018-09-18 15:52
 8  * 消費者類
 9  */
10 public class Consumer {
11     private String lock;
12 
13     public Consumer(String lock) {
14         this.lock = lock;
15     }
16 
17     public void getValue(){
18         try {
19             synchronized (lock){
20                 if(Entity.value.equals("")){
21                     System.out.println("消費者 "+Thread.currentThread().getName()+" WAITING了☆");
22                     lock.wait();
23                 }
24                 System.out.println("消費者 "+Thread.currentThread().getName()+" RUNNABLE了");
25                 //System.out.println("get的值"+Entity.value);
26                 Entity.value = "";
27                 lock.notify();
28             }
29         } catch (InterruptedException e) {
30             e.printStackTrace();
31         }
32     }
33 }
 1 package com.qf.test10;
 2 
 3 import com.qf.test10.thread.ThreadC;
 4 import com.qf.test10.thread.ThreadP;
 5 
 6 /**
 7  * @author qf
 8  * @create 2018-09-18 16:12
 9  */
10 public class Run {
11     public static void main(String[] args) throws InterruptedException {
12         String lock = new String("");
13         Producer p = new Producer(lock);
14         Consumer c = new Consumer(lock);
15         /*ThreadP tp = new ThreadP(p);
16         tp.start();
17         ThreadC tc = new ThreadC(c);
18         tc.start();*/
19         ThreadP[] threadPS = new ThreadP[2];
20         ThreadC[] threadCS = new ThreadC[2];
21         for (int i = 0; i < 2; i++) {
22             threadPS[i] = new ThreadP(p);
23             threadPS[i].setName("生產者"+(i+1));
24             threadPS[i].start();
25             threadCS[i] = new ThreadC(c);
26             threadCS[i].setName("消費者"+(i+1));
27             threadCS[i].start();
28         }
29 
30         Thread.sleep(5000);
31         Thread[] threads = new Thread[Thread.currentThread().getThreadGroup().activeCount()];
32         Thread.currentThread().getThreadGroup().enumerate(threads);
33         for (int i = 0; i < threads.length; i++) {
34             System.out.println(threads[i].getName()+" "+threads[i].getState());
35         }
36     }
37 }

打印結果

生產者 生產者1 RUNNABLE了
生產者 生產者1 WAITING了★
生產者 生產者2 WAITING了★
消費者 消費者1 RUNNABLE了
消費者 消費者1 WAITING了☆
生產者 生產者1 RUNNABLE了
生產者 生產者1 WAITING了★
生產者 生產者2 WAITING了★
消費者 消費者2 RUNNABLE了
消費者 消費者2 WAITING了☆
消費者 消費者1 RUNNABLE了
消費者 消費者1 WAITING了☆
生產者 生產者1 RUNNABLE了
生產者 生產者1 WAITING了★
生產者 生產者2 WAITING了★
main RUNNABLE
Monitor Ctrl-Break RUNNABLE
生產者1 WAITING
消費者1 WAITING
生產者2 WAITING
消費者2 WAITING

主要原因是因為notify可能喚醒的是同類(生產者喚醒生產者,消費者喚醒消費者)。最終導致所有線程都處於WAITING狀態,程序進而呈現假死狀態

只要將Producer和Consumer中的notify修改為notifyAll即可,這樣就不至於出現假死狀態

一個生產者和一個消費者:操作棧

 1 package com.qf.test11.pojo;
 2 
 3 import java.util.ArrayList;
 4 import java.util.List;
 5 
 6 /**
 7  * @author qf
 8  * @create 2018-09-18 17:14
 9  */
10 public class MyStack {
11     private List list = new ArrayList();
12     synchronized public void push(){
13         try {
14             if (list.size() == 1){
15                 this.wait();
16             }
17             list.add("test"+Math.random());
18             this.notify();
19             System.out.println("push = "+list.size());
20         } catch (InterruptedException e) {
21             e.printStackTrace();
22         }
23     }
24     public synchronized void pop(){
25         try {
26             if(list.size() == 0){
27                 //System.out.println("pop操作: "+Thread.currentThread().getName()+"線程wait狀態");
28                 this.wait();
29             }
30             System.out.println("pop操作: "+Thread.currentThread().getName()+"線程,獲取值="+list.get(0));
31             list.remove(0);
32             this.notify();
33             System.out.println("pop = "+list.size());
34         } catch (InterruptedException e) {
35             e.printStackTrace();
36         }
37     }
38 }

生產者/消費者

 1 package com.qf.test11;
 2 
 3 import com.qf.test11.pojo.MyStack;
 4 
 5 /**
 6  * @author qf
 7  * @create 2018-09-18 17:13
 8  * 生產者
 9  */
10 public class Producer {
11     private MyStack myStack;
12 
13     public Producer(MyStack myStack) {
14         this.myStack = myStack;
15     }
16 
17     public void pushService(){
18         myStack.push();
19     }
20 }
 1 package com.qf.test11;
 2 
 3 import com.qf.test11.pojo.MyStack;
 4 
 5 /**
 6  * @author qf
 7  * @create 2018-09-18 17:14
 8  */
 9 public class Consumer {
10     private MyStack myStack;
11 
12     public Consumer(MyStack myStack) {
13         this.myStack = myStack;
14     }
15     public void popService(){
16         myStack.pop();
17     }
18 }

線程類

 1 package com.qf.test11.thread;
 2 
 3 import com.qf.test11.Producer;
 4 
 5 /**
 6  * @author qf
 7  * @create 2018-09-18 17:13
 8  */
 9 public class ThreadP extends Thread {
10     private Producer producer;
11 
12     public ThreadP(Producer producer) {
13         this.producer = producer;
14     }
15 
16     @Override
17     public void run() {
18         while (true){
19             producer.pushService();
20         }
21     }
22 }
 1 package com.qf.test11.thread;
 2 
 3 import com.qf.test11.Consumer;
 4 
 5 /**
 6  * @author qf
 7  * @create 2018-09-18 17:14
 8  */
 9 public class ThreadC extends Thread {
10     private Consumer consumer;
11 
12     public ThreadC(Consumer consumer) {
13         this.consumer = consumer;
14     }
15 
16     @Override
17     public void run() {
18         while (true){
19             consumer.popService();
20         }
21     }
22 }

測試運行

 1 package com.qf.test11;
 2 
 3 import com.qf.test11.pojo.MyStack;
 4 import com.qf.test11.thread.ThreadC;
 5 import com.qf.test11.thread.ThreadP;
 6 
 7 /**
 8  * @author qf
 9  * @create 2018-09-18 17:34
10  */
11 public class Run {
12     public static void main(String[] args) {
13         MyStack myStack = new MyStack();
14         Producer p = new Producer(myStack);
15         Consumer c = new Consumer(myStack);
16         ThreadP tp = new ThreadP(p);
17         ThreadC tc = new ThreadC(c);
18         tp.setName("tp");
19         tc.setName("tc");
20         tp.start();
21         tc.start();
22     }
23 }

打印結果

push = 1
pop操作: tc線程,獲取值=test0.8957260024057878
pop = 0
push = 1
pop操作: tc線程,獲取值=test0.9236606274738514
pop = 0
push = 1
pop操作: tc線程,獲取值=test0.7661156573296891
pop = 0
push = 1
pop操作: tc線程,獲取值=test0.6523634151650343
pop = 0
push = 1
pop操作: tc線程,獲取值=test0.08728918553111287
pop = 0
push = 1
pop操作: tc線程,獲取值=test0.472483808512989
pop = 0
push = 1
pop操作: tc線程,獲取值=test0.17456918848050884
pop = 0
push = 1
pop操作: tc線程,獲取值=test0.1785536700399648
pop = 0
............

多線程生產者/消費者模式實現