【Java 併發筆記】Exchanger 相關整理
文前說明
作為碼農中的一員,需要不斷的學習,我工作之餘將一些分析總結和學習筆記寫成部落格與大家一起交流,也希望採用這種方式記錄自己的學習之旅。
本文僅供學習交流使用,侵權必刪。
不用於商業目的,轉載請註明出處。
1. 簡介
- Exchanger(交換者)是自 JDK 1.5 起開始提供的工具套件,源於 java.util.concurrent 包。
- 是一個用於執行緒間協作的工具類。
- Exchanger 用於進行執行緒間的資料交換。它提供一個同步點,在這個同步點兩個執行緒可以交換彼此的資料。
- 此類提供對外的操作是同步的。
- 用於 成對 出現的執行緒之間交換資料。
- 可以視作雙向的同步佇列。
- 可應用於基因演算法、流水線設計等場景。
2. Exchanger 的原理
- Exchanger 用於進行執行緒間的資料交換。
- 它提供一個同步點,在這個同步點兩個執行緒可以交換彼此的資料。
- 兩個執行緒通過
exchange()
方法交換資料, 如果第一個執行緒先執行exchange()
方法,會一直等待第二個執行緒也執行exchange()
,當兩個執行緒都到達同步點時,兩個執行緒交換資料,將本執行緒生產出來的資料傳遞給對方。 - 使用 Exchanger 的重點是成對的執行緒使用
exchange()
方法。
- 這個類提供一個無參建構函式,兩個過載的範型
exchange()
方法。
public V exchange(V x) throws InterruptedException public V exchange(V x, long timeout, TimeUnit unit) throws InterruptedException, TimeoutException
- 在 Exchanger 中,如果一個執行緒已經到達了
exchanger()
時,對於其夥伴結點的情況分為三種。exchanger()
- Exchanger 有單槽位和多槽位之分,單個槽位在同一時刻只能用於兩個執行緒交換資料,這樣在競爭比較激烈的時候,會影響到效能,多個槽位就是多個執行緒可以同時進行兩個的資料交換,彼此之間不受影響,這樣可以很好的提高吞吐量。
資料結構
@sun.misc.Contended static final class Node { int index;// arena的下標,多個槽位的時候利用 int bound;// 上一次記錄的Exchanger.bound; int collides;// 在當前bound下CAS失敗的次數; int hash;// 用於自旋; Object item;// 這個執行緒的當前項,也就是需要交換的資料; volatile Object match;// 交換的資料 volatile Thread parked; // 執行緒 } /** * Value representing null arguments/returns from public * methods. Needed because the API originally didn't disallow null * arguments, which it should have. * 如果交換的資料為 null,則用NULL_ITEM代替 */ private static final Object NULL_ITEM = new Object();
- Node 定義中,index,bound,collides 用於多槽位。
- item 是當前執行緒需要交換的資料。
- match 是和其它執行緒交換後的資料,初始為 null。
- parked 是記錄執行緒,用於阻塞和喚醒執行緒。
2.1 單槽 Exchanger
- Node 是每個執行緒自身用於資料交換的結點,每個 Node 就代表了每個執行緒,為了保證執行緒安全,把執行緒的 Node 結點放在ThreadLocal。
- slot 為單槽。
/** The number of CPUs, for sizing and spin control */ private static final int NCPU = Runtime.getRuntime().availableProcessors(); /** * The bound for spins while waiting for a match. The actual * number of iterations will on average be about twice this value * due to randomization. Note: Spinning is disabled when NCPU==1. */ private static final int SPINS = 1 << 10; // 自旋次數 /** * Slot used until contention detected. */ private volatile Node slot; // 用於交換資料的槽位 /** * Per-thread state每個執行緒的資料,ThreadLocal 子類 */ private final Participant participant; /** The corresponding thread local class */ static final class Participant extends ThreadLocal<Node> { // 初始值返回Node public Node initialValue() { return new Node(); } }
exchange 方法
- 等待另一個執行緒到達此交換點(除非當前執行緒被中斷),然後將給定的物件傳送給該執行緒,並接收該執行緒的物件。
沒有設定超時時間的 exchange 方法
public V exchange(V x) throws InterruptedException { Object v; Object item = (x == null) ? NULL_ITEM : x; // 轉換成空物件 // arena == null, 路由到slotExchange(單槽交換), 如果arena != null或者單槽交換失敗,且執行緒沒有被中斷,則路由到arenaExchange(多槽交換),返回null,則丟擲中斷異常 if ((arena != null || (v = slotExchange(item, false, 0L)) == null) && ((Thread.interrupted() || (v = arenaExchange(item, false, 0L)) == null))) throw new InterruptedException(); return (v == NULL_ITEM) ? null : (V) v; }
- arena 為多槽位,如果為 null,則執行
slotExchange()
單槽方法,否則判斷執行緒是否中斷,如果中斷值丟擲 InterruptedException 異常,沒有中斷則執行arenaExchange()
多槽方法,如果該方法返回 null,丟擲中斷異常,最後返回結果。
具有超時功能的 exchange 方法
public V exchange(V x, long timeout, TimeUnit unit) throws InterruptedException, TimeoutException { Object v; Object item = (x == null) ? NULL_ITEM : x;// 轉換成空物件 long ns = unit.toNanos(timeout); // arena == null, 路由到slotExchange(單槽交換), 如果arena != null或者單槽交換失敗,且執行緒沒有被中斷,則路由到arenaExchange(多槽交換),返回null,則丟擲中斷異常 if ((arena != null || (v = slotExchange(item, true, ns)) == null) && ((Thread.interrupted() || (v = arenaExchange(item, true, ns)) == null))) throw new InterruptedException(); if (v == TIMED_OUT)// 超時 throw new TimeoutException(); return (v == NULL_ITEM) ? null : (V) v; }
- 增加超時的判斷。
slotExchange 方法
private final Object slotExchange(Object item, boolean timed, long ns) { Node p = participant.get(); // 獲取當前執行緒攜帶的Node Thread t = Thread.currentThread(); // 當前執行緒 if (t.isInterrupted()) // 保留中斷狀態,以便呼叫者可以重新檢查,Thread.interrupted() 會清除中斷狀態標記 return null; for (Node q;;) { if ((q = slot) != null) { // slot不為null, 說明已經有執行緒在這裡等待了 if (U.compareAndSwapObject(this, SLOT, q, null)) { // 將slot重新設定為null, CAS操作 Object v = q.item; // 取出等待執行緒攜帶的資料 q.match = item; // 將當前執行緒的攜帶的資料交給等待執行緒 Thread w = q.parked; // 可能存在的等待執行緒(可能中斷,不等了) if (w != null) U.unpark(w); // 喚醒等待執行緒 return v; // 返回結果,交易成功 } // CPU的個數多於1個,並且bound為0時建立 arena,並將bound設定為SEQ大小 if (NCPU > 1 && bound == 0 && U.compareAndSwapInt(this, BOUND, 0, SEQ)) arena = new Node[(FULL + 2) << ASHIFT]; // 根據CPU的個數估計Node的數量 } else if (arena != null) return null; // 如果slot為null, 但arena不為null, 則轉而路由到arenaExchange方法 else { // 最後一種情況,說明當前執行緒先到,則佔用此slot p.item = item; // 將攜帶的資料卸下,等待別的執行緒來交易 if (U.compareAndSwapObject(this, SLOT, null, p)) // 將slot的設為當前執行緒攜帶的Node break; // 成功則跳出迴圈 p.item = null; // 失敗,將資料清除,繼續迴圈 } } // 當前執行緒等待被釋放, spin -> yield -> block/cancel int h = p.hash; // 偽隨機,用於自旋 long end = timed ? System.nanoTime() + ns : 0L; // 如果timed為true,等待超時的時間點; 0表示沒有設定超時 int spins = (NCPU > 1) ? SPINS : 1; // 自旋次數 Object v; while ((v = p.match) == null) { // 一直迴圈,直到有執行緒來交易 if (spins > 0) { // 自旋,直至spins不大於0 h ^= h << 1; // 偽隨機演算法, 目的是等h小於0(隨機的) h ^= h >>> 3; h ^= h << 10; if (h == 0) // 初始值 h = SPINS | (int) t.getId(); else if (h < 0 && (--spins & ((SPINS >>> 1) - 1)) == 0) Thread.yield(); // 等到h < 0, 而spins的低9位也為0(防止spins過大,CPU空轉過久),讓出CPU時間片,每一次等待有兩次讓出CPU的時機(SPINS >>> 1) } else if (slot != p) // 別的執行緒已經到來,正在準備資料,自旋等待一會兒,馬上就好 spins = SPINS; // 如果執行緒沒被中斷,且arena還沒被建立,並且沒有超時 else if (!t.isInterrupted() && arena == null && (!timed || (ns = end - System.nanoTime()) > 0L)) { U.putObject(t, BLOCKER, this); // 設定當前執行緒將阻塞在當前物件上 p.parked = t; // 掛在此結點上的阻塞著的執行緒 if (slot == p) U.park(false, ns); // 阻塞, 等著被喚醒或中斷 p.parked = null; // 醒來後,解除與結點的聯絡 U.putObject(t, BLOCKER, null); // 解除阻塞物件 } else if (U.compareAndSwapObject(this, SLOT, p, null)) { // 超時或其它(取消),給其它執行緒騰出slot v = timed && ns <= 0L && !t.isInterrupted() ? TIMED_OUT : null; break; } } // 歸位 U.putOrderedObject(p, MATCH, null); p.item = null; p.hash = h; return v; }
- 執行流程。
- 檢查 slot 是否為空(null),不為空,說明已經有執行緒在此等待,嘗試佔領該槽位,如果佔領成功,與等待執行緒交換資料,並喚醒等待執行緒,交易結束,返回。
- 如果佔領槽位失敗,建立 arena,繼續步驟 1 嘗試搶佔 slot,直至 slot 為空,或者搶佔成功,交易結束返回。
- 如果 slot 為空,則判斷 arena 是否為空,如果 arena 不為空,返回 null,重新路由到 arenaExchange 方法。
- 如果 arena 為空,說明當前執行緒是先到達的,嘗試佔有 slot,如果成功,將 slot 標記為自己佔用,跳出迴圈,繼續步驟 5,如果失敗,則繼續步驟 1。
- 當前執行緒等待被釋放,等待的順序是先自旋(spin),不成功則讓出 CPU 時間片(yield),最後還不行就阻塞(block),spin -> yield -> block。
- 如果超時(設定超時的話)或被中斷,則退出迴圈。
- 最後,重置資料,下次重用,返回結果,結束。

slotExchange 流程圖
2.2 多槽 Exchanger
- 一個 Node 陣列 arena,代表了很多的槽位。
private static final int ASHIFT = 7; // 兩個有效槽(slot -> Node)之間的位元組地址長度(記憶體地址,以位元組為單位),1 << 7至少為快取行的大小,防止偽共享 private static final int MMASK = 0xff; // 場地(一排槽,arena -> Node[])的可支援的最大索引,可分配的大小為 MMASK + 1 private static final int SEQ = MMASK + 1; // bound的遞增單元,確立其唯一性 private static final int NCPU = Runtime.getRuntime().availableProcessors(); // CPU的個數,用於場地大小和自旋控制 static final int FULL = (NCPU >= (MMASK << 1)) ? MMASK : NCPU >>> 1; // 最大的arena索引 private static final int SPINS = 1 << 10; // 自旋次數,NCPU = 1時,禁用 private static final Object NULL_ITEM = new Object();// 空物件,對應null private static final Object TIMED_OUT = new Object();// 超時物件,對應timeout // 多個執行緒交換/多槽位 private volatile Node[] arena;
arenaExchange 方法
private final Object arenaExchange(Object item, boolean timed, long ns) { Node[] a = arena; // 交換場地,一排slot Node p = participant.get(); // 獲取當前執行緒攜帶的Nodep.index 初始值為 0 for (int i = p.index;;) { // arena的索引,陣列下標 int b, m, c; long j; // 原陣列偏移量,包括填充值 // 從場地中選出偏移地址為(i << ASHIFT) + ABASE的記憶體值,也即真正可用的Node //如果i為0,j相當於是 "第一個"槽位 Node q = (Node) U.getObjectVolatile(a, j = (i << ASHIFT) + ABASE); if (q != null && U.compareAndSwapObject(a, j, q, null)) { // 此槽位不為null, 說明已經有執行緒在這裡等了,重新將其設定為null, CAS操作 Object v = q.item; // 取出等待執行緒攜帶的資料 q.match = item; // 將當前執行緒攜帶的資料交給等待執行緒 Thread w = q.parked; // 可能存在的等待執行緒 if (w != null) U.unpark(w); // 喚醒等待執行緒 return v; // 返回結果, 交易成功 } else if (i <= (m = (b = bound) & MMASK) && q == null) { // 有效交換位置,且槽位為空 p.item = item; // 將攜帶的資料卸下,等待別的執行緒來交易 if (U.compareAndSwapObject(a, j, null, p)) { // 槽位佔領成功 long end = (timed && m == 0) ? System.nanoTime() + ns : 0L; // 計算出超時結束時間點 Thread t = Thread.currentThread(); // 當前執行緒 for (int h = p.hash, spins = SPINS;;) { // 一直迴圈,直到有別的執行緒來交易,或超時,或中斷 Object v = p.match; // 檢查是否有別的執行緒來交換資料 if (v != null) { // 有則返回 U.putOrderedObject(p, MATCH, null); // match重置,等著下次使用 p.item = null; // 清空,下次接著使用 p.hash = h; return v; // 返回結果,交易結束 } else if (spins > 0) { // 自旋 h ^= h << 1; h ^= h >>> 3; h ^= h << 10; // 移位加異或,偽隨機 if (h == 0) // 初始值 h = SPINS | (int) t.getId(); else if (h < 0 && // SPINS >>> 1, 一半的概率 (--spins & ((SPINS >>> 1) - 1)) == 0) Thread.yield(); // 每一次等待有兩次讓出CPU的時機 } else if (U.getObjectVolatile(a, j) != p) spins = SPINS; // 別的執行緒已經到來,正在準備資料,自旋等待一會兒,馬上就好 else if (!t.isInterrupted() && m == 0 && (!timed || (ns = end - System.nanoTime()) > 0L)) { U.putObject(t, BLOCKER, this); // 設定當前執行緒將阻塞在當前物件上 p.parked = t; // 掛在此結點上的阻塞著的執行緒 if (U.getObjectVolatile(a, j) == p) U.park(false, ns); // 阻塞, 等著被喚醒或中斷 p.parked = null; // 醒來後,解除與結點的聯絡 U.putObject(t, BLOCKER, null); // 解除阻塞物件 } else if (U.getObjectVolatile(a, j) == p && U.compareAndSwapObject(a, j, p, null)) { if (m != 0) // 嘗試縮減 U.compareAndSwapInt(this, BOUND, b, b + SEQ - 1); // 更新bound, 高位遞增,低位 -1 p.item = null; // 重置 p.hash = h; i = p.index >>>= 1; // 索引減半,為的是快速找到匯合點(最左側) if (Thread.interrupted())// 保留中斷狀態,以便呼叫者可以重新檢查,Thread.interrupted() 會清除中斷狀態標記 return null; if (timed && m == 0 && ns <= 0L) // 超時 return TIMED_OUT; break; // 重新開始 } } } else p.item = null; // 重置 } else { if (p.bound != b) { // 別的執行緒更改了bound,重置collides為0, i的情況如下:當i != m, 或者m = 0時,i = m; 否則,i = m-1; 從右往左遍歷 p.bound = b; p.collides = 0; i = (i != m || m == 0) ? m : m - 1; // index 左移 } else if ((c = p.collides) < m || m == FULL || !U.compareAndSwapInt(this, BOUND, b, b + SEQ + 1)) { // 更新bound, 高位遞增,低位 +1 p.collides = c + 1; i = (i == 0) ? m : i - 1; // 左移,遍歷槽位,m == FULL時,i == 0(最左側),重置i = m, 重新從右往左迴圈遍歷 } else i = m + 1; // 槽位增長 p.index = i; } } }
- 執行流程。
- 從場地中選出偏移地址為(i << ASHIFT)+ ABASE 的記憶體值,也即第 i 個真正可用的 Node,判斷其槽位是否為空,為空,進入步驟 2。
- 不為空,說明有執行緒在此等待,嘗試搶佔該槽位,搶佔成功,交換資料,並喚醒等待執行緒,返回,結束。
- 沒有搶佔成功,進入步驟 9。
- 檢查索引是否越界,越界,進入步驟 9。沒有越界,進入步驟 3。
- 嘗試佔有該槽位,搶佔失敗,進入步驟 1。搶佔成功,進入步驟 4。
- 檢查 match,是否有執行緒來交換資料,如果有,交換資料,結束。如果沒有,進入步驟 5。
- 檢查 spin 是否大於 0,如果不大於 0,進入步驟 6。
- 如果大於 0,檢查 hash 是否小於 0,並且 spin 減半或為 0,如果不是,進入步驟 4。
- 如果是,讓出 CPU 時間,過一會兒,進入步驟 4。
- 檢查是否中斷,m 達到最小值,是否超時,如果沒有中斷,沒有超時,並且 m 達到最小值,阻塞,過一會兒進入步驟 4。否則,進入步驟 7。
- 沒有執行緒交換資料,嘗試丟棄原有的槽位重新開始,丟棄失敗,進入步驟 4。否則,進入步驟 8。
- bound 減 1(m>0),索引減半。
- 檢查是否中斷或超時,如果沒有,進入步驟 1。
- 否則,返回,結束。
- 檢查 bound 是否發生變化,如果變化,重置 collides,索引重置為 m 或左移,轉向步驟 1。否則,進入步驟 10。
- 檢查 collides 是否達到最大值,如果沒有,進入步驟13。否則,進入步驟 11。
- m 是否達到 FULL,是,進入步驟13。否則,進入步驟 12。
- CAS bound 加 1 是否成功,如果成功,i 置為 m+1,槽位增長,進入步驟 1。否則,進入步驟 13。
- collides 加 1,索引左移,進入步驟 1。
- 從場地中選出偏移地址為(i << ASHIFT)+ ABASE 的記憶體值,也即第 i 個真正可用的 Node,判斷其槽位是否為空,為空,進入步驟 2。
static final class Participant extends ThreadLocal<Node> { public Node initialValue() { return new Node(); } }
- 通過 participant 取得當前結點 Node,然後根據當前結點 Node 的 index 去取 arena 中相對應的結點。
偽隨機
h ^= h << 1; h ^= h >>> 3; h ^= h << 10;
- xorshift 演算法。
- T = (I + L a )(I + R b )(I + L c )。
- L 代表左移。
- R 代表右移。
- a,b,c 分別為程式碼中的 1,3,10。
- I 代表矩陣 {0, 1} 共 32 位,即是 int 型別的二進位制。
- T 代表的是隨機演算法。
- T = (I + L a )(I + R b )(I + L c )。
- 偽隨機通過 xorshift 演算法模擬隨機,為了達到更好的隨機效果,週期自然是越大越好。
- 週期 指的是,當給定一個輸入,得到的輸出再作為下一次的輸入,如此反覆,直到某次輸出恰巧等於最初的輸入,這便是隨機演算法的一個週期。
- int 型別的最大週期應該是遍歷該型別所有的值(0 除外(奇異矩陣),如果是 0 ,輸出便一直是 0,不能隨機),即
max(2^31-1) - min(-2^31) = 2^32 - 1 = 4294967295
。
為什麼選用 1,3,10
2^32 - 1 = 4294967295
[4294967295] (1, 3, 10) (2, 7, 7) (2, 7, 9) (5, 9, 7) (7, 1, 9) (7, 7, 2) (7, 9, 5)
為什麼要有兩次左移和一次右移
- 雖然只一次左移+異或就能達到隨機的效果。
- 但是第一次左移(I + L a )可以讓高位多 1,右移(I + R b )可以讓低位多 1,高位低位都參與計算,可以增加隨機性,第二次左移(I + L c ),再進行真正的隨機計算。
自旋等待
private static final int SPINS = 1 << 10; else if (h < 0 && (--spins & ((SPINS >>> 1) - 1)) == 0) // h < 0,一半的概率 Thread.yield(); // 每一次等待有兩次讓出CPU的時機
- 等待其它執行緒交換資料時,會進行自旋等待,自旋的過程中,當前執行緒會有 2 次讓出 CPU 的時機。
- SPINS 為 1024,
((1024 >>>1) -1) = 511 = 0111111111
,spins 預設為 1024 迴圈遞減。 - 當 spins 的最高位為 0 或 1 並且其它位為 0 時(0 或 512)進行 與 (&) 計算的結果為 0。
- SPINS 為 1024,
arena 的建立
static final int FULL = (NCPU >= (MMASK << 1)) ? MMASK : NCPU >>> 1; private static final int ASHIFT = 7; private static final int NCPU = Runtime.getRuntime().availableProcessors(); private static final int MMASK = 0xff;// 255 ...... if (NCPU > 1 && bound == 0 &&U.compareAndSwapInt(this, BOUND, 0, SEQ)) arena = new Node[(FULL + 2) << ASHIFT];
- 在
slotExchange()
方法中存在競爭時,會構建 arena。SEQ(SEQ=MMASK + 1) Runtime.getRuntime().availableProcessors()
private static final sun.misc.Unsafe U; private static final int ABASE; U = sun.misc.Unsafe.getUnsafe(); Class<?> ak = Node[].class; s = U.arrayIndexScale(ak); ABASE = U.arrayBaseOffset(ak) + (1 << ASHIFT);
FULL 和 ASHIFT 的定義
- arena 陣列很大,但裡面並不是每個位置都被使用了,還有一些是沒有使用的。
- 通過Unsafe 的
arrayBaseOffset(ak)
方法可以返回 arena 陣列中第一個元素的偏移地址。 - 通過
arrayIndexScale(ak)
方法可以返回 arena 陣列中每一個元素佔用的大小,也就是元素與元素之間的間隔,即1 << ASHIFT
為 128。ABASE = arrayBaseOffset + (1 << ASHIFT) arrayBaseOffset + N * arrayIndexScale
-
@sun.misc.Contended
註解 和1 << ASHIFT
主要是用於避免偽共享。1 << ASHIFT
可以避免兩個 Node 在同一個共享區(快取行)。- 主流快取行大小一般為 32 位元組到 256 位元組,128 個地址位基本覆蓋到了常見的處理器平臺。
- arena 陣列中元素(結點)的分佈間隔為 128 個整數倍地址位,也就是說最小相差 128 個地址位。
- 通過Unsafe 的

arena 陣列結構
Node q = (Node)U.getObjectVolatile(a, j = (i << ASHIFT) + ABASE); if (q != null && U.compareAndSwapObject(a, j, q, null)) { Object v = q.item;// 獲取槽位中結點 q 的資料 q.match = item;// 把當前執行緒的資料交換給它 Thread w = q.parked; // 獲得槽位中結點 q 對應的執行緒物件 if (w != null) U.unpark(w);//喚醒該執行緒 return v; }
bound 和 collides
- bound 是上一次記錄的 Exchanger.bound。
- bound 會記錄 最大有效 的 arena 索引,是動態變化的,競爭激烈時(槽位全滿)增加, 槽位空曠時減小。
- bound + SEQ 確立其唯一性(版本),低 8 位記錄 有效索引 。
- collides 是在當前 bound 下 CAS 失敗的次數。
- 最大為 m,m(bound & MMASK)為當前 bound 下最大有效索引。
- 槽位最大值為 MMASK(255),bound 最大值也就是 255,m 和 i 的範圍為 [0,255]。
- 從右往左遍歷,等到 collides == m 時,有效索引的槽位已經遍歷完,這時需要增長槽位。
- 增長的方式是重置 bound(依賴 SEQ 更新其版本,低位 + 1),同時 collides 重置。
private static final int MMASK = 0xff; private static final int SEQ = MMASK + 1; ...... // MASK: 00000000000000000000000011111111 //SEQ: 00000000000000000000000100000000(MASK + 1) //1: 00000000000000000000000000000001 if (NCPU > 1 && bound == 0 && U.compareAndSwapInt(this, BOUND, 0, SEQ)) // 當 bound 為 0 時,bound 被更新為 SEQ //第一次更新 //b0: 00000000000000000000000100000000 U.compareAndSwapInt(this, BOUND, b, b + SEQ + 1) //SEQ+1: 00000000000000000000000100000001 //b0+SEQ+1=b1: 00000000000000000000000200000001 //第二次更新 //b1+SEQ: 00000000000000000000000300000001 //第二次是 -1 的情況 U.compareAndSwapInt(this, BOUND, b, b + SEQ - 1) //b1+SEQ-1=b2: 00000000000000000000000300000000
- bound + SEQ 是版本遞增的過程,
b + SEQ + 1
後再b + SEQ - 1
,實際經歷了兩個版本,並且會將 collides 重置。 - 下圖中去除了實際存在的未使用位置,只保留了陣列中被使用的位置。
- 其中被使用的位置數量最大值為 MMASK(255),FULL <= MMASK。
- 當前執行緒進入 " 第一個 " 槽位,發現有其它執行緒在交換資料,則增加 1 個槽位並且 bound 遞增,此時最大有效索引為 1。
- m 等於 1,i 範圍為 [0,1],p.index 等於 1。
- 當前執行緒進入後續槽位(包含之前增加的槽位),如果發現同樣有其它執行緒在交換資料,則繼續增加槽位,bound 遞增。
- 當前執行緒進入後續槽位(包含之前增加的槽位),沒有元素(結點),則嘗試佔據該槽位,佔據成功則等待其它執行緒。
- 當等待超時則刪除該槽位,再次從頭開始遍歷有效索引,尋找其它執行緒交換資料。

bound 操作
- bound 版本唯一性的作用主要用於更新索引,將有效索引更新到最右側位置,使得可以再次從右向左(從頭)遍歷。
- 如果沒有 bound 的版本唯一性,便沒有索引更新,就會一直往左遍歷競爭激烈的槽位。
- 如果沒有 bound 的版本唯一性,還會使得 bound 只增不減,影響效率。
3. 總結
- 當前執行緒 A 和其它執行緒 B(一個或多個)在槽中交換資料。
- 單槽方法(slotExchange)執行,A 發現 B 已經在槽中,則嘗試交換資料,如果成功,則進入第 2 步驟。如果失敗則說明有其它執行緒已經在和 B 進行資料交換,則進入第 5 步驟。
- 交換資料成功,則交換結束。也可能超時或者中斷,造成交換失敗,只能從頭開始。
- 到達槽位,未發現其它執行緒,則嘗試佔位,搶佔成功,則自旋等待其它執行緒交換資料,進入第 4 步驟。搶佔失敗,則說明被其它執行緒搶佔了槽位,則進入第 5 步驟。
- 其它執行緒來交換資料,成功則交換結束。如果等待超時則尋找其它執行緒進行交換,先刪除一個槽位,再從頭開始尋找其它執行緒交換資料。也有可能會被中斷。
- 轉為多槽方法(arenaExchange)執行,挨個尋找槽中是否有可交換資料的物件,如果發現交換物件且嘗試交換資料成功,則進入第 2 步驟。如果為空槽,則佔據並等待其它執行緒來交換資料,進入第 4 步驟。
- 嘗試多次交換都未成功,則增加槽位,然後再從頭開始。