java執行緒安全之併發Queue(十三)
併發Queue
在併發的佇列上jdk提供了兩套實現,一個是以ConcurrentLinkedQueue為代表的高效能佇列,一個是以BlockingQueue介面為代表的阻塞佇列,無論在那種都繼承自Queue。
如圖繼承Queue共有二十四個:
ConcurrentLinkedQueue
概念理解
ConcurrentLinkedQueue:是一個適用於高併發場景下的佇列,通過無鎖的方式,實現了高併發狀態下的高效能,通常ConcurrentLinkedQueue效能好於BlockingQueueo它是一個基於連結節點的無界執行緒安全佇列。該佇列的元素遵循先講先出的原則。頭是最先加入的,尾是最近加入的,該佇列不允許null元素。
ConcurrentLinkedQueue重要方法:
Add()和offer()都是加入元素的方法(在ConcurrentLinkedQueue中,這兩個方法投有任何區別)
Poll()和peek()都是取頭元素節點,區別在於前者會刪除元素,後者不會。
案例
public class UseQueue_ConcurrentLinkedQueue {
public static void main(String[] args) throws Exception {
//高效能無阻塞無界佇列:ConcurrentLinkedQueue
ConcurrentLinkedQueue<String> q = new ConcurrentLinkedQueue<String>();
q.offer("a");
q.offer("b");
q.offer("c");
q.offer("d");
q.add("e");
System.out.println("從頭部取出元素,並從佇列裡刪除 >> "+q.poll()); //a 從頭部取出元素,並從佇列裡刪除
System.out.println("刪除後的長度 >> "+q.size()); //4
System.out.println("取出頭部元素 >> " +q.peek()); //b
System.out.println("長度 >> "+q.size()); //4
}
}
列印結果:
從頭部取出元素,並從佇列裡刪除 >> a
刪除後的長度 >> 4
取出頭部元素 >> b
長度 >> 4
BlockingQueue介面
ArrayBlockingQueue:基於陣列的阻塞佇列實現,在ArrayBlockingQueue內部,維護了一個定長陣列,以便快取佇列中的資料物件,其內部沒實現讀寫分離,也就意味著生產和消費不能完全並行,長度是需要定義的,可以指定先講先出或者先講後出,也叫有界佇列,在很多場合非常適合使用。
LinkedBlockingQueue:基於連結串列的阻塞佇列,同ArrayBlockingQueue類似,其內部也維持著一個數據緩衝佇列〈該佇列由一個連結串列構成),LinkedBlockingQueue之所以能夠高效的處理併發資料,是因為其內部實現採用分離鎖(讀寫分離兩個鎖),從而實現生產者和消費者操作的完全並行執行,他是一個無界佇列。
SynchronousQueue:一種沒有緩衝的佇列,生產者產生的資料直接會被消費者獲取並消費。
PriorityBlockingQueue:基於優先順序的阻塞佇列(優先順序的判斷通過建構函式傳入的Compator物件來決定,也就是說傳入佇列的物件必須實現Comparable介面),在實現PriorityBlockingQueue時,內部控制執行緒同步的鎖採用的是公平鎖,他也是一個無界的佇列。
DelayQueue:帶有延遲時間的Queue,其中的元素只有當其指定的延遲時間到了,才能夠從佇列中獲取到該元素。DelayQueue中的元素必須實現Delayed介面,DelayQueue是一個沒有大小限制的佇列,應用場景很多,比如對快取超時的資料進行移除、任務超時處理、空閒連線的關閉等等。
ArrayBlockingQueue、LinkedBlockingQueue、synchronousQueue案例
public class UseQueue_ConcurrentLinkedQueue {
public static void main(String[] args) throws Exception {
System.out.println("--------------- ArrayBlockingQueue --------------");
//阻塞佇列 有長度的佇列
ArrayBlockingQueue<String> array = new ArrayBlockingQueue<String>(5);
array.put("a");
array.put("b");
array.add("c");
array.add("d");
array.add("e");
//array.add("f");
//返回一個布林型別 在3秒之內能不能加入 不能返回false
System.out.println(array.offer("a", 3, TimeUnit.SECONDS));
System.out.println("所有資料 >> " + array.toString());
System.out.println("--------------- LinkedBlockingQueue --------------");
//阻塞佇列 無長度限制佇列
LinkedBlockingQueue<String> q = new LinkedBlockingQueue<String>();
q.offer("a");
q.offer("b");
q.offer("c");
q.offer("d");
q.offer("e");
q.add("f");
System.out.println("總長度 >> "+q.size());
for (Iterator iterator = q.iterator(); iterator.hasNext(); ) {
String string = (String) iterator.next();
System.out.print(string+" -- ");
}
System.out.println();
List<String> list = new ArrayList<String>();
//在 q 的佇列中取三個元素放到list 佇列裡
System.out.println(q.drainTo(list, 3));
System.out.println("取出LinkedBlockingQueue資料放到list列表的長度為 >> "+list.size());
for (String string : list) {
System.out.print(string + " -- ");
}
System.out.println();
System.out.println("--------------- SynchronousQueue --------------");
final SynchronousQueue<String> q1 = new SynchronousQueue<String>();
Thread t1 = new Thread(new Runnable() {
@Override
public void run() {
try {
System.out.println(Thread.currentThread().getName()+"取資料 "+ q1.take());
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
t1.start();
Thread t2 = new Thread(new Runnable() {
@Override
public void run() {
q1.add("b");
System.out.println(Thread.currentThread().getName() +"加入資料 b");
}
});
t2.start();
}
}
列印結果
--------------- ArrayBlockingQueue --------------
false
所有資料 >> [a, b, c, d, e]
--------------- LinkedBlockingQueue --------------
總長度 >> 6
a -- b -- c -- d -- e -- f --
3
取出LinkedBlockingQueue資料放到list列表的長度為 >> 3
a -- b -- c --
--------------- SynchronousQueue --------------
Thread-1加入資料 b
Thread-0取資料 b
PriorityBlockingQueue 案例
Task.java
public class Task implements Comparable<Task>{
private int id ;
private String name;
public int getId() {
return id;
}
public void setId(int id) {
this.id = id;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
@Override
public int compareTo(Task task) {
return this.id > task.id ? 1 : (this.id < task.id ? -1 : 0);
}
public String toString(){
return this.id + "," + this.name;
}
}
UsePriorityBlockingQueue.java
public class UsePriorityBlockingQueue {
public static void main(String[] args) throws Exception{
PriorityBlockingQueue<Task> q2 = new PriorityBlockingQueue<Task>();
Task t1 = new Task();
t1.setId(3);
t1.setName("id為3");
Task t2 = new Task();
t2.setId(4);
t2.setName("id為4");
Task t3 = new Task();
t3.setId(1);
t3.setName("id為1");
Task t4 = new Task();
t4.setId(2);
t4.setName("id為2");
//return this.id > task.id ? 1 : 0;
q2.add(t1); //3
q2.add(t2); //4
q2.add(t3); //1
q2.add(t4);
// 1 3 4
//第一次取值時候是取最小的後面不做排序
System.out.println("容器:" + q2); //[1,id為1, 2,id為2, 3,id為3, 4,id為4]
//拿出一個元素後 又會取一個最小的出來 放在第一個
System.out.println(q2.take().getId());
System.out.println("容器:" + q2); //[2,id為2, 4,id為4, 3,id為3]
System.out.println(q2.take().getId());
System.out.println("容器:" + q2); //[3,id為3, 4,id為4]
}
}
列印結果
容器:[1,id為1, 2,id為2, 3,id為3, 4,id為4]
1
容器:[2,id為2, 4,id為4, 3,id為3]
2
容器:[3,id為3, 4,id為4]
DelayQueue 案例
Wangmin.java
public class Wangmin implements Delayed {
private String name;
//身份證
private String id;
//截止時間
private long endTime;
//定義時間工具類
private TimeUnit timeUnit = TimeUnit.SECONDS;
public Wangmin(String name,String id,long endTime){
this.name=name;
this.id=id;
this.endTime = endTime;
}
public String getName(){
return this.name;
}
public String getId(){
return this.id;
}
/**
* 用來判斷是否到了截止時間
*/
@Override
public long getDelay(TimeUnit unit) {
//return unit.convert(endTime, TimeUnit.MILLISECONDS) - unit.convert(System.currentTimeMillis(), TimeUnit.MILLISECONDS);
return endTime - System.currentTimeMillis();
}
/**
* 相互批較排序用
*/
@Override
public int compareTo(Delayed delayed) {
Wangmin w = (Wangmin)delayed;
return this.getDelay(this.timeUnit) - w.getDelay(this.timeUnit) > 0 ? 1:0;
}
}
WangBa.java
public class WangBa implements Runnable {
private DelayQueue<Wangmin> queue = new DelayQueue<Wangmin>();
public boolean yinye =true;
public void shangji(String name,String id,int money){
Wangmin man = new Wangmin(name, id, 1000 * money + System.currentTimeMillis());
System.out.println("網名"+man.getName()+" 身份證"+man.getId()+"交錢"+money+"塊,開始上機...");
this.queue.add(man);
}
public void xiaji(Wangmin man){
System.out.println("網名"+man.getName()+" 身份證"+man.getId()+"時間到下機...");
}
@Override
public void run() {
while(yinye){
try {
Wangmin man = queue.take();
xiaji(man);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
public static void main(String args[]){
try{
System.out.println("網咖開始營業");
WangBa siyu = new WangBa();
Thread shangwang = new Thread(siyu);
shangwang.start();
siyu.shangji("路人甲", "123", 1);
siyu.shangji("路人乙", "234", 10);
siyu.shangji("路人丙", "345", 5);
}
catch(Exception e){
e.printStackTrace();
}
}
}
列印結果:
網咖開始營業
網名路人甲 身份證123交錢1塊,開始上機...
網名路人乙 身份證234交錢10塊,開始上機...
網名路人丙 身份證345交錢5塊,開始上機...
網名路人甲 身份證123時間到下機...
網名路人丙 身份證345時間到下機...
網名路人乙 身份證234時間到下機...
BlockingQueue 介面的重要方法
放入資料:
offer(anObject):表示如果可能的話,將anObject加到BlockingQueue裡,即如果BlockingQueue 可以容納,則返回true,否則返回false.(本方法不阻蹇當前執行方法的執行緒)
offer(E 0,long timeout, TimeUnit unit),可以設定等待的時間,如果在指定的時間內,還不能往佇列中加入BlockingQueue,則返回失敗。
put(anObject):把anObject加到BlockingQueue裡,如果BlockQueue沒有空間,則呼叫此方法的執行緒被阻斷直到BlokingQue裡面有空間再繼續,
獲取資料:
poll(time):取走BlokingQueue裡排在首位的物件,若不能立即取出,則可以等time引數規定的時間,取不到時返回null
poll(long timeout, Timeunit unit):從blockingQueue取出一個隊首的物件,如果在指定時間內,佇列一旦有資料可取,則立即返回佇列中的資料。否則知道時間超時還沒有資料可取,返回失敗。
take():取走引BlockinQueue裡排在首位的物件,若BlockingQueue為空,阻斷進入等待狀態直到BlckingQueue有新的資料被加入;
drainTo():一次性從BlockingQueue獲取所有可用的資料物件(還可以指定獲取資料的個數),通過該方法,可以提升獲取資料效率:不需要多次分批加鎖或釋放鎖。
Deque 雙端佇列
Deque允許在佇列的頭部活尾部進行出隊和入隊操作。
LinkedBlockingDeque是一個執行緒安全的雙端佇列實現,可以說他是最為複雜的一種佇列,在內部實現維護了前端和後端節點,但是其沒有實現讀寫分離,因此同一時間只能有一個執行緒對其講行操作。在高併發中效能要遠低於其他引。BlockingQueue。更要低於ConcurrentLinkedQueue,布jdk早期有一個非執行緒安全的Deque就是ArryDeque了, java6裡添加了LinkBlockingDeque來彌補多執行緒場景下執行緒安全的問題。
案例
public class UseDeque {
public static void main(String[] args) {
LinkedBlockingDeque<String> dq = new LinkedBlockingDeque<String>(10);
dq.addFirst("a");
dq.addFirst("b");
dq.addFirst("c");
dq.addFirst("d");
dq.addFirst("e");
dq.addLast("f");
dq.addLast("g");
dq.addLast("h");
dq.addLast("i");
dq.addLast("j");
//dq.offerFirst("k");
System.out.println("檢視頭元素:" + dq.peekFirst());
System.out.println("獲取尾元素:" + dq.pollLast());
Object [] objs = dq.toArray();
for (int i = 0; i < objs.length; i++) {
System.out.print(objs[i] + " -- ");
}
}
}
列印結果:
檢視頭元素:e
獲取尾元素:j
e -- d -- c -- b -- a -- f -- g -- h -- i --
LinkedBlockingDeque 方法說明
// 建立一個容量為 Integer.MAX_VALUE 的 LinkedBlockingDeque。
LinkedBlockingDeque()
// 建立一個容量為 Integer.MAX_VALUE 的 LinkedBlockingDeque,最初包含給定 collection 的元素,以該 collection 迭代器的遍歷順序新增。
LinkedBlockingDeque(Collection<? extends E> c)
// 建立一個具有給定(固定)容量的 LinkedBlockingDeque。
LinkedBlockingDeque(int capacity)
// 在不違反容量限制的情況下,將指定的元素插入此雙端佇列的末尾。
boolean add(E e)
// 如果立即可行且不違反容量限制,則將指定的元素插入此雙端佇列的開頭;如果當前沒有空間可用,則丟擲 IllegalStateException。
void addFirst(E e)
// 如果立即可行且不違反容量限制,則將指定的元素插入此雙端佇列的末尾;如果當前沒有空間可用,則丟擲 IllegalStateException。
void addLast(E e)
// 以原子方式 (atomically) 從此雙端佇列移除所有元素。
void clear()
// 如果此雙端佇列包含指定的元素,則返回 true。
boolean contains(Object o)
// 返回在此雙端佇列的元素上以逆向連續順序進行迭代的迭代器。
Iterator<E> descendingIterator()
// 移除此佇列中所有可用的元素,並將它們新增到給定 collection 中。
int drainTo(Collection<? super E> c)
// 最多從此佇列中移除給定數量的可用元素,並將這些元素新增到給定 collection 中。
int drainTo(Collection<? super E> c, int maxElements)
// 獲取但不移除此雙端隊列表示的佇列的頭部。
E element()
// 獲取,但不移除此雙端佇列的第一個元素。
E getFirst()
// 獲取,但不移除此雙端佇列的最後一個元素。
E getLast()
// 返回在此雙端佇列元素上以恰當順序進行迭代的迭代器。
Iterator<E> iterator()
// 如果立即可行且不違反容量限制,則將指定的元素插入此雙端隊列表示的佇列中(即此雙端佇列的尾部),並在成功時返回 true;如果當前沒有空間可用,則返回 false。
boolean offer(E e)
// 將指定的元素插入此雙端隊列表示的佇列中(即此雙端佇列的尾部),必要時將在指定的等待時間內一直等待可用空間。
boolean offer(E e, long timeout, TimeUnit unit)
// 如果立即可行且不違反容量限制,則將指定的元素插入此雙端佇列的開頭,並在成功時返回 true;如果當前沒有空間可用,則返回 false。
boolean offerFirst(E e)
// 將指定的元素插入此雙端佇列的開頭,必要時將在指定的等待時間內等待可用空間。
boolean offerFirst(E e, long timeout, TimeUnit unit)
// 如果立即可行且不違反容量限制,則將指定的元素插入此雙端佇列的末尾,並在成功時返回 true;如果當前沒有空間可用,則返回 false。
boolean offerLast(E e)
// 將指定的元素插入此雙端佇列的末尾,必要時將在指定的等待時間內等待可用空間。
boolean offerLast(E e, long timeout, TimeUnit unit)
// 獲取但不移除此雙端隊列表示的佇列的頭部(即此雙端佇列的第一個元素);如果此雙端佇列為空,則返回 null。
E peek()
// 獲取,但不移除此雙端佇列的第一個元素;如果此雙端佇列為空,則返回 null。
E peekFirst()
// 獲取,但不移除此雙端佇列的最後一個元素;如果此雙端佇列為空,則返回 null。
E peekLast()
// 獲取並移除此雙端隊列表示的佇列的頭部(即此雙端佇列的第一個元素);如果此雙端佇列為空,則返回 null。
E poll()
// 獲取並移除此雙端隊列表示的佇列的頭部(即此雙端佇列的第一個元素),如有必要將在指定的等待時間內等待可用元素。
E poll(long timeout, TimeUnit unit)
// 獲取並移除此雙端佇列的第一個元素;如果此雙端佇列為空,則返回 null。
E pollFirst()
// 獲取並移除此雙端佇列的第一個元素,必要時將在指定的等待時間等待可用元素。
E pollFirst(long timeout, TimeUnit unit)
// 獲取並移除此雙端佇列的最後一個元素;如果此雙端佇列為空,則返回 null。
E pollLast()
// 獲取並移除此雙端佇列的最後一個元素,必要時將在指定的等待時間內等待可用元素。
E pollLast(long timeout, TimeUnit unit)
// 從此雙端佇列所表示的堆疊中彈出一個元素。
E pop()
// 將元素推入此雙端隊列表示的棧。
void push(E e)
// 將指定的元素插入此雙端隊列表示的佇列中(即此雙端佇列的尾部),必要時將一直等待可用空間。
void put(E e)
// 將指定的元素插入此雙端佇列的開頭,必要時將一直等待可用空間。
void putFirst(E e)
// 將指定的元素插入此雙端佇列的末尾,必要時將一直等待可用空間。
void putLast(E e)
// 返回理想情況下(沒有記憶體和資源約束)此雙端佇列可不受阻塞地接受的額外元素數。
int remainingCapacity()
// 獲取並移除此雙端隊列表示的佇列的頭部。
E remove()
// 從此雙端佇列移除第一次出現的指定元素。
boolean remove(Object o)
// 獲取並移除此雙端佇列第一個元素。
E removeFirst()
// 從此雙端佇列移除第一次出現的指定元素。
boolean removeFirstOccurrence(Object o)
// 獲取並移除此雙端佇列的最後一個元素。
E removeLast()
// 從此雙端佇列移除最後一次出現的指定元素。
boolean removeLastOccurrence(Object o)
// 返回此雙端佇列中的元素數。
int size()
// 獲取並移除此雙端隊列表示的佇列的頭部(即此雙端佇列的第一個元素),必要時將一直等待可用元素。
E take()
// 獲取並移除此雙端佇列的第一個元素,必要時將一直等待可用元素。
E takeFirst()
// 獲取並移除此雙端佇列的最後一個元素,必要時將一直等待可用元素。
E takeLast()
// 返回以恰當順序(從第一個元素到最後一個元素)包含此雙端佇列所有元素的陣列。
Object[] toArray()
// 返回以恰當順序包含此雙端佇列所有元素的陣列;返回陣列的執行時型別是指定陣列的執行時型別。
<T> T[] toArray(T[] a)
// 返回此 collection 的字串表示形式。
String toString()