1. 程式人生 > >java執行緒安全之併發Queue(十三)

java執行緒安全之併發Queue(十三)

併發Queue

       在併發的佇列上jdk提供了兩套實現,一個是以ConcurrentLinkedQueue為代表的高效能佇列,一個是以BlockingQueue介面為代表的阻塞佇列,無論在那種都繼承自Queue。
如圖繼承Queue共有二十四個:
這裡寫圖片描述
這裡寫圖片描述

ConcurrentLinkedQueue

概念理解

       ConcurrentLinkedQueue:是一個適用於高併發場景下的佇列,通過無鎖的方式,實現了高併發狀態下的高效能,通常ConcurrentLinkedQueue效能好於BlockingQueueo它是一個基於連結節點的無界執行緒安全佇列。該佇列的元素遵循先講先出的原則。頭是最先加入的,尾是最近加入的,該佇列不允許null元素。
ConcurrentLinkedQueue重要方法:

Add()和offer()都是加入元素的方法(在ConcurrentLinkedQueue中,這兩個方法投有任何區別)

Poll()和peek()都是取頭元素節點,區別在於前者會刪除元素,後者不會。

案例
public class UseQueue_ConcurrentLinkedQueue {


    public static void main(String[] args) throws Exception {

        //高效能無阻塞無界佇列:ConcurrentLinkedQueue

        ConcurrentLinkedQueue<String> q = new
ConcurrentLinkedQueue<String>(); q.offer("a"); q.offer("b"); q.offer("c"); q.offer("d"); q.add("e"); System.out.println("從頭部取出元素,並從佇列裡刪除 >> "+q.poll()); //a 從頭部取出元素,並從佇列裡刪除 System.out.println("刪除後的長度 >> "+q.size()); //4 System.out.println("取出頭部元素 >> "
+q.peek()); //b System.out.println("長度 >> "+q.size()); //4 } }

列印結果:

從頭部取出元素,並從佇列裡刪除 >> a
刪除後的長度 >> 4
取出頭部元素 >> b
長度 >> 4

BlockingQueue介面

       ArrayBlockingQueue:基於陣列的阻塞佇列實現,在ArrayBlockingQueue內部,維護了一個定長陣列,以便快取佇列中的資料物件,其內部沒實現讀寫分離,也就意味著生產和消費不能完全並行,長度是需要定義的,可以指定先講先出或者先講後出,也叫有界佇列,在很多場合非常適合使用。

       LinkedBlockingQueue:基於連結串列的阻塞佇列,同ArrayBlockingQueue類似,其內部也維持著一個數據緩衝佇列〈該佇列由一個連結串列構成),LinkedBlockingQueue之所以能夠高效的處理併發資料,是因為其內部實現採用分離鎖(讀寫分離兩個鎖),從而實現生產者和消費者操作的完全並行執行,他是一個無界佇列。

       SynchronousQueue:一種沒有緩衝的佇列,生產者產生的資料直接會被消費者獲取並消費。

       PriorityBlockingQueue:基於優先順序的阻塞佇列(優先順序的判斷通過建構函式傳入的Compator物件來決定,也就是說傳入佇列的物件必須實現Comparable介面),在實現PriorityBlockingQueue時,內部控制執行緒同步的鎖採用的是公平鎖,他也是一個無界的佇列。

       DelayQueue:帶有延遲時間的Queue,其中的元素只有當其指定的延遲時間到了,才能夠從佇列中獲取到該元素。DelayQueue中的元素必須實現Delayed介面,DelayQueue是一個沒有大小限制的佇列,應用場景很多,比如對快取超時的資料進行移除、任務超時處理、空閒連線的關閉等等。

ArrayBlockingQueue、LinkedBlockingQueue、synchronousQueue案例
public class UseQueue_ConcurrentLinkedQueue {

    public static void main(String[] args) throws Exception {


        System.out.println("--------------- ArrayBlockingQueue --------------");
        //阻塞佇列    有長度的佇列
        ArrayBlockingQueue<String> array = new ArrayBlockingQueue<String>(5);
        array.put("a");
        array.put("b");
        array.add("c");
        array.add("d");
        array.add("e");
        //array.add("f");
        //返回一個布林型別   在3秒之內能不能加入  不能返回false
        System.out.println(array.offer("a", 3, TimeUnit.SECONDS));
        System.out.println("所有資料  >>  " + array.toString());


        System.out.println("--------------- LinkedBlockingQueue --------------");
        //阻塞佇列   無長度限制佇列
        LinkedBlockingQueue<String> q = new LinkedBlockingQueue<String>();
        q.offer("a");
        q.offer("b");
        q.offer("c");
        q.offer("d");
        q.offer("e");
        q.add("f");
        System.out.println("總長度  >>  "+q.size());

        for (Iterator iterator = q.iterator(); iterator.hasNext(); ) {
            String string = (String) iterator.next();
            System.out.print(string+" -- ");
        }
        System.out.println();
        List<String> list = new ArrayList<String>();
        //在 q 的佇列中取三個元素放到list 佇列裡
        System.out.println(q.drainTo(list, 3));
        System.out.println("取出LinkedBlockingQueue資料放到list列表的長度為   >>  "+list.size());
        for (String string : list) {
            System.out.print(string + " -- ");
        }
        System.out.println();
        System.out.println("--------------- SynchronousQueue --------------");

        final SynchronousQueue<String> q1 = new SynchronousQueue<String>();
        Thread t1 = new Thread(new Runnable() {
            @Override
            public void run() {
                try {

                    System.out.println(Thread.currentThread().getName()+"取資料  "+ q1.take());
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        });
        t1.start();
        Thread t2 = new Thread(new Runnable() {

            @Override
            public void run() {
                q1.add("b");
                System.out.println(Thread.currentThread().getName() +"加入資料  b");
            }
        });
        t2.start();


    }
}

列印結果

--------------- ArrayBlockingQueue --------------
false
所有資料  >>  [a, b, c, d, e]
--------------- LinkedBlockingQueue --------------
總長度  >>  6
a -- b -- c -- d -- e -- f -- 
3
取出LinkedBlockingQueue資料放到list列表的長度為   >>  3
a -- b -- c -- 
--------------- SynchronousQueue --------------
Thread-1加入資料  b
Thread-0取資料  b
PriorityBlockingQueue 案例

Task.java

public class Task implements Comparable<Task>{

    private int id ;
    private String name;
    public int getId() {
        return id;
    }
    public void setId(int id) {
        this.id = id;
    }
    public String getName() {
        return name;
    }
    public void setName(String name) {
        this.name = name;
    }

    @Override
    public int compareTo(Task task) {
        return this.id > task.id ? 1 : (this.id < task.id ? -1 : 0);  
    }

    public String toString(){
        return this.id + "," + this.name;
    }

}

UsePriorityBlockingQueue.java

public class UsePriorityBlockingQueue {


    public static void main(String[] args) throws Exception{


        PriorityBlockingQueue<Task> q2 = new PriorityBlockingQueue<Task>();

        Task t1 = new Task();
        t1.setId(3);
        t1.setName("id為3");
        Task t2 = new Task();
        t2.setId(4);
        t2.setName("id為4");
        Task t3 = new Task();
        t3.setId(1);
        t3.setName("id為1");
        Task t4 = new Task();
        t4.setId(2);
        t4.setName("id為2");

        //return this.id > task.id ? 1 : 0;
        q2.add(t1); //3
        q2.add(t2); //4
        q2.add(t3);  //1
        q2.add(t4);

        // 1 3 4
        //第一次取值時候是取最小的後面不做排序
        System.out.println("容器:" + q2);  //[1,id為1, 2,id為2, 3,id為3, 4,id為4]
        //拿出一個元素後  又會取一個最小的出來 放在第一個
        System.out.println(q2.take().getId());
        System.out.println("容器:" + q2);    //[2,id為2, 4,id為4, 3,id為3]
        System.out.println(q2.take().getId());
        System.out.println("容器:" + q2);  //[3,id為3, 4,id為4]



    }
}

列印結果

容器:[1,id1, 2,id2, 3,id3, 4,id4]
1
容器:[2,id2, 4,id4, 3,id3]
2
容器:[3,id3, 4,id4]
DelayQueue 案例

Wangmin.java

public class Wangmin implements Delayed {  

    private String name;  
    //身份證  
    private String id;  
    //截止時間  
    private long endTime;  
    //定義時間工具類
    private TimeUnit timeUnit = TimeUnit.SECONDS;

    public Wangmin(String name,String id,long endTime){  
        this.name=name;  
        this.id=id;  
        this.endTime = endTime;  
    }  

    public String getName(){  
        return this.name;  
    }  

    public String getId(){  
        return this.id;  
    }  

    /** 
     * 用來判斷是否到了截止時間 
     */  
    @Override  
    public long getDelay(TimeUnit unit) { 
        //return unit.convert(endTime, TimeUnit.MILLISECONDS) - unit.convert(System.currentTimeMillis(), TimeUnit.MILLISECONDS);
        return endTime - System.currentTimeMillis();
    }  

    /** 
     * 相互批較排序用 
     */  
    @Override  
    public int compareTo(Delayed delayed) {  
        Wangmin w = (Wangmin)delayed;  
        return this.getDelay(this.timeUnit) - w.getDelay(this.timeUnit) > 0 ? 1:0;  
    }  

}  

WangBa.java

public class WangBa implements Runnable {  

    private DelayQueue<Wangmin> queue = new DelayQueue<Wangmin>();  

    public boolean yinye =true;  

    public void shangji(String name,String id,int money){  
        Wangmin man = new Wangmin(name, id, 1000 * money + System.currentTimeMillis());  
        System.out.println("網名"+man.getName()+" 身份證"+man.getId()+"交錢"+money+"塊,開始上機...");  
        this.queue.add(man);  
    }  

    public void xiaji(Wangmin man){  
        System.out.println("網名"+man.getName()+" 身份證"+man.getId()+"時間到下機...");  
    }  

    @Override  
    public void run() {  
        while(yinye){  
            try {  
                Wangmin man = queue.take();  
                xiaji(man);  
            } catch (InterruptedException e) {  
                e.printStackTrace();  
            }  
        }  
    }  

    public static void main(String args[]){  
        try{  
            System.out.println("網咖開始營業");  
            WangBa siyu = new WangBa();  
            Thread shangwang = new Thread(siyu);  
            shangwang.start();  

            siyu.shangji("路人甲", "123", 1);  
            siyu.shangji("路人乙", "234", 10);  
            siyu.shangji("路人丙", "345", 5);  
        }  
        catch(Exception e){  
            e.printStackTrace();
        }  

    }  
}  

列印結果:

網咖開始營業
網名路人甲 身份證123交錢1塊,開始上機...
網名路人乙 身份證234交錢10塊,開始上機...
網名路人丙 身份證345交錢5塊,開始上機...
網名路人甲 身份證123時間到下機...
網名路人丙 身份證345時間到下機...
網名路人乙 身份證234時間到下機...

BlockingQueue 介面的重要方法

放入資料:

offer(anObject):表示如果可能的話,將anObject加到BlockingQueue裡,即如果BlockingQueue 可以容納,則返回true,否則返回false.(本方法不阻蹇當前執行方法的執行緒)

offer(E 0,long timeout, TimeUnit unit),可以設定等待的時間,如果在指定的時間內,還不能往佇列中加入BlockingQueue,則返回失敗。

put(anObject):把anObject加到BlockingQueue裡,如果BlockQueue沒有空間,則呼叫此方法的執行緒被阻斷直到BlokingQue裡面有空間再繼續,

獲取資料:

poll(time):取走BlokingQueue裡排在首位的物件,若不能立即取出,則可以等time引數規定的時間,取不到時返回null

poll(long timeout, Timeunit unit):從blockingQueue取出一個隊首的物件,如果在指定時間內,佇列一旦有資料可取,則立即返回佇列中的資料。否則知道時間超時還沒有資料可取,返回失敗。

take():取走引BlockinQueue裡排在首位的物件,若BlockingQueue為空,阻斷進入等待狀態直到BlckingQueue有新的資料被加入;

drainTo():一次性從BlockingQueue獲取所有可用的資料物件(還可以指定獲取資料的個數),通過該方法,可以提升獲取資料效率:不需要多次分批加鎖或釋放鎖。

Deque 雙端佇列

Deque允許在佇列的頭部活尾部進行出隊和入隊操作。

LinkedBlockingDeque是一個執行緒安全的雙端佇列實現,可以說他是最為複雜的一種佇列,在內部實現維護了前端和後端節點,但是其沒有實現讀寫分離,因此同一時間只能有一個執行緒對其講行操作。在高併發中效能要遠低於其他引。BlockingQueue。更要低於ConcurrentLinkedQueue,布jdk早期有一個非執行緒安全的Deque就是ArryDeque了, java6裡添加了LinkBlockingDeque來彌補多執行緒場景下執行緒安全的問題。

案例
public class UseDeque {

    public static void main(String[] args) {


        LinkedBlockingDeque<String> dq = new LinkedBlockingDeque<String>(10);
        dq.addFirst("a");
        dq.addFirst("b");
        dq.addFirst("c");
        dq.addFirst("d");
        dq.addFirst("e");
        dq.addLast("f");
        dq.addLast("g");
        dq.addLast("h");
        dq.addLast("i");
        dq.addLast("j");
        //dq.offerFirst("k");
        System.out.println("檢視頭元素:" + dq.peekFirst());
        System.out.println("獲取尾元素:" + dq.pollLast());
        Object [] objs = dq.toArray();
        for (int i = 0; i < objs.length; i++) {
            System.out.print(objs[i] + " -- ");
        }

    }
}

列印結果:

檢視頭元素:e
獲取尾元素:j
e -- d -- c -- b -- a -- f -- g -- h -- i -- 
LinkedBlockingDeque 方法說明
// 建立一個容量為 Integer.MAX_VALUE 的 LinkedBlockingDeque。
LinkedBlockingDeque()
// 建立一個容量為 Integer.MAX_VALUE 的 LinkedBlockingDeque,最初包含給定 collection 的元素,以該 collection 迭代器的遍歷順序新增。
LinkedBlockingDeque(Collection<? extends E> c)
// 建立一個具有給定(固定)容量的 LinkedBlockingDeque。
LinkedBlockingDeque(int capacity)
// 在不違反容量限制的情況下,將指定的元素插入此雙端佇列的末尾。
boolean add(E e)
// 如果立即可行且不違反容量限制,則將指定的元素插入此雙端佇列的開頭;如果當前沒有空間可用,則丟擲 IllegalStateException。
void addFirst(E e)
// 如果立即可行且不違反容量限制,則將指定的元素插入此雙端佇列的末尾;如果當前沒有空間可用,則丟擲 IllegalStateException。
void addLast(E e)
// 以原子方式 (atomically) 從此雙端佇列移除所有元素。
void clear()
// 如果此雙端佇列包含指定的元素,則返回 true。
boolean contains(Object o)
// 返回在此雙端佇列的元素上以逆向連續順序進行迭代的迭代器。
Iterator<E> descendingIterator()
// 移除此佇列中所有可用的元素,並將它們新增到給定 collection 中。
int drainTo(Collection<? super E> c)
// 最多從此佇列中移除給定數量的可用元素,並將這些元素新增到給定 collection 中。
int drainTo(Collection<? super E> c, int maxElements)
// 獲取但不移除此雙端隊列表示的佇列的頭部。
E element()
// 獲取,但不移除此雙端佇列的第一個元素。
E getFirst()
// 獲取,但不移除此雙端佇列的最後一個元素。
E getLast()
// 返回在此雙端佇列元素上以恰當順序進行迭代的迭代器。
Iterator<E> iterator()
// 如果立即可行且不違反容量限制,則將指定的元素插入此雙端隊列表示的佇列中(即此雙端佇列的尾部),並在成功時返回 true;如果當前沒有空間可用,則返回 false。
boolean offer(E e)
// 將指定的元素插入此雙端隊列表示的佇列中(即此雙端佇列的尾部),必要時將在指定的等待時間內一直等待可用空間。
boolean offer(E e, long timeout, TimeUnit unit)
// 如果立即可行且不違反容量限制,則將指定的元素插入此雙端佇列的開頭,並在成功時返回 true;如果當前沒有空間可用,則返回 false。
boolean offerFirst(E e)
// 將指定的元素插入此雙端佇列的開頭,必要時將在指定的等待時間內等待可用空間。
boolean offerFirst(E e, long timeout, TimeUnit unit)
// 如果立即可行且不違反容量限制,則將指定的元素插入此雙端佇列的末尾,並在成功時返回 true;如果當前沒有空間可用,則返回 false。
boolean offerLast(E e)
// 將指定的元素插入此雙端佇列的末尾,必要時將在指定的等待時間內等待可用空間。
boolean offerLast(E e, long timeout, TimeUnit unit)
// 獲取但不移除此雙端隊列表示的佇列的頭部(即此雙端佇列的第一個元素);如果此雙端佇列為空,則返回 null。
E peek()
// 獲取,但不移除此雙端佇列的第一個元素;如果此雙端佇列為空,則返回 null。
E peekFirst()
// 獲取,但不移除此雙端佇列的最後一個元素;如果此雙端佇列為空,則返回 null。
E peekLast()
// 獲取並移除此雙端隊列表示的佇列的頭部(即此雙端佇列的第一個元素);如果此雙端佇列為空,則返回 null。
E poll()
// 獲取並移除此雙端隊列表示的佇列的頭部(即此雙端佇列的第一個元素),如有必要將在指定的等待時間內等待可用元素。
E poll(long timeout, TimeUnit unit)
// 獲取並移除此雙端佇列的第一個元素;如果此雙端佇列為空,則返回 null。
E pollFirst()
// 獲取並移除此雙端佇列的第一個元素,必要時將在指定的等待時間等待可用元素。
E pollFirst(long timeout, TimeUnit unit)
// 獲取並移除此雙端佇列的最後一個元素;如果此雙端佇列為空,則返回 null。
E pollLast()
// 獲取並移除此雙端佇列的最後一個元素,必要時將在指定的等待時間內等待可用元素。
E pollLast(long timeout, TimeUnit unit)
// 從此雙端佇列所表示的堆疊中彈出一個元素。
E pop()
// 將元素推入此雙端隊列表示的棧。
void push(E e)
// 將指定的元素插入此雙端隊列表示的佇列中(即此雙端佇列的尾部),必要時將一直等待可用空間。
void put(E e)
// 將指定的元素插入此雙端佇列的開頭,必要時將一直等待可用空間。
void putFirst(E e)
// 將指定的元素插入此雙端佇列的末尾,必要時將一直等待可用空間。
void putLast(E e)
// 返回理想情況下(沒有記憶體和資源約束)此雙端佇列可不受阻塞地接受的額外元素數。
int remainingCapacity()
// 獲取並移除此雙端隊列表示的佇列的頭部。
E remove()
// 從此雙端佇列移除第一次出現的指定元素。
boolean remove(Object o)
// 獲取並移除此雙端佇列第一個元素。
E removeFirst()
// 從此雙端佇列移除第一次出現的指定元素。
boolean removeFirstOccurrence(Object o)
// 獲取並移除此雙端佇列的最後一個元素。
E removeLast()
// 從此雙端佇列移除最後一次出現的指定元素。
boolean removeLastOccurrence(Object o)
// 返回此雙端佇列中的元素數。
int size()
// 獲取並移除此雙端隊列表示的佇列的頭部(即此雙端佇列的第一個元素),必要時將一直等待可用元素。
E take()
// 獲取並移除此雙端佇列的第一個元素,必要時將一直等待可用元素。
E takeFirst()
// 獲取並移除此雙端佇列的最後一個元素,必要時將一直等待可用元素。
E takeLast()
// 返回以恰當順序(從第一個元素到最後一個元素)包含此雙端佇列所有元素的陣列。
Object[] toArray()
// 返回以恰當順序包含此雙端佇列所有元素的陣列;返回陣列的執行時型別是指定陣列的執行時型別。
<T> T[] toArray(T[] a)
// 返回此 collection 的字串表示形式。
String toString()