1. 程式人生 > >(八)java併發佇列

(八)java併發佇列

Java併發佇列

在併發佇列上JDK提供了兩套實現: 
一個是以ConcurrentLinkedQueue為代表的高效能佇列; 
一個是以BlockingQueue介面為代表的阻塞佇列; 
無論哪種都繼承自Queue。 
這裡寫圖片描述

一、ConcurrentLinkedQueue

定義

ConcurrentLinkedQueue : 是一個適用於高併發場景下的佇列,通過無鎖的方式,實現了高併發狀態下的高效能,通常ConcurrentLinkedQueue效能好於BlockingQueue。 
它是一個基於連結節點的無界執行緒安全佇列。該佇列的元素遵循先進先出的原則。 
頭是最先加入的,尾是最近加入的,該佇列不允許null元素。

ConcurrentLinkedQueue重要方法:

add 和offer() :都是加入元素的方法(在ConcurrentLinkedQueue中這倆個方法沒有任何區別) 
poll() 和peek() :都是取頭元素節點,區別在於前者會刪除元素,後者不會。

程式碼示例:

  •     ConcurrentLinkedQueue q = new ConcurrentLinkedQueue();
        q.offer("張三");
        q.offer("李四");
        q.offer("王五");
        q.offer("趙六");
        q.offer("大聖");
        //從頭獲取元素,刪除該元素
        System.out.println(q.poll());
        //從頭獲取元素,不刪除該元素
        System.out.println(q.peek());
        //獲取總長度
        System.out.println(q.size());

二、BlockingQueue

定義: 
阻塞佇列(BlockingQueue)是一個支援兩個附加操作的佇列。這兩個附加的操作是: 
1、在佇列為空時,獲取元素的執行緒會等待佇列變為非空。 
2、當佇列滿時,儲存元素的執行緒會等待佇列可用。 
阻塞佇列是執行緒安全的。 
用途: 
阻塞佇列常用於生產者和消費者的場景,生產者是往佇列裡新增元素的執行緒,消費者是從佇列裡拿元素的執行緒。阻塞佇列就是生產者存放元素的容器,而消費者也只從容器裡拿元素。

1、ArrayBlockingQueue

add(E e) 
將指定的元素插入到此佇列的尾部(如果立即可行且不會超過該佇列的容量),在成功時返回 true,如果此佇列已滿,則丟擲IllegalStateException; 
offer(E e)

 
將指定的元素插入到此佇列的尾部(如果立即可行且不會超過該佇列的容量),在成功時返回 true,如果此佇列已滿,則返回 false。 
offer(E e, long timeout, TimeUnit unit) 
將指定的元素插入此佇列的尾部,如果該佇列已滿,則在到達指定的等待時間之前等待可用的空間。

定義:

ArrayBlockingQueue是一個有邊界的阻塞佇列,它的內部實現是一個數組。 
有邊界的意思是它的容量是有限的,我們必須在其初始化的時候指定它的容量大小,容量大小一旦指定就不可改變。 
ArrayBlockingQueue是以先進先出的方式儲存資料,最新插入的物件是尾部,最新移出的物件是頭部。

下面是一個初始化和使用ArrayBlockingQueue的例子:

  • //初始化3個佇列
    ArrayBlockingQueue array = new ArrayBlockingQueue(3);
            array.add("張三");
            array.add("李四");
            array.add("大聖");
            // 新增阻塞佇列
            boolean a=array.offer("王五",1, TimeUnit.SECONDS);
            System.out.println(a);
            //執行結果:false


2、LinkedBlockingQueue

定義:

LinkedBlockingQueue阻塞佇列大小的配置是可選的, 
如果我們初始化時指定一個大小,它就是有邊界的,如果不指定,它就是無邊界的。 
說是無邊界,其實是採用了預設大小為Integer.MAX_VALUE的容量 。它的內部實現是一個連結串列。 
和ArrayBlockingQueue一樣,LinkedBlockingQueue 也是以先進先出的方式儲存資料,最新插入的物件是尾部,最新移出的物件是頭部。

下面是一個初始化和使LinkedBlockingQueue的例子:

  • //初始化
    LinkedBlockingQueue lbq = new LinkedBlockingQueue(3);
            lbq.add("張三");
            lbq.add("李四");
            lbq.add("李四");
            System.out.println(lbq.size());
            //執行結果:3

3、PriorityBlockingQueue

定義:

PriorityBlockingQueue是一個沒有邊界的佇列,它的排序規則和 java.util.PriorityQueue一樣。需要注意,PriorityBlockingQueue中允許插入null物件。 
所有插入PriorityBlockingQueue的物件必須實現 java.lang.Comparable介面,佇列優先順序的排序規則就是按照我們對這個介面的實現來定義的。 
另外,我們可以從PriorityBlockingQueue獲得一個迭代器Iterator,但這個迭代器並不保證按照優先順序順序進行迭代。

4、SynchronousQueue

定義:

SynchronousQueue佇列內部僅允許容納一個元素。當一個執行緒插入一個元素後會被阻塞,除非這個元素被另一個執行緒消費。

三、使用BlockingQueue模擬生產者與消費者

程式碼示例

生產者:

  • public class ProducerThread implements Runnable {
        private BlockingQueue queue;
        private volatile boolean flag = true;
        private static AtomicInteger count = new AtomicInteger();
    
        public ProducerThread(BlockingQueue queue) {
            this.queue = queue;
        }
    
        @Override
        public void run() {
            try {
                System.out.println("生產執行緒啟動...");
                while (flag) {
                    System.out.println("正在生產資料....");
                    String data = count.incrementAndGet()+"";
                    // 將資料存入佇列中
                    boolean offer = queue.offer(data, 2, TimeUnit.SECONDS);
                    if (offer) {
                        System.out.println("生產者,存入" + data + "到佇列中,成功.");
                    } else {
                        System.out.println("生產者,存入" + data + "到佇列中,失敗.");
                    }
                    Thread.sleep(1000);
                }
            } catch (Exception e) {
    
            } finally {
                System.out.println("生產者退出執行緒");
            }
    
        }
        public void stopThread() {
            this.flag = false;
        }
    }

消費者:

  • class ConsumerThread implements Runnable {
        private BlockingQueue<String> queue;
        private volatile boolean flag = true;
    
        public ConsumerThread(BlockingQueue<String> queue) {
            this.queue = queue;
    
        }
    
        @Override
        public void run() {
            System.out.println("消費執行緒啟動...");
            try {
                while (flag) {
                    System.out.println("消費者,正在從佇列中獲取資料..");
                    String data = queue.poll(2, TimeUnit.SECONDS);
                    if (data != null) {
                        System.out.println("消費者,拿到佇列中的資料data:" + data);
                        Thread.sleep(1000);
                    } else {
                        System.out.println("消費者,超過2秒未獲取到資料..");
                        flag = false;
                    }
    
    
                }
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                System.out.println("消費者退出執行緒...");
            }
    
        }
    
    }

執行:

  • public class ProducerAndConsumer {
        public static void main(String[] args) throws InterruptedException {
            BlockingQueue<String> queue = new LinkedBlockingQueue<String>(10);
            ProducerThread producerThread1 = new ProducerThread(queue);
            ProducerThread producerThread2 = new ProducerThread(queue);
            ConsumerThread consumerThread1 = new ConsumerThread(queue);
            Thread t1 = new Thread(producerThread1);
            Thread t2 = new Thread(producerThread2);
            Thread c1 = new Thread(consumerThread1);
            t1.start();
            t2.start();
            c1.start();
    
            // 執行2s後,生產者不再生產
            Thread.sleep(2* 1000);
            producerThread1.stopThread();
            producerThread2.stopThread();
    
        }
    }


執行結果: 
這裡寫圖片描述