1. 程式人生 > >多執行緒與併發----阻塞佇列的應用

多執行緒與併發----阻塞佇列的應用

一、佇列

    1、佇列分為固定長度的佇列和不固定長度的佇列;

    2、固定長度的佇列,若放滿了還要放,阻塞式佇列就會等待,直到有資料取出,空出位置後才繼續放;

    3、固定長度的佇列,若放滿了還要放,非阻塞式佇列不能等待就只能報錯了。

二、阻塞佇列(BlockingQueue)

public interface BlockingQueue<E>   
extends Queue<E>

    支援兩個附加操作的 Queue,這兩個操作是:獲取元素時等待佇列變為非空,以及儲存元素時等待空間變得可用。


BlockingQueue 方法以四種形式出現,對於不能立即滿足但可能在將來某一時刻可以滿足的操作,這四種形式的處理方式不同:第一種是丟擲一個異常,第二種是返回一個特殊值(null

 或 false,具體取決於操作),第三種是在操作可以成功前,無限期地阻塞當前執行緒,第四種是在放棄前只在給定的最大時間限制內阻塞。下表中總結了這些方法:


注意:

 1、BlockingQueue 不接受 null 元素。試圖 addput 或 offer 一個 null 元素時,某些實現會丟擲 NullPointerExceptionnull 被用作指示 poll 操作失敗的警戒值。

   2、BlockingQueue 可以是限定容量的。它在任意給定時間都可以有一個 remainingCapacity,超出此容量,便無法無阻塞地 put 附加元素。沒有任何內部容量約束的 BlockingQueue

 總是報告 Integer.MAX_VALUE 的剩餘容量。

    3、BlockingQueue實現主要用於生產者-使用者佇列,但它另外還支援 Collection 介面。因此,舉例來說,使用 remove(x) 從佇列中移除任意一個元素是有可能的。然而,這種操作通常會有效執行,只能有計劃地偶爾使用,比如在取消排隊資訊時。

    4、BlockingQueue 實現是執行緒安全的。所有排隊方法都可以使用內部鎖或其他形式的併發控制來自動達到它們的目的。然而,大量的 Collection 操作(addAllcontainsAllretainAll 和 removeAll沒有必要自動執行,除非在實現中特別說明。因此,舉例來說,在只添加了 

c 中的一些元素後,addAll(c) 有可能失敗(丟擲一個異常)。

    5、BlockingQueue 實質上 支援使用任何一種“close”或“shutdown”操作來指示不再新增任何項。這種功能的需求和使用有依賴於實現的傾向。例如,一種常用的策略是:對於生產者,插入特殊的 end-of-stream 或 poison 物件,並根據使用者獲取這些物件的時間來對它們進行解釋。

三、例項

    例項1:用長度為4的佇列演示阻塞佇列的功能和效果

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;

public class BlockingQueueTest {

	public static void main(String[] args) {
		final BlockingQueue queue = new ArrayBlockingQueue(4);
		for(int i=0;i<2;i++){
			new Thread(){
				public void run(){
					while(true){
						try {
							Thread.sleep((long)(Math.random()*1000));
							System.out.println(Thread.currentThread().getName() + "準備放資料!");							
							queue.put(1);
							System.out.println(Thread.currentThread().getName() + "已經放了資料," + 							
										"佇列目前有" + queue.size() + "個數據");
						} catch (InterruptedException e) {
							e.printStackTrace();
						}

					}
				}
				
			}.start();
		}
		
		new Thread(){
			public void run(){
				while(true){
					try {
						//將此處的睡眠時間分別改為100和1000,觀察執行結果
						Thread.sleep(1000);
						System.out.println(Thread.currentThread().getName() + "準備取資料!");
						queue.take();
						System.out.println(Thread.currentThread().getName() + "已經取走資料," + 							
								"佇列目前有" + queue.size() + "個數據");					
					} catch (InterruptedException e) {
						e.printStackTrace();
					}
				}
			}
			
		}.start();			
	}
}

執行部分結果為:

Thread-1準備放資料!
Thread-1已經放了資料,佇列目前有1個數據
Thread-0準備放資料!
Thread-0已經放了資料,佇列目前有2個數據
Thread-1準備放資料!
Thread-1已經放了資料,佇列目前有3個數據
Thread-0準備放資料!
Thread-0已經放了資料,佇列目前有4個數據
Thread-2準備取資料!
Thread-2已經取走資料,佇列目前有3個數據

    例項2:用兩個具有 1 個空間的佇列來實現同步通知的功能(也即是主執行緒執行一會,子執行緒執行一會,模擬一個佇列已滿,另一個佇列是空的)
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;

public class BlockingQueueCommunication {

	public static void main(String[] args) {
		
		final Business business=new Business();
		
		new Thread(new Runnable() {
			
			public void run() {
				for(int i=0;i<5;i++){	
						business.sub(i);
				}
				
			}
			
		}).start();
		
		for(int i=0;i<5;i++){
			business.main(i);
		}
	}
	
	static class Business{
		//構造兩個阻塞佇列,長度均為1
		BlockingQueue<Integer> queue1=new ArrayBlockingQueue<Integer>(1);
		BlockingQueue<Integer> queue2=new ArrayBlockingQueue<Integer>(1);
		
		//用構造程式碼塊給阻塞佇列2賦初值,賦值後阻塞佇列2已滿
		{
			try {
				queue2.put(1);
			} catch (InterruptedException e) {
				e.printStackTrace();
			}
		}
		
		public void sub(int i){
			try {
				queue1.put(1);//向阻塞佇列1中新增元素
			} catch (InterruptedException e) {
				e.printStackTrace();
			}
			for(int j=0;j<10;j++){
				System.out.println("sub thread sequence of"+j+",loop of"+i);
			}
			
			try {
				queue2.take();//從阻塞佇列2中取出元素
			} catch (InterruptedException e) {
				e.printStackTrace();
			}
		}
		
		public void main(int i){
			try {
				queue2.put(1);//向阻塞佇列2中新增元素
			} catch (InterruptedException e) {
				e.printStackTrace();
			}
			for(int j=0;j<10;j++){
				System.out.println("main thread sequence of"+j+",loop of"+i);
			}
			
			try {
				queue1.take();//從阻塞佇列1中取出元素
			} catch (InterruptedException e) {
				e.printStackTrace();
			}
		}
	}
}

執行部分結果為:

sub thread sequence of0,loop of0
sub thread sequence of1,loop of0
sub thread sequence of2,loop of0
sub thread sequence of3,loop of0
sub thread sequence of4,loop of0
sub thread sequence of5,loop of0
sub thread sequence of6,loop of0
sub thread sequence of7,loop of0
sub thread sequence of8,loop of0
sub thread sequence of9,loop of0
main thread sequence of0,loop of0
main thread sequence of1,loop of0
main thread sequence of2,loop of0
main thread sequence of3,loop of0
main thread sequence of4,loop of0
main thread sequence of5,loop of0
main thread sequence of6,loop of0
main thread sequence of7,loop of0
main thread sequence of8,loop of0
main thread sequence of9,loop of0