1. 程式人生 > >【多執行緒高併發】 同步容器和佇列

【多執行緒高併發】 同步容器和佇列

關鍵字: 同步容器,佇列,ConcurrentMap, Copy-On-Write容器,併發Queue, ConcurrentLinkedQueue, BlockQueue介面, ArrayBlockingQueue , LinkedBlockingQueue, PriorityBlockingQueue, DelayQueue, SynchronousQueue
github地址: https://github.com/zhaikaishun/concurrent_programming
程式碼主要在 Multi_003 , Multi_003\Multi_003\src\com\kaishun\base\coll013

1.1 同步容器

同步容器都是執行緒安全的,但是在某些場景下可能需要加鎖來保護複合操作,複合類操作如 迭代,跳轉,條件運算等,這些複合操作在多執行緒併發執行的時候。可能會出現意外行為,最經典的便是ConcurrentModificationException,原因是當容器迭代的過程中,被併發的修改了內容。
同步類容器: 如古老的Vector,HashTable。這些容器的同步功能其實都是由JDK的Collections.synchronized等工廠方法去實現的,其底層的機制無非就是用傳統的synchronized關鍵字對每個公用的方法都進行同步,使得每次只能有一個執行緒訪問容器的狀態。這很明顯不符合今天網際網路時代高併發的需求,在保證執行緒安全的同時,也必須要有足夠好的效能。

JDK5.0以後提供了多種併發容器來替代同步容器從而改善效能。併發容器是專門針對併發設計的,使用ConcurrentHashMap來代替給予雜湊的HashTable,而且在ConcurrentHashMap中,添加了一些常見覆合操作的支援,以及使用CopyOnWriteArrayList代替Voctor, 併發的CopyOnWriteArraySet, 以及併發的Queue, ConcurrentLinkedQueue和LinkedBlockingQueue, 前者是高效能的佇列,後者是以阻塞形式的佇列,具體實現Queue還有很多,例如ArrayBlockingQueue, PriorityBlockingQueue, SynchronousQueue等。

1.2 ConcurrentMap

ConcurrentMap介面有兩個重要的實現
- ConcurrentHashMap
- ConcurrentSkipListMap(支援併發排序功能,彌補ConcurrnetHashMap)
ConcurrentHashMap主要是利用了Segment(段)的方式,來減小鎖的粒度,從而實現提高併發效能的機制, 最大可以分成16段。並且程式碼中大多共享變數使用Volatile關鍵字宣告,目的是第一時間獲取修改的內容,效能非常好
如圖,傳統的HashTable,只有一段,對整個map進行加鎖,鎖的粒度比較大。而CorrentHashMap, 對這個map的某一個小Segment來進行加鎖,在哪一段操作,只鎖定哪一個段,其他段不影響,鎖的粒度比較小,從而提高併發的效能
correntHashMap

具體如何使用,和之前的HashMap幾乎是一模一樣的,還是隨便看個例子吧

        ConcurrentHashMap<String, Object> chm = new ConcurrentHashMap<String, Object>();
        chm.put("k1", "v1");
        chm.put("k2", "v2");
        chm.put("k3", "v3");
        chm.putIfAbsent("k4", "vvvv"); // 如果key不存在,就加進去

1.3 Copy-On-Write容器

Copy-On-Write簡稱COW, 是一種用於程式設計中的優化策略。
Copy容器即寫時複製的容器,先將當前容器進行Copy,複製出一個新的容器,然後想信的容器裡面新增元素,新增完後,在講原容器的引用指向新的容器。這樣的好處是我們可以對CopyOnWrite容器進行併發的讀而不用加鎖,因為當前容器不會新增任何元素,所以CopyOnWrite也是一種讀寫分離的思想,讀和寫不同的容器,適用於讀多寫少的場景。
JDK裡的COW容器有兩種:CopyOnWriteArrayList和CopyOnWriteArraySet, COW容器非常有用,可以在非常多的併發容器場景中使用到。
使用方法也和原始的ArrayList, set一樣

        CopyOnWriteArrayList<String> cwal = new CopyOnWriteArrayList<String>();
        CopyOnWriteArraySet<String> cwas = new CopyOnWriteArraySet<String>();

2.1 併發Queue

在併發佇列上JDK提供了兩套實現,一個是以ConcurrentLinkedQueue為代表的高效能佇列,一個是以BlockingQueue介面為代表的阻塞佇列,無論哪種都繼承自Queue
Queue的實現類
Queue的實現類1

2.2 ConcurrentLinkedQueue

ConcurrentLinkedQueue: 是一個適用於高併發場景下的佇列,通過無鎖的方式,實現了高併發狀態下的高效能,通常ConcurrentLinkedQueue效能好於BlockingQueue,他是一個基於連結節點的無界執行緒安全佇列,該佇列的元素遵循先進先出的原則,頭是最新加入,尾是最近加入。該佇列不允許null元素。

ConcurrentLinkedQueue重要方法:
add()和offer都是加入元素的方法(在ConcurrentLinkedQueue中,這兩個方法沒有任何區別)
poll()和peek()都是取頭元素節點,區別在於前者會刪除元素,後者不會。
示例:
com.kaishun.base.coll013.UseQueue
高效能無阻塞無界佇列:ConcurrentLinkedQueue

//高效能無阻塞無界佇列:ConcurrentLinkedQueue

ConcurrentLinkedQueue<String> q = new ConcurrentLinkedQueue<String>();
q.offer("a");
q.offer("b");
q.offer("c");
q.offer("d");
q.add("e");

System.out.println(q.poll());   //a 從頭部取出元素,並從佇列裡刪除
System.out.println(q.size());   //4
System.out.println(q.peek());   //b
System.out.println(q.size());   //4

----輸出-----
a
4
b
4

2.3 BlockQueue介面

有5種queue的實現。

ArrayBlockingQueue

基於陣列的阻塞佇列實現,在ArrayBlockingQueue內部,維護了一個定常陣列,一邊快取佇列中的資料物件,其內部沒有實現讀寫分離,也就意味著生產和消費不能完全並行,長度是需要定義的,可以指定先進先出或者先進後出。也叫有界佇列,在很多場合下非常適用。

        ArrayBlockingQueue<String> array = new ArrayBlockingQueue<String>(5);
        array.put("a");
        array.put("b");
        array.add("c");
        array.add("d");
        array.add("e");
//      array.add("f");
        System.out.println(array.offer("a", 3, TimeUnit.SECONDS));

由於指定的是5個長度,前面已經加了5個了,後面再次新增的時候,3秒內都加不進去,3秒後返回一個false,輸出

false

若超過了還是用add方法,就會拋異常 IllegalStateException: Queue full ,例如

        ArrayBlockingQueue<String> array = new ArrayBlockingQueue<String>(5);
        array.put("a");
        array.put("b");
        array.add("c");
        array.add("d");
        array.add("e");
        array.add("f");  //這裡接回拋異常
        System.out.println(array.offer("a", 3, TimeUnit.SECONDS));

LinkedBlockingQueue:

基於連結串列的阻塞佇列,同ArrayBlockingQueue類似,其內部也維護者一個數據緩衝佇列(該佇列是由一個連結串列構成),LinkBlockingQueue之所以能夠搞笑的處理併發資料,是因為其內部實現了讀寫分離鎖,從而實現了生產者和消費者的完全並行執行,他是一個無界佇列

        //阻塞佇列
        LinkedBlockingQueue<String> q = new LinkedBlockingQueue<String>();
        q.offer("a");
        q.offer("b");
        q.offer("c");
        q.offer("d");
        q.offer("e");
        q.add("f");
        System.out.println(q.size());

        for (Iterator iterator = q.iterator(); iterator.hasNext();) {
            String string = (String) iterator.next();
            System.out.println(string);
        }
-----------輸出-----------
6
a
b
c
d
e
f

drainTo一次取多個元素

        //阻塞佇列
        LinkedBlockingQueue<String> q = new LinkedBlockingQueue<String>();
        q.offer("a");
        q.offer("b");
        q.offer("c");
        q.offer("d");
        q.offer("e");
        q.add("f");
        System.out.println(q.size());

        List<String> list = new ArrayList<String>();
        //取3個元素,放入到 list 集合中去
        System.out.println(q.drainTo(list, 3));
        System.out.println(list.size());
        for (String string : list) {
            System.out.println(string);
        }
-----------輸出-----------
6
3
3
a
b
c

PriorityBlockingQueue:

基於優先順序的阻塞佇列(優先順序的判斷通過建構函式傳入的Compator物件來決定,也就是說傳入佇列的物件必須實現Comparable介面),在實現PriorityBlockingQueue時,內部控制執行緒同步的鎖採用的是公平鎖,他也是一個無界的佇列。
示例:
Task類,實現了Comparable的方法,重寫compareTo方法

public class Task implements Comparable<Task>{

    private int id ;
    private String name;
    public int getId() {
        return id;
    }
    public void setId(int id) {
        this.id = id;
    }
    public String getName() {
        return name;
    }
    public void setName(String name) {
        this.name = name;
    }

    @Override
    public int compareTo(Task task) {
        return this.id > task.id ? 1 : (this.id < task.id ? -1 : 0);  
    }

    public String toString(){
        return this.id + "," + this.name;
    }

}

UsePriorityBlockingQueue
測試是否是有序佇列

public class UsePriorityBlockingQueue {


    public static void main(String[] args) throws Exception{


        PriorityBlockingQueue<Task> q = new PriorityBlockingQueue<Task>();

        Task t1 = new Task();
        t1.setId(3);
        t1.setName("id為3");
        Task t2 = new Task();
        t2.setId(4);
        t2.setName("id為4");
        Task t3 = new Task();
        t3.setId(1);
        t3.setName("id為1");

        //return this.id > task.id ? 1 : 0;
        q.add(t1);  //3
        q.add(t2);  //4
        q.add(t3);  //1

        // 1 3 4
        System.out.println("容器:" + q);
        System.out.println(q.take().getId());
        System.out.println("容器:" + q);
        System.out.println(q.take().getId());
        System.out.println(q.take().getId());
    }
}

輸出

容器:[1,id為1, 4,id為4, 3,id為3]
1
容器:[3,id為3, 4,id為4]
3
4

說明,這種佇列在沒有take的時候,還不是排序的,take()時,才利用了排序,比較的方法

DelayQueue:

帶有延遲時間的Queue, 其中的元素只有當其指定的延遲時間到了,才能夠從佇列中獲取到該元素, DelayQueue的元素必須實現Delayed 介面, DelayQueue是一個沒有大小限制的佇列,應用場景很多,比如對快取超時的資料進行一處,任務超時處理,空閒連線的關閉等等。
經典的網咖上機案例:
Wangmin 實現了Delayed介面

public class Wangmin implements Delayed {  

    private String name;  
    //身份證  
    private String id;  
    //截止時間  
    private long endTime;  
    //定義時間工具類
    private TimeUnit timeUnit = TimeUnit.SECONDS;

    public Wangmin(String name,String id,long endTime){  
        this.name=name;  
        this.id=id;  
        this.endTime = endTime;  
    }  

    public String getName(){  
        return this.name;  
    }  

    public String getId(){  
        return this.id;  
    }  

    /** 
     * 用來判斷是否到了截止時間 
     */  
    @Override  
    public long getDelay(TimeUnit unit) { 
        //return unit.convert(endTime, TimeUnit.MILLISECONDS) - unit.convert(System.currentTimeMillis(), TimeUnit.MILLISECONDS);
        return endTime - System.currentTimeMillis();
    }  

    /** 
     * 相互批較排序用 
     */  
    @Override  
    public int compareTo(Delayed delayed) {  
        Wangmin w = (Wangmin)delayed;  
        return this.getDelay(this.timeUnit) - w.getDelay(this.timeUnit) > 0 ? 1:0;  
    } 

WangBa類

public class WangBa implements Runnable {  

    private DelayQueue<Wangmin> queue = new DelayQueue<Wangmin>();  

    public boolean yinye =true;  

    public void shangji(String name,String id,int money){  
        Wangmin man = new Wangmin(name, id, 1000 * money + System.currentTimeMillis());  
        System.out.println("網名"+man.getName()+" 身份證"+man.getId()+"交錢"+money+"塊,開始上機...");  
        this.queue.add(man);  
    }  

    public void xiaji(Wangmin man){  
        System.out.println("網名"+man.getName()+" 身份證"+man.getId()+"時間到下機...");  
    }  

    @Override  
    public void run() {  
        while(yinye){  
            try {  
                Wangmin man = queue.take();  
                xiaji(man);  
            } catch (InterruptedException e) {  
                e.printStackTrace();  
            }  
        }  
    }  

    public static void main(String args[]){  
        try{  
            System.out.println("網咖開始營業");  
            WangBa siyu = new WangBa();  
            Thread shangwang = new Thread(siyu);  
            shangwang.start();  

            siyu.shangji("路人甲", "123", 1);  
            siyu.shangji("路人乙", "234", 10);  
            siyu.shangji("路人丙", "345", 5);  
        }  
        catch(Exception e){  
            e.printStackTrace();
        }  

    }  
}  

輸出

網咖開始營業
網名路人甲 身份證123交錢1塊,開始上機...
網名路人乙 身份證234交錢10塊,開始上機...
網名路人丙 身份證345交錢5塊,開始上機...
網名路人甲 身份證123時間到下機...
網名路人丙 身份證345時間到下機...
網名路人乙 身份證234時間到下機...

SynchronousQueue:

一種沒有緩衝的佇列,生產者產生的資料直接會被消費者獲取並消費
個人理解為虛擬佇列,這個佇列不存元素,生產與消費相互扔而已

        final SynchronousQueue<String> q = new SynchronousQueue<String>();
        Thread t1 = new Thread(new Runnable() {
            @Override
            public void run() {
                try {
                    System.out.println(q.take());
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        });
        t1.start();
        Thread t2 = new Thread(new Runnable() {

            @Override
            public void run() {
                q.add("asdasd");
            }
        });
        t2.start();
---------輸出-------
asdasd

– 本文總結自前人經驗,總結得挺累的,特別感謝網際網路架構師白鶴翔老師