1. 程式人生 > >java實現生產者-消費者模型

java實現生產者-消費者模型

1.方式一:使用synchronize以及wait()、notify() /notifyAll() 的配合使用

package producercomsumer;

/**
 * 描述:
 * 
 * @author: fangchangtan
 * @version 建立時間:2019年1月7日 下午7:19:26
 */
public class ChangtanTest {

	public static void main(String[] args) {
		String lock = "lock";
		for (int i = 0; i < 3; i++) {
			Thread thread1 =
new Consumer(lock); thread1.start(); } for (int i = 0; i < 3; i++) { Thread thread2 = new Producer(lock); thread2.start(); } } } class Consumer extends Thread { private String lock; public Consumer(String lock) { this.lock = lock; } public void getValue() { try { synchronized
(lock) { if (ValueObject.value == null) { lock.wait(); } System.out.println("Consumer:" + ValueObject.value); ValueObject.value = null; lock.notifyAll(); } } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } } @Override
public void run() { while (true) { getValue(); try { Thread.sleep(1000); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } } } } class Producer extends Thread { private String lock; public Producer(String lock) { this.lock = lock; } public void setValue() { try { synchronized (lock) { if (ValueObject.value != null) { lock.wait(); } ValueObject.value = System.currentTimeMillis() + ""; System.out.println("Producer:" + ValueObject.value); lock.notifyAll(); } } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } } @Override public void run() { while (true) { setValue(); try { Thread.sleep(1000); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } } } } class ValueObject { public static String value; }

方式二、使用lock鎖、配合await() 、signal() 方式

package producercomsumer;

import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;

/**
 * 描述:
 * 
 * @author: fangchangtan
 * @version 建立時間:2019年1月7日 下午7:30:26
 */
public class TankTest {

	public static void main(String[] args) {
		ReentrantLock lock = new ReentrantLock();
		Condition condition = lock.newCondition();
		Thread thread1 = new Consumer1(lock, condition);
		Thread thread2 = new Producer1(lock, condition);
		thread1.start();
		thread2.start();
	}

}

class Consumer1 extends Thread {

	private ReentrantLock lock;
	private Condition condition;

	public Consumer1(ReentrantLock lock, Condition condition) {
		this.lock = lock;
		this.condition = condition;
	}

	public void getValue() {
		try {
			lock.lock();
			if (ValueObject.value == null) {
				condition.await();
			}
			System.out.println("Consumer:" + ValueObject.value);
			ValueObject.value = null;
			condition.signal();
		} catch (InterruptedException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		} finally {
			lock.unlock();
		}
	}

	@Override
	public void run() {
		while (true) {
			getValue();
			try {
				Thread.sleep(1000);
			} catch (InterruptedException e) {
				// TODO Auto-generated catch block
				e.printStackTrace();
			}
		}
	}
}

class Producer1 extends Thread {

	private ReentrantLock lock;
	private Condition condition;

	public Producer1(ReentrantLock lock, Condition condition) {
		this.lock = lock;
		this.condition = condition;
	}

	public void setValue() {
		try {
			lock.lock();
			if (ValueObject.value != null) {
				condition.await();
			}
			ValueObject.value = System.currentTimeMillis() + "";
			System.out.println("Producer:" + ValueObject.value);
			condition.signal();
		} catch (InterruptedException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		} finally {
			lock.unlock();
		}
	}

	@Override
	public void run() {
		while (true) {
			setValue();
			try {
				Thread.sleep(1000);
			} catch (InterruptedException e) {
				// TODO Auto-generated catch block
				e.printStackTrace();
			}
		}
	}
}

方式三:使用java自身提供的同步阻塞佇列

略…(晚上有示例,自身參考吧)


自身在使用中遇到的問題:
1.類檔案MyMain.java.。主函式實現多執行緒生產和消費

package producercomsumer;
/** 
* 描述:
* @author: fangchangtan
* @version 建立時間:2019年1月7日 下午6:57:48 
*/
public class MyMain {

	private static String lock = "lock";
//	private static String value = null;
	
	public static void main(String[] args) {
		ProducerX producerX = new ProducerX(lock);
		ComsumerX comsumerX = new ComsumerX(lock);
		producerX.start();
		comsumerX.start();
	}
}
class ValueObject {
	public static String value;
}

生產者ProducerX :

package producercomsumer;

import java.util.Random;

/**
 * 描述:
 * 
 * @author: fangchangtan
 * @version 建立時間:2019年1月7日 下午6:56:16
 */
public class ProducerX extends Thread{

	private String lock;
	private Random random = new Random();
	public ProducerX(String lock) {
		this.lock = lock;
	}

	@Override
	public void run() {
		while (true) {
			try {
				synchronized (lock) {
					if (ValueObject.value != null) {
						lock.wait();
					}
					ValueObject.value = String.valueOf(random.nextInt(100));
					System.out.println("ProducerX: " + ValueObject.value);
					lock.notify();
				}
			} catch (InterruptedException e) {
				e.printStackTrace();
			}

		}

	}

}

消費者ComsumerX:

package producercomsumer;

import com.sun.org.apache.xalan.internal.lib.ExsltBase;

/**
 * 描述:
 * 
 * @author: fangchangtan
 * @version 建立時間:2019年1月7日 下午6:57:17
 */
public class ComsumerX extends Thread{

	private String lock;
	
	public ComsumerX(String lock) {
		this.lock = lock;
	}

	@Override
	public void run() {
		while (true) {
			try {
				synchronized (lock) {
					if (ValueObject.value == null) {
						lock.wait();
					}
					System.out.println("ComsumerX: " + ValueObject.value);
					ValueObject.value = null;
//					System.out.println("ComsumerX: " + str);
					lock.notify();
				}
			} catch (InterruptedException e) {
				// TODO Auto-generated catch block
				e.printStackTrace();
			}

		}

	}

}

(以上的程式碼是沒問題的,但是自身在之前的實現過程中出現很多異常情況)
自身在使用存在的問題:
1.自身使用字串變數String value;來儲存生產的訊息物件,導致的結果是,生產者生產完之後無法喚醒消費者消費資料。是因為,消費者和生產者操作的不是同一個訊息容器或者物件,即:String型別的物件是通過值來傳遞的,因此做不到多執行緒共享同一個物件,此處後期改為: ValueObject.value來作為訊息快取的容器,可以正常的生產和消費。