1. 程式人生 > >Java之集合(二十一)LinkedTransferQueue

Java之集合(二十一)LinkedTransferQueue

rac tco 詳細介紹 off 目前 splay runnable 異步 沒有

  轉載請註明源出處:http://www.cnblogs.com/lighten/p/7505355.html

1.前言

  本章介紹無界的阻塞隊列LinkedTransferQueue,JDK7才提供了這個類,所以這個類具備了一些一般隊列不具有的特性。此隊列也是基於鏈表的,對於所有給定的生產者都是先入先出的。註意,該隊列的size方法和ConcurrentLinkedQueue一樣不是常量時間。由於隊列的實現,其需要遍歷隊列才能計算出隊列的大小,這期間隊列發生的改變,遍歷的結果會不正確。bulk操作並不保證原子性,比如叠代器叠代的時候執行addAll()方法,叠代器可能只能看到部分新加的元素。

2.LinkedTransferQueue

2.1 TransferQueue接口

  這個是JDK7才定義的一個節點,LinkedTransferQueue實現了這個接口,其新特性也就與之有關。通常阻塞隊裏中,生產者放入元素,消費者使用元素,這兩個部分是分離的。這裏的分離意思如下:廚師做好了菜放在櫃臺上,服務員端走,廚師是不需要管有沒有人取走做的菜,服務員也不需要管廚師有沒有做好菜,沒做好菜阻塞就行了。上面就是個人所說的分離的意思。TransferQueue接口定義的相關內容就是廚師會知道做好的菜有沒有被取走。

技術分享

  1.tryTransfer(E):將元素立刻給消費者。準確的說就是立刻給一個等待接收元素的線程,如果沒有消費者就會返回false,而不將元素放入隊列。

  2.transfer(E):將元素給消費者,如果沒有消費者就會等待。

  3.tryTransfer(E,long,TimeUnit):將元素立刻給消費者,如果沒有就等待指定時間。給失敗返回false。

  4.hasWaitingConsumer():返回當前是否有消費者在等待元素。

  5.getWaitingConsumerCount():返回等待元素的消費者個數。

2.2 設計原理

  Dual Queues是該隊列的基礎理論。此隊列不進存放數據節點,也會存放請求節點。當一個線程試圖放入一個數據節點,正好遇到一個請求數據的結點,會立刻匹配並移除該數據節點,對於請求節點入隊列也是一樣的。Blocking Dual Queues阻塞所有未匹配的線程,直到有匹配的線程出現。

  一個先入先出的dual queue實現是無鎖隊列算法M&S的變體。其包含兩個指向字段:head指向一個匹配的結點,然後依次指向未匹配的結點,如果不為空。tail指向最後一個節點,或者null,如果隊列為空。例如下圖是一個包含四個元素的隊列結構:

技術分享

  M&S算法易於擴展和保持(通過CAS)這些頭部和尾指針。在dual隊列中,節點需要自動維護匹配狀態。所以這裏需要一些必要的變量:對於數據模式,匹配需要將一個item字段通過CAS從非null的數據轉成null,反之對於請求模式,需要從null變成data。一旦一個節點匹配了,其狀態將不再改變。因此通常安排元素鏈表的前綴是0個或多個匹配節點,而後跟隨0個或多個未匹配節點。如果不關心時間或空間的效率,通過從頭指針開始遍歷隊列放入取出操作都是對的。CAS操作第一個未匹配節點匹配時的item,在下一個字段追加後一個節點。然而這是一個糟糕的想法,雖然其確實有好處,不需對head或tail進行原子更新。

  LinkedTransferQueue采取了一種折中的方案,介於實時更新head/tail和不更新head/tail之間的方法。該方法對有時候需要額外的遍歷去定位第一個或最後一個未匹配的結點和減少開銷及隊列結點的競爭更新這兩個方面進行了權衡。例如,一個可能出現的隊列快照如下圖:

技術分享

  slack(head位置和第一個未匹配的結點的最大距離,尾結點類似)的最佳值是一個經驗問題,發現在1~3之間在大部分平臺是最佳的值。更大的值會增加內存命中開銷和長遍歷鏈表的風險,更小的值則會增加CAS的競爭開銷。

  具體實現:使用一個基礎的threshold來更新,slack為2。所以在當前位置超過第一個或最後一個節點2個距離以上的時候就會更新head/tail。出入隊列操作都是通過xfer方法完成的,只需要不同的參數來表示操作。

  其它的內容通過代碼詳細介紹。

2.3 數據結構

技術分享

  上圖是一個基本的數據結構:

    MP表示是否是多核處理器;

    FRONT_SPINS當一個節點目前是隊列的第一個等待者,在多核處理器上自旋的次數2n

    CHAINED_SPINS當一個節點先於另一個明顯自旋的結點阻塞時自旋的次數。

技術分享

  上圖是Node節點的基本結構,next就是下一個節點了,isData表示是請求還是數據節點,另外兩個字段就是對應不同模式要存儲的值了。Node的基本操作如下:

    casNext:CAS更新當前結點next的字段

    casItem:CAS更新當前結點的item字段

    forgetNext:CAS設置當前結點的next字段為自身

    forgetContents:CAS設置item字段為自身,waiter為null

    isMatched:是否是匹配了的結點

    isUnmatchedRequest:是否是未匹配的請求節點

    cannotPrecede:當該節點是未匹配節點卻與當前的結點類型不符的時候,返回true。意思就是當前都是請求節點,數據節點應該立刻被消耗,未匹配的結點應該是同一種節點。

    tryMatchData:數據節點嘗試匹配  

2.4 基本操作

  存入一個元素:該隊列的put、offer和offer(E,timeout,unit)方法所調用的都是同一個方法。

技術分享

技術分享

  不允許放入的數據為空,放入操作的模式是ASYNC。從頭指針處開始死循環,當前結點p沒有被匹配,數據節點不能匹配直接跳出循環,不進行匹配,後面會進入how!=NOW的判斷,創建新節點,嘗試追加到隊列尾。如果可以匹配就替換P節點的值,失敗意味著被其它線程搶先了,繼續循環,成功了意味著這兩個匹配成功,可能需要更新頭結點。q=p且p!=h的循環意味著已經跳過了一個元素,n又取了q.next,p又是當前被匹配了的結點,這就意味著前面有2個match的結點:head和p。達到slack為2的條件,更新頭結點,並遺棄之前的head。不需要更新頭結點的時候直接跳出循環。匹配完成之後就是喚醒p結點的waiter(如果p是請求節點的話)返回item。

技術分享

  隊列尾追加節點操作如上圖,從尾結點開始,

    如果當前結點p為null,頭結點也是null,初始化隊列,設置頭結點,返回s追加節點。

    如果該節點不能放入隊列,返回null。

    如果p.next不為空,意味著當前結點不是尾結點,重新找到尾結點,繼續循環。

    如果p節點在設置的時候被插隊了,繼續找其下一個循環

    如果成功了,p!=t,且tail也不等於t。意味著有尾結點後面又追加了2個節點,slack>=2更新尾結點。返回p節點。

技術分享

  取出都是消費者data為null,poll的模式是NOW,有時間限制就是TIMED,take方法使用的是SYNC。回到xfer方法,我們可以知道:其先找到第一個未匹配的元素進行匹配,匹配了不管什麽模式都是直接返回,沒匹配就要根據模式來了,先是how!=NOW才會有額外操作,所以poll取出就是NOW,取不到那就是沒準備好,直接返回就可以了。其它三種模式沒匹配到都會嘗試追加該節點,沒追加上肯定是模式不匹配,意味著可以匹配的,重新循環。如果不是ASYNC模式,那就是帶有時間或異步的模式,需要等待。

  以上就是整個類的設計思路了,分成四種模式:NOW就是立刻返回不追加元素到末尾,ASYNC就是同步需要添加元素到隊列尾,TIMED用於有時間限制的操作,SYNC用於無時間限制無限等待的操作。awaitMatch方法不再進行介紹,就是等到指定時間。size方法和getWaitingConsumerCount方法都是遍歷鏈表,超過Integer.MAX_VALUE就返回這個值,區別就是該鏈表是處於什麽模式而已。其它的方法不再描述,上面是基本的操作。

3.使用例子

    @Test
	public void testTransfer() {
		LinkedTransferQueue<Integer> queue = new LinkedTransferQueue<>();
		new Thread(new Runnable() {
			@Override
			public void run() {
				try {
					Thread.sleep(500);	// 再改成1500
					System.out.println(Thread.currentThread().getName()+"-"+queue.take());
					System.out.println(Thread.currentThread().getName()+"-"+queue.take());
					System.out.println(Thread.currentThread().getName()+"-"+queue.take());
				} catch (InterruptedException e1) {
					e1.printStackTrace();
				}
			}
		},"consumer").start();
		new Thread(new Runnable() {
			@Override
			public void run() {
				System.out.println(Thread.currentThread().getName()+"-"+queue.tryTransfer(1));
				try {
					System.out.println(Thread.currentThread().getName()+"-等待2被消耗:"+queue.tryTransfer(2, 1, TimeUnit.SECONDS));
					queue.transfer(3);
					System.out.println(Thread.currentThread().getName()+"-"+"等到3被消費:true");
				} catch (InterruptedException e) {
					e.printStackTrace();
				}
			}
		},"prodcuer").start();
		try {
			System.in.read();
		} catch (IOException e) {
			e.printStackTrace();
		}
	}

技術分享

  更改1500毫秒之後:

技術分享

Java之集合(二十一)LinkedTransferQueue