LinkedBlockingQueue原始碼解析(1)
此文已由作者趙計剛授權網易雲社群釋出。
歡迎訪問網易雲社群,瞭解更多網易技術產品運營經驗。
1、對於LinkedBlockingQueue需要掌握以下幾點
建立
入隊(新增元素)
出隊(刪除元素)
2、建立
Node節點內部類與LinkedBlockingQueue的一些屬性
static class Node<E> { E item;//節點封裝的資料 /** * One of: * - the real successor Node * - this Node, meaning the successor is head.next * - null, meaning there is no successor (this is the last node) */ Node<E> next;//下一個節點 Node(E x) { item = x; } } /** 指定連結串列容量 */ private final int capacity; /** 當前的元素個數 */ private final AtomicInteger count = new AtomicInteger(0); /** 連結串列頭節點 */ private transient Node<E> head; /** 連結串列尾節點 */ private transient Node<E> last; /** 出隊鎖 */ private final ReentrantLock takeLock = new ReentrantLock(); /** 出隊等待條件 */ private final Condition notEmpty = takeLock.newCondition(); /** 入隊鎖 */ private final ReentrantLock putLock = new ReentrantLock(); /** 入隊等待條件 */ private final Condition notFull = putLock.newCondition();
2.1、public LinkedBlockingQueue(int capacity)
使用方法:
Queue<String> abq = new LinkedBlockingQueue<String>(1000);
原始碼:
/** * 建立一個 LinkedBlockingQueue,容量為指定容量 */ public LinkedBlockingQueue(int capacity) { if (capacity <= 0) throw new IllegalArgumentException(); this.capacity = capacity; last = head = new Node<E>(null);//初始化頭節點和尾節點,均為封裝了null資料的節點 }
注意點:
LinkedBlockingQueue的組成一個連結串列+兩把鎖+兩個條件
2.2、public LinkedBlockingQueue()
使用方法:
Queue<String> abq = new LinkedBlockingQueue<String>();
原始碼:
/** * 建立一個LinkedBlockingQueue,容量為整數最大值 */ public LinkedBlockingQueue() { this(Integer.MAX_VALUE); }
注意點:預設容量為整數最大值,可以看做沒有容量限制
3、入隊:
3.1、public boolean offer(E e)
原理:
在隊尾插入一個元素, 如果佇列沒滿,立即返回true; 如果佇列滿了,立即返回false
使用方法:
abq.offer("hello1");
原始碼:
/** * 在隊尾插入一個元素, 容量沒滿,可以立即插入,返回true; 佇列滿了,直接返回false * 注:如果使用了限制了容量的佇列,這個方法比add()好,因為add()插入失敗就會丟擲異常 */ public boolean offer(E e) { if (e == null) throw new NullPointerException(); final AtomicInteger count = this.count;// 獲取佇列中的元素個數 if (count.get() == capacity)// 佇列滿了 return false; int c = -1; final ReentrantLock putLock = this.putLock; putLock.lock();// 獲取入隊鎖 try { if (count.get() < capacity) {// 容量沒滿 enqueue(e);// 入隊 c = count.getAndIncrement();// 容量+1,返回舊值(注意) if (c + 1 < capacity)// 如果新增元素後的容量,還小於指定容量(說明在插入當前元素後,至少還可以再插一個元素) notFull.signal();// 喚醒等待notFull條件的其中一個執行緒 } } finally { putLock.unlock();// 釋放入隊鎖 } if (c == 0)// 如果c==0,這是什麼情況?一開始如果是個空佇列,就會是這樣的值,要注意的是,上邊的c返回的是舊值 signalNotEmpty(); return c >= 0; }
/** * 建立一個節點,並加入連結串列尾部 * @param x */ private void enqueue(E x) { /* * 封裝新節點,並賦給當前的最後一個節點的下一個節點,然後在將這個節點設為最後一個節點 */ last = last.next = new Node<E>(x); }
private void signalNotEmpty() { final ReentrantLock takeLock = this.takeLock; takeLock.lock();//獲取出隊鎖 try { notEmpty.signal();//喚醒等待notEmpty條件的執行緒中的一個 } finally { takeLock.unlock();//釋放出隊鎖 } }
如果,入隊邏輯不懂,檢視最後總結部分入隊邏輯的圖,程式碼非常簡單,流程看註釋即可。
3.2、public boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException
原理:
在隊尾插入一個元素,,如果佇列已滿,則進入等待,直到出現以下三種情況:
被喚醒
等待時間超時
當前執行緒被中斷
使用方法:
try { abq.offer("hello2",1000,TimeUnit.MILLISECONDS); } catch (InterruptedException e) { e.printStackTrace(); }
原始碼:
/** * 在隊尾插入一個元素,,如果佇列已滿,則進入等待,直到出現以下三種情況: * 1、被喚醒 * 2、等待時間超時 * 3、當前執行緒被中斷 */ public boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException { if (e == null) throw new NullPointerException(); long nanos = unit.toNanos(timeout);// 轉換為納秒 int c = -1; final ReentrantLock putLock = this.putLock;// 入隊鎖 final AtomicInteger count = this.count;// 總數量 putLock.lockInterruptibly(); try { while (count.get() == capacity) {// 容量已滿 if (nanos <= 0)// 已經超時 return false; /* * 進行等待: 在這個過程中可能發生三件事: * 1、被喚醒-->繼續當前這個while迴圈 * 2、超時-->繼續當前這個while迴圈 * 3、被中斷-->丟擲中斷異常InterruptedException */ nanos = notFull.awaitNanos(nanos); } enqueue(e);// 入隊 c = count.getAndIncrement();// 入隊元素數量+1 if (c + 1 < capacity) notFull.signal(); } finally { putLock.unlock(); } if (c == 0) signalNotEmpty(); return true; }
注意:
awaitNanos(nanos)是AQS中的一個方法,這裡就不詳細說了,有興趣的自己去檢視AQS的原始碼。
免費領取驗證碼、內容安全、簡訊傳送、直播點播體驗包及雲伺服器等套餐
更多網易技術、產品、運營經驗分享請點選。
相關文章:
【推薦】 異常測試之Socket網路異常
【推薦】 分散式儲存系統可靠性系列三:設計模式