Java併發程式設計系列之十四:阻塞佇列
阻塞佇列(BlockingQueue)是一個支援兩個附加操作的佇列。這兩個附加操作支援阻塞地插入和移除方法。支援阻塞插入的方法是指當佇列滿時會阻塞插入元素的執行緒,直到佇列不滿;支援阻塞移除的方法是指當佇列為空時獲取元素的執行緒無法繼續獲取元素直到佇列不空。
可以發現阻塞佇列非常適合消費者和生產者場景下進行使用,生產者生產資料就是向阻塞佇列中插入元素,消費者消費資料就是從阻塞佇列中移除元素。
Java提供了阻塞佇列支援如下方法:
插入方法:add(e)(新增失敗會丟擲異常)、offer(e)(新增失敗返回特殊值)、put(e)(新增失敗會一直阻塞)
移除方法:remove(e)(移除失敗會丟擲異常)、poll(e)(移除失敗會返回特殊值)、take(e)(移除失敗會一直阻塞)
在Java中提供了無界佇列,這種情況下佇列不可能出現滿的情況(除非發生記憶體溢位),所以使用put和take方法永遠不會被阻塞,offer返回的永遠是true。
在Java中提供了7種阻塞佇列,使用較多有四種:ArrayBlockingQueue、LinkedBlockingQueue、PriorityBlockingQueue和DelayQueue。ArrayBlockingQueue是一個由陣列結構組成的有界阻塞佇列,LinkedBlockingQueue是一個由連結串列結構組成的有界阻塞佇列,PriorityBlockingQueue是一個支援優先順序排序的無界阻塞佇列,DelayQueue是一個使用優先順序佇列實現的支援延時獲取元素的無界阻塞佇列。DelayQueue適用於快取系統的設計以及定時任務排程等場景。
那麼阻塞佇列是如何實現執行緒的同步的呢?使用通知模式實現。
通知模式是指當生產者往滿的佇列新增元素的時候會阻塞生產者,當消費者消費了一個佇列中的元素後,會通知生產者當前佇列已經不滿了,這時生產者可以繼續往佇列中新增元素。就ArrayBlockingQueue而言,是使用Condition條件變數實現通知模式的。
public ArrayBlockingQueue(int capacity, boolean fair) {
//省略部分程式碼
notEmpty = lock.newCondition();
notFull = lock.newCondition();
}
//新增元素的方法
public void put(E e) throws InterruptedException {
checkNotNull(e);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (count == items.length)
notFull.await();
//如果佇列不滿就入隊
enqueue(e);
} finally {
lock.unlock();
}
}
//入隊的方法
private void enqueue(E x) {
final Object[] items = this.items;
items[putIndex] = x;
if (++putIndex == items.length)
putIndex = 0;
count++;
notEmpty.signal();
}
//移除元素的方法
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (count == 0)
notEmpty.await();
return dequeue();
} finally {
lock.unlock();
}
}
//出隊的方法
private E dequeue() {
final Object[] items = this.items;
@SuppressWarnings("unchecked")
E x = (E) items[takeIndex];
items[takeIndex] = null;
if (++takeIndex == items.length)
takeIndex = 0;
count--;
if (itrs != null)
itrs.elementDequeued();
notFull.signal();
return x;
}
從原始碼可以看出,阻塞佇列的實現仍然是使用了經典的等待/通知模式實現的。使用阻塞佇列的好處在於使用者不用關心什麼時候等待,什麼時候進行通知,什麼時候新增元素什麼時候取元素都由使用者實現,讓使用者可以更多關注業務的實現。那麼對於上一篇文章提到的生產者消費者模式,如何使用阻塞佇列實現呢?
下面程式碼演示了使用阻塞佇列實現生產者消費者模式:
package com.rhwayfun.concurrency;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
/**
* Created by rhwayfun on 16-4-4.
*/
public class ProducerConsumerModeWithBlockQueueTest {
static class Info{
//內容
private String content;
public Info(String content) {
this.content = content;
}
public String getContent() {
return content;
}
public void setContent(String content) {
this.content = content;
}
@Override
public String toString() {
return this.getContent();
}
}
static class Producer implements Runnable{
private final BlockingQueue<Info> blockingQueue;
public Producer(BlockingQueue<Info> blockingQueue) {
this.blockingQueue = blockingQueue;
}
public void run() {
boolean flag = true;
for (int i = 0; i < 5; i++){
if (flag){
try {
blockingQueue.put(new Info("contentA"));
System.out.println("[生產者]:contentA");
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
flag = false;
}else {
try {
blockingQueue.put(new Info("contentB"));
System.out.println("[生產者]:contentB");
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
flag = true;
}
}
}
}
static class Consumer implements Runnable{
private final BlockingQueue<Info> blockingQueue;
public Consumer(BlockingQueue<Info> blockingQueue) {
this.blockingQueue = blockingQueue;
}
public void run() {
while (true){
try {
System.out.println("[消費者]:" + blockingQueue.take());
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
public static void main(String[] args){
BlockingQueue<Info> blockingQueue = new LinkedBlockingQueue<Info>();
new Thread(new Producer(blockingQueue)).start();
new Thread(new Consumer(blockingQueue)).start();
}
}
可以發現,相比之前使用等待/通知模式實現的生產者消費者模式,使用阻塞佇列實現的程式碼更加簡潔,Info類無需新增任何同步方法,程式的可擴充套件性提高了提高,耦合度也降低了。