1. 程式人生 > >併發容器之ArrayBlockingQueue和LinkedBlockingQueue實現原理詳解

併發容器之ArrayBlockingQueue和LinkedBlockingQueue實現原理詳解

1. ArrayBlockingQueue簡介

在多執行緒程式設計過程中,為了業務解耦和架構設計,經常會使用併發容器用於儲存多執行緒間的共享資料,這樣不僅可以保證執行緒安全,還可以簡化各個執行緒操作。例如在“生產者-消費者”問題中,會使用阻塞佇列(BlockingQueue)作為資料容器,關於BlockingQueue可以看這篇文章。為了加深對阻塞佇列的理解,唯一的方式是對其實驗原理進行理解,這篇文章就主要來看看ArrayBlockingQueue和LinkedBlockingQueue的實現原理。

2. ArrayBlockingQueue實現原理

阻塞佇列最核心的功能是,能夠可阻塞式的插入和刪除佇列元素。當前佇列為空時,會阻塞消費資料的執行緒,直至佇列非空時,通知被阻塞的執行緒;當佇列滿時,會阻塞插入資料的執行緒,直至佇列未滿時,通知插入資料的執行緒(生產者執行緒)。那麼,多執行緒中訊息通知機制最常用的是lock的condition機制,關於condition可以

看這篇文章的詳細介紹。那麼ArrayBlockingQueue的實現是不是也會採用Condition的通知機制呢?下面來看看。

2.1 ArrayBlockingQueue的主要屬性

ArrayBlockingQueue的主要屬性如下:

/** The queued items */
final Object[] items;

/** items index for next take, poll, peek or remove */
int takeIndex;

/** items index for next put, offer, or add */
int putIndex;

/** Number of elements in the queue */
int count;

/*
 * Concurrency control uses the classic two-condition algorithm
 * found in any textbook.
 */

/** Main lock guarding all access */
final ReentrantLock lock;

/** Condition for waiting takes */
private final Condition notEmpty;

/** Condition for waiting puts */
private final Condition notFull;

從原始碼中可以看出ArrayBlockingQueue內部是採用陣列進行資料儲存的(屬性items),為了保證執行緒安全,採用的是ReentrantLock lock,為了保證可阻塞式的插入刪除資料利用的是Condition,當獲取資料的消費者執行緒被阻塞時會將該執行緒放置到notEmpty等待佇列中,當插入資料的生產者執行緒被阻塞時,會將該執行緒放置到notFull等待佇列中。而notEmpty和notFull等中要屬性在構造方法中進行建立:

public ArrayBlockingQueue(int capacity, boolean fair) {
    if (capacity <= 0)
        throw new IllegalArgumentException();
    this.items = new Object[capacity];
    lock = new ReentrantLock(fair);
    notEmpty = lock.newCondition();
    notFull =  lock.newCondition();
}

接下來,主要看看可阻塞式的put和take方法是怎樣實現的。

2.2 put方法詳解

put(E e)方法原始碼如下:

public void put(E e) throws InterruptedException {
    checkNotNull(e);
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();
    try {
        //如果當前佇列已滿,將執行緒移入到notFull等待佇列中
        while (count == items.length)
            notFull.await();
        //滿足插入資料的要求,直接進行入隊操作
        enqueue(e);
    } finally {
        lock.unlock();
    }
}

該方法的邏輯很簡單,當佇列已滿時(count == items.length)將執行緒移入到notFull等待佇列中,如果當前滿足插入資料的條件,就可以直接呼叫enqueue(e)插入資料元素。enqueue方法原始碼為:

private void enqueue(E x) {
    // assert lock.getHoldCount() == 1;
    // assert items[putIndex] == null;
    final Object[] items = this.items;
    //插入資料
    items[putIndex] = x;
    if (++putIndex == items.length)
        putIndex = 0;
    count++;
    //通知消費者執行緒,當前佇列中有資料可供消費
    notEmpty.signal();
}

enqueue方法的邏輯同樣也很簡單,先完成插入資料,即往陣列中新增資料(items[putIndex] = x),然後通知被阻塞的消費者執行緒,當前佇列中有資料可供消費(notEmpty.signal())。

2.3 take方法詳解

take方法原始碼如下:

public E take() throws InterruptedException {
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();
    try {
        //如果佇列為空,沒有資料,將消費者執行緒移入等待佇列中
        while (count == 0)
            notEmpty.await();
        //獲取資料
        return dequeue();
    } finally {
        lock.unlock();
    }
}

take方法也主要做了兩步:1. 如果當前佇列為空的話,則將獲取資料的消費者執行緒移入到等待佇列中;2. 若佇列不為空則獲取資料,即完成出隊操作dequeue。dequeue方法原始碼為:

private E dequeue() {
    // assert lock.getHoldCount() == 1;
    // assert items[takeIndex] != null;
    final Object[] items = this.items;
    @SuppressWarnings("unchecked")
    //獲取資料
    E x = (E) items[takeIndex];
    items[takeIndex] = null;
    if (++takeIndex == items.length)
        takeIndex = 0;
    count--;
    if (itrs != null)
        itrs.elementDequeued();
    //通知被阻塞的生產者執行緒
    notFull.signal();
    return x;
}

dequeue方法也主要做了兩件事情:1. 獲取佇列中的資料,即獲取陣列中的資料元素((E) items[takeIndex]);2. 通知notFull等待佇列中的執行緒,使其由等待佇列移入到同步佇列中,使其能夠有機會獲得lock,並執行完成功退出。

從以上分析,可以看出put和take方法主要是通過condition的通知機制來完成可阻塞式的插入資料和獲取資料。在理解ArrayBlockingQueue後再去理解LinkedBlockingQueue就很容易了。

3. LinkedBlockingQueue實現原理

LinkedBlockingQueue是用連結串列實現的有界阻塞佇列,當構造物件時為指定佇列大小時,佇列預設大小為Integer.MAX_VALUE。從它的構造方法可以看出:

public LinkedBlockingQueue() {
    this(Integer.MAX_VALUE);
}

3.1 LinkedBlockingQueue的主要屬性

LinkedBlockingQueue的主要屬性有:

/** Current number of elements */
private final AtomicInteger count = new AtomicInteger();

/**
 * Head of linked list.
 * Invariant: head.item == null
 */
transient Node<E> head;

/**
 * Tail of linked list.
 * Invariant: last.next == null
 */
private transient Node<E> last;

/** Lock held by take, poll, etc */
private final ReentrantLock takeLock = new ReentrantLock();

/** Wait queue for waiting takes */
private final Condition notEmpty = takeLock.newCondition();

/** Lock held by put, offer, etc */
private final ReentrantLock putLock = new ReentrantLock();

/** Wait queue for waiting puts */
private final Condition notFull = putLock.newCondition();

可以看出與ArrayBlockingQueue主要的區別是,LinkedBlockingQueue在插入資料和刪除資料時分別是由兩個不同的lock(takeLockputLock)來控制執行緒安全的,因此,也由這兩個lock生成了兩個對應的condition(notEmptynotFull)來實現可阻塞的插入和刪除資料。並且,採用了連結串列的資料結構來實現佇列,Node結點的定義為:

static class Node<E> {
    E item;

    /**
     * One of:
     * - the real successor Node
     * - this Node, meaning the successor is head.next
     * - null, meaning there is no successor (this is the last node)
     */
    Node<E> next;

    Node(E x) { item = x; }
}

接下來,我們也同樣來看看put方法和take方法的實現。

3.2 put方法詳解

put方法原始碼為:

public void put(E e) throws InterruptedException {
    if (e == null) throw new NullPointerException();
    // Note: convention in all put/take/etc is to preset local var
    // holding count negative to indicate failure unless set.
    int c = -1;
    Node<E> node = new Node<E>(e);
    final ReentrantLock putLock = this.putLock;
    final AtomicInteger count = this.count;
    putLock.lockInterruptibly();
    try {
        /*
         * Note that count is used in wait guard even though it is
         * not protected by lock. This works because count can
         * only decrease at this point (all other puts are shut
         * out by lock), and we (or some other waiting put) are
         * signalled if it ever changes from capacity. Similarly
         * for all other uses of count in other wait guards.
         */
        //如果佇列已滿,則阻塞當前執行緒,將其移入等待佇列
        while (count.get() == capacity) {
            notFull.await();
        }
        //入隊操作,插入資料
        enqueue(node);
        c = count.getAndIncrement();
        //若佇列滿足插入資料的條件,則通知被阻塞的生產者執行緒
        if (c + 1 < capacity)
            notFull.signal();
    } finally {
        putLock.unlock();
    }
    if (c == 0)
        signalNotEmpty();
}

put方法的邏輯也同樣很容易理解,可見註釋。基本上和ArrayBlockingQueue的put方法一樣。take方法的原始碼如下:

public E take() throws InterruptedException {
    E x;
    int c = -1;
    final AtomicInteger count = this.count;
    final ReentrantLock takeLock = this.takeLock;
    takeLock.lockInterruptibly();
    try {
        //當前佇列為空,則阻塞當前執行緒,將其移入到等待佇列中,直至滿足條件
        while (count.get() == 0) {
            notEmpty.await();
        }
        //移除隊頭元素,獲取資料
        x = dequeue();
        c = count.getAndDecrement();
        //如果當前滿足移除元素的條件,則通知被阻塞的消費者執行緒
        if (c > 1)
            notEmpty.signal();
    } finally {
        takeLock.unlock();
    }
    if (c == capacity)
        signalNotFull();
    return x;
}

take方法的主要邏輯請見於註釋,也很容易理解。

4. ArrayBlockingQueue與LinkedBlockingQueue的比較

相同點:ArrayBlockingQueue和LinkedBlockingQueue都是通過condition通知機制來實現可阻塞式插入和刪除元素,並滿足執行緒安全的特性;

不同點:1. ArrayBlockingQueue底層是採用的陣列進行實現,而LinkedBlockingQueue則是採用連結串列資料結構;
2. ArrayBlockingQueue插入和刪除資料,只採用了一個lock,而LinkedBlockingQueue則是在插入和刪除分別採用了putLocktakeLock,這樣可以降低執行緒由於執行緒無法獲取到lock而進入WAITING狀態的可能性,從而提高了執行緒併發執行的效率。

相關推薦

併發容器ArrayBlockingQueueLinkedBlockingQueue實現原理

1. ArrayBlockingQueue簡介 在多執行緒程式設計過程中,為了業務解耦和架構設計,經常會使用併發容器用於儲存多執行緒間的共享資料,這樣不僅可以保證執行緒安全,還可以簡化各個執行緒操作。例如在“生產者-消費者”問題中,會使用阻塞佇列(Blocki

二十五、併發程式設計join應用與實現原理剖析

1、join有什麼用呢? 當一個執行緒正在進行中的時候,如果我們想呼叫另外一個執行緒的話,這時我們可以使用join。 2、join方法的底層原理,簡單來說就是,join方法能把所呼叫join方法的執行緒進入休眠狀態(wait()),等執行完joinThread執行緒之後,會自動

堵塞佇列ArrayBlockingQueueLinkedBlockingQueue解析

線上程池建立的時候,需要傳一個堵塞佇列來維護需要執行的執行緒任務,其中最常用的是ArrayBlockingQueue和LinkedBlockingQueue。他們都繼承了BlockingQueue介面。 ArrayBlockingQueue 一個有邊

JUCArrayBlockingQueueLinkedBlockingQueue

部分摘抄自 阻塞佇列 在JDK中,LinkedList或ArrayList就是佇列。但是實際使用者並不多。 阻塞佇列與我們平常接觸的普通佇列(LinkedList或ArrayList等)的最大不同點,在於阻塞佇列支出阻塞新增和阻塞刪除方法。 阻

java併發佇列ArrayBlockingQueueLinkedBlockingQueue

ArrayBlockingQueue和LinkedBlockingQueue用法上沒有什麼區別,所以就放在一起把。 特點:阻塞佇

反射多型實現原理

Table of Contents 反射和多型 多型 多型的定義和用法 多型的實現原理 反射 反射的實現原理 反射的應用 反射的弊端 反射和多型 這兩種技術並無直接聯絡,之所以把它們放在一起說,是因為Java提供讓我們在執行時識別物件和類的資訊,主要有

Python Web開發中,WSGI協議的作用實現原理

首先理解下面三個概念: WSGI:全稱是Web Server Gateway Interface,WSGI不是伺服器,python模組,框架,API或者任何軟體,只是一種規範,描述web server如何與web application通訊的規範。 uwsgi:與WSGI一樣是一種協議,是uWSGI伺服器

Spring AopCglib實現原理

Spring Aop實現對目標物件的代理,AOP的兩種實現方式:Jdk代理和Cglib代理。這兩種代理的區別在於,Jdk代理與目標類都會實現同一個介面,並且在代理類中會呼叫目標類中被代理的方法,呼叫者實際呼叫的則是代理類的方法,通過這種方式我們就可以在代理類中織入切面邏輯;Jdk代理存在的問題在於目標類被代

JVM源碼分析System.currentTimeMillis及nanoTime原理

atime status bin lease col void 奇怪 pro http JDK7

Java LinkedList的實現原理

分享一下我老師大神的人工智慧教程!零基礎,通俗易懂!http://blog.csdn.net/jiangjunshow 也歡迎大家轉載本篇文章。分享知識,造福人民,實現我們中華民族偉大復興!        

Java HashSet的實現原理

分享一下我老師大神的人工智慧教程!零基礎,通俗易懂!http://blog.csdn.net/jiangjunshow 也歡迎大家轉載本篇文章。分享知識,造福人民,實現我們中華民族偉大復興!        

HashMap底層實現原理(轉載)

本文轉自:https://blog.csdn.net/caihaijiang/article/details/6280251 java中HashMap詳解 HashMap 和 HashSet 是 Java Collection Framework 的兩個重要成員,其中 HashMap 是

golang net/http包部分實現原理

net/http包在編寫golang web應用中有很重要的作用,它主要提供了基於HTTP協議進行工作的client實現和server實現,可用於編寫HTTP服務端和客戶端。 其使用方法也跟其他面嚮物件語言很相似,我們可以先從它的一些基礎用法來感受一下: 以下是

Android中Canvas繪圖PorterDuffXfermode使用及工作原理

概述 類android.graphics.PorterDuffXfermode繼承自android.graphics.Xfermode。在用Android中的Canvas進行繪圖時,可以通過使用PorterDuffXfermode將所繪製的圖形的畫素與Canv

深度學習卷積神經網路原理(一)

初探CNN卷積神經網路 1、概述 典型的深度學習模型就是很深層的神經網路,包含多個隱含層,多隱層的神經網路很難直接使用BP演算法進行直接訓練,因為反向傳播誤差時往往會發散,很難收斂 CNN節省訓練開銷的方式是權共享weight sharing,讓一組神經元

幾種壓縮演算法實現原理

gzip 、zlib以及圖形格式png,使用的壓縮演算法都是deflate演算法。從gzip的原始碼中,我們瞭解到了defalte演算法的原理和實現。我閱讀的gzip版本為 gzip-1.2.4。下面我們將要對deflate演算法做一個分析和說明。首先簡單介紹一下基本原理,

幾種主流貼圖壓縮演算法的實現原理

ETC壓縮演算法採用將影象中的chromatic和luminance分開儲存的方式,而在解碼時使用luminance對chromatic進行調製進而重現原始影象資訊。   ETC也主要有兩種方法:ETC1和改進後的ETC2。 ETC1: 採用4x2的block進行分割(原始為4*2*24=192,壓

【深入Java虛擬機器】Java虛擬機器工作原理

轉自:https://blog.csdn.net/bingduanlbd/article/details/8363734 一、類載入器 首先來看一下java程式的執行過程。               &nbs

MVC之前的那點事兒系列(7):WebActivator的實現原理

文章內容 上篇文章,我們分析如何動態註冊HttpModule的實現,本篇我們來分析一下通過上篇程式碼原理實現的WebActivator類庫,WebActivator提供了3種功能,允許我們分別在HttpApplication初始化之前,之後以及ShutDown的時候分別執行指定的程式碼,示例如下: [

String類在記憶體中實現原理

(1) ==  比較引用型別比較的是地址值是否相同 equals:比較引用型別預設也是比較地址值是否相同,而String類重寫了equals()方法,比較的是內容是否相同。 (2) 區分下面兩種語句在記憶體中的實現: <span style="font-size:14