【多執行緒高併發】 同步容器和佇列
關鍵字: 同步容器,佇列,ConcurrentMap, Copy-On-Write容器,併發Queue, ConcurrentLinkedQueue, BlockQueue介面, ArrayBlockingQueue , LinkedBlockingQueue, PriorityBlockingQueue, DelayQueue, SynchronousQueue
github地址: https://github.com/zhaikaishun/concurrent_programming
程式碼主要在 Multi_003 , Multi_003\Multi_003\src\com\kaishun\base\coll013
1.1 同步容器
同步容器都是執行緒安全的,但是在某些場景下可能需要加鎖來保護複合操作,複合類操作如 迭代,跳轉,條件運算等,這些複合操作在多執行緒併發執行的時候。可能會出現意外行為,最經典的便是ConcurrentModificationException,原因是當容器迭代的過程中,被併發的修改了內容。
同步類容器: 如古老的Vector,HashTable。這些容器的同步功能其實都是由JDK的Collections.synchronized等工廠方法去實現的,其底層的機制無非就是用傳統的synchronized關鍵字對每個公用的方法都進行同步,使得每次只能有一個執行緒訪問容器的狀態。這很明顯不符合今天網際網路時代高併發的需求,在保證執行緒安全的同時,也必須要有足夠好的效能。
JDK5.0以後提供了多種併發容器來替代同步容器從而改善效能。併發容器是專門針對併發設計的,使用ConcurrentHashMap來代替給予雜湊的HashTable,而且在ConcurrentHashMap中,添加了一些常見覆合操作的支援,以及使用CopyOnWriteArrayList代替Voctor, 併發的CopyOnWriteArraySet, 以及併發的Queue, ConcurrentLinkedQueue和LinkedBlockingQueue, 前者是高效能的佇列,後者是以阻塞形式的佇列,具體實現Queue還有很多,例如ArrayBlockingQueue, PriorityBlockingQueue, SynchronousQueue等。
1.2 ConcurrentMap
ConcurrentMap介面有兩個重要的實現
- ConcurrentHashMap
- ConcurrentSkipListMap(支援併發排序功能,彌補ConcurrnetHashMap)
ConcurrentHashMap主要是利用了Segment(段)的方式,來減小鎖的粒度,從而實現提高併發效能的機制, 最大可以分成16段。並且程式碼中大多共享變數使用Volatile關鍵字宣告,目的是第一時間獲取修改的內容,效能非常好
如圖,傳統的HashTable,只有一段,對整個map進行加鎖,鎖的粒度比較大。而CorrentHashMap, 對這個map的某一個小Segment來進行加鎖,在哪一段操作,只鎖定哪一個段,其他段不影響,鎖的粒度比較小,從而提高併發的效能
具體如何使用,和之前的HashMap幾乎是一模一樣的,還是隨便看個例子吧
ConcurrentHashMap<String, Object> chm = new ConcurrentHashMap<String, Object>();
chm.put("k1", "v1");
chm.put("k2", "v2");
chm.put("k3", "v3");
chm.putIfAbsent("k4", "vvvv"); // 如果key不存在,就加進去
1.3 Copy-On-Write容器
Copy-On-Write簡稱COW, 是一種用於程式設計中的優化策略。
Copy容器即寫時複製的容器,先將當前容器進行Copy,複製出一個新的容器,然後想信的容器裡面新增元素,新增完後,在講原容器的引用指向新的容器。這樣的好處是我們可以對CopyOnWrite容器進行併發的讀而不用加鎖,因為當前容器不會新增任何元素,所以CopyOnWrite也是一種讀寫分離的思想,讀和寫不同的容器,適用於讀多寫少的場景。
JDK裡的COW容器有兩種:CopyOnWriteArrayList和CopyOnWriteArraySet, COW容器非常有用,可以在非常多的併發容器場景中使用到。
使用方法也和原始的ArrayList, set一樣
CopyOnWriteArrayList<String> cwal = new CopyOnWriteArrayList<String>();
CopyOnWriteArraySet<String> cwas = new CopyOnWriteArraySet<String>();
2.1 併發Queue
在併發佇列上JDK提供了兩套實現,一個是以ConcurrentLinkedQueue為代表的高效能佇列,一個是以BlockingQueue介面為代表的阻塞佇列,無論哪種都繼承自Queue
2.2 ConcurrentLinkedQueue
ConcurrentLinkedQueue: 是一個適用於高併發場景下的佇列,通過無鎖的方式,實現了高併發狀態下的高效能,通常ConcurrentLinkedQueue效能好於BlockingQueue,他是一個基於連結節點的無界執行緒安全佇列,該佇列的元素遵循先進先出的原則,頭是最新加入,尾是最近加入。該佇列不允許null元素。
ConcurrentLinkedQueue重要方法:
add()和offer都是加入元素的方法(在ConcurrentLinkedQueue中,這兩個方法沒有任何區別)
poll()和peek()都是取頭元素節點,區別在於前者會刪除元素,後者不會。
示例:
com.kaishun.base.coll013.UseQueue
高效能無阻塞無界佇列:ConcurrentLinkedQueue
//高效能無阻塞無界佇列:ConcurrentLinkedQueue
ConcurrentLinkedQueue<String> q = new ConcurrentLinkedQueue<String>();
q.offer("a");
q.offer("b");
q.offer("c");
q.offer("d");
q.add("e");
System.out.println(q.poll()); //a 從頭部取出元素,並從佇列裡刪除
System.out.println(q.size()); //4
System.out.println(q.peek()); //b
System.out.println(q.size()); //4
----輸出-----
a
4
b
4
2.3 BlockQueue介面
有5種queue的實現。
ArrayBlockingQueue
基於陣列的阻塞佇列實現,在ArrayBlockingQueue內部,維護了一個定常陣列,一邊快取佇列中的資料物件,其內部沒有實現讀寫分離,也就意味著生產和消費不能完全並行,長度是需要定義的,可以指定先進先出或者先進後出。也叫有界佇列,在很多場合下非常適用。
ArrayBlockingQueue<String> array = new ArrayBlockingQueue<String>(5);
array.put("a");
array.put("b");
array.add("c");
array.add("d");
array.add("e");
// array.add("f");
System.out.println(array.offer("a", 3, TimeUnit.SECONDS));
由於指定的是5個長度,前面已經加了5個了,後面再次新增的時候,3秒內都加不進去,3秒後返回一個false,輸出
false
若超過了還是用add方法,就會拋異常 IllegalStateException: Queue full ,例如
ArrayBlockingQueue<String> array = new ArrayBlockingQueue<String>(5);
array.put("a");
array.put("b");
array.add("c");
array.add("d");
array.add("e");
array.add("f"); //這裡接回拋異常
System.out.println(array.offer("a", 3, TimeUnit.SECONDS));
LinkedBlockingQueue:
基於連結串列的阻塞佇列,同ArrayBlockingQueue類似,其內部也維護者一個數據緩衝佇列(該佇列是由一個連結串列構成),LinkBlockingQueue之所以能夠搞笑的處理併發資料,是因為其內部實現了讀寫分離鎖,從而實現了生產者和消費者的完全並行執行,他是一個無界佇列。
//阻塞佇列
LinkedBlockingQueue<String> q = new LinkedBlockingQueue<String>();
q.offer("a");
q.offer("b");
q.offer("c");
q.offer("d");
q.offer("e");
q.add("f");
System.out.println(q.size());
for (Iterator iterator = q.iterator(); iterator.hasNext();) {
String string = (String) iterator.next();
System.out.println(string);
}
-----------輸出-----------
6
a
b
c
d
e
f
drainTo一次取多個元素
//阻塞佇列
LinkedBlockingQueue<String> q = new LinkedBlockingQueue<String>();
q.offer("a");
q.offer("b");
q.offer("c");
q.offer("d");
q.offer("e");
q.add("f");
System.out.println(q.size());
List<String> list = new ArrayList<String>();
//取3個元素,放入到 list 集合中去
System.out.println(q.drainTo(list, 3));
System.out.println(list.size());
for (String string : list) {
System.out.println(string);
}
-----------輸出-----------
6
3
3
a
b
c
PriorityBlockingQueue:
基於優先順序的阻塞佇列(優先順序的判斷通過建構函式傳入的Compator物件來決定,也就是說傳入佇列的物件必須實現Comparable介面),在實現PriorityBlockingQueue時,內部控制執行緒同步的鎖採用的是公平鎖,他也是一個無界的佇列。
示例:
Task類,實現了Comparable的方法,重寫compareTo方法
public class Task implements Comparable<Task>{
private int id ;
private String name;
public int getId() {
return id;
}
public void setId(int id) {
this.id = id;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
@Override
public int compareTo(Task task) {
return this.id > task.id ? 1 : (this.id < task.id ? -1 : 0);
}
public String toString(){
return this.id + "," + this.name;
}
}
UsePriorityBlockingQueue類
測試是否是有序佇列
public class UsePriorityBlockingQueue {
public static void main(String[] args) throws Exception{
PriorityBlockingQueue<Task> q = new PriorityBlockingQueue<Task>();
Task t1 = new Task();
t1.setId(3);
t1.setName("id為3");
Task t2 = new Task();
t2.setId(4);
t2.setName("id為4");
Task t3 = new Task();
t3.setId(1);
t3.setName("id為1");
//return this.id > task.id ? 1 : 0;
q.add(t1); //3
q.add(t2); //4
q.add(t3); //1
// 1 3 4
System.out.println("容器:" + q);
System.out.println(q.take().getId());
System.out.println("容器:" + q);
System.out.println(q.take().getId());
System.out.println(q.take().getId());
}
}
輸出
容器:[1,id為1, 4,id為4, 3,id為3]
1
容器:[3,id為3, 4,id為4]
3
4
說明,這種佇列在沒有take的時候,還不是排序的,take()時,才利用了排序,比較的方法
DelayQueue:
帶有延遲時間的Queue, 其中的元素只有當其指定的延遲時間到了,才能夠從佇列中獲取到該元素, DelayQueue的元素必須實現Delayed 介面, DelayQueue是一個沒有大小限制的佇列,應用場景很多,比如對快取超時的資料進行一處,任務超時處理,空閒連線的關閉等等。
經典的網咖上機案例:
Wangmin 實現了Delayed介面
public class Wangmin implements Delayed {
private String name;
//身份證
private String id;
//截止時間
private long endTime;
//定義時間工具類
private TimeUnit timeUnit = TimeUnit.SECONDS;
public Wangmin(String name,String id,long endTime){
this.name=name;
this.id=id;
this.endTime = endTime;
}
public String getName(){
return this.name;
}
public String getId(){
return this.id;
}
/**
* 用來判斷是否到了截止時間
*/
@Override
public long getDelay(TimeUnit unit) {
//return unit.convert(endTime, TimeUnit.MILLISECONDS) - unit.convert(System.currentTimeMillis(), TimeUnit.MILLISECONDS);
return endTime - System.currentTimeMillis();
}
/**
* 相互批較排序用
*/
@Override
public int compareTo(Delayed delayed) {
Wangmin w = (Wangmin)delayed;
return this.getDelay(this.timeUnit) - w.getDelay(this.timeUnit) > 0 ? 1:0;
}
WangBa類
public class WangBa implements Runnable {
private DelayQueue<Wangmin> queue = new DelayQueue<Wangmin>();
public boolean yinye =true;
public void shangji(String name,String id,int money){
Wangmin man = new Wangmin(name, id, 1000 * money + System.currentTimeMillis());
System.out.println("網名"+man.getName()+" 身份證"+man.getId()+"交錢"+money+"塊,開始上機...");
this.queue.add(man);
}
public void xiaji(Wangmin man){
System.out.println("網名"+man.getName()+" 身份證"+man.getId()+"時間到下機...");
}
@Override
public void run() {
while(yinye){
try {
Wangmin man = queue.take();
xiaji(man);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
public static void main(String args[]){
try{
System.out.println("網咖開始營業");
WangBa siyu = new WangBa();
Thread shangwang = new Thread(siyu);
shangwang.start();
siyu.shangji("路人甲", "123", 1);
siyu.shangji("路人乙", "234", 10);
siyu.shangji("路人丙", "345", 5);
}
catch(Exception e){
e.printStackTrace();
}
}
}
輸出
網咖開始營業
網名路人甲 身份證123交錢1塊,開始上機...
網名路人乙 身份證234交錢10塊,開始上機...
網名路人丙 身份證345交錢5塊,開始上機...
網名路人甲 身份證123時間到下機...
網名路人丙 身份證345時間到下機...
網名路人乙 身份證234時間到下機...
SynchronousQueue:
一種沒有緩衝的佇列,生產者產生的資料直接會被消費者獲取並消費
個人理解為虛擬佇列,這個佇列不存元素,生產與消費相互扔而已
final SynchronousQueue<String> q = new SynchronousQueue<String>();
Thread t1 = new Thread(new Runnable() {
@Override
public void run() {
try {
System.out.println(q.take());
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
t1.start();
Thread t2 = new Thread(new Runnable() {
@Override
public void run() {
q.add("asdasd");
}
});
t2.start();
---------輸出-------
asdasd
– 本文總結自前人經驗,總結得挺累的,特別感謝網際網路架構師白鶴翔老師