高併發之併發容器詳解(從入門到超神)
一、ConcurrentHashMap
在上面已經提到過 ConcurrentHashMap
, ConcurrentHashMap
相比 Hashtable
能夠進一步提高併發性,其原理圖如下:

ConcurrentHashMap原理
HashMap,Hashtable與ConcurrentHashMap都是實現的雜湊表資料結構,在隨機讀取的時候效率很高。Hashtable實現同步是利用synchronized關鍵字進行鎖定的,其是針對整張雜湊表進行鎖定的,即每次鎖住整張表讓執行緒獨佔,線上程安全的背後是巨大的浪費。ConcurrentHashMap和Hashtable主要區別就是圍繞著鎖的粒度進行區別以及如何區鎖定。
上圖中,左邊是Hashtable的實現方式,可以看到鎖住整個雜湊表;而右邊則是ConcurrentHashMap的實現方式,單獨鎖住每一個桶(segment).ConcurrentHashMap將雜湊表分為16個桶(預設值),諸如get(),put(),remove()等常用操作只鎖當前需要用到的桶,而size()才鎖定整張表。原來只能一個執行緒進入,現在卻能同時接受16個寫執行緒併發進入(寫執行緒需要鎖定,而讀執行緒幾乎不受限制),併發性的提升是顯而易見的。
而在迭代時,ConcurrentHashMap使用了不同於傳統集合的 快速失敗迭代器 (fast-fail iterator)的另一種迭代方式,稱為弱一致迭代器。在這種迭代方式中,當iterator被建立後集合再發生改變就不再是丟擲ConcurrentModificationException,取而代之的是在改變時例項化出新的資料從而不影響原有的資料,iterator完成後再將頭指標替換為新的資料,這樣iterator執行緒可以使用原來老的資料,而寫執行緒也可以併發的完成改變,更重要的,這保證了多個執行緒併發執行的連續性和擴充套件性,是效能提升的關鍵。
我們在上面闡述了ConcurrentHashMap的使用特點和原理,分別在同樣的一個高併發場景下,測試不同的方式產生的延時(ms):
Map<String, String> map = new ConcurrentHashMap<>();//483 Map<String, String> map = new ConcurrentSkipListMap<>(); //高併發並且排序 559 Map<String, String> map = new Hashtable<>(); //499 Map<String, String> map =Collections.synchronizedMap(new HashMap<>()); // 530 Map<String, String> map =Collections.synchronizedMap(new TreeMap()); //905
以ConcurrentLinkedQueue為例,他實現了Queue介面,例項化方式如下:
Queue<String> strs = new ConcurrentLinkedQueue<>();
新增元素的方法: offer()
取出隊頭的方法: poll()
判斷佇列長度: size()
對於雙端佇列,使用 ConcurrentLinkedDeque 型別來實現.
下面我們再看一個具體的例項:
public class T01_ConcurrentMap { public static void main(String[] args) { Map<String, String> map = new ConcurrentHashMap<String, String>(); //Map<String, String> map = new ConcurrentSkipListMap<String, String>(); //高併發並且排序 //Map<String, String> map = new Hashtable<>(); //Map<String, String> map = new HashMap<String, String>(); Random random = new Random(); Thread[] threads = new Thread[100]; CountDownLatch latch = new CountDownLatch(threads.length); long start = System.currentTimeMillis(); for (int i = 0; i < threads.length; i++) { threads[i] = new Thread(()->{ for(int j=0; j<10000;j++) map.put("a" + random.nextInt(100000), "a" + random.nextInt(100000)); latch.countDown(); }); } Arrays.asList(threads).forEach(t->t.start()); try { latch.await(); } catch (InterruptedException e) { e.printStackTrace(); } long end = System.currentTimeMillis(); System.out.println(end-start); } }
啟動100個執行緒,向圖中新增100000個元素,分別使用 Hashtable,HashMap,ConcurrentHashMap,ConcurrentSkipListMap
定義map,判斷程式完成的時間。最終發現,ConcurrentHashMap要比HashMap效率高,ConcurrentHashMap是將大鎖分成若干小鎖,實現多個執行緒共同執行,所以,效率有很大差距。ConcurrentSkipListMap較ConcurrentHashMap除了實現高併發外還能夠排序。
參考:
http://blog.csdn.net/sunxianghuang/article/details/52221913 http://www.educity.cn/java/498061.html
二、ConcurrentQueue
與ConcurrentHashMap相同,ConcurrentQueue也是通過同樣的方式來提高併發效能的。
我們在同步容器中提到過火車票問題:
有N張火車票,每張票都有一個編號,同時有10個視窗對外售票,寫一個模擬程式。
在上述問題中,也可以使用ConcurrentQueue進一步提高併發性:
static Queue<String> tickets = new ConcurrentLinkedQueue<>();
具體的程式碼是這樣的:
public class TicketSeller4 { static Queue<String> tickets = new ConcurrentLinkedQueue<>(); static { for(int i=0; i<1000; i++) tickets.add("票編號:" + i); } public static void main(String[] args) { for(int i=0; i<10; i++) { new Thread(()->{ while(true) { String s = tickets.poll(); if(s == null) break; else System.out.println("銷售了--" + s); } }).start(); } } }
這裡面通過ConcurrentLinkedQueue的poll()方法來實現獲取容器成員的。用這個型別可以進一步提高併發性。
具體基本操作例項
public class T04_ConcurrentQueue { public static void main(String[] args) { Queue<String> strings = new ConcurrentLinkedQueue<String>(); for (int i = 0; i < 10; i++) { strings.offer("a" + i); //相當於add, 放進佇列 } System.out.println(strings); System.out.println(strings.size()); System.out.println(strings.poll()); //取出並移除掉 System.out.println(strings.size()); System.out.println(strings.peek()); //取出,不會移除。相當於get(0) System.out.println(strings.size()); } }
三、CopyOnWriteArrayList
寫時複製容器,即 copy-on-write
,在多執行緒環境下,寫時效率低,讀時效率高,適合寫少讀多的環境。對比測試幾種情況:
List<String> lists = new ArrayList<>(); //這個會出併發問題!報錯:ArrayIndexOutOfBoundsException List<String> lists = new Vector();//111 ms List<String> lists = new CopyOnWriteArrayList<>();//5230 ms //測試核心程式碼: Runnable task = new Runnable() { @Override public void run() { for(int i=0; i<1000; i++) lists.add("a" +r.nextInt(10000)); } }; //多執行緒向該容器中不斷加入資料。
從JDK 5開始Java併發包裡提供了兩個使用CopyOnWrite機制實現的併發容器,它們是 CopyOnWriteArrayList
和 CopyOnWriteArraySet
。
當我們往一個容器新增元素的時候,不直接往當前容器新增,而是先將當前容器進行Copy,複製出一個新的容器,然後向新的容器裡新增元素,新增完元素之後,再將原容器的引用指向新的容器。這樣做的好處是我們可以對 CopyOnWrite
容器進行併發的讀,而不需要加鎖,因為在當前讀的容器中不會新增任何元素。所以 CopyOnWrite
容器是一種讀寫分離的思想,讀和寫對應不同的容器。
四、BlockingQueue
這種併發容器,會自動實現阻塞式的生產者/消費者模式。使用佇列解耦合,在實現非同步事物的時候很有用。下面的例子,實現了阻塞佇列:
LinkedBlockingQueue static BlockingQueue<String> strs = new LinkedBlockingQueue<>(10); strs.put("a" + i); //加入佇列,如果滿了,就會等待 strs.take(); //取出佇列元素,如果空了,就會等待
在例項化時,可以指定具體的佇列容量。
在加入成員的時候,除了使用put方法還可以使用其他方法:
Str.add(“aaa”); /* add如果在佇列滿了之後,再加入成員會丟擲異常,而這種情況下,put方法會一直等待被消費掉。 */ Str.offer(“aaa”); /* offer新增成員的時候,會有boolean型別的返回值,如果新增成功,會返回true,如果新增失敗,會返回false.除此之外,offer還可以按時段進行新增,例如: */ strs.offer("aaa", 1, TimeUnit.SECONDS); /* 如果佇列滿了,等待1秒,再進行成員的新增,如果新增失敗了,則返回false. */
五、ArrayBlockingQueue
static BlockingQueue<String> strs = new ArrayBlockingQueue<>(10);
物件的方法和上面的BlockingQueue是一樣的,用法也是一樣的。
二者的區別主要是:
LinkedBlockingQueue LinkedBlockingQueue
相比於陣列實現的 ArrayBlockingQueue
的有界情況,我們稱之為 有界佇列 , LinkedBlockingQueue
可認為是 無界佇列 。當然,也可以向上面那樣指定佇列容量,但是這個引數常常是省略的,多用於任務佇列。
六、LinkedBlockingQueue例項
public class T05_LinkedBlockingQueue { private static BlockingQueue<String> strings = new LinkedBlockingQueue<String>(); private static Random r = new Random(); public static void main(String[] args) { new Thread(()->{ for (int i = 0; i < 100; i++) { try { strings.put("a" + i); //如果滿了,就會等待 TimeUnit.SECONDS.sleep(r.nextInt(10)); } catch (Exception e) { e.printStackTrace(); } } }, "p1").start(); for (int i = 0; i < 5; i++) { new Thread(()->{ for(;;){ try { System.out.println(Thread.currentThread().getName() + "take -" + strings.take()); //如果空了,就會等待 } catch (Exception e) { e.printStackTrace(); } } },"c" + i).start(); } } }
LinkedBlockingQueue
是使用連結串列是實現的阻塞式容器。
七、DelayQueue
DelayQueue
也是一個 BlockingQueue
,其特化的引數是 Delayed
。
Delayed擴充套件了Comparable介面,比較的基準為延時的時間值,Delayed介面的實現類getDelay()的返回值應為固定值(final).DelayQueue內部是使用PriorityQueue實現的,即:
DelayQueue = BlockingQueue + PriorityQueue + Delayed
可以說, DelayQueue
是一個使用優先佇列(PriorityQueue)實現的 BlockingQueue
,優先佇列的比較基準值是時間。這是一個無界的BlockingQueue,用於放置實現了Delayed介面的物件,其中的物件只能在其到期時才能從佇列中取走。這種佇列是有序的,即隊頭物件的延遲到期時間最長。但是要注意的是,不能將null元素放置到這種佇列中。
Delayed,一種混合風格的介面,用來標記那些應該在給定延遲時間之後執行的物件。此介面的實現類必須重寫一個 compareTo() 方法,該方法提供與此介面的 getDelay()方法一致的排序。
DelayQueue
儲存的物件是實現了Delayed介面的物件,在這個物件中,需要重寫 compareTo()
和 getDelay()
方法,例如:
static class MyTask implements Delayed { long runningTime; MyTask(long rt) { this.runningTime = rt; } @Override public int compareTo(Delayed o) { if(this.getDelay(TimeUnit.MILLISECONDS) <o.getDelay(TimeUnit.MILLISECONDS)) return -1; else if(this.getDelay(TimeUnit.MILLISECONDS) > o.getDelay(TimeUnit.MILLISECONDS)) return 1; else return 0; } @Override public long getDelay(TimeUnit unit) { return unit.convert(runningTime -System.currentTimeMillis(), TimeUnit.MILLISECONDS); } }
因此,當我們在main()函式中,向該佇列加入元素後再取出元素的過程,就會存在延時,可以這樣驗證:
long now = System.currentTimeMillis(); MyTask t1 = new MyTask(now + 1000); MyTask t2 = new MyTask(now + 2000); MyTask t3 = new MyTask(now + 1500); MyTask t4 = new MyTask(now + 2500); MyTask t5 = new MyTask(now + 500); tasks.put(t1); tasks.put(t2); tasks.put(t3); tasks.put(t4); tasks.put(t5); System.out.println(tasks); for(int i=0; i<5; i++) { System.out.println(tasks.take()); }
注意:為了方便檢視到效果,可以重寫toString()函式,來保證打印出來的結果有意義:
例如:
@Override public String toString() { return "" + runningTime; }
DelayQueue
可以用在諸如用監控執行緒來輪詢是否有超時任務出現,來處理某些具有等待時延的情況,這樣,可以避免由於數量巨大造成的輪詢效率差的問題。例如:
- 關閉空閒連線:伺服器中,有很多客戶端的連線,空閒一段時間之後需要關閉他們。
- 快取:快取中的物件,超過了空閒時間,需要從快取中移出。
- 任務超時處理:在網路協議滑動視窗請求應答式互動時,處理超時未響應的請求。
例項:
public class T07_DelayQueue { private static BlockingQueue<MyTask> tasks = new DelayQueue<>(); private static Random r = new Random(); static class MyTask implements Delayed{ long runningTime; public MyTask(long rt) { this.runningTime = rt; } @Override public int compareTo(Delayed o) { if (this.getDelay(TimeUnit.MILLISECONDS) < o.getDelay(TimeUnit.MICROSECONDS)) { return -1; }else if (this.getDelay(TimeUnit.MILLISECONDS) > o.getDelay(TimeUnit.MILLISECONDS)) { return 1; }else { return 0; } } @Override public long getDelay(TimeUnit unit) { return unit.convert(runningTime - System.currentTimeMillis(), TimeUnit.MILLISECONDS); } @Override public String toString() { return "" + runningTime; } public static void main(String[] args) throws InterruptedException { long now = System.currentTimeMillis(); MyTask t1 = new MyTask(now + 1000); MyTask t2 = new MyTask(now + 2000); MyTask t3 = new MyTask(now + 1500); MyTask t4 = new MyTask(now + 2500); MyTask t5 = new MyTask(now + 500); tasks.put(t1); tasks.put(t2); tasks.put(t3); tasks.put(t4); tasks.put(t5); System.out.println(tasks); for (int i = 0; i < 5; i++) { System.out.println(tasks.take()); } } } }
八、LinkedTransferQueue
TransferQueue
是一個繼承了 BlockingQueue
的介面,並且增加若干新的方法。LinkedTransferQueue是TransferQueue介面的實現類,其定義為一個無界的佇列,具有先進先出(FIFO)的特性。
TransferQueue介面含有下面幾個重要方法:
-
transfer(E e)
若當前存在一個正在等待獲取的消費者執行緒,即立刻移交之;否則,會插入當前元素e到佇列尾部,並且等待進入阻塞狀態,到有消費者執行緒取走該元素。
-
tryTransfer(E e)
若當前存在一個正在等待獲取的消費者執行緒(使用take()或者poll()函式),使用該方法會即刻轉移/傳輸物件元素e;若不存在,則返回false,並且不進入佇列。這是一個不阻塞的操作。
-
tryTransfer(E e,long timeout,TimeUnit unit)
若當前存在一個正在等待獲取的消費者執行緒,會立即傳輸給它;否則將插入元素e到佇列尾部,並且等待被消費者執行緒獲取消費掉;若在指定的時間內元素e無法被消費者執行緒獲取,則返回false,同時該元素被移除。
-
hasWaitingConsumer()
判斷是否存在消費者執行緒。
-
getWaitingConsumerCount()
獲取所有等待獲取元素的消費執行緒數量。
-
size()
因為佇列的非同步特性,檢測當前佇列的元素個數需要逐一迭代,無法保證原子性,可能會得到一個不太準確的結果,尤其是在遍歷時有可能佇列發生更改。
使用方法:
LinkedTransferQueue<String> strs = new LinkedTransferQueue<>();//例項化
如果當前沒有消費者執行緒(存在take方法的執行緒):
strs.transfer("aaa");
該方法會一直阻塞在這裡,知道有消費者執行緒存在。
而如果使用傳統的put()方法來加入元素的話,則不會發生阻塞現象。
strs.take()
同樣,獲取佇列中元素的方法take()也是阻塞在這裡等待獲取新的元素的。
九、SynchronousQueue
SynchronousQueue
也是一種 BlockingQueue
,是一種無緩衝的等待佇列。所以,在某次新增元素後必須等待其他執行緒取走後才能繼續新增;可以認為SynchronousQueue是一個快取值為0的阻塞佇列(也可以認為是1),它的 isEmpty()
方法永遠返回是true, remainingCapacity()
方法永遠返回是0.
remove()
和 removeAll()
方法永遠返回是false, iterator()
方法永遠返回空, peek()
方法永遠返回null.
在使用put()方法時,會一直阻塞在這裡,等待被消費:
BlockingQueue strs = new SynchronousQueue<>();//例項化 strs.put(“aaa”); //阻塞等待消費者消費 strs.add(“aaa”);//會產生異常,提示佇列滿了 strs.take();//該方法可以取出元素,同樣是阻塞的,需要線上程中去實現他,作為消費者.
例項:
public class T09_Synchronized { public static void main(String[] args) throws InterruptedException { BlockingQueue<String> strings = new SynchronousQueue<String>(); new Thread(()->{ try { System.out.println(strings.take()); } catch (Exception e) { e.printStackTrace(); } }).start(); strings.put("aaa"); //阻塞等待消費者消費 //strings.add("aaa"); System.out.println(strings.size()); } }
參考資料
https://blog.csdn.net/qq_34707744/article/details/79746622 https://blog.csdn.net/wang7807564/article/details/80048576