1. 程式人生 > >生產者消費者三種併發模式實現方法

生產者消費者三種併發模式實現方法

package cn.luxh.app.test;

import java.util.LinkedList;
import java.util.List;
import java.util.Random;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;


/** * Lock實現的生產者和消費者 * */ public class ProducerCustomerWithLock { Executor pool = Executors.newFixedThreadPool(10); //倉庫 private List<String> storageList = new LinkedList<String>(); //倉庫容量 private int MAX_SIZE = 3; //倉庫為空 private int ZERO = 0;
//獲取鎖物件 private Lock lock = new ReentrantLock(); //倉庫滿了,繫結生產者執行緒 private Condition full = lock.newCondition(); //倉庫為空,繫結消費者執行緒 private Condition empty = lock.newCondition(); //生產者執行緒 private class Producer implements Runnable{ //生產方法,需同步 private
void produce(){ if(lock.tryLock()) { System.out.println(Thread.currentThread().getName()+"進入倉庫,準備生產!"); try { if(storageList.size()==MAX_SIZE) { System.out.println("倉庫已滿!等待消費者消費"); Thread.sleep(1000); full.await();//生產者執行緒加入執行緒等待池 } if(storageList.size()<MAX_SIZE){ String name = "產品"+new Random().nextInt(); storageList.add(name); System.out.println(Thread.currentThread().getName()+"往倉庫中生產了一個產品!"); } Thread.sleep(1000); empty.signalAll();//喚醒消費者執行緒 }catch(InterruptedException ie) { System.out.println("中斷異常"); ie.printStackTrace(); }finally{ lock.unlock(); } } } @Override public void run() { while(true) { produce(); } } } //消費者執行緒 private class Customer implements Runnable{ //消費方法,需同步 private void consume() { if(lock.tryLock()) { System.out.println(Thread.currentThread().getName()+"進入倉庫,準備消費!"); try { if(storageList.size()==ZERO) { System.out.println("倉庫已空!等待生產者生產"); Thread.sleep(1000); empty.await();//消費者執行緒加入執行緒等待池 } if(storageList.size()!=ZERO) { System.out.println(Thread.currentThread().getName()+"從倉庫取得產品:"+storageList.remove(0)); } Thread.sleep(1000); full.signalAll();//喚醒生產者執行緒 }catch(InterruptedException ie) { System.out.println("中斷異常"); ie.printStackTrace(); }finally{ lock.unlock(); } } } @Override public void run() { while(true) { consume(); } } } //啟動生產者和消費者執行緒 public void start() { for(int i=1;i<5;i++) { //new Thread(new Producer()).start(); //new Thread(new Customer()).start(); pool.execute(new Producer()); pool.execute(new Customer()); } } public static void main(String[] args) { ProducerCustomerWithLock pc = new ProducerCustomerWithLock(); pc.start(); } }
package cn.luxh.app.test;

import java.util.Random;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;


/**
 * BlockingQueue實現的生產者和消費者
 *
 */
public class ProducerCustomerWithBlockingQueue {
    
    Executor pool = Executors.newFixedThreadPool(10);
    
    //倉庫
    private BlockingQueue<String> storageQueue = new LinkedBlockingQueue<String>(5);
    
    //倉庫容量
    private int MAX_SIZE = 3;
    
    //倉庫為空
    private int ZERO = 0;
    
    
    
    //生產者執行緒
    private class Producer implements Runnable{
        
        //生產方法,需同步
        private void produce(){
            try {
                System.out.println(Thread.currentThread().getName()+"進入倉庫,準備生產!");
                if(storageQueue.size()==MAX_SIZE) {
                    System.out.println("倉庫已滿!等待消費者消費");
                    Thread.sleep(1000);
                }
                if(storageQueue.size()<=MAX_SIZE) {
                    String product = "產品"+new Random().nextInt();
                    storageQueue.put(product);
                    System.out.println(Thread.currentThread().getName()+"往倉庫中生產了一個產品!");
                }
                Thread.sleep(1000);
            }catch(InterruptedException ie) {
                System.out.println("中斷異常");
                ie.printStackTrace();
            }
        }

        @Override
        public void run() {
            while(true) {
                produce();
            }
        }
    }
    
    //消費者執行緒
    private class Customer implements Runnable{
        
        //消費方法,需同步
        private void consume() {
            try {
                System.out.println(Thread.currentThread().getName()+"進入倉庫,準備消費!");
                if(storageQueue.size()==ZERO) {
                    System.out.println("倉庫已空!等待生產者生產");
                    Thread.sleep(1000);
                }
                if(storageQueue.size()!=ZERO) {
                    System.out.println(Thread.currentThread().getName()+"從倉庫取得產品:"+storageQueue.take());
                }
                Thread.sleep(1000);
            }catch(InterruptedException ie) {
                System.out.println("中斷異常");
                ie.printStackTrace();
            }
        }

        @Override
        public void run() {
            while(true) {
                consume();
            }
        }
        
    }
    
    //啟動生產者和消費者執行緒
    public void start() {
        for(int i=1;i<5;i++) {
            //new Thread(new Producer()).start();
            ///new Thread(new Customer()).start();
            pool.execute(new Producer());
            pool.execute(new Customer());
        }
        
    }
    
    public static void main(String[] args) {
        ProducerCustomerWithBlockingQueue pc = new ProducerCustomerWithBlockingQueue();
        pc.start();
    }
}