1. 程式人生 > >關於執行緒池中的阻塞佇列BlockingQueue

關於執行緒池中的阻塞佇列BlockingQueue

       接上篇文章https://blog.csdn.net/GoSaint/article/details/84345210

       對於BlockingQueue阻塞佇列而言,常用在多執行緒生產者和消費者模型上。首先我們需要明確的是阻塞佇列是執行緒安全的。或者可以稱之為併發佇列,是併發容器的一種規範。

                  

上圖是BlockingQueue的實現類:我們介紹下這幾個相關的實現。

1 ArrayBlockingQueue

ArrayBlockingQueue是基於陣列的佇列實現。

存在三個構造器:

(1)public ArrayBlockingQueue(int capacity, boolean fair) {}

(2) public ArrayBlockingQueue(int capacity) {}

(3)public ArrayBlockingQueue(int capacity, boolean fair,Collection<? extends E> c) {}

        從上面的三個構造器可以看出,要建立一個ArrayBlockingQueue,最起碼需要一個引數capacity;從而說明                  ArrayBlockingQueue是有界的阻塞佇列。當然這裡的有界無界是針對Integer.MAX。如果說我們的指定的容量超過Integer.MAX或者說沒有容量大小的限制。那麼就可以說這個佇列是無界的。boolean fair這個引數是指定是否是公平鎖還是非公平鎖。對於非公平鎖而言,吞吐量要高。最後一個引數Collection,可以指定用一個集合來初始化,將此集合中的元素在構造方法期間就先新增到佇列中。

  ArrayBlockingQueue在生產者放入資料和消費者獲取資料,都是共用同一個鎖物件,由此也意味著兩者無法真正並行執行,這點尤其不同於LinkedBlockingQueue;按照實現原理來分析,ArrayBlockingQueue完全可以採用分離鎖,從而實現生產者和消費者操作的完全並行執行。Doug Lea之所以沒這樣去做,也許是因為ArrayBlockingQueue的資料寫入和獲取操作已經足夠輕巧,以至於引入獨立的鎖機制,除了給程式碼帶來額外的複雜性外,其在效能上完全佔不到任何便宜。 ArrayBlockingQueue和LinkedBlockingQueue間還有一個明顯的不同之處在於,前者在插入或刪除元素時不會產生或銷燬任何額外的物件例項,而後者則會生成一個額外的Node物件。這在長時間內需要高效併發地處理大批量資料的系統中,其對於GC的影響還是存在一定的區別。而在建立ArrayBlockingQueue時,我們還可以控制物件的內部鎖是否採用公平鎖,預設採用非公平鎖。

        總結:ArrayBlockingQueue是有界的,並且生產者和消費者使用的是同一個鎖。因此無法實現真正的並行。

2 DelayQueue(延遲工作佇列)

        首先要明確的是:延遲佇列是無界的工作佇列。只有當指定的時間到了之後才能從佇列中取到資料。這就是延遲佇列。關於使用demo,我給出下面的一種。首先延遲佇列,我們也可以初始化一個容器去儲存資料。我們定義物件message.去實現Delayed介面。

package com.cmos.communication;

import java.util.concurrent.Delayed;
import java.util.concurrent.TimeUnit;

/**
 * 訊息體定義 實現Delayed介面就是實現兩個方法即compareTo 和 getDelay
 * 
 * @author gosaint
 *
 */
public class Message implements Delayed {

	private int id;
	private String body; // 訊息內容
	private long excuteTime;// 延遲時長,這個是必須的屬性因為要按照這個判斷延時時長。

	public Message(int id, String body, long delayTime) {
		this.id = id;
		this.body = body;
		this.excuteTime = TimeUnit.NANOSECONDS.convert(delayTime, TimeUnit.MILLISECONDS) + System.nanoTime();
	}

	public int getId() {
		return id;
	}

	public void setId(int id) {
		this.id = id;
	}

	public String getBody() {
		return body;
	}

	public void setBody(String body) {
		this.body = body;
	}

	public long getExcuteTime() {
		return excuteTime;
	}

	public void setExcuteTime(long excuteTime) {
		this.excuteTime = excuteTime;
	}

	@Override
	public int compareTo(Delayed delayed) {
		Message msg = (Message) delayed;
		return Integer.valueOf(this.id) > Integer.valueOf(msg.id) ? 1
				: (Integer.valueOf(this.id) < Integer.valueOf(msg.id) ? -1 : 0);
	}

	// 最重要的就是getDelay方法,這個方法用來判斷是否到期……
	@Override
	public long getDelay(TimeUnit unit) {
		return unit.convert(this.excuteTime - System.nanoTime(), TimeUnit.NANOSECONDS);
	}

}

下面的程式碼是消費者以及測試例項:

    // 延時佇列 ,消費者從其中獲取訊息進行消費  
    private DelayQueue<Message> queue;  
  
    public Consumer2(DelayQueue<Message> queue) {  
        this.queue = queue;  
    }  
  
    @Override  
    public void run() {  
        while (true) {  
            try {  
                Message take = queue.take();  
                System.out.println("消費訊息id:" + take.getId() + " 訊息體:" + take.getBody());  
            } catch (InterruptedException e) {  
                e.printStackTrace();  
            }  
        }  
    }  
}
package com.cmos.procon.partten;  
  
import java.util.concurrent.DelayQueue;  
import java.util.concurrent.ExecutorService;  
import java.util.concurrent.Executors;  
  
public class DelayQueueTest {  
     public static void main(String[] args) {    
            // 建立延時佇列    
            DelayQueue<Message> queue = new DelayQueue<Message>();    
            // 新增延時訊息,m1 延時3s    
            Message m1 = new Message(1, "world", 3000);    
            // 新增延時訊息,m2 延時10s    
            Message m2 = new Message(2, "hello", 10000);    
            //將延時訊息放到延時佇列中  
            queue.offer(m2);    
            queue.offer(m1);    
            // 啟動消費執行緒 消費新增到延時佇列中的訊息,前提是任務到了延期時間   
            ExecutorService exec = Executors.newFixedThreadPool(1);  
            exec.execute(new Consumer2(queue));  
            exec.shutdown();  
        }    
}

3 LinkedBlockingQueue

基於連結串列實現的阻塞佇列。

public LinkedBlockingQueue() {
        this(Integer.MAX_VALUE);
}

        從構造器來看,LinkedBlockingQueue是無界的阻塞佇列。和ArrayBlockingQueue相比較,它是連結串列的實現,並且生產者和消費者各自使用自己的鎖。因此就併發能力來看,它應該更加的高效。對於LinkedBlockingQueue的使用值得注意的是由於LinkedBlockingQueue是無界的,因此如果一旦生產者生產的速度大於消費者消費的速度,那麼就很有可能產生記憶體溢位的。

4 PriorityBlockingQueue優先順序佇列

public PriorityBlockingQueue(int initialCapacity, Comparator<? super E> comparator) {}

        從構造器可以看出,可以通過某個欄位或者其他屬性進行比較排序,確定優先順序。

5 SynchronousQueue(同步佇列)

        它是一個特殊的佇列,它的名字其實就蘊含了它的特徵 – - 同步的佇列。為什麼說是同步的呢?這裡說的並不是多執行緒的併發問題,而是因為當一個執行緒往佇列中寫入一個元素時,寫入操作不會立即返回,需要等待另一個執行緒來將這個元素拿走;同理,當一個讀執行緒做讀操作的時候,同樣需要一個相匹配的寫執行緒的寫操作。這裡的 Synchronous 指的就是讀執行緒和寫執行緒需要同步,一個讀執行緒匹配一個寫執行緒。

總結:生產者生產一個,消費者消費一個。

參考:http://www.importnew.com/28053.html