1. 程式人生 > >Java多執行緒程式設計---java5阻塞佇列

Java多執行緒程式設計---java5阻塞佇列

java5阻塞佇列的應用

        佇列包含固定長度的佇列和不固定長度的佇列,先進先出。

        固定長度的佇列往裡放資料,如果放滿了還要放,阻塞式佇列就會等待,直到有資料取出,空出位置後才繼續放;非阻塞式佇列不能等待就只能報錯了。

        講Condition時提到了阻塞佇列的原理,Java中已經實現了阻塞佇列ArrayBlockingQueue、BlockingQueue<E>。

public interface BlockingQueue<E> extendsQueue<E>

        支援兩個附加操作的Queue,這兩個操作是:獲取元素時等待佇列變為非空,以及儲存元素時等待空間變得可用。

        BlockingQueue方法以四種形式出現,對於不能立即滿足但可能在將來某一時刻可以滿足的操作,這四種形式的處理方式不同:第一種是丟擲一個異常,第二種是返回一個特殊值(null或 false,具體取決於操作),第三種是在操作可以成功前,無限期地阻塞當前執行緒,第四種是在放棄前只在給定的最大時間限制內阻塞。下表中總結了這些方法:

        BlockingQueue 不接受 null 元素。試圖add、put 或 offer 一個null 元素時,某些實現會丟擲 NullPointerException。null被用作指示 poll 操作失敗的警戒值。

        BlockingQueue可以是限定容量的。它在任意給定時間都可以有一個 remainingCapacity,超出此容量,便無法無阻塞地put 附加元素。沒有任何內部容量約束的 BlockingQueue 總是報告Integer.MAX_VALUE 的剩餘容量。

        BlockingQueue實現主要用於生產者-使用者佇列,但它另外還支援Collection 介面。因此,舉例來說,使用remove(x) 從佇列中移除任意一個元素是有可能的。然而,這種操作通常不會有效執行,只能有計劃地偶爾使用,比如在取消排隊資訊時。

        BlockingQueue 實現是執行緒安全的。所有排隊方法都可以使用內部鎖或其他形式的併發控制來自動達到它們的目的。然而,大量的Collection 操作(addAll、containsAll、retainAll和 removeAll)沒有必要自動執行,除非在實現中特別說明。因此,舉例來說,在只添加了c 中的一些元素後,addAll(c) 有可能失敗(丟擲一個異常)。

java.util.concurrent.ArrayBlockingQueue<E>

        E- 在此 collection 中保持的元素型別

        extends AbstractQueue<E>implements BlockingQueue<E>, Serializable

        一個由陣列支援的有界阻塞佇列。此佇列按FIFO(先進先出)原則對元素進行排序。佇列的頭部是在佇列中存在時間最長的元素。佇列的尾部是在佇列中存在時間最短的元素。新元素插入到佇列的尾部,佇列獲取操作則是從佇列頭部開始獲得元素。

        這是一個典型的“有界快取區”,固定大小的陣列在其中保持生產者插入的元素和使用者提取的元素。一旦建立了這樣的快取區,就不能再增加其容量。試圖向已滿佇列中放入元素會導致操作受阻塞;試圖從空佇列中提取元素將導致類似阻塞。

        此類支援對等待的生產者執行緒和使用者執行緒進行排序的可選公平策略。預設情況下,不保證是這種排序。然而,通過將公平性(fairness) 設定為 true 而構造的佇列允許按照FIFO順序訪問執行緒。公平性通常會降低吞吐量,但也減少了可變性和避免了“不平衡性”。

        此類及其迭代器實現了Collection 和 Iterator 介面的所有可選方法。此類是Java Collections Framework 的成員。

構造方法摘要

        1、ArrayBlockingQueue(intcapacity):建立一個帶有給定的(固定)容量和預設訪問策略的ArrayBlockingQueue。

        2、ArrayBlockingQueue(intcapacity, boolean fair):建立一個具有給定的(固定)容量和指定訪問策略的ArrayBlockingQueue。

        3、ArrayBlockingQueue(intcapacity, boolean fair, Collection<? extends E> c):建立一個具有給定的(固定)容量和指定訪問策略的ArrayBlockingQueue,它最初包含給定collection 的元素,並以 collection 迭代器的遍歷順序新增元素。

方法摘要

        1、boolean add(E e):將指定的元素插入到此佇列的尾部(如果立即可行且不會超過該佇列的容量),在成功時返回 true,如果此佇列已滿,則丟擲 IllegalStateException。

        2、voidclear():自動移除此佇列中的所有元素。

        3、boolean contains(Object o):如果此佇列包含指定的元素,則返回true。

        4、int drainTo(Collection<? super E> c):移除此佇列中所有可用的元素,並將它們新增到給定collection 中。

        5、int drainTo(Collection<? super E> c, intmaxElements):最多從此佇列中移除給定數量的可用元素,並將這些元素新增到給定collection 中。

        6、Iterator<E>iterator():返回在此佇列中的元素上按適當順序進行迭代的迭代器。

        7、boolean offer(E e):將指定的元素插入到此佇列的尾部(如果立即可行且不會超過該佇列的容量),在成功時返回 true,如果此佇列已滿,則返回 false。

        8、boolean offer(E e, long timeout, TimeUnit unit):將指定的元素插入此佇列的尾部,如果該佇列已滿,則在到達指定的等待時間之前等待可用的空間。

        9、E peek():獲取但不移除此佇列的頭;如果此佇列為空,則返回 null。

        10、E poll():獲取並移除此佇列的頭,如果此佇列為空,則返回 null。

        11、Epoll(long timeout, TimeUnit unit):獲取並移除此佇列的頭部,在指定的等待時間前等待可用的元素(如果有必要)。

        12、void   put(E e):將指定的元素插入此佇列的尾部,如果該佇列已滿,則等待可用的空間。

        13、intremainingCapacity():返回在無阻塞的理想情況下(不存在記憶體或資源約束)此佇列能接受的其他元素數量。

        14、boolean   remove(Object o):從此佇列中移除指定元素的單個例項(如果存在)。

        15、intsize():返回此佇列中元素的數量。

        16、Etake():獲取並移除此佇列的頭部,在元素變得可用之前一直等待(如果有必要)。

        17、Object[]toArray():返回一個按適當順序包含此佇列中所有元素的陣列。

        18、<T>T[]   toArray(T[] a):返回一個按適當順序包含此佇列中所有元素的陣列;返回陣列的執行時型別是指定陣列的執行時型別。

        19、StringtoString():返回此 collection 的字串表示形式。

程式例項

1、阻塞佇列的簡單使用

public class BlockingQueueTest {
	
	public static void main(String[] args) {
		
		// 固定大小為3的阻塞佇列
		final BlockingQueue<Integer> queue = new ArrayBlockingQueue<Integer>(3);
		
		for (int i = 0; i < 2; i++) {
			// 往佇列中放資料的佇列
			new Thread() {
				public void run() {
					while (true) {
						try {
							Thread.sleep((long) (Math.random() * 1000));
							System.out.println(Thread.currentThread().getName() + " 準備放資料!");
							// 若佇列滿了則會阻塞
							queue.put(1);
							
							System.out.println(
									Thread.currentThread().getName() + " 已經放了資料," + "佇列目前有 " + queue.size() + " 個數據");
							
						} catch (InterruptedException e) {
							e.printStackTrace();
						}

					}
				}

			}.start();
		}
		
		// 從佇列中取資料的佇列
		new Thread() {
			public void run() {
				while (true) {
					try {
						// 將此處的睡眠時間分別改為100和1000,觀察執行結果
						Thread.sleep(1000);
						System.out.println(Thread.currentThread().getName() + " 準備取資料!");
						// 佇列為空則阻塞
						queue.take();
						
						System.out
								.println(Thread.currentThread().getName() + " 已經取走資料," + "佇列目前有 " + queue.size() + " 個數據");
						
					} catch (InterruptedException e) {
						e.printStackTrace();
					}
				}
			}

		}.start();
	}
}

2、用阻塞佇列實現執行緒間的通訊

阻塞佇列的實現原理(Condition鎖中有提到awaitsignal)

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;

public class BlockingQueueCommunication {

	public static void main(String[] args) {
		final Business business = new Business();
		
		// 子執行緒
		new Thread(new Runnable() {

			@Override
			public void run() {

				for (int i = 1; i <= 50; i++) {
					business.sub(i);
				}

			}
		}).start();
		
		// 主執行緒
		for (int i = 1; i <= 50; i++) {
			business.main(i);
		}

	}

	static class Business {
		// 每次執行之前都先在對於的佇列中放值,用滿佇列來阻塞執行緒,操作完成之後,清空另一個佇列,相當於一個通知
		BlockingQueue<Integer> queue1 = new ArrayBlockingQueue<Integer>(1);
		BlockingQueue<Integer> queue2 = new ArrayBlockingQueue<Integer>(1);
		
		// 在每次新建物件,呼叫構造方法之前執行
		{
			try {
				// 初始時  queue2 滿了,此時再往 queue2 中放資料則會阻塞,只有當另一個執行緒將 queue2 的值取走,相當於給它一個通知
				queue2.put(1);
				
			} catch (InterruptedException e) {
				e.printStackTrace();
			}
		}

		public void sub(int i) {
			try {
				queue1.put(1);
			} catch (InterruptedException e) {
				e.printStackTrace();
			}
			
			for (int j = 1; j <= 10; j++) {
				System.out.println("sub thread sequece of " + j + ",loop of " + i);
			}
			
			try {
				queue2.take();
			} catch (InterruptedException e) {
				e.printStackTrace();
			}
		}

		public void main(int i) {
			try {
				queue2.put(1);
			} catch (InterruptedException e1) {
				e1.printStackTrace();
			}
			
			for (int j = 1; j <= 100; j++) {
				System.out.println("main thread sequece of " + j + ",loop of " + i);
			}
			
			try {
				queue1.take();
			} catch (InterruptedException e) {
				e.printStackTrace();
			}
		}
	}
}

java5同步集合類的應用

        傳統集合實現同步的問題,舉一個例子:Map集合執行緒不同步導致的問題。

        解決辦法:使用同步的Map集合,使用集合工具類中的方法將不同步的集合轉為同步的Collections.synchronizedMap(newMap())這個方法返回一個同步的集合:

        publicstatic<K, V> Map<K, V> synchronizedMap(Map<K, V> m) {

            return new SynchronizedMap<K, V>(m);

        }

        SynchronizedMap類相當於一個代理類,通過檢視原始碼發現:該類中的所有方法都是直接返回:原Map集合方法呼叫後的結果,只是將返回結果的程式碼放在了同步程式碼塊中以實現同步,構造是將同步鎖預設置為當前物件。

        HashSet與HashMap的關係與區別:

        HashSet是單列的,HashMap是雙列的(鍵值對)

        關係:HashSet內部使用的是HashMap中的鍵,不考慮值。

        檢視HashSet的原始碼發現其內部就是用HashMap實現的,只是沒有使用HashMap的V,只使用了它的K。

        JDK1.5中提供了併發Collection:提供了設計用於多執行緒上下文中的Collection 實現:ConcurrentHashMap、    ConcurrentSkipListMap、ConcurrentSkipListSet、CopyOnWriteArrayList和 CopyOnWriteArraySet。當期望許多執行緒訪問一個給定collection 時,ConcurrentHashMap 通常優於同步的HashMap,ConcurrentSkipListMap 通常優於同步的TreeMap。當期望的讀數和遍歷遠遠大於列表的更新數時,CopyOnWriteArrayList優於同步的 ArrayList。

        ConcurrentSkipListMap<K,V>對映可以根據鍵的自然順序進行排序,也可以根據建立對映時所提供的Comparator 進行排序,具體取決於使用的構造方法。

        ConcurrentSkipListSet<E>一個基於ConcurrentSkipListMap 的可縮放併發 NavigableSet 實現。set的元素可以根據它們的自然順序進行排序,也可以根據建立 set 時所提供的Comparator 進行排序,具體取決於使用的構造方法。

        CopyOnWriteArrayList<E>ArrayList 的一個執行緒安全的變體,其中所有可變操作(add、set等等)都是通過對底層陣列進行一次新的複製來實現的。這一般需要很大的開銷,但是當遍歷操作的數量大大超過可變操作的數量時,這種方法可能比其他替代方法更有效。在不能或不想進行同步遍歷,但又需要從併發執行緒中排除衝突時,它也很有用。

        CopyOnWriteArraySet<E>對其所有操作使用內部CopyOnWriteArrayList 的 Set。因此,它共享以下相同的基本屬性:

        它最適合於具有以下特徵的應用程式:set大小通常保持很小,只讀操作遠多於可變操作,需要在遍歷期間防止執行緒間的衝突。它是執行緒安全的。因為通常需要複製整個基礎陣列,所以可變操作(add、set和 remove 等等)的開銷很大。迭代器不支援可變remove操作。 使用迭代器進行遍歷的速度很快,並且不會與其他執行緒發生衝突。在構造迭代器時,迭代器依賴於不變的陣列快照。

        傳統集合中存在的其它問題:對集合迭代時,不能對集合中的元素進行修改(新增、刪除……),Java5中提供的併發集合就解決了這個問題。

程式例項

CopyOnWriteArrayList使用示例:

public class User implements Cloneable {
	private String name;
	private int age;

	public User(String name, int age) {
		this.name = name;
		this.age = age;
	}

	public boolean equals(Object obj) {
		if (this == obj) {
			return true;
		}
		if (!(obj instanceof User)) {
			return false;
		}
		
		User user = (User) obj;
		if (this.name.equals(user.name) && this.age == user.age) {
			return true;
		} else {
			return false;
		}
	}

	public int hashCode() {
		return name.hashCode() + age;
	}

	public String toString() {
		return "{name:'" + name + "',age:" + age + "}";
	}

	public Object clone() {
		Object object = null;
		
		try {
			object = super.clone();
			
		} catch (CloneNotSupportedException e) {
			
		}
		
		return object;
	}

	public void setAge(int age) {
		this.age = age;
	}

	public String getName() {
		return name;
	}
}

public class CollectionModifyExceptionTest {
	
	public static void main(String[] args) {
		// ArraysList的迭代器,在迭代的過程中如果修改了List的結構,則會異常
		// Collection<User> users = new ArrayList<User>();
		
		Collection<User> users = new CopyOnWriteArrayList<User>();

		users.add(new User("張三", 28));
		users.add(new User("李四", 25));
		users.add(new User("王五", 31));
		
		Iterator<User> itrUsers = users.iterator();
		while (itrUsers.hasNext()) {
			User user = itrUsers.next();
			if ("張三".equals(user.getName())) {
				users.remove(user);
				
			} else {
				System.out.println(user);
			}
		}
	}
}