1. 程式人生 > >Java多執行緒總結之執行緒安全佇列Queue

Java多執行緒總結之執行緒安全佇列Queue

在Java多執行緒應用中,佇列的使用率很高,多數生產消費模型的首選資料結構就是佇列。Java提供的執行緒安全的Queue可以分為阻塞佇列和非阻塞佇列,其中阻塞佇列的典型例子是BlockingQueue,非阻塞佇列的典型例子是ConcurrentLinkedQueue,在實際應用中要根據實際需要選用阻塞佇列或者非阻塞佇列。
注:什麼叫執行緒安全?這個首先要明確。執行緒安全的類 ,指的是類內共享的全域性變數的訪問必須保證是不受多執行緒形式影響的如果由於多執行緒的訪問(比如修改、遍歷、檢視)而使這些變數結構被破壞或者針對這些變數操作的原子性被破壞,則這個類就不是執行緒安全的。
今天就聊聊這兩種Queue,本文分為以下兩個部分,用分割線分開: 

  • BlockingQueue  阻塞演算法
  • ConcurrentLinkedQueue,非阻塞演算法


首先來看看BlockingQueue: 
Queue是什麼就不需要多說了吧,一句話:佇列是先進先出相對的,棧是後進先出。如果不熟悉的話先找本基礎的資料結構的書看看吧。 

BlockingQueue,顧名思義,“阻塞佇列”:可以提供阻塞功能的佇列。 
首先,看看BlockingQueue提供的常用方法: 
可能報異常 返回布林值 可能阻塞 設定等待時間
入隊 add(e) offer(e) put(e) offer(e, timeout, unit)
出隊 remove() poll() take()
poll(timeout, unit)
檢視 element() peek()

從上表可以很明顯看出每個方法的作用,這個不用多說。我想說的是: 
  • add(e) remove() element() 方法不會阻塞執行緒。當不滿足約束條件時,會丟擲IllegalStateException 異常。例如:當佇列被元素填滿後,再呼叫add(e),則會丟擲異常。
  • offer(e) poll() peek() 方法即不會阻塞執行緒,也不會丟擲異常。例如:當佇列被元素填滿後,再呼叫offer(e),則不會插入元素,函式返回false。
  • 要想要實現阻塞功能,需要呼叫put(e) take() 方法。當不滿足約束條件時,會阻塞執行緒。
  • BlockingQueue  阻塞演算法

BlockingQueue作為執行緒容器,可以為執行緒同步提供有力的保障。


二、BlockingQueue定義的常用方法
1.BlockingQueue定義的常用方法如下:

  丟擲異常 特殊值 阻塞 超時
插入 add(e) offer(e) put(e) offer(e, time, unit)
移除 remove() poll() take() poll(time, unit)
檢查 element() peek() 不可用 不可用

1. ArrayBlockingQueue

基於陣列的阻塞佇列實現,在ArrayBlockingQueue內部,維護了一個定長陣列,以便快取佇列中的資料物件,這是一個常用的阻塞佇列,除了一個定長陣列外,ArrayBlockingQueue內部還儲存著兩個整形變數,分別標識著佇列的頭部和尾部在陣列中的位置。
  ArrayBlockingQueue在生產者放入資料和消費者獲取資料,都是共用同一個鎖物件,由此也意味著兩者無法真正並行執行,這點尤其不同於LinkedBlockingQueue;按照實現原理來分析,ArrayBlockingQueue完全可以採用分離鎖,從而實現生產者和消費者操作的完全並行執行。Doug Lea之所以沒這樣去做,也許是因為ArrayBlockingQueue的資料寫入和獲取操作已經足夠輕巧,以至於引入獨立的鎖機制,除了給程式碼帶來額外的複雜性外,其在效能上完全佔不到任何便宜。 ArrayBlockingQueue和LinkedBlockingQueue間還有一個明顯的不同之處在於,前者在插入或刪除元素時不會產生或銷燬任何額外的物件例項,而後者則會生成一個額外的Node物件。這在長時間內需要高效併發地處理大批量資料的系統中,其對於GC的影響還是存在一定的區別。而在建立ArrayBlockingQueue時,我們還可以控制物件的內部鎖是否採用公平鎖,預設採用非公平鎖。

2. LinkedBlockingQueue

基於連結串列的阻塞佇列,同ArrayListBlockingQueue類似,其內部也維持著一個數據緩衝佇列(該佇列由一個連結串列構成),當生產者往佇列中放入一個數據時,佇列會從生產者手中獲取資料,並快取在佇列內部,而生產者立即返回;只有當佇列緩衝區達到最大值快取容量時(LinkedBlockingQueue可以通過建構函式指定該值),才會阻塞生產者佇列,直到消費者從佇列中消費掉一份資料,生產者執行緒會被喚醒,反之對於消費者這端的處理也基於同樣的原理。而LinkedBlockingQueue之所以能夠高效的處理併發資料,還因為其對於生產者端和消費者端分別採用了獨立的鎖來控制資料同步,這也意味著在高併發的情況下生產者和消費者可以並行地操作佇列中的資料,以此來提高整個佇列的併發效能。
作為開發者,我們需要注意的是,如果構造一個LinkedBlockingQueue物件,而沒有指定其容量大小,LinkedBlockingQueue會預設一個類似無限大小的容量(Integer.MAX_VALUE),這樣的話,如果生產者的速度一旦大於消費者的速度,也許還沒有等到佇列滿阻塞產生,系統記憶體就有可能已被消耗殆盡了。

阻塞佇列:執行緒安全

按 FIFO(先進先出)排序元素。佇列的頭部 是在佇列中時間最長的元素。佇列的尾部 是在佇列中時間最短的元素。新元素插入到佇列的尾部,並且佇列檢索操作會獲得位於佇列頭部的元素。連結佇列的吞吐量通常要高於基於陣列的佇列,但是在大多數併發應用程式中,其可預知的效能要低。

注意:

1、必須要使用take()方法在獲取的時候達成阻塞結果
2、使用poll()方法將產生非阻塞效果

複製程式碼 複製程式碼
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;

 

public class BlockingDeque {
    //阻塞佇列,FIFO
    private static LinkedBlockingQueue<Integer> concurrentLinkedQueue = new LinkedBlockingQueue<Integer>(); 

          
 public static void main(String[] args) {  
     ExecutorService executorService = Executors.newFixedThreadPool(2);  

     executorService.submit(new Producer("producer1"));  
     executorService.submit(new Producer("producer2"));  
     executorService.submit(new Producer("producer3"));  
     executorService.submit(new Consumer("consumer1"));  
     executorService.submit(new Consumer("consumer2"));  
     executorService.submit(new Consumer("consumer3"));  

 }  

 static class Producer implements Runnable {  
     private String name;  

     public Producer(String name) {  
         this.name = name;  
     }  

     public void run() {  
         for (int i = 1; i < 10; ++i) {  
             System.out.println(name+ "  生產: " + i);  
             //concurrentLinkedQueue.add(i);  
             try {
                concurrentLinkedQueue.put(i);
                Thread.sleep(200); //模擬慢速的生產,產生阻塞的效果
            } catch (InterruptedException e1) {
                // TODO Auto-generated catch block
                e1.printStackTrace();
            }
             
         }  
     }  
 }  

 static class Consumer implements Runnable {  
     private String name;  

     public Consumer(String name) {  
         this.name = name;  
     }  
     public void run() {  
         for (int i = 1; i < 10; ++i) {  
             try {          
                    //必須要使用take()方法在獲取的時候阻塞
                      System.out.println(name+"消費: " +  concurrentLinkedQueue.take());  
                      //使用poll()方法 將產生非阻塞效果
                      //System.out.println(name+"消費: " +  concurrentLinkedQueue.poll());  
                     
                     //還有一個超時的用法,佇列空時,指定阻塞時間後返回,不會一直阻塞
                     //但有一個疑問,既然可以不阻塞,為啥還叫阻塞佇列?
                    //System.out.println(name+" Consumer " +  concurrentLinkedQueue.poll(300, TimeUnit.MILLISECONDS));                    
                } catch (Exception e) {
                    // TODO Auto-generated catch block
                    e.printStackTrace();
                }  

         }  
     }  
 }  
}
複製程式碼 複製程式碼
  • ConcurrentLinkedQueue,非阻塞演算法

非阻塞佇列

基於連結節點的、無界的、執行緒安全。此佇列按照 FIFO(先進先出)原則對元素進行排序。佇列的頭部 是佇列中時間最長的元素。佇列的尾部 是佇列中時間最短的元素。新的元素插入到佇列的尾部,佇列檢索操作從佇列頭部獲得元素。當許多執行緒共享訪問一個公共 collection 時,ConcurrentLinkedQueue 是一個恰當的選擇。此佇列不允許 null 元素。

例子

複製程式碼
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.TimeUnit;


public class NoBlockQueue {  
       private static ConcurrentLinkedQueue<Integer> concurrentLinkedQueue = new ConcurrentLinkedQueue<Integer>();   
          
    public static void main(String[] args) {  
        ExecutorService executorService = Executors.newFixedThreadPool(2);  

        executorService.submit(new Producer("producer1"));  
        executorService.submit(new Producer("producer2"));  
        executorService.submit(new Producer("producer3"));  
        executorService.submit(new Consumer("consumer1"));  
        executorService.submit(new Consumer("consumer2"));  
        executorService.submit(new Consumer("consumer3"));  

    }  
  
    static class Producer implements Runnable {  
        private String name;  
  
        public Producer(String name) {  
            this.name = name;  
        }  
  
        public void run() {  
            for (int i = 1; i < 10; ++i) {  
                System.out.println(name+ " start producer " + i);  
                concurrentLinkedQueue.add(i);  
                try {
                    Thread.sleep(20);
                } catch (InterruptedException e) {
                    // TODO Auto-generated catch block
                    e.printStackTrace();
                }
                //System.out.println(name+"end producer " + i);  
            }  
        }  
    }  
  
    static class Consumer implements Runnable {  
        private String name;  
  
        public Consumer(String name) {  
            this.name = name;  
        }  
        public void run() {  
            for (int i = 1; i < 10; ++i) {  
                try {
 
                    System.out.println(name+" Consumer " +  concurrentLinkedQueue.poll());

                } catch (Exception e) {
                    // TODO Auto-generated catch block
                    e.printStackTrace();
                }  
//                System.out.println();  
//                System.out.println(name+" end Consumer " + i);  
            }  
        }  
    }  
} 
複製程式碼

在併發程式設計中,一般推薦使用阻塞佇列,這樣實現可以儘量地避免程式出現意外的錯誤。阻塞佇列使用最經典的場景就是socket客戶端資料的讀取和解析,讀取資料的執行緒不斷將資料放入佇列,然後解析執行緒不斷從佇列取資料解析。還有其他類似的場景,只要符合生產者-消費者模型的都可以使用阻塞佇列。

使用非阻塞佇列,雖然能即時返回結果(消費結果),但必須自行編碼解決返回為空的情況處理(以及消費重試等問題)。

另外他們都是執行緒安全的,不用考慮執行緒同步問題。