Java生產者消費者的三種實現
阿新 • • 發佈:2018-11-27
Java生產者消費者是最基礎的執行緒同步問題,java崗面試中還是很容易遇到的,之前沒寫過多執行緒的程式碼,面試中被問到很尬啊,面完回來惡補下。在網上查到大概有5種生產者消費者的寫法,分別如下。
- 用synchronized對儲存加鎖,然後用object原生的wait() 和 notify()做同步。
- 用concurrent.locks.Lock,然後用condition的await() 和signal()做同步。
- 直接使用concurrent.BlockingQueue。
- 使用PipedInputStream/PipedOutputStream。
- 使用訊號量semaphore。
我的理解,生產者消費者模式,其實只要保證在儲存端同一時刻只有一個執行緒讀或寫就不會有問題,然後再去考慮執行緒同步。方法1 2 5都比較類似,都是加鎖來限制同一時刻只能有一個讀或寫。而方法3 4其實是在儲存內部去保證讀和寫的唯一的,最低層肯定還是通過鎖機制來實現的,java底層程式碼都封裝好了而已。
我自己嘗試寫了下前三種,程式碼如下:
synchronized版本
import java.util.LinkedList;
import java.util.Queue;
public class ProducerAndConsumer {
private final int MAX_LEN = 10;
private Queue<Integer> queue = new LinkedList<Integer>();
class Producer extends Thread {
@Override
public void run() {
producer();
}
private void producer() {
while(true) {
synchronized (queue) {
while (queue.size() == MAX_LEN) {
queue.notify();
System.out.println("當前佇列滿");
try {
queue.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
queue.add(1);
queue.notify();
System.out.println("生產者生產一條任務,當前佇列長度為" + queue.size());
try {
Thread.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
}
class Consumer extends Thread {
@Override
public void run() {
consumer();
}
private void consumer() {
while (true) {
synchronized (queue) {
while (queue.size() == 0) {
queue.notify();
System.out.println("當前佇列為空");
try {
queue.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
queue.poll();
queue.notify();
System.out.println("消費者消費一條任務,當前佇列長度為" + queue.size());
try {
Thread.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
}
public static void main(String[] args) {
ProducerAndConsumer pc = new ProducerAndConsumer();
Producer producer = pc.new Producer();
Consumer consumer = pc.new Consumer();
producer.start();
consumer.start();
}
}
lock版實現,使用了condition做執行緒之間的同步。
import java.util.LinkedList;
import java.util.Queue;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
/**
* version 1 doesn't use synchronized to improve performance
*/
public class ProducerAndConsumer1 {
private final int MAX_LEN = 10;
private Queue<Integer> queue = new LinkedList<Integer>();
private final Lock lock = new ReentrantLock();
private final Condition condition = lock.newCondition();
class Producer extends Thread {
@Override
public void run() {
producer();
}
private void producer() {
while(true) {
lock.lock();
try {
while (queue.size() == MAX_LEN) {
System.out.println("當前佇列滿");
try {
condition.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
queue.add(1);
condition.signal();
System.out.println("生產者生產一條任務,當前佇列長度為" + queue.size());
try {
Thread.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}
} finally {
lock.unlock();
}
}
}
}
class Consumer extends Thread {
@Override
public void run() {
consumer();
}
private void consumer() {
while (true) {
lock.lock();
try {
while (queue.size() == 0) {
System.out.println("當前佇列為空");
try {
condition.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
queue.poll();
condition.signal();
System.out.println("消費者消費一條任務,當前佇列長度為" + queue.size());
try {
Thread.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}
} finally {
lock.unlock();
}
}
}
}
public static void main(String[] args) {
ProducerAndConsumer pc = new ProducerAndConsumer();
Producer producer = pc.new Producer();
Consumer consumer = pc.new Consumer();
producer.start();
consumer.start();
}
}
BlockingQueue版實現
import java.util.Random;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
public class ProducerAndConsumer {
private BlockingQueue<Integer> queue = new LinkedBlockingQueue<Integer>(10);
class Producer extends Thread {
@Override
public void run() {
producer();
}
private void producer() {
while(true) {
try {
queue.put(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("生產者生產一條任務,當前佇列長度為" + queue.size());
try {
Thread.sleep(new Random().nextInt(1000)+500);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
class Consumer extends Thread {
@Override
public void run() {
consumer();
}
private void consumer() {
while (true) {
try {
queue.take();
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("消費者消費一條任務,當前佇列長度為" + queue.size());
try {
Thread.sleep(new Random().nextInt(1000)+500);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
public static void main(String[] args) {
ProducerAndConsumer pc = new ProducerAndConsumer();
Producer producer = pc.new Producer();
Consumer consumer = pc.new Consumer();
producer.start();
consumer.start();
}
}