1. 程式人生 > >多執行緒消費阻塞佇列(生產者消費者模型)

多執行緒消費阻塞佇列(生產者消費者模型)

一.幾種主要的阻塞佇列

ArrayBlockingQueue:基於陣列實現的一個阻塞佇列,在建立ArrayBlockingQueue物件時必須制定容量大小。並且可以指定公平性與非公平性,預設情況下為非公平的,即不保證等待時間最長的佇列最優先能夠訪問佇列。

LinkedBlockingQueue:基於連結串列實現的一個阻塞佇列,在建立LinkedBlockingQueue物件時如果不指定容量大小,則預設大小為Integer.MAX_VALUE。

PriorityBlockingQueue:以上2種佇列都是先進先出佇列,而PriorityBlockingQueue卻不是,它會按照元素的優先順序對元素進行排序,按照優先順序順序出隊,每次出隊的元素都是優先順序最高的元素。注意,此阻塞佇列為無界阻塞佇列,即容量沒有上限(通過原始碼就可以知道,它沒有容器滿的訊號標誌),前面2種都是有界佇列。

DelayQueue:基於PriorityQueue,一種延時阻塞佇列,DelayQueue中的元素只有當其指定的延遲時間到了,才能夠從佇列中獲取到該元素。DelayQueue也是一個無界佇列,因此往佇列中插入資料的操作(生產者)永遠不會被阻塞,而只有獲取資料的操作(消費者)才會被阻塞。

  • LinkedTransferQueue:一個由連結串列結構組成的無界阻塞佇列。
  • LinkedBlockingDeque:一個由連結串列結構組成的雙向阻塞佇列

阻塞佇列(BlockingQueue)是一個支援兩個附加操作的佇列。這兩個附加的操作是:在佇列為空時,獲取元素的執行緒會等待佇列變為非空。當佇列滿時,儲存元素的執行緒會等待佇列可用。阻塞佇列常用於生產者和消費者的場景,生產者是往佇列裡新增元素的執行緒,消費者是從佇列裡拿元素的執行緒。阻塞佇列就是生產者存放元素的容器,而消費者也只從容器裡拿元素。

阻塞佇列提供了四種處理方法:

方法\處理方式丟擲異常返回特殊值一直阻塞超時退出
插入方法add(e)offer(e)put(e)offer(e,time,unit)
移除方法remove()poll()take()poll(time,unit)
檢查方法element()peek()不可用不可用
  • 丟擲異常:是指當阻塞佇列滿時候,再往佇列裡插入元素,會丟擲IllegalStateException(“Queue full”)異常。當佇列為空時,從佇列裡獲取元素時會丟擲NoSuchElementException異常 。
  • 返回特殊值:插入方法會返回是否成功,成功則返回true。移除方法,則是從佇列裡拿出一個元素,如果沒有則返回null
  • 一直阻塞:當阻塞佇列滿時,如果生產者執行緒往佇列裡put元素,佇列會一直阻塞生產者執行緒,直到拿到資料,或者響應中斷退出。當佇列空時,消費者執行緒試圖從佇列裡take元素,佇列也會阻塞消費者執行緒,直到佇列可用。
  • 超時退出:當阻塞佇列滿時,佇列會阻塞生產者執行緒一段時間,如果超過一定的時間,生產者執行緒就會退出。

1、訊息生產者

package com.es.queue;

/**
 * Created by Administrator on 2018/7/1 0001.
 */
import java.util.Random;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

/**
 * 生產者執行緒
 *
 * @author jackyuj
 */
public class Producer implements Runnable {

    private volatile boolean  isRunning = true;//是否在執行標誌
    private BlockingQueue queue;//阻塞佇列
    private static AtomicInteger count = new AtomicInteger();//自動更新的值
    private static final int DEFAULT_RANGE_FOR_SLEEP = 1000;

    //建構函式
    public Producer(BlockingQueue queue) {
        this.queue = queue;
    }

    public void run() {
        String data = null;
        Random r = new Random();

        System.out.println("啟動生產者執行緒!");
        try {
            while (isRunning) {
                System.out.println("正在生產資料...");
                Thread.sleep(r.nextInt(DEFAULT_RANGE_FOR_SLEEP));//取0~DEFAULT_RANGE_FOR_SLEEP值的一個隨機數

                data = "data:" + count.incrementAndGet();//以原子方式將count當前值加1
                System.out.println("將資料:" + data + "放入佇列...");
                if (!queue.offer(data, 2, TimeUnit.SECONDS)) {//設定的等待時間為2s,如果超過2s還沒加進去返回true
                    System.out.println("放入資料失敗:" + data);
                }
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
            Thread.currentThread().interrupt();
        } finally {
            System.out.println("退出生產者執行緒!");
        }
    }

    public void stop() {
        isRunning = false;
    }
}

2、訊息消費者

package com.es.queue;

/**
 * Created by Administrator on 2018/7/1 0001.
 */
import java.util.Random;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;

/**
 * 消費者執行緒
 *
 * @author jackyuj
 */
public class Consumer implements Runnable {

    private BlockingQueue<String> queue;
    private static final int DEFAULT_RANGE_FOR_SLEEP = 1000;

    //建構函式
    public Consumer(BlockingQueue<String> queue) {
        this.queue = queue;
    }

    public void run() {
        System.out.println("啟動消費者執行緒!");
        Random r = new Random();
        boolean isRunning = true;
        try {
            while (isRunning) {
                System.out.println("正從佇列獲取資料...");
                String data = queue.poll(2, TimeUnit.SECONDS);//有資料時直接從佇列的隊首取走,無資料時阻塞,在2s內有資料,取走,超過2s還沒資料,返回失敗
                if (null != data) {
                    System.out.println("拿到資料:" + data);
                    System.out.println("正在消費資料:" + data);
                    Thread.sleep(r.nextInt(DEFAULT_RANGE_FOR_SLEEP));
                } else {
                    // 超過2s還沒資料,認為所有生產執行緒都已經退出,自動退出消費執行緒。
                    isRunning = false;
                }
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
            Thread.currentThread().interrupt();
        } finally {
            System.out.println("退出消費者執行緒!");
        }
    }


}

3、測試程式入口

package com.es.queue;

/**
 * Created by Administrator on 2018/7/1 0001.
 */
import com.es.queue.Consumer;
import com.es.queue.Producer;

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;

public class BlockingQueueTest {

    public static void main(String[] args) throws InterruptedException {
        // 宣告一個容量為10的快取佇列
        BlockingQueue<String> queue = new LinkedBlockingQueue<String>(10);

        //new了三個生產者和一個消費者
        Producer producer1 = new Producer(queue);
        Producer producer2 = new Producer(queue);
        Producer producer3 = new Producer(queue);
        Consumer consumer = new Consumer(queue);

        // 藉助Executors
        ExecutorService service = Executors.newCachedThreadPool();
        // 啟動執行緒
        service.execute(producer1);
        service.execute(producer2);
        service.execute(producer3);
        service.execute(consumer);

        // 執行10s
        Thread.sleep(10 * 1000);
        producer1.stop();
        producer2.stop();
        producer3.stop();

        Thread.sleep(2000);
        // 退出Executor
        service.shutdown();
    }
}