1. 程式人生 > >併發Queue之BlockingQueue介面及其實現類

併發Queue之BlockingQueue介面及其實現類

1、下面先簡單介紹BlockingQueue介面的五個實現:

ArrayBlockingQueue:基於陣列的阻塞佇列實現,在ArrayBlockingQueue內部,維護了一個定長的陣列,以便快取佇列中的資料物件,其內部沒實現讀寫分離,也就意味著生產和消費者不能完全並行。長度是需要定義的,可以指定先進先出或者先進後出,因為長度是需要定義的,所以也叫有界佇列,在很多場合非常適合使用。

LinkedBlockingQueue:基於連結串列的阻塞佇列,同ArrayBlockingQueue類似,其內部也維持著一個數據緩衝佇列(該佇列由一個連結串列構成),LinkedBlockingQueue之所以能夠高效地處理併發資料,是因為其內部實現採用分離鎖(讀寫分離兩個鎖),從而實現生產者和消費者操作完全並行執行。需要注意一下,它是一個無界佇列

SynchronousQueue:一種沒有緩衝的佇列,生產者產生的資料直接會被消費者獲取並且立刻消費。

PriorityBlockingQueue:基於優先級別的阻塞佇列(優先順序的判斷通過建構函式傳入的Compator物件來決定,也就是說傳入佇列的物件必須實現Comparable介面),在實現PriorityBlockingQueue時,內部控制執行緒同步的鎖採用的是公平鎖,需要注意的是它也是一個無界的佇列

DelayQueue:帶有延遲時間的Queue,其中的元素只有當其指定的延遲時間到了,才能夠從佇列中獲取到該元素。DelayQueue中的元素必須先實現Delayed介面,DelayQueue是一個沒有大小限制的佇列,應用場景很多,比如對快取超時的資料進行移除、任務超時處理、空閒連線的關閉等等。

2、下面簡單用一下ArrayBlockingQueue:

因為是有界佇列,所以初始化的時候記得定義長度。下面只是簡單說一下新增方法,其他的看一下API即可。新增方法有三個,其中add和put方法效果是一樣的,而offer方法可以設定新增的延遲時間並且返回true/false表示新增成功還是失敗。

ArrayBlockingQueue<String> arrayQueue = new ArrayBlockingQueue<String>(5);  //初始化一定要有長度
arrayQueue.add("a");
arrayQueue.put("b");  //add和put方法其實是一樣效果的
arrayQueue.add("c"); 
arrayQueue.add("d");
arrayQueue.add("e");
boolean flag = arrayQueue.offer("f", 2, TimeUnit.SECONDS);  //TimeUnit裡面有時分秒等等,意思是多少時間後新增
System.out.println(flag);  //會返回false,因為長度為5,f是新增的第六個,所以會新增失敗

控制檯列印結果:

false

上面的測試我們可以看到,offer方法就算超長度新增控制檯也不會報錯的,我們嘗試一下add和put的超長新增的結果。

ArrayBlockingQueue<String> arrayQueue = new ArrayBlockingQueue<String>(5);  //初始化一定要有長度
arrayQueue.add("a");
arrayQueue.put("b");  //add和put方法其實是一樣效果的
arrayQueue.add("c");  
arrayQueue.add("d");
arrayQueue.add("e");
//arrayQueue.offer("f");  //用offer超長新增不會報錯
//arrayQueue.put("f");   //不會報錯,但是執行緒一直執行不停止
arrayQueue.add("f");

下面看一下add方法超長新增的報錯:

Exception in thread "main" java.lang.IllegalStateException: Queue full
	at java.util.AbstractQueue.add(Unknown Source)
	at java.util.concurrent.ArrayBlockingQueue.add(Unknown Source)
	at com.demo.testQueue.TestQueue.main(TestQueue.java:22)

3、下面介紹的是LinkedBlockingQueue:

上面說到LinkedBlockingQueue是無界佇列,它的初始化可以不定義長度,當然,也是可以定義長度的,當初始化定義長度時,超長新增也是像ArrayBlockingQueue一樣,add方法會報錯,put方法會不停止執行緒,offer方法會返回false。

下面主要測試新增方法和drainTo方法。新增方法和上面的ArrayBlockingQueue是一樣的,而drainTo方法是將佇列的一定長度的資料liush到集合中。

//LinkedBlockingQueue<String> linkQueue = new LinkedBlockingQueue<String>(2);  //初始化可帶長度也可不帶
LinkedBlockingQueue<String> linkQueue = new LinkedBlockingQueue<String>();
linkQueue.add("a");
linkQueue.put("b");
linkQueue.add("c");
linkQueue.offer("f");
boolean flag = linkQueue.offer("f", 2, TimeUnit.SECONDS);
List<String> list = new ArrayList<String>();
linkQueue.drainTo(list, 3);  //將佇列的第一到第三個資料流失到list中,流失後的資料將不在佇列裡
System.out.println("被drainTo的列表的長度"+list.size());
System.out.println("被drainTo的列表的資料:");
for(String str : list){
	System.out.println(str);
}
System.out.println("drainTo方法後佇列的資料:");
Iterator<String> ite = linkQueue.iterator();
while(ite.hasNext()){
	System.out.println(ite.next());
}

控制檯結果:

被drainTo後的列表的長度3
被drainTo後的列表的資料:
a
b
c
drainTo方法後佇列的資料:
f
f

4、接下來接續介紹的是SynchronousQueue。

下面是api的講解,就是不能進行新增刪除任務操作,只是生產了就等待消費者直接消費。一個 blocking queue其中每個插入操作必須等待相應的刪除操作由另一個執行緒,反之亦然。同步佇列沒有任何內部容量,甚至沒有一個容量。你不能 peek在同步佇列因為元素只存在當你試圖刪除它;你不能插入一個元素(使用任何方法)除非另一個執行緒試圖刪除它;你不能因為沒有迭代迭代。佇列的頭部是第一個排隊的插入執行緒試圖新增到佇列的元素;如果沒有這樣的排隊線然後沒有元素可用於去除和 poll()將返回 null。對於其他 Collection方法的目的(例如 contains),一個 SynchronousQueue作為空集合。這個佇列不允許 null元素。

因為是沒有緩衝的佇列,生產後即刻消費,所以只是單純的新增元素是會報錯的。

SynchronousQueue<String> queue = new SynchronousQueue<String>();
queue.add("a");

控制檯報錯:

Exception in thread "main" java.lang.IllegalStateException: Queue full
	at java.util.AbstractQueue.add(Unknown Source)
	at com.demo.testQueue.TestQueue.main(TestQueue.java:49)

但是我們可以這麼測試,執行緒1是獲取佇列的資料的,執行緒2是生產資料放進佇列中的。

SynchronousQueue<String> queue = new SynchronousQueue<String>();//初始化不能帶長度
Thread t1 = new Thread(new Runnable() {		
    @Override
    public void run() {
	try {
		String str = queue.take();   //執行緒1在獲取,這是阻塞的,當執行緒2一新增,執行緒1就獲取,因為SynchronousQueue是沒有容量的
		System.out.println(str);
	} catch (InterruptedException e) {
		e.printStackTrace();
	}
    }
});
t1.start();
		
Thread t2 = new Thread(new Runnable() {	
    @Override
    public void run() {
	queue.add("abcd");  //執行緒2往佇列裡新增元素
    }
});
t2.start();

控制檯結果:

獲取的資料:abcd

要注意一下的就是,執行緒1必定要線上程2前啟動,不然會報下面的錯。

Exception in thread "Thread-1" java.lang.IllegalStateException: Queue full
	at java.util.AbstractQueue.add(Unknown Source)
	at com.demo.testQueue.TestQueue$2.run(TestQueue.java:67)
	at java.lang.Thread.run(Unknown Source)

因為上面的三種BlockingQueue介面的實現是比較常用的,下面將會舉例說一下使用場景:

簡單點講一下上面三個佇列的使用場景:假如每天的早上7點到8點是任務的高峰期,那麼我們這時候使用的當然是ArrayBlockingQueue,因為他是有界限的,當任務數滿了的話就不能再新增任務了,這時候可以通知說請8點後再來。如果使用的是LinkedBlockingQueue,因為他是無界限的,只要來了任務就新增到佇列裡面,這會造成存放著大量的未完成任務,這個是不和實際的。但是到了8點後任務不是高峰期,就可以使用LinkedBlockingQueue了。到了10點後,基本沒什麼任務,來就是來一個兩個,這時候可以使用SynchronousQueue。

5、再將一下基於優先順序的阻塞佇列PriorityBlockingQueue

因為傳入佇列的物件必須實現Comparable介面來實現優先順序判斷,所以我們看一下程式碼,我將會建立一個叫Task的類來實現comparable介面,有簡單的id和name屬性,會利用id來進行優先順序的判斷,id大的將會排列在後面。

public class Task implements Comparable<Task>{ //放進PriorityBlockingQueue佇列的物件必須實現Comparable介面
	private int id;
	private String name;
	public int getId() {
		return id;
	}
	public void setId(int id) {
		this.id = id;
	}
	public String getName() {
		return name;
	}
	public void setName(String name) {
		this.name = name;
	}
	@Override
	public int compareTo(Task task) { //重寫的compareTo方法就是進行優先順序的排序的
		return (this.id>task.id?1:this.id<task.id?-1:0); //這裡是將id大的排在佇列的後面。id小的先被取出來
	}
	@Override
	public String toString() {
		return "Task [id=" + id + ", name=" + name + "]";
	}
}

再看一下測試的程式碼:

PriorityBlockingQueue<Task> queue = new PriorityBlockingQueue<Task>(); //因為是無界佇列,初始化可以不定義長度
Task t1 = new Task();
t1.setId(1);
t1.setName("任務 1");
Task t2 = new Task();
t2.setId(4);
t2.setName("任務 2");
Task t3 = new Task();
t3.setId(3);
t3.setName("任務 3");
Task t4 = new Task();
t4.setId(6);
t4.setName("任務4");
queue.add(t1);
queue.add(t2);
queue.add(t4);
queue.add(t3);
		
System.out.println("新增元素後的佇列:"+queue);
queue.take();  //取出元素
System.out.println("取出一個元素後的佇列:"+queue);

控制檯的列印:

新增元素後的佇列:[Task [id=1, name=任務 1], Task [id=3, name=任務 3], Task [id=6, name=任務4], Task [id=4, name=任務 2]]
取出一個元素後的佇列:[Task [id=3, name=任務 3], Task [id=4, name=任務 2], Task [id=6, name=任務4]]

我們可以看到:新增進去的元素都是沒有進行優先順序排序的,只要等取出一個元素後才會進行排序,因為是先排序了才能進行提取操作,這算是一個優化的設計吧,不然每次新增都進行優先順序排序的話會影響效能。

6、終於到最後一個DelayQueue了。

上面也說到這是一個帶延遲時間的Queue,其中的元素只有到了延遲時間才能被取出來。

DelayQueue中的元素必須實現Delayed介面,重寫介面下的兩個方法。第一個是getDelay方法:是用來判斷任務是否到了時間需要取出來了,這裡是最後時間減去當前時間來判斷第二個是compareTo是用來相互比較排序用的,像ProorityBlockingQueue一樣。這裡的排序一定要寫對。排序用的是上面的getDelay方法,因為返回的是延遲的時間,然後時間長的一定要排到後面,不然就是最長時間的到了從佇列中取出來,因為他是佇列的第一個,他到時間了其他的肯定也一早就到時間了,就會全部一個一個取出來了,除了第一個的(時間最長的)延遲時間是對的,其他的都和第一個一樣了。所以排序判斷時,一定是時間長的放在後面。

元素Task的程式碼:

public class Task2 implements Delayed{
	
	private String name; //姓名
	private Integer id;  
	private long endTime; //截止時間
	private TimeUnit timeUnit = TimeUnit.SECONDS; 
	//相互比較排序用
	@Override
	public int compareTo(Delayed delayed) {
		Task2 t2 = (Task2)delayed;
		return this.getDelay(timeUnit)>t2.getDelay(timeUnit)?1:this.getDelay(timeUnit)<t2.getDelay(timeUnit)?-1:0;
	}
	
	//判斷是否到了截止時間
	@Override
	public long getDelay(TimeUnit unit) {
		return endTime - System.currentTimeMillis();  //延遲時間的毫秒數減去當前時間毫秒數判斷是否到了延遲時間
	}

	public String getName() {
		return name;
	}

	public void setName(String name) {
		this.name = name;
	}

	public Integer getId() {
		return id;
	}

	public void setId(Integer id) {
		this.id = id;
	}

	public long getEndTime() {
		return endTime;
	}

	public void setEndTime(long endTime) {
		this.endTime = endTime;
	}

	@Override
	public String toString() {
		return "Task2 [name=" + name + ", id=" + id + ", endTime=" + endTime + "]";
	}
	
	

}

測試的程式碼:

DelayQueue<Task2> queue = new DelayQueue<Task2>();
Task2 t1 = new Task2();
t1.setId(1);
t1.setName("張三");
t1.setEndTime(1*1000+System.currentTimeMillis()); //記得設定的延遲時間是要延遲的毫秒數加上當前時間毫秒數
		
Task2 t2 = new Task2();
t2.setId(2);
t2.setName("李四");
t2.setEndTime(10*1000+System.currentTimeMillis());
		
Task2 t3 = new Task2();
t3.setId(3);
t3.setName("王五");
t3.setEndTime(5*1000+System.currentTimeMillis());
		
queue.add(t1);
queue.add(t2);
queue.add(t3);
		
Thread thread1 = new Thread(new Runnable() {
			
	@Override
	public void run() {
		while(true){  //死迴圈,將佇列的任務全部取出來
		    Task2 t2;
			try {
				t2 = queue.take();
				System.out.println(t2);
			} catch (InterruptedException e) {
				e.printStackTrace();
			}
		}
	}
});
thread1.start();
}

控制檯結果:可以看到確實是延遲時間短的先被取出來了

Task2 [name=張三, id=1, endTime=1529478869353]
Task2 [name=王五, id=3, endTime=1529478873353]
Task2 [name=李四, id=2, endTime=1529478878353]