Java多執行緒-生產者消費者模式
阿新 • • 發佈:2018-12-12
問題描述
有一群生產者程序在生產產品,並將這些產品提供給消費者程序去消費。為使生產者程序與消費者程序能併發執行,在兩者之間設定了一個具有n個緩衝區的緩衝池,生產者程序將它所產生的產品放入緩衝區中,消費者程序可以從一個緩衝區中取走產品去消費。儘管所有的生產者和消費者程序都是以非同步的方式執行的,但他們之間必須保持同步,即不允許消費者程序到一個空的緩衝區中去取產品,也不允許生產者程序向一個已裝滿產品且尚未被取走的緩衝區投放產品。
程式碼
package time20180914; import java.util.ArrayList; public class Two { public static void main(String[] args) { //緩衝池 ArrayList<Integer> a = new ArrayList<Integer>(5); //啟動生產者 new Thread(new producter(a)).start() //啟動消費者; new Thread(new Consumer(a)).start(); } } class producter implements Runnable { ArrayList<Integer> a; public producter(ArrayList<Integer> a) { this.a = a; } @Override public void run() { while (true) { try { Thread.sleep(500); } catch (InterruptedException e) { e.printStackTrace(); } //對緩衝池加鎖 synchronized (a) { //進行判斷 if(a.size()<5){ a.add(1); System.out.println("正在生產,當前剩餘"+a.size()); //當生產過後要通知一下消費者可以取了 a.notifyAll(); }else { System.out.println("產量太多進入等待"); try { //當滿了以後要注意等待 a.wait(); } catch (InterruptedException e) { e.printStackTrace(); } } } } } } class Consumer implements Runnable { ArrayList<Integer> a; public Consumer(ArrayList<Integer> a) { this.a = a; } @Override public void run() { while (true) { try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } synchronized (a) { if(a.size()>0){ a.remove(0); System.out.println("消費了,當前剩餘"+a.size()); //通知生產者生產 a.notifyAll(); }else { System.out.println("產量不足,進入等待"); try { //當緩衝池空的時候要等待 a.wait(); } catch (InterruptedException e) { e.printStackTrace(); } } } } } }
第二種實現方式
直接通過一個資源池來進行資源的調配
package time20180914; //wait 和 notify public class ProducerConsumerWithWaitNofity { public static void main(String[] args) { Resource resource = new Resource(); //生產者執行緒 ProducerThread p1 = new ProducerThread(resource); // ProducerThread p2 = new ProducerThread(resource); // ProducerThread p3 = new ProducerThread(resource); //消費者執行緒 ConsumerThread c1 = new ConsumerThread(resource); //ConsumerThread c2 = new ConsumerThread(resource); //ConsumerThread c3 = new ConsumerThread(resource); p1.start(); // p2.start(); // p3.start(); c1.start(); //c2.start(); //c3.start(); } } /** * 公共資源類 * @author * */ class Resource{//重要 //當前資源數量 private int num = 0; //資源池中允許存放的資源數目 private int size = 10; /** * 從資源池中取走資源 */ public synchronized void remove(){ if(num > 0){ num--; System.out.println("消費者" + Thread.currentThread().getName() + "消耗一件資源," + "當前執行緒池有" + num + "個"); notifyAll();//通知生產者生產資源 }else{ try { //如果沒有資源,則消費者進入等待狀態 wait(); System.out.println("消費者" + Thread.currentThread().getName() + "執行緒進入等待狀態"); } catch (InterruptedException e) { e.printStackTrace(); } } } /** * 向資源池中新增資源 */ public synchronized void add(){ if(num < size){ num++; System.out.println(Thread.currentThread().getName() + "生產一件資源,當前資源池有" + num + "個"); //通知等待的消費者 notifyAll(); }else{ //如果當前資源池中有10件資源 try{ wait();//生產者進入等待狀態,並釋放鎖 System.out.println(Thread.currentThread().getName()+"執行緒進入等待"); }catch(InterruptedException e){ e.printStackTrace(); } } } } /** * 消費者執行緒 */ class ConsumerThread extends Thread{ private Resource resource; public ConsumerThread(Resource resource){ this.resource = resource; } @Override public void run() { while(true){ try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } resource.remove(); } } } /** * 生產者執行緒 */ class ProducerThread extends Thread{ private Resource resource; public ProducerThread(Resource resource){ this.resource = resource; } @Override public void run() { //不斷地生產資源 while(true){ try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } resource.add(); } } }