1. 程式人生 > >java併發-特大疑問-阻塞佇列(BlockingQueue)

java併發-特大疑問-阻塞佇列(BlockingQueue)

java併發-阻塞佇列(BlockingQueue)

何為阻塞佇列

A {@link java.util.Queue} that additionally supports operations
that wait for the queue to become non-empty when retrieving an
element, and wait for space to become available in the queue when
storing an element.\

即:

  • 在新增元素的時候,在佇列滿的時候會處於等待狀態,知道佇列有空加才去新增
  • 在移除元素的時候,當佇列為空即佇列長隊為0的時候,將等待知道佇列不為空才可移除佇列node

何處用到了阻塞佇列?

  • 執行緒池,ThreadPoolExecutor使用的是BlockingQueue

實現類

  • ArrayBlockingQueue:一個由陣列結構組成的有界阻塞佇列。
  • LinkedBlockingQueue:一個由連結串列結構組成的有界阻塞佇列。
  • PriorityBlockingQueue:一個支援優先順序排序的無界阻塞佇列。
  • DelayQueue:一個使用優先順序佇列實現的無界阻塞佇列。
  • SynchronousQueue:一個不儲存元素的阻塞佇列
  • LinkedTransferQueue 個由連結串列結構組成的無界阻塞佇列
  • LinkedBlockingQueue 一個由連結串列結構組成的雙向阻塞佇列

詳解LinkedBlockingQueue(tomcat執行緒池用的是LinkedBlockingQueue)

入隊出隊流程

init
在這裡插入圖片描述

     public LinkedBlockingQueue(int capacity) {
            if (capacity <= 0) throw new IllegalArgumentException();
            //queue的最大容量
            this.capacity = capacity;
            //last和head指向同一個引用
            last = head = new Node<E>(null);
        }

enqueue
在這裡插入圖片描述

  • 第一次入隊,那麼last和head就分道揚鑣了
  • 每一次入隊,last都指向最後一個node
   private void enqueue(Node<E> node) {
       // assert putLock.isHeldByCurrentThread();
       // assert last.next == null;
       last = last.next = node;
   }

dequeue

  • 每一次出隊操作,head.next的引用就指向佇列的下一個,head.item永遠都為空,head.next才是正真的隊首
    在這裡插入圖片描述
    private E dequeue() {
        // assert takeLock.isHeldByCurrentThread();
        // assert head.item == null;
        Node<E> h = head;
        Node<E> first = h.next;
        h.next = h; // help GC
        head = first;
        E x = first.item;
        first.item = null;
        return x;
    }

執行緒安全

採用的是AQS的ReentrantLock來實現

  • ReentrantLock(putLock)
    • notFull(Codition)
 public void put(E e) throws InterruptedException {
         if (e == null) throw new NullPointerException();
         // Note: convention in all put/take/etc is to preset local var
         // holding count negative to indicate failure unless set.
         //區域性變數保證 可重複讀,與外界隔離,有點事務的隔離性
         int c = -1;
         Node<E> node = new Node<E>(e);
         final ReentrantLock putLock = this.putLock;
         final AtomicInteger count = this.count;
         putLock.lockInterruptibly();
         try {
             /*
              * Note that count is used in wait guard even though it is
              * not protected by lock. This works because count can
              * only decrease at this point (all other puts are shut
              * out by lock), and we (or some other waiting put) are
              * signalled if it ever changes from capacity. Similarly
              * for all other uses of count in other wait guards.
              */
             while (count.get() == capacity) {
                 notFull.await();
             }
             //入隊操作
             enqueue(node);
             c = count.getAndIncrement();
             //這裡必須要通知其他put執行緒
             if (c + 1 < capacity)
                 notFull.signal();
         } finally {
             putLock.unlock();
         }
         //不懂
         if (c == 0)
             signalNotEmpty();
     }
  • ReentrantLock(takeLock)
    • notEmpty(Codition)
//同上
public E take() throws InterruptedException {
        E x;
        int c = -1;
        final AtomicInteger count = this.count;
        final ReentrantLock takeLock = this.takeLock;
        takeLock.lockInterruptibly();
        try {
            while (count.get() == 0) {
                notEmpty.await();
            }
            x = dequeue();
            c = count.getAndDecrement();
            if (c > 1)
                notEmpty.signal();
        } finally {
            takeLock.unlock();
        }
        //不懂
        if (c == capacity)
            signalNotFull();
        return x;
    }

入隊和出隊採用不同的鎖,通過AtomicInteger來保證佇列技術器的執行緒安全。不同操作使用不同的鎖,效率很高。通過Codition來進行執行緒間
通訊,及時喚醒。這裡採用了兩把鎖,相比一把鎖更加高效,take和put操作互不影響。

疑問

這裡的c在在put操作後,個人感覺不可能為0,不懂作者的意圖

         if (c == 0)
             signalNotEmpty();

這裡的c在在take操作後,個人感覺不可能為capacity,不懂作者的意圖

if (c == capacity)
            signalNotFull();
        return x;
        

GitHub主頁