1. 程式人生 > >Java高併發程式設計(八):Java併發容器和框架

Java高併發程式設計(八):Java併發容器和框架

1. ConcurrentHashMap

1.1 ConcurrentHashMap的優勢

在併發程式設計中使用HashMap可能導致程式死迴圈。而使用執行緒安全的HashTable效率又非
常低下,基於以上兩個原因,便有了ConcurrentHashMap的登場機會。

  1. 執行緒不安全的HashMap: 在多執行緒環境下,使用HashMap進行put操作會引起死迴圈,導致CPU利用率接近100%,所以在併發情況下不能使用HashMap。HashMap在併發執行put操作時會引起死迴圈,是因為多執行緒會導致HashMap的Entry連結串列形成環形資料結構
  2. 效率低下的HashTable。HashTable容器使用synchronized來保證執行緒安全,但線上程競爭激烈的情況下HashTable的效率非常低下。
  3. ConcurrentHashMap的鎖分段技術可有效提升併發訪問率。假如容器裡有多把鎖,每一把鎖用於鎖容器其中一部分資料,那麼當多執行緒訪問容器裡不同資料段的資料時,執行緒間就不會存在鎖競爭,從而可以有效提高併發訪問效率。

1.2 ConcurrentHashMap的結構

ConcurrentHashMap是由Segment陣列結構和HashEntry陣列結構組成。

  1. Segment:是一種可重入鎖(ReentrantLock),在ConcurrentHashMap裡扮演鎖的角色。
  2. HashEntry:用於儲存資料。

ConcurrentHashMap類圖:
在這裡插入圖片描述

ConcurrentHashMap結構圖:
在這裡插入圖片描述

1.3 ConcurrentHashMap初始化

ConcurrentHashMap初始化方法是通過initialCapacity、loadFactor、concurrencyLevel等幾個引數來初始化segment陣列、段偏移量segmentShift、段掩碼segmentMask和每個segment裡的HashEntry陣列來實現的。

1.3.1 初始化segments陣列

if (concurrencyLevel > MAX_SEGMENTS)
    concurrencyLevel = MAX_SEGMENTS;
int sshift = 0; int ssize = 1; while (ssize < concurrencyLevel) { ++sshift; ssize <<= 1; } segmentShift = 32 - sshift; segmentMask = ssize - 1; this.segments = Segment.newArray(ssize);

由上面的程式碼可知,segments陣列的長度ssize是通過concurrencyLevel計算得出的。為了能通過按位與的雜湊演算法來定位segments陣列的索引,必須保證segments陣列的長度是2的N次方(power-of-two size),所以必須計算出一個大於或等於concurrencyLevel的最小的2的N次方值來作為segments陣列的長度。

1.3.2 初始化segmentShift和segmentMask

這兩個全域性變數需要在定位segment時的雜湊演算法裡使用,sshift等於ssize從1向左移位的
次數,在預設情況下concurrencyLevel等於16,1需要向左移位移動4次,所以sshift等於4。segmentShift用於定位參與雜湊運算的位數,segmentShift等於32減sshift,所以等於28.

segmentMask是雜湊運算的掩碼,等於ssize減1,即15,掩碼的二進位制各個位的值都是1。

1.3.3 初始化每個segment

初始化每個segment輸入引數initialCapacity是ConcurrentHashMap的初始化容量,loadfactor是每個segment的負載因子。負載因子=尺寸/容量 Size/Capacity

if (initialCapacity > MAXIMUM_CAPACITY)
    initialCapacity = MAXIMUM_CAPACITY;
    int c = initialCapacity / ssize;
    if (c * ssize < initialCapacity)
        ++c;
    int cap = 1;
    while (cap < c)
        cap <<= 1;
    for (int i = 0; i < this.segments.length; ++i)
        this.segments[i] = new Segment<K,V>(cap, loadFactor);

上面程式碼中的變數cap就是segment裡HashEntry陣列的長度,它等於initialCapacity除以ssize的倍數c,如果c大於1,就會取大於等於c的2的N次方值,所以cap不是1,就是2的N次方。segment的容量threshold=(int)cap*loadFactor

1.3.4 定位Segment

既然ConcurrentHashMap使用分段鎖Segment來保護不同段的資料,那麼在插入和獲取元素
的時候,必須先通過雜湊演算法定位到Segment。可以看到ConcurrentHashMap會首先使用
Wang/Jenkins hash的變種演算法對元素的hashCode進行一次再雜湊。

1.4 ConcurrentHashMap的操作

1.4.1 get操作

Segment的get操作實現非常簡單和高效。先經過一次再雜湊,然後使用這個雜湊值通過散
列運算定位到Segment,再通過雜湊演算法定位到元素。

public V get(Object key) {
    int hash = hash(key.hashCode());
    return segmentFor(hash).get(key, hash);
}

get操作的高效之處在於整個get過程不需要加鎖。原因是它的get方法裡將要使用的共享變數都定義成volatile型別,如用於統計當前Segement大小的count欄位和用於儲存值的HashEntry的value。定義成volatile的變數,能夠線上程之間保持可見性,能夠被多執行緒同時讀,並且保證不會讀到過期的值,但是隻能被單執行緒寫(有一種情況可以被多執行緒寫,就是寫入的值不依賴於原值),在get操作裡只需要讀不需要寫共享變數count和value,所以可以不用加鎖。並且get操作還不會獲取過期的值,是因為根據Java記憶體模型的happen before原則,對volatile欄位的寫入操作先於讀操作,即使兩個執行緒同時修改和獲取volatile變數,get操作也能拿到最新的值,

1.4.2 put操作

由於put方法裡需要對共享變數進行寫入操作,所以為了執行緒安全,在操作共享變數時必
須加鎖。put方法首先定位到Segment,然後在Segment裡進行插入操作。插入操作需要經歷兩個步驟,第一步判斷是否需要對Segment裡的HashEntry陣列進行擴容,第二步定位新增元素的位置,然後將其放在HashEntry數組裡。

  1. 是否需要擴容:在插入元素前會先判斷Segment裡的HashEntry陣列是否超過容量(threshold),如果超過閾值,則對陣列進行擴容。
  2. 如何擴容:在擴容的時候,首先會建立一個容量是原來容量兩倍的陣列,然後將原數組裡的元素進行再雜湊後插入到新的數組裡。為了高效,ConcurrentHashMap不會對整個容器進行擴容,而只對某個segment進行擴容。

1.4.3 size操作

雖然相加時可以獲取每個Segment的count的最新值,但是可能累加前使用的count發生了變化,那麼統計結果就不準了。所以,最安全的做法是在統計size的時候把所有Segment的put、remove和clean方法全部鎖住,但是這種做法顯然非常低效。

解決方案:

在累加count操作過程中,之前累加過的count發生變化的機率非常小,所以ConcurrentHashMap的做法是先嚐試2次通過不鎖住Segment的方式來統計各個Segment大小,如果統計的過程中,容器的count發生了變化,則再採用加鎖的方式來統計所有Segment的大小。ConcurrentHashMap使用modCount變數,在put、remove和clean方法裡操作元素前都會將變數modCount進行加1,那麼在統計size前後比較modCount是否發生變化,從而得知容器的大小是否發生變化。

2. ConcurrentLinkedQueue

如果要實現一個執行緒安全的佇列有兩種方式:

  1. 阻塞方式:可以用一個鎖(入隊和出隊用同一把鎖)或兩個鎖(入隊和出隊用不同的鎖)等方式來實現。
  2. 非阻塞方式:可以使用迴圈CAS的方式來實現

ConcurrentLinkedQueue是一個基於連結節點的無界執行緒安全佇列,它採用先進先出的規則對節點進行排序,當我們新增一個元素的時候,它會新增到佇列的尾部;當我們獲取一個元素時,它會返回佇列頭部的元素。它採用了“wait-free”演算法(即CAS演算法)來實現。

2.1 ConcurrentLinkedQueue結構圖

在這裡插入圖片描述

2.2 入佇列

入佇列就是將入隊節點新增到佇列的尾部。

ConcurrentLinkedQueue如佇列主要是做兩件事情:

  1. 第一是將入隊節點設定成當前佇列尾節點的下一個節點;
  2. 第二是更新tail節點,如果tail節點的next節點不為空,則將入隊節點設定成tail節點,如果tail節點的next節點為空,則將入隊節點設定成tail的next節點
public boolean offer(E e) {
        //判斷元素e是否為null
        checkNotNull(e);
        //// 入隊前,建立一個入隊節點
        final Node<E> newNode = new Node<E>(e);

        // 建立一個指向tail節點的引用
        // p用來表示佇列的尾節點,預設情況下等於tail節點。
        for (Node<E> t = tail, p = t;;) {
            //獲取p節點的next節點 q
            Node<E> q = p.next;
            if (q == null) {
                //如q為null,說明p是尾節點。這個時候找到了尾節點,現在新增入隊節點
                // p is last node
                //使用casNext將入隊節點設定為尾節點的next節點。首先判斷p的next節點是否為null。如果為null則設定newNode為p的next節點。如果不為null則返回false。
                if (p.casNext(null, newNode)) {
                    // Successful CAS is the linearization point
                    // for e to become an element of this queue,
                    // and for newNode to become "live".
                    // 如果尾節點p和t不相等,說明同一時刻有其他執行緒更新了尾節點,則需要將當前節點設定為tail節點。
                    if (p != t) // hop two nodes at a time
                        casTail(t, newNode);  // Failure is OK.
                    return true;
                }
                // Lost CAS race to another thread; re-read next
            }
            else if (p == q)
                // We have fallen off list.  If tail is unchanged, it
                // will also be off-list, in which case we need to
                // jump to head, from which all live nodes are always
                // reachable.  Else the new tail is a better bet.
                p = (t != (t = tail)) ? t : head;
            else
                // Check for tail updates after two hops.
                p = (p != t && t != (t = tail)) ? t : q;
        }
    }

從原始碼角度來看,整個入隊過程主要做兩件事情:第一是定位出尾節點;第二是使用
CAS演算法將入隊節點設定成尾節點的next節點,如不成功則重試。

2.3 出佇列

出佇列的就是從佇列裡返回一個節點元素,並清空該節點對元素的引用。

從圖中可知,並不是每次出隊時都更新head節點,當head節點裡有元素時,直接彈出head節點裡的元素,而不會更新head節點。只有當head節點裡沒有元素時,出隊操作才會更新head節點。這種做法也是通過hops變數來減少使用CAS更新head節點的消耗,從而提高出隊效率。

在這裡插入圖片描述

 public E poll() {
        restartFromHead:
        for (;;) {
            //p表示head節點,需要出對的節點
            for (Node<E> h = head, p = h, q;;) {
                //獲取p節點的元素
                E item = p.item;

                // 如果p節點的元素不為空,使用CAS設定p節點引用的元素為null,
                // 如果成功則返回p節點的元素。
                if (item != null && p.casItem(item, null)) {
                    // Successful CAS is the linearization point
                    // for item to be removed from this queue.
                    //// 將p節點下一個節點設定成head節點
                    if (p != h) // hop two nodes at a time
                        updateHead(h, ((q = p.next) != null) ? q : p);
                    return item;
                }
                //// 如果p的下一個節點也為空,說明這個佇列已經空了
                else if ((q = p.next) == null) {
                    //更新頭節點
                    updateHead(h, p);
                    return null;
                }
                else if (p == q)
                    continue restartFromHead;
                else
                    p = q;
            }
        }
    }

首先獲取頭節點的元素,然後判斷頭節點元素是否為空,如果為空,表示另外一個執行緒已
經進行了一次出隊操作將該節點的元素取走,如果不為空,則使用CAS的方式將頭節點的引
用設定成null,如果CAS成功,則直接返回頭節點的元素,如果不成功,表示另外一個執行緒已經進行了一次出隊操作更新了head節點,導致元素髮生了變化,需要重新獲取頭節點。

3. Java中阻塞佇列

3.1 阻塞佇列

阻塞佇列(BlockingQueue)是一個支援兩個附加操作的佇列。這兩個附加的操作支援阻塞
的插入和移除方法。

  1. 支援阻塞的插入方法:意思是當佇列滿時,佇列會阻塞插入元素的執行緒,直到佇列不
    滿。
  2. 支援阻塞的移除方法:意思是在佇列為空時,獲取元素的執行緒會等待佇列變為非空。

Java中的阻塞佇列提供了以下四種操作方式:

在這裡插入圖片描述

  • 丟擲異常:當佇列滿時,如果再往佇列裡插入元素,會丟擲IllegalStateException(“Queuefull”)異常。當佇列空時,從佇列裡獲取元素會丟擲NoSuchElementException異常。

  • 返回特殊值:當往佇列插入元素時,會返回元素是否插入成功,成功返回true。如果是移
    除方法,則是從佇列裡取出一個元素,如果沒有則返回null。

  • 一直阻塞:當阻塞佇列滿時,如果生產者執行緒往佇列裡put元素,佇列會一直阻塞生產者
    執行緒,直到佇列可用或者響應中斷退出。當佇列空時,如果消費者執行緒從佇列裡take元素,佇列會阻塞住消費者執行緒,直到佇列不為空。

  • 超時退出:當阻塞佇列滿時,如果生產者執行緒往佇列裡插入元素,佇列會阻塞生產者執行緒
    一段時間,如果超過了指定的時間,生產者執行緒就會退出。

3.2 Java中的阻塞佇列

3.2.1 ArrayBlockingQueue

ArrayBlockingQueue是一個用陣列實現的有界阻塞佇列。此佇列按照先進先出(FIFO)的原則對元素進行排序。預設情況下不保證執行緒公平的訪問佇列,所謂公平訪問佇列是指阻塞的執行緒,可以按照阻塞的先後順序訪問佇列,即先阻塞執行緒先訪問佇列。

public class ArrayblockingQueueTest {
    public static void main(String[] args) {
        ArrayBlockingQueue<Integer> queue=new ArrayBlockingQueue<Integer>(5);
        new Thread(new Runnable() {
            @Override
            public void run() {
                int count=1;
                while (true){
                    try {
                        queue.put(count++);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    System.out.println("新增元素成功:"+count);
                }
            }
        }).start();
        
        new Thread(new Runnable() {
            @Override
            public void run() {
                while (true){
                    Integer remove = null;
                    try {
                        remove = queue.take();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    System.out.println("remove元素成功:"+remove);
                }
            }
        }).start();
    }
}

3.2.2 LinkedBlockingQueue

LinkedBlockingQueue是一個用連結串列實現的有界阻塞佇列。此佇列的預設和最大長度為
Integer.MAX_VALUE。此佇列按照先進先出的原則對元素進行排序。

public class LinkedblockingQueueTest {
    public static void main(String[] args) {
        LinkedBlockingQueue<Integer> queue=new LinkedBlockingQueue<Integer>(5);
        new Thread(new Runnable() {
            @Override
            public void run() {
                int count=1;
                while (true){
                    try {
                        queue.put(count++);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    System.out.println("新增元素成功:"+count);
                }
            }
        }).start();

        new Thread(new Runnable() {
            @Override
            public void run() {
                while (true){
                    Integer remove = null;
                    try {
                        remove = queue.take();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    System.out.println("remove元素成功:"+remove);
                }
            }
        }).start();
    }
}

3.3.3 PriorityBlockingQueue

PriorityBlockingQueue是一個支援優先順序的無界阻塞佇列。預設情況下元素採取自然順序
升序排列。也可以自定義類實現compareTo()方法來指定元素排序規則,或者初始化
PriorityBlockingQueue時,指定構造引數Comparator來對元素進行排序。需要注意的是不能保證同優先順序元素的順序。

public class PriorityBlockingQueueTest {
    //支援優先順序的無界佇列,可以不斷的put因為支援擴容。但是當佇列元素為空時,get操作就會阻塞。
    public static PriorityBlockingQueue<User> queue = new PriorityBlockingQueue<User>(3);

    public static void main(String[] args) {
        queue.put(new User(1,"wu"));
        queue.put(new User(5,"wu5"));
        queue.put(new User(23,"wu23"));
        queue.put(new User(55,"wu55"));
        queue.put(new User(9,"wu9"));
        queue.put(new User(3,"wu3"));
        for (User user : queue) {
            try {
                System.out.println(queue.take().name);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }

        try {
            User user = queue.take();
            System.out.println(user);
        } catch (InterruptedException e) {

        }
    }

    //靜態內部類
    static class User implements Comparable<User>{

        public User(int age,String name) {
            this.age = age;
            this.name = name;
        }

        int age;
        String name;

        /**
         * 排序 o this
         * this compareTo o
         * @param o
         * @return
         */
        @Override
        public int compareTo(User o) {
            return this.age > o.age ? 1 : -1;
        }
    }
}

3.3.4 DelayQueue

DelayQueue是一個支援延時獲取元素的無界阻塞佇列。佇列使用PriorityQueue來實現。隊
列中的元素必須實現Delayed介面,在建立元素時可以指定多久才能從佇列中獲取當前元素。只有在延遲期滿時才能從佇列中提取元素。

DelayQueue運用場景:

  1. 快取系統的設計:可以用DelayQueue儲存快取元素的有效期,使用一個執行緒迴圈查詢
    DelayQueue,一旦能從DelayQueue中獲取元素時,表示快取有效期到了。
  2. 定時任務排程:使用DelayQueue儲存當天將會執行的任務和執行時間,一旦從
    DelayQueue中獲取到任務就開始執行,比如TimerQueue就是使用DelayQueue實現的。
public class DelayedQueueTest {
    public static void main(String[] args) {
        DelayQueue<DelayTask> queue = new DelayQueue<>();
        queue.add(new DelayTask("1", new Date()));
        queue.add(new DelayTask("2", new Date(System.currentTimeMillis()+5000)));
        queue.add(new DelayTask("3", new Date(System.currentTimeMillis()+1000)));
        queue.add(new DelayTask("4", new Date(System.currentTimeMillis()+2000)));

        System.out.println("queue put done");

        while(!queue.isEmpty()) {
            try {
                DelayTask task = queue.take();
                System.out.println(task.name + ":" + System.currentTimeMillis());

            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

    static class DelayTask implements Delayed {
        private String name;

        private Date taskTime;


        public DelayTask(String name, Date  taskTime) {
            this.name = name;
            this.taskTime = taskTime;
        }

        @Override
        public int compareTo( Delayed o) {
            DelayTask delayTask = (DelayTask) o;
            long diff = taskTime.getTime() - delayTask.getTaskTime().getTime();
            if (diff > 0) {
                return 1;
            } else if (diff == 0) {
                return 0;
            } else {
                return -1;
            }
        }

        @Override
        public long getDelay(TimeUnit unit) {
            return unit.convert(<