1. 程式人生 > >java執行緒安全之併發Queue

java執行緒安全之併發Queue

java執行緒安全之併發Queue(十三)

2017年11月19日 23:40:23  閱讀數:12092 所屬專欄: 執行緒安全  

併發Queue

       在併發的佇列上jdk提供了兩套實現,一個是以ConcurrentLinkedQueue為代表的高效能佇列,一個是以BlockingQueue介面為代表的阻塞佇列,無論在那種都繼承自Queue。 
如圖繼承Queue共有二十四個: 
這裡寫圖片描述


這裡寫圖片描述

ConcurrentLinkedQueue

概念理解

       ConcurrentLinkedQueue:是一個適用於高併發場景下的佇列,通過無鎖的方式,實現了高併發狀態下的高效能,通常ConcurrentLinkedQueue效能好於BlockingQueueo它是一個基於連結節點的無界執行緒安全佇列。該佇列的元素遵循先講先出的原則。頭是最先加入的,尾是最近加入的,該佇列不允許null元素。 
ConcurrentLinkedQueue重要方法:

Add()和offer()都是加入元素的方法(在ConcurrentLinkedQueue中,這兩個方法投有任何區別)

Poll()和peek()都是取頭元素節點,區別在於前者會刪除元素,後者不會。

案例
public class UseQueue_ConcurrentLinkedQueue {


    public static void main(String[] args) throws Exception { //高效能無阻塞無界佇列:ConcurrentLinkedQueue ConcurrentLinkedQueue<String> q = new ConcurrentLinkedQueue<String>(); q.offer("a"); q.offer("b"); q.offer("c"); q.offer("d"); q.add("e"); System.out.println("從頭部取出元素,並從佇列裡刪除 >> "+q.poll()); //a 從頭部取出元素,並從佇列裡刪除 System.out.println("刪除後的長度 >> "+q.size()); //4 System.out.println("取出頭部元素 >> "+q.peek()); //b System.out.println("長度 >> "+q.size()); //4 } }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20

列印結果:

從頭部取出元素,並從佇列裡刪除 >> a
刪除後的長度 >> 4
取出頭部元素 >> b
長度 >> 4
  • 1
  • 2
  • 3
  • 4

BlockingQueue介面

       ArrayBlockingQueue:基於陣列的阻塞佇列實現,在ArrayBlockingQueue內部,維護了一個定長陣列,以便快取佇列中的資料物件,其內部沒實現讀寫分離,也就意味著生產和消費不能完全並行,長度是需要定義的,可以指定先講先出或者先講後出,也叫有界佇列,在很多場合非常適合使用。

       LinkedBlockingQueue:基於連結串列的阻塞佇列,同ArrayBlockingQueue類似,其內部也維持著一個數據緩衝佇列〈該佇列由一個連結串列構成),LinkedBlockingQueue之所以能夠高效的處理併發資料,是因為其內部實現採用分離鎖(讀寫分離兩個鎖),從而實現生產者和消費者操作的完全並行執行,他是一個無界佇列。

       SynchronousQueue:一種沒有緩衝的佇列,生產者產生的資料直接會被消費者獲取並消費。

       PriorityBlockingQueue:基於優先順序的阻塞佇列(優先順序的判斷通過建構函式傳入的Compator物件來決定,也就是說傳入佇列的物件必須實現Comparable介面),在實現PriorityBlockingQueue時,內部控制執行緒同步的鎖採用的是公平鎖,他也是一個無界的佇列。

       DelayQueue:帶有延遲時間的Queue,其中的元素只有當其指定的延遲時間到了,才能夠從佇列中獲取到該元素。DelayQueue中的元素必須實現Delayed介面,DelayQueue是一個沒有大小限制的佇列,應用場景很多,比如對快取超時的資料進行移除、任務超時處理、空閒連線的關閉等等。

ArrayBlockingQueue、LinkedBlockingQueue、synchronousQueue案例
public class UseQueue_ConcurrentLinkedQueue {

    public static void main(String[] args) throws Exception { System.out.println("--------------- ArrayBlockingQueue --------------"); //阻塞佇列 有長度的佇列 ArrayBlockingQueue<String> array = new ArrayBlockingQueue<String>(5); array.put("a"); array.put("b"); array.add("c"); array.add("d"); array.add("e"); //array.add("f"); //返回一個布林型別 在3秒之內能不能加入 不能返回false System.out.println(array.offer("a", 3, TimeUnit.SECONDS)); System.out.println("所有資料 >> " + array.toString()); System.out.println("--------------- LinkedBlockingQueue --------------"); //阻塞佇列 無長度限制佇列 LinkedBlockingQueue<String> q = new LinkedBlockingQueue<String>(); q.offer("a"); q.offer("b"); q.offer("c"); q.offer("d"); q.offer("e"); q.add("f"); System.out.println("總長度 >> "+q.size()); for (Iterator iterator = q.iterator(); iterator.hasNext(); ) { String string = (String) iterator.next(); System.out.print(string+" -- "); } System.out.println(); List<String> list = new ArrayList<String>(); //在 q 的佇列中取三個元素放到list 佇列裡 System.out.println(q.drainTo(list, 3)); System.out.println("取出LinkedBlockingQueue資料放到list列表的長度為 >> "+list.size()); for (String string : list) { System.out.print(string + " -- "); } System.out.println(); System.out.println("--------------- SynchronousQueue --------------"); final SynchronousQueue<String> q1 = new SynchronousQueue<String>(); Thread t1 = new Thread(new Runnable() { @Override public void run() { try { System.out.println(Thread.currentThread().getName()+"取資料 "+ q1.take()); } catch (InterruptedException e) { e.printStackTrace(); } } }); t1.start(); Thread t2 = new Thread(new Runnable() { @Override public void run() { q1.add("b"); System.out.println(Thread.currentThread().getName() +"加入資料 b"); } }); t2.start(); } }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71

列印結果

--------------- ArrayBlockingQueue -------------- false 所有資料 >> [a, b, c, d, e] --------------- LinkedBlockingQueue -------------- 總長度 >> 6 a -- b -- c -- d -- e -- f -- 3 取出LinkedBlockingQueue資料放到list列表的長度為 >> 3 a -- b -- c -- --------------- SynchronousQueue -------------- Thread-1加入資料 b Thread-0取資料 b
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
PriorityBlockingQueue 案例

Task.java

public class Task implements Comparable<Task>{ private int id ; private String name; public int getId() { return id; } public void setId(int id) { this.id = id; } public String getName() { return name; } public void setName(String name) { this.name = name; } @Override public int compareTo(Task task) { return this.id > task.id ? 1 : (this.id < task.id ? -1 : 0); } public String toString(){ return this.id + "," + this.name; } }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27

UsePriorityBlockingQueue.java

public class UsePriorityBlockingQueue {


    public static void main(String[] args) throws Exception{ PriorityBlockingQueue<Task> q2 = new PriorityBlockingQueue<Task>(); Task t1 = new Task(); t1.setId(3); t1.setName("id為3"); Task t2 = new Task(); t2.setId(4); t2.setName("id為4"); Task t3 = new Task(); t3.setId(1); t3.setName("id為1"); Task t4 = new Task(); t4.setId(2); t4.setName("id為2"); //return this.id > task.id ? 1 : 0; q2.add(t1); //3 q2.add(t2); //4 q2.add(t3); //1 q2.add(t4); // 1 3 4 //第一次取值時候是取最小的後面不做排序 System.out.println("容器:" + q2); //[1,id為1, 2,id為2, 3,id為3, 4,id為4] //拿出一個元素後 又會取一個最小的出來 放在第一個 System.out.println(q2.take().getId()); System.out.println("容器:" + q2); //[2,id為2, 4,id為4, 3,id為3] System.out.println(q2.take().getId()); System.out.println("容器:" + q2); //[3,id為3, 4,id為4] } } 
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41

列印結果

容器:[1,id為1, 2,id為2, 3,id為3, 4,id為4] 1 容器:[2,id為2, 4,id為4, 3,id為3] 2 容器:[3,id為3, 4,id為4]
  • 1
  • 2
  • 3
  • 4
  • 5
DelayQueue 案例

Wangmin.java

public class Wangmin implements Delayed { private String name; //身份證 private String id; //截止時間 private long endTime; //定義時間工具類 private TimeUnit timeUnit = TimeUnit.SECONDS; public Wangmin(String name,String id,long endTime){ this.name=name; this.id=id; this.endTime = endTime; } public String getName(){ return this.name; } public String getId(){ return this.id; } /** * 用來判斷是否到了截止時間 */ @Override public long getDelay(TimeUnit unit) { //return unit.convert(endTime, TimeUnit.MILLISECONDS) - unit.convert(System.currentTimeMillis(), TimeUnit.MILLISECONDS); return endTime - System.currentTimeMillis(); } /** * 相互批較排序用 */ @Override public int compareTo(Delayed delayed) { Wangmin w = (Wangmin)delayed; return this.getDelay(this.timeUnit) - w.getDelay(this.timeUnit) > 0 ? 1:0; } } 
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43

WangBa.java

public class WangBa implements Runnable { private DelayQueue<Wangmin> queue = new DelayQueue<Wangmin>(); public boolean yinye =true; public void shangji(String name,String id,int money){ Wangmin man = new Wangmin(name, id, 1000 * money + System.currentTimeMillis()); System.out.println("網名"+man.getName()+" 身份證"+man.getId()+"交錢"+money+"塊,開始上機..."); this.queue.add(man); } public void xiaji(Wangmin man){ System.out.println("網名"+man.getName()+" 身份證"+man.getId()+"時間到下機..."); } @Override public void run() { while(yinye){ try { Wangmin man = queue.take(); xiaji(man); } catch (InterruptedException e) { e.printStackTrace(); } } } public static void main(String args[]){ try{ System.out.println("網咖開始營業"); WangBa siyu = new WangBa(); Thread shangwang = new Thread(siyu); shangwang.start(); siyu.shangji("路人甲", "123", 1); siyu.shangji("路人乙", "234", 10); siyu.shangji("路人丙", "345", 5); } catch(Exception e){ e.printStackTrace(); } } } 
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45

列印結果:

網咖開始營業
網名路人甲 身份證123交錢1塊,開始上機...
網名路人乙 身份證234交錢10塊,開始上機... 網名路人丙 身份證345交錢5塊,開始上機... 網名路人甲 身份證123時間到下機... 網名路人丙 身份證345時間到下機... 網名路人乙 身份證234時間到下機...
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7

BlockingQueue 介面的重要方法

放入資料:

offer(anObject):表示如果可能的話,將anObject加到BlockingQueue裡,即如果BlockingQueue 可以容納,則返回true,否則返回false.(本方法不阻蹇當前執行方法的執行緒)

offer(E 0,long timeout, TimeUnit unit),可以設定等待的時間,如果在指定的時間內,還不能往佇列中加入BlockingQueue,則返回失敗。

put(anObject):把anObject加到BlockingQueue裡,如果BlockQueue沒有空間,則呼叫此方法的執行緒被阻斷直到BlokingQue裡面有空間再繼續,

獲取資料:

poll(time):取走BlokingQueue裡排在首位的物件,若不能立即取出,則可以等time引數規定的時間,取不到時返回null

poll(long timeout, Timeunit unit):從blockingQueue取出一個隊首的物件,如果在指定時間內,佇列一旦有資料可取,則立即返回佇列中的資料。否則知道時間超時還沒有資料可取,返回失敗。

take():取走引BlockinQueue裡排在首位的物件,若BlockingQueue為空,阻斷進入等待狀態直到BlckingQueue有新的資料被加入;

drainTo():一次性從BlockingQueue獲取所有可用的資料物件(還可以指定獲取資料的個數),通過該方法,可以提升獲取資料效率:不需要多次分批加鎖或釋放鎖。

Deque 雙端佇列

Deque允許在佇列的頭部活尾部進行出隊和入隊操作。

LinkedBlockingDeque是一個執行緒安全的雙端佇列實現,可以說他是最為複雜的一種佇列,在內部實現維護了前端和後端節點,但是其沒有實現讀寫分離,因此同一時間只能有一個執行緒對其講行操作。在高併發中效能要遠低於其他引。BlockingQueue。更要低於ConcurrentLinkedQueue,布jdk早期有一個非執行緒安全的Deque就是ArryDeque了, java6裡添加了LinkBlockingDeque來彌補多執行緒場景下執行緒安全的問題。

案例
public class UseDeque {

    public static void main(String[] args) { LinkedBlockingDeque<String> dq = new LinkedBlockingDeque<String>(10); dq.addFirst("a"); dq.addFirst("b"); dq.addFirst("c"); dq.addFirst("d"); dq.addFirst("e"); dq.addLast("f"); dq.addLast("g"); dq.addLast("h"); dq.addLast("i"); dq.addLast("j"); //dq.offerFirst("k"); System.out.println("檢視頭元素:" + dq.peekFirst()); System.out.println("獲取尾元素:" + dq.pollLast()); Object [] objs = dq.toArray(); for (int i = 0; i < objs.length; i++) { System.out.print(objs[i] + " -- "); } } } 
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27

列印結果:

檢視頭元素:e
獲取尾元素:j
e -- d -- c -- b -- a -- f -- g -- h -- i -- 
  • 1
  • 2
  • 3
  • 4
LinkedBlockingDeque 方法說明
// 建立一個容量為 Integer.MAX_VALUE 的 LinkedBlockingDeque。
LinkedBlockingDeque()
// 建立一個容量為 Integer.MAX_VALUE 的 LinkedBlockingDeque,最初包含給定 collection 的元素,以該 collection 迭代器的遍歷順序新增。
LinkedBlockingDeque(Collection<? extends E> c)
// 建立一個具有給定(固定)容量的 LinkedBlockingDeque。
LinkedBlockingDeque(int capacity)
// 在不違反容量限制的情況下,將指定的元素插入此雙端佇列的末尾。 boolean add(E e) // 如果立即可行且不違反容量限制,則將指定的元素插入此雙端佇列的開頭;如果當前沒有空間可用,則丟擲 IllegalStateException。 void addFirst(E e) // 如果立即可行且不違反容量限制,則將指定的元素插入此雙端佇列的末尾;如果當前沒有空間可用,則丟擲 IllegalStateException。 void addLast(E e) // 以原子方式 (atomically) 從此雙端佇列移除所有元素。 void clear() // 如果此雙端佇列包含指定的元素,則返回 true。 boolean contains(Object o) // 返回在此雙端佇列的元素上以逆向連續順序進行迭代的迭代器。 Iterator<E> descendingIterator() // 移除此佇列中所有可用的元素,並將它們新增到給定 collection 中。 int drainTo(Collection<? super E> c) // 最多從此佇列中移除給定數量的可用元素,並將這些元素新增到給定 collection 中。 int drainTo(Collection<? super E> c, int maxElements) // 獲取但不移除此雙端隊列表示的佇列的頭部。 E element() // 獲取,但不移除此雙端佇列的第一個元素。 E getFirst() // 獲取,但不移除此雙端佇列的最後一個元素。 E getLast() // 返回在此雙端佇列元素上以恰當順序進行迭代的迭代器。 Iterator<E> iterator() // 如果立即可行且不違反容量限制,則將指定的元素插入此雙端隊列表示的佇列中(即此雙端佇列的尾部),並在成功時返回 true;如果當前沒有空間可用,則返回 false。 boolean offer(E e) // 將指定的元素插入此雙端隊列表示的佇列中(即此雙端佇列的尾部),必要時將在指定的等待時間內一直等待可用空間。 boolean offer(E e, long timeout, TimeUnit unit) // 如果立即可行且不違反容量限制,則將指定的元素插入此雙端佇列的開頭,並在成功時返回 true;如果當前沒有空間可用,則返回 false。 boolean offerFirst(E e) // 將指定的元素插入此雙端佇列的開頭,必要時將在指定的等待時間內等待可用空間。 boolean offerFirst(E e, long timeout, TimeUnit unit) // 如果立即可行且不違反容量限制,則將指定的元素插入此雙端佇列的末尾,並在成功時返回 true;如果當前沒有空間可用,則返回 false。 boolean offerLast(E e) // 將指定的元素插入此雙端佇列的末尾,必要時將在指定的等待時間內等待可用空間。 boolean offerLast(E e, long timeout, TimeUnit unit) // 獲取但不移除此雙端隊列表示的佇列的頭部(即此雙端佇列的第一個元素);如果此雙端佇列為空,則返回 null。 E peek() // 獲取,但不移除此雙端佇列的第一個元素;如果此雙端佇列為空,則返回 null。 E peekFirst() // 獲取,但不移除此雙端佇列的最後一個元素;如果此雙端佇列為空,則返回 null。 E peekLast() // 獲取並移除此雙端隊列表示的佇列的頭部(即此雙端佇列的第一個元素);如果此雙端佇列為空,則返回 null。 E poll() // 獲取並移除此雙端隊列表示的佇列的頭部(即此雙端佇列的第一個元素),如有必要將在指定的等待時間內等待可用元素。 E poll(long timeout, TimeUnit unit) // 獲取並移除此雙端佇列的第一個元素;如果此雙端佇列為空,則返回 null。 E pollFirst() // 獲取並移除此雙端佇列的第一個元素,必要時將在指定的等待時間等待可用元素。 E pollFirst(long timeout, TimeUnit unit) // 獲取並移除此雙端佇列的最後一個元素;如果此雙端佇列為空,則返回 null。 E pollLast() // 獲取並移除此雙端佇列的最後一個元素,必要時將在指定的等待時間內等待可用元素。 E pollLast(long timeout, TimeUnit unit) // 從此雙端佇列所表示的堆疊中彈出一個元素。 E pop() // 將元素推入此雙端隊列表示的棧。 void push(E e) // 將指定的元素插入此雙端隊列表示的佇列中(即此雙端佇列的尾部),必要時將一直等待可用空間。 void put(E e) // 將指定的元素插入此雙端佇列的開頭,必要時將一直等待可用空間。 void putFirst(E e) // 將指定的元素插入此雙端佇列的末尾,必要時將一直等待可用空間。 void putLast(E e) // 返回理想情況下(沒有記憶體和資源約束)此雙端佇列可不受阻塞地接受的額外元素數。 int remainingCapacity() // 獲取並移除此雙端隊列表示的佇列的頭部。 E remove() // 從此雙端佇列移除第一次出現的指定元素。 boolean remove(Object o) // 獲取並移除此雙端佇列第一個元素。 E removeFirst() // 從此雙端佇列移除第一次出現的指定元素。 boolean removeFirstOccurrence(Object o) // 獲取並移除此雙端佇列的最後一個元素。 E removeLast() // 從此雙端佇列移除最後一次出現的指定元素。 boolean removeLastOccurrence(Object o) // 返回此雙端佇列中的元素數。 int size() // 獲取並移除此雙端隊列表示的佇列的頭部(即此雙端佇列的第一個元素),必要時將一直等待可用元素。 E take() // 獲取並移除此雙端佇列的第一個元素,必要時將一直等待可用元素。 E takeFirst() // 獲取並移除此雙端佇列的最後一個元素,必要時將一直等待可用元素。 E takeLast() // 返回以恰當順序(從第一個元素到最後一個元素)包含此雙端佇列所有元素的陣列。 Object[] toArray() // 返回以恰當順序包含此雙端佇列所有元素的陣列;返回陣列的執行時型別是指定陣列的執行時型別。 <T> T[] toArray(T[] a) // 返回此 collection 的字串表示形式。 String toString()
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
  • 87
  • 88
  • 89
  • 90
  • 91
  • 92
  • 93
  • 94
  • 95
  • 96
  • 97
  • 98

原始碼:https://github.com/hfbin/Thread_Socket/tree/master/Thread/coll013

 

from: https://blog.csdn.net/qq_33524158/article/details/78578370