1. 程式人生 > >Java高並發程序設計學習筆記(五):JDK並發包(各種同步控制工具的使用、並發容器及典型源碼分析(Hashmap等))

Java高並發程序設計學習筆記(五):JDK並發包(各種同步控制工具的使用、並發容器及典型源碼分析(Hashmap等))

pin 指定 timeunit executors .sh 部分 現象 arr span

轉自:https://blog.csdn.net/dataiyangu/article/details/86491786#2__696

1. 各種同步控制工具的使用
1.1. ReentrantLock
1.1.1.可重入
1.1.2. 可中斷 lockInterruptibly()
1.1.3. 可限時
1.1.4. 公平鎖
1.2. Condition
1.2.1. 概述
1.2.2. 主要接口
1.2.3. API詳解
1.3. Semaphore
1.3.1. 概述
1.3.2. 主要接口
1.4. ReadWriteLock
1.4.1. 概述
1.4.2. 訪問情況
1.4.3. 主要接口
1.5. CountDownLatch

1.5.1. 概述
1.5.2. 主要接口
1.5.3. 示意圖
例子
1.6. CyclicBarrier
1.6.1. 概述
1.6.2. 主要接口
1.6.3. 示意圖
例子
1.7. LockSupport
1.7.1. 概述
1.7.2. 主要接口
1.7.3. 與suspend()比較
1.7.4. 中斷響應
例子
1.8. ReentrantLock 的實現
1.8.1. CAS狀態
1.8.2. 等待隊列
1.8.3. park()
源碼
2. 並發容器及典型源碼分析
2.1. 集合包裝
2.1.1. HashMap
源碼
2.1.2. List
2.1.3. Set
2.2. ConcurrentHashMap
2.2.1HashMap源碼分析
2.2.2. ConcurrentHashMap源碼分析
2.3. BlockingQueue
ArrayBlockingQueue源碼
2.4. ConcurrentLinkedQueue
1. 各種同步控制工具的使用
1.1. ReentrantLock
在synchronize的基礎上新加了功能,如果是特別簡單的場景,兩者是沒有太大的區別的。

1.1.1.可重入
單線程可以重復進入,但要重復退出
對於同一個線程,自己是可以重復進入的,否則會把自己卡死。
結論:
重入鎖是可重復獲得資源的鎖,已經獲得鎖的線程可以對當前的資源重入加鎖而不會引起阻塞;不可重入鎖是不可重復獲得資源的鎖,當已經獲得鎖的線程對當前資源再次加鎖時,會把自己阻塞

廣義上的可重入鎖指的是可重復可遞歸調用的鎖,在外層使用鎖之後,在內層仍然可以使用,並且不發生死鎖(前提得是同一個對象或者class),這樣的鎖就叫做可重入鎖。ReentrantLock和synchronized都是可重入鎖,下面是一個用synchronized實現的例子:

public class ReentrantTest implements Runnable {

public synchronized void get() {
System.out.println(Thread.currentThread().getName());
set();
}

public synchronized void set() {
System.out.println(Thread.currentThread().getName());
}

public void run() {
get();
}

public static void main(String[] args) {
ReentrantTest rt = new ReentrantTest();
for(;;){
new Thread(rt).start();
}
}
}

整個過程沒有發生死鎖的情況,截取一部分輸出結果如下:

Thread-8492
Thread-8492
Thread-8494
Thread-8494
Thread-8495
Thread-8495
Thread-8493
Thread-8493



set()和get()同時輸出了線程名稱,表明即使遞歸使用synchronized也沒有發生死鎖,證明其是可重入的。

以上原文:https://blog.csdn.net/rickiyeat/article/details/78314451

import java.util.concurrent.locks.ReentrantLock;

public class Lock implements Runnable{
public static ReentrantLock lock = new ReentrantLock();
public static int i = 0;
public void run() {
for (int j = 0; j < 100000; j++) {
lock.lock();
lock.lock();
try {
i++;
} finally {
//不論如何都進行unlock
//加兩次鎖必須解鎖兩次。
lock.unlock();
lock.unlock();
}

}
}
//synchronize只要在大括號外面,出了大括號,虛擬機會自動釋放鎖,可是lock是通過unlock來控制什麽時候釋放鎖

public static void main(String[] args) throws InterruptedException {
Lock tl = new Lock();
//同時開兩個一樣的線程
Thread t1 = new Thread(tl);
Thread t2 = new Thread(tl);
t1.start();t2.start();
t1.join();t2.join();
System.out.println(i);
}
}

技術分享圖片

1.1.2. 可中斷 lockInterruptibly()
在發生死鎖或者其他的導致長期等待的情況,希望鎖停下來的功能,synchronize沒有這個功能,ReentrantLock提供了可中斷。

import java.util.concurrent.locks.ReentrantLock;

public class Interruptible implements Runnable{
public static ReentrantLock lock1 = new ReentrantLock();
public static ReentrantLock lock2 = new ReentrantLock();
int lock;
public Interruptible(int lock){
this.lock = lock;
}

public void run() {
try{
//lock=1的時候lock1上鎖,lock=2的時候lock2上鎖,剛才的lock1上了鎖
//之後還要給lock2上鎖,lock2上了鎖之後還要給lock1上鎖,
// 可是lock2在lock=2的時候被locl=2拿到了,lock1在lock=1的時候
//被locl=1拿到了所以形成了死鎖
if (lock == 1){
lock1.lockInterruptibly();
try{
Thread.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}
lock2.lockInterruptibly();
}else{
lock2.lockInterruptibly();
try{
Thread.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}
lock1.lockInterruptibly();
}
} catch (InterruptedException e) {
e.printStackTrace();
}finally {
//如果lock1還拿著這把鎖的話,就解掉。
if (lock1.isHeldByCurrentThread())
lock1.unlock();
if (lock2.isHeldByCurrentThread())
lock2.unlock();
System.out.println(Thread.currentThread().getId()+":線程退出");
}
}

public static void main(String[] args) throws InterruptedException {
Interruptible interruptible1 = new Interruptible(1);
Interruptible interruptible2 = new Interruptible(2);
Thread t1 = new Thread(interruptible1);
Thread t2 = new Thread(interruptible2);
t1.start();t2.start();
Thread.sleep(1000);
DeadlockChecker.check();
}
}

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
DeadlockChecker.check();如果將這句話註釋,將會進行無限期的死鎖。

import java.lang.management.ManagementFactory;
import java.lang.management.ThreadInfo;
import java.lang.management.ThreadMXBean;

public class DeadLockChecker {
private final static ThreadMXBean mbean = ManagementFactory.getThreadMXBean();
final static Runnable deadLockCheck = new Runnable() {
public void run() {
while (true) {
long [] deadLockedThreadIds = mbean.findDeadlockedThreads();
if (deadLockedThreadIds != null) {
ThreadInfo[] threadInfos = mbean.getThreadInfo(deadLockedThreadIds);
for (Thread t : Thread.getAllStackTraces().keySet()) {
for (int i = 0; i<threadInfos.length ; i++) {
if (t.getId() == threadInfos[i].getThreadId()) {
t.interrupt();
}
}
}
}
try{
Thread.sleep(5000);
}catch (InterruptedException e){

}
}
}
};

public static void check(){
Thread t = new Thread(deadLockCheck);
t.setDaemon(true);
t.start();
}
}

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
1.1.3. 可限時
超時不能獲得鎖,就返回false,不會永久等待構成死鎖

import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantLock;

public class TimeLock implements Runnable{
public static ReentrantLock lock = new ReentrantLock();

public void run() {
try{
//如果線程在5秒之內沒有拿到鎖就走else裏面的內容
if (lock.tryLock(5, TimeUnit.SECONDS)) {
Thread.sleep(6000);
}else{
System.out.println("get lock failed");
}
} catch (InterruptedException e) {
e.printStackTrace();
}finally {
if (lock.isHeldByCurrentThread()) {
lock.unlock();
}
}
}

public static void main(String[] args) {
TimeLock tl = new TimeLock();
Thread t1 = new Thread(tl);
Thread t2 = new Thread(t1);
t1.start();
t2.start();

}
}

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
1.1.4. 公平鎖
什麽是公平鎖?
先來先得,避免產生饑餓現象,但是性能差很多。所以不是特殊情況下,不要使用公平鎖。
public ReentrantLock(boolean fair)
public static ReentrantLock fairLock = new ReentrantLock(true);

1.2. Condition
1.2.1. 概述
類似於 Object.wait()和Object.notify() 與ReentrantLock結合使用

1.2.2. 主要接口
void await() throws InterruptedException;//等待
void awaitUninterruptibly();
long awaitNanos(long nanosTimeout) throws InterruptedException; boolean await(long time, TimeUnit unit) throws InterruptedException; boolean awaitUntil(Date deadline) throws InterruptedException;
void signal();//通知繼續往下走
void signalAll();

import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;

public class ReenTerLockCondition implements Runnable {
public static ReentrantLock lock = new ReentrantLock();
public static Condition condition = lock.newCondition();

public void run() {
try{
lock.lock();
condition.await();
System.out.println("Thread is going on ");
} catch (InterruptedException e) {
e.printStackTrace();
}finally {
lock.unlock();
}
}

public static void main(String[] args) throws InterruptedException {
ReenTerLockCondition tl = new ReenTerLockCondition();
Thread t1 = new Thread(tl);
t1.start();
Thread.sleep(2000);
lock.lock();
condition.signal();
lock.unlock();
}
}
//必須在lock和unlock執行完了之後才會到上面將t1的鎖解開。
//類似於之前的Synchronize代碼塊。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
1.2.3. API詳解
await()方法會使當前線程等待,同時釋放當前鎖,當其他線程中使用signal()時或者signalAll()方法時,線
程會重新獲得鎖並繼續執行。或者當線程被中斷時,也能跳出等待。這和Object.wait()方法很相似。

awaitUninterruptibly()方法與await()方法基本相同,但是它並不會再等待過程中響應中斷。不會被中斷。
singal()方法用於喚醒一個在等待中的線程。相對的singalAll()方法會喚醒所有在等待中的線程。這和Obej ct.notify()方法很類似。

1.3. Semaphore
信號量
互斥的、排他的
可以有多個線程去共享這個臨界區,廣義上的鎖,比如每個信號量指定十個許可,每個許可分配若幹個線程,當然每個線程也可以持有多個許可,如果有多余的許可就可以進入,如果沒有許可,後面的線程就必須等待,不能進入。
信號量允許多個線程進入臨界區,信號量的許可等於一的時候就相當於一把鎖。

1.3.1. 概述
共享鎖 運行多個線程同時臨界區

1.3.2. 主要接口
public void acquire()//獲得信號量

public void acquire() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}
//也可以拿到多個許可
public void acquire(int permits) throws InterruptedException {
if (permits < 0) throw new IllegalArgumentException();
sync.acquireSharedInterruptibly(permits);
}
1
2
3
4
5
6
7
8
public void acquireUninterruptibly()//不支持中斷的獲得信號量
public boolean tryAcquire()//不會等待,只是試一試,拿不到就返回false
public boolean tryAcquire(long timeout, TimeUnit unit)//try的時間,等待的時間,和上線功能差差不多
public void release()釋放。
例子:

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;

public class SemapDemo implements Runnable {
final Semaphore semp = new Semaphore(5);

public void run() {
try{
//也可以semp.acquire(2);下面將會每兩個線程輸出一次
semp.acquire();
Thread.sleep(2000);
System.out.println(Thread.currentThread().getId()+":done!");
} catch (InterruptedException e) {
e.printStackTrace();
}finally {
semp.release();
}
}

public static void main(String[] args) {
ExecutorService exec = Executors.newFixedThreadPool(20);
final SemapDemo demo = new SemapDemo();
for (int i = 0; i <20 ; i++) {
exec.submit(demo);
}
}
}

技術分享圖片

可以看到每五個線程輸出一次,每次停留兩秒,同樣的如果semp.acquire(2);每個線程拿到兩個許可,一共有五個許可,release(2),所以每次只有兩個線程輸出。

1.4. ReadWriteLock
寫會修改數據,讀不會修改數據,從性能上來看,如果不管讀還是寫都加鎖,會十分影響性能,synchronized並行度只有一,一次只允許一個線程經過。如果沒有寫操作,這個所有的read線程必然是無等待的。

1.4.1. 概述
ReadWriteLock是JDK5中提供的讀寫分離鎖

1.4.2. 訪問情況
讀-讀不互斥:讀讀之間不阻塞。
讀-寫互斥:讀阻塞寫,寫也會阻塞讀。
寫-寫互斥:寫寫阻塞。
技術分享圖片

1.4.3. 主要接口
private static ReentrantReadWriteLock readWriteLock=newReentrantReadWriteLock();
private static Lock readLock = readWriteLock.readLock();
獲得readLock,然後通過readLock.lock()驚醒操作
private static Lock writeLock = readWriteLock.writeLock();
獲得readLock,然後通過readLock.lock()驚醒操作

1.5. CountDownLatch
1.5.1. 概述
倒數計時器 一種典型的場景就是火箭發射。在火箭發射前,為了保證萬無一失,往往還要進行各項設備、儀器的檢查。 只有等所有檢查完畢後,引擎才能點火。這種場景就非常適合使用CountDownLatch。它可以使得點火線程 ,等待所有檢查線程全部完工後,再執行

1.5.2. 主要接口
static final CountDownLatch end = new CountDownLatch(10); //加入有十個檢查項(十個線程)
end.countDown();//沒通過一個就減一
end.await();//見到最後就返回,執行比如發射火箭的操作

1.5.3. 示意圖

技術分享圖片

主線程在這裏等待,等到所有的檢查任務都到達臨界點之後,主線程就繼續執行。

例子
import java.util.Random;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class CountDownLatchDemo implements Runnable {
static final CountDownLatch end = new CountDownLatch(10);//開啟十個線程來進行檢查
static final CountDownLatchDemo demo = new CountDownLatchDemo();
public void run() {
try {
Thread.sleep(new Random().nextInt(10)*1000);
System.out.println("check complete");
end.countDown();
} catch (InterruptedException e) {
e.printStackTrace();
}
}

public static void main(String[] args) throws InterruptedException {
ExecutorService exec = Executors.newFixedThreadPool(10);
for (int i = 0; i <10 ; i++) {
exec.submit(demo);
}
//等待檢查
end.await();
//發射火箭
System.out.println("Fire");
exec.shutdown();
}
}

技術分享圖片

1.6. CyclicBarrier
1.6.1. 概述
循環柵欄 Cyclic意為循環,也就是說這個計數器可以反復使用。比如,假設我們將計數器設置為10。那麽湊齊第一批1 0個線程後,計數器就會歸零,然後接著湊齊下一批10個線程
它和CountDownLatch的區別是CountDownLatch的計數器只能有一次,而CyclicBarrier可以再重復利用。

1.6.2. 主要接口
public CyclicBarrier(int parties(幾個參與者相當於上一節中的10), Runnable barrierAction(所有的線程到了之後,柵欄(系統)執行的動作))
barrierAction就是當計數器一次計數完成後,系統會執行的動作
await() //都到了之後再往下執行

1.6.3. 示意圖
技術分享圖片

例子
import java.util.Random;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;

public class CyclicBarrierDemo {
public static class Soldier implements Runnable{
private String soldier;
private final CyclicBarrier cyclic;

Soldier(CyclicBarrier cyclic,String soldierName) {
this.soldier = soldierName;
this.cyclic = cyclic;
}

public void run() {
try {
cyclic.await();
doWork();
cyclic.await();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (BrokenBarrierException e) {
e.printStackTrace();
}
}

void doWork(){
try {
Thread.sleep(Math.abs(new Random().nextInt()%10000));
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(soldier+":任務完成");
}
}
public static class BarrierRun implements Runnable{
boolean flag;
int N;

public BarrierRun(boolean flag, int N) {
this.flag = flag;
this.N = N;
}
public void run() {
if (flag) {
System.out.println("司令:[士兵"+N+"個,任務完成!]");
}else{
System.out.println("司令:[士兵"+N+"個,集合完畢!]");
flag = true;
}
}
}

public static void main(String[] args) {
final int N = 10;
Thread[] allSoldier = new Thread[N];
boolean flag = false;
CyclicBarrier cyclic = new CyclicBarrier(N, new BarrierRun(flag, N));
System.out.println("集合隊伍!");
for (int i = 0; i <N ; i++) {
System.out.println("士兵"+i+"報道!");
allSoldier[i] = new Thread(new Soldier(cyclic, "士兵" + i));
allSoldier[i].start();
// if (i == 5) {
// allSoldier[0].interrupt();
// }
}
}
}
技術分享圖片

//			if (i == 5) {
//				allSoldier[0].interrupt();
//			}

將這個註釋打開的話

技術分享圖片

第零個終端之後,將會拋出InterruptedException異常,剩下的九個線程將會拋出BrokenBarrierException異常。

1.7. LockSupport
1.7.1. 概述
比較偏底層。
提供線程阻塞原語
掛起

1.7.2. 主要接口
LockSupport.park(); //停下來、掛起,將線程掛起,除非許可是可用的。
LockSupport.unpark(t1);//將t1繼續執行,使得許可是可用的

1.7.3. 與suspend()比較
LockSupport不容易引起線程凍結,suspend將來可能會被廢棄。
有點類似於信號量中的許可,如果unpark發生在park之前,park並不會將線程阻塞住。
如果rewiew發生在suspend之前,suspend就不能再繼續執行了,永久掛起。

1.7.4. 中斷響應
wait等是能夠try catch 終端異常的,但是park是沒有捕獲這個異常的,所以:
能夠響應中斷,但不拋出異常。
中斷響應的結果是,park()函數的返回,可以從Thread.interrupted()得到中斷標誌

例子
import java.util.concurrent.locks.LockSupport;

public class LockSupportDemo {
public static Object u = new Object();
static ChangeObjectThread t1 = new ChangeObjectThread("t1");
static ChangeObjectThread t2 = new ChangeObjectThread("t2");

public static class ChangeObjectThread extends Thread{
public ChangeObjectThread(String name) {
super.setName(name);
}

@Override
public void run() {
synchronized (u){
System.out.println("in"+getName());
LockSupport.park();
}
}
}

public static void main(String[] args) throws InterruptedException {
t1.start();
Thread.sleep(100);
t2.start();
LockSupport.unpark(t1);
LockSupport.unpark(t2);
t1.join();
t2.join();
}
}

技術分享圖片

不論unpark在park的前面還是後面都不會阻塞。

1.8. ReentrantLock 的實現
1.8.1. CAS狀態
鎖到底有沒有被人占用,通過是否到達期望值,通過值是否改變來判斷是否應該拿到鎖。

1.8.2. 等待隊列
如果沒有拿到鎖,線程應該怎麽辦呢,應該進入等待的隊列,如果有多個線程進來,多個線程在隊列中進行等待。

1.8.3. park()
在隊列中的線程都進行park操作。unlock的時候,從等待的隊列中挑出一個出來進行unpark操作。

源碼
/**
* Performs lock. Try immediate barge, backing up to normal
* acquire on failure.
*/
final void lock() {
if (compareAndSetState(0, 1))//期望值是0,更新成1
//如果成功了,便能拿到鎖,就能繼續往下走。
setExclusiveOwnerThread(Thread.currentThread());
else
//否則就嘗試去做申請。
acquire(1);
}
protected final boolean compareAndSetState(int expect, int update) {
// See below for intrinsics setup to support this
return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
}
public final void acquire(int arg) {
//在嘗試一下,萬一別人釋放了呢?
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
//如果嘗試了還是不行就把自己放到等待隊列中去
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
//放到隊列的尾巴上去。
Node pred = tail;
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
enq(node);
return node;
}
//返回的node然後再去嘗試請求鎖
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
//如果確實是拿不到就會在這裏掛起,下面會有詳細
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
//通過park進行掛起
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this);
return Thread.interrupted();
}


//unlock操作
public void unlock() {
sync.release(1);
}
*/
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
//將隊列頭部的node進行unpark操作
unparkSuccessor(h);
return true;
}
return false;
}

private void unparkSuccessor(Node node) {
/*
* If status is negative (i.e., possibly needing signal) try
* to clear in anticipation of signalling. It is OK if this
* fails or if status is changed by waiting thread.
*/
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);

/*
* Thread to unpark is held in successor, which is normally
* just the next node. But if cancelled or apparently null,
* traverse backwards from tail to find the actual
* non-cancelled successor.
*/
Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
LockSupport.unpark(s.thread);
}

2. 並發容器及典型源碼分析
2.1. 集合包裝
2.1.1. HashMap
HashMap不是一個線程安全的容器,可以通過如下的方法變成線程安全的,但是只適用於並發量比較小的情況。
Collections.synchronizedMap
技術分享圖片

源碼
private static class SynchronizedMap<K,V>
implements Map<K,V>, Serializable {
private static final long serialVersionUID = 1978198479659022715L;

private final Map<K,V> m; // Backing Map
final Object mutex; // Object on which to synchronize

SynchronizedMap(Map<K,V> m) {
if (m==null)
throw new NullPointerException();
this.m = m;
mutex = this;
}

SynchronizedMap(Map<K,V> m, Object mutex) {
this.m = m;
this.mutex = mutex;
}

public int size() {
synchronized (mutex) {return m.size();}
}
public boolean isEmpty() {
synchronized (mutex) {return m.isEmpty();}
}
public boolean containsKey(Object key) {
synchronized (mutex) {return m.containsKey(key);}
}
public boolean containsValue(Object value) {
synchronized (mutex) {return m.containsValue(value);}
}
public V get(Object key) {
synchronized (mutex) {return m.get(key);}
}

public V put(K key, V value) {
synchronized (mutex) {return m.put(key, value);}
}
public V remove(Object key) {
synchronized (mutex) {return m.remove(key);}
}
public void putAll(Map<? extends K, ? extends V> map) {
synchronized (mutex) {m.putAll(map);}
}
public void clear() {
synchronized (mutex) {m.clear();}
}

private transient Set<K> keySet = null;
private transient Set<Map.Entry<K,V>> entrySet = null;
private transient Collection<V> values = null;

public Set<K> keySet() {
synchronized (mutex) {
if (keySet==null)
keySet = new SynchronizedSet<>(m.keySet(), mutex);
return keySet;
}
}

public Set<Map.Entry<K,V>> entrySet() {
synchronized (mutex) {
if (entrySet==null)
entrySet = new SynchronizedSet<>(m.entrySet(), mutex);
return entrySet;
}
}

public Collection<V> values() {
synchronized (mutex) {
if (values==null)
values = new SynchronizedCollection<>(m.values(), mutex);
return values;
}
}

public boolean equals(Object o) {
if (this == o)
return true;
synchronized (mutex) {return m.equals(o);}
}
public int hashCode() {
synchronized (mutex) {return m.hashCode();}
}
public String toString() {
synchronized (mutex) {return m.toString();}
}
private void writeObject(ObjectOutputStream s) throws IOException {
synchronized (mutex) {s.defaultWriteObject();}
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
通過觀察分析,是將map封裝到了synchronizeMap中,並且,將get put 等操作都放在了Synchronize代碼塊中,下面的List和Set同理,因為是放在了synchronize代碼塊中所以是串行的不是並行的,只能適用於並發量比較小的場景。

2.1.2. List
synchronizedList

2.1.3. Set
synchronizedSet

2.2. ConcurrentHashMap
高性能HashMap(解決synchronize只適用於並發量小的場景)

2.2.1HashMap源碼分析
HashMap內部是一個數組
拿put方法來看

public V put(K key, V value) {
if (table == EMPTY_TABLE) {
inflateTable(threshold);
}
if (key == null)
return putForNullKey(value);
int hash = hash(key);
int i = indexFor(hash, table.length);
for (Entry<K,V> e = table[i]; e != null; e = e.next) {
Object k;
if (e.hash == hash && ((k = e.key) == key || key.equals(k))) {
V oldValue = e.value;
e.value = value;
e.recordAccess(this);
return oldValue;
}
}

modCount++;
addEntry(hash, key, value, i);
return null;
}
//進入inflateTable方法
private void inflateTable(int toSize) {
// Find a power of 2 >= toSize
int capacity = roundUpToPowerOf2(toSize);

threshold = (int) Math.min(capacity * loadFactor, MAXIMUM_CAPACITY + 1);
table = new Entry[capacity];
initHashSeedAsNeeded(capacity);
}
//看到是將疏浚存到了table中
transient Entry<K,V>[] table = (Entry<K,V>[]) EMPTY_TABLE;
//每一個table中是entry表象
static class Entry<K,V> implements Map.Entry<K,V> {
//每個entry裏面是key value next hash
final K key;
V value;
Entry<K,V> next;
int hash;

/**
* Creates new entry.
*/
Entry(int h, K k, V v, Entry<K,V> n) {
value = v;
next = n;
key = k;
hash = h;
}

public final K getKey() {
return key;
}

public final V getValue() {
return value;
}

public final V setValue(V newValue) {
V oldValue = value;
value = newValue;
return oldValue;
}

public final boolean equals(Object o) {
if (!(o instanceof Map.Entry))
return false;
Map.Entry e = (Map.Entry)o;
Object k1 = getKey();
Object k2 = e.getKey();
if (k1 == k2 || (k1 != null && k1.equals(k2))) {
Object v1 = getValue();
Object v2 = e.getValue();
if (v1 == v2 || (v1 != null && v1.equals(v2)))
return true;
}
return false;
}

public final int hashCode() {
return Objects.hashCode(getKey()) ^ Objects.hashCode(getValue());
}

public final String toString() {
return getKey() + "=" + getValue();
}

/**
* This method is invoked whenever the value in an entry is
* overwritten by an invocation of put(k,v) for a key k that‘s already
* in the HashMap.
*/
void recordAccess(HashMap<K,V> m) {
}

/**
* This method is invoked whenever the entry is
* removed from the table.
*/
void recordRemoval(HashMap<K,V> m) {
}
}
技術分享圖片

get(key)是通過哈希算法來判斷應該映射到哪個槽位的
兩個不同的key比如,key1 key2可能被映射到同一個槽位中,這裏叫做哈希沖突
一種解決方法是,既然你映射到了同一個槽位中,我就把你方法同一個槽位中,可是在一個entry的數組中如何放兩個entry呢??通過entry中的next指向下一個entry

事實上HashMap的內部主要實現是數組,數組中放著entry,每個entry都是鏈表中的一環,鏈表的頭部,當發生大量的hash沖突的時候蛻化成一個鏈表。

一般情況下HashMap不會放滿,因為放滿之後必然產生沖突,所以一般HashMap需要預留空間

2.2.2. ConcurrentHashMap源碼分析
public V put(K key, V value) {
Segment<K,V> s;
if (value == null)
throw new NullPointerException();
int hash = hash(key);
int j = (hash >>> segmentShift) & segmentMask;
if ((s = (Segment<K,V>)UNSAFE.getObject // nonvolatile; recheck
(segments, (j << SSHIFT) + SBASE)) == null) // in ensureSegment
s = ensureSegment(j);
return s.put(key, hash, value, false);
}
1
Segment 段
如果大量線程進來,會一起競爭HashMap的賦值操作
把大的HashMap切割成若幹個小的HashMap,每個線程進來的時候,先把當前的key映射到其中的一個小HashMap中去,在小HashMap中做一個普通HashMap應該做的事情,假如大的HashMap中有十六個小的HashMap,意味著大的HashMap可以同時接受十六個線程的賦值操作,相比於之前只有一個HasnMap,性能提高了十六倍。

這裏的Segment就是上面說的小HashMap,通過移位操作拿到當前的偏移量如果不存在就創建一個:

private Segment<K,V> ensureSegment(int k) {
final Segment<K,V>[] ss = this.segments;
long u = (k << SSHIFT) + SBASE; // raw offset
Segment<K,V> seg;
if ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u)) == null) {
Segment<K,V> proto = ss[0]; // use segment 0 as prototype
int cap = proto.table.length;
float lf = proto.loadFactor;
int threshold = (int)(cap * lf);
HashEntry<K,V>[] tab = (HashEntry<K,V>[])new HashEntry[cap];
if ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u))
== null) { // recheck
Segment<K,V> s = new Segment<K,V>(lf, threshold, tab);
while ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u))
== null) {
if (UNSAFE.compareAndSwapObject(ss, u, null, seg = s))
break;
}
}
}
return seg;
}

將第零個Segment作為原型將第k個Segment設置出來。

註意這裏的put雖然是線程安全的但是並沒有使用鎖。

上面的s.put():


final V put(K key, int hash, V value, boolean onlyIfAbsent) {
//這裏的tryLock還是一個CAS操作tryLock不會等待,Lock才會等待
HashEntry<K,V> node = tryLock() ? null :
//如果trylock失敗,代碼在線面詳細解釋這個函數
scanAndLockForPut(key, hash, value);
V oldValue;
try {
HashEntry<K,V>[] tab = table;
int index = (tab.length - 1) & hash;
槽位映射到index上,拿出index中的第一個
HashEntry<K,V> first = entryAt(tab, index);
//嘗試將first插到entry中
for (HashEntry<K,V> e = first;;) {
//如果有hash沖突
if (e != null) {
K k;
if ((k = e.key) == key ||
(e.hash == hash && key.equals(k))) {
oldValue = e.value;
if (!onlyIfAbsent) {
e.value = value;
++modCount;
}
break;
}
//有Hash沖突就將value串起來
e = e.next;
}
//如果沒有hash沖突
else {
if (node != null)
node.setNext(first);
else
node = new HashEntry<K,V>(hash, key, value, first);
int c = count + 1;
if (c > threshold && tab.length < MAXIMUM_CAPACITY)
rehash(node);
else
//就將entry set到數組中去
setEntryAt(tab, index, node);
++modCount;
count = c;
oldValue = null;
break;
}
}
} finally {
unlock();
}
return oldValue;
}

private HashEntry<K,V> scanAndLockForPut(K key, int hash, V value) {
HashEntry<K,V> first = entryForHash(this, hash);
HashEntry<K,V> e = first;
HashEntry<K,V> node = null;
int retries = -1; // negative while locating node
//不斷的trylock
while (!tryLock()) {
HashEntry<K,V> f; // to recheck first below
if (retries < 0) {
if (e == null) {
if (node == null) // speculatively create node
node = new HashEntry<K,V>(hash, key, value, null);
retries = 0;
}
else if (key.equals(e.key))
retries = 0;
else
e = e.next;
}
//如果trylock超過了一定的次數,就會掛起
else if (++retries > MAX_SCAN_RETRIES) {
lock();
break;
}
else if ((retries & 1) == 0 &&
//這裏可能是hash進行了擴容重hash等操作,將retrues復制為-1,
//再不斷的進行trylock
(f = entryForHash(this, hash)) != first) {
e = first = f; // re-traverse if entry changed
retries = -1;
}
}
return node;
}


這裏的不斷的trylock,如果到達一定的次數在掛起,是concurrentHashMap的核心優化
但是註意在concurrentHashMap中有一個size操作中的:

public int size() {
// Try a few times to get accurate count. On failure due to
// continuous async changes in table, resort to locking.
final Segment<K,V>[] segments = this.segments;
int size;
boolean overflow; // true if size overflows 32 bits
long sum; // sum of modCounts
long last = 0L; // previous sum
int retries = -1; // first iteration isn‘t retry
try {
for (;;) {
if (retries++ == RETRIES_BEFORE_LOCK) {
for (int j = 0; j < segments.length; ++j)
ensureSegment(j).lock(); // force creation
}
sum = 0L;
size = 0;
overflow = false;
for (int j = 0; j < segments.length; ++j) {
Segment<K,V> seg = segmentAt(segments, j);
if (seg != null) {
sum += seg.modCount;
int c = seg.count;
if (c < 0 || (size += c) < 0)
overflow = true;
}
}
if (sum == last)
break;
last = sum;
}
} finally {
if (retries > RETRIES_BEFORE_LOCK) {
for (int j = 0; j < segments.length; ++j)
segmentAt(segments, j).unlock();
}
}
return overflow ? Integer.MAX_VALUE : size;
}


上面的代碼中重復進行了lock和unlock,因為在想要得到map的size的時候是不能夠再進行修改的,所以加上鎖,這裏可能會有性能問題,可是size操作用的並不是很頻繁,所以可以忽略。

rehash操作:

//在put操作中
//如果大於了hash的閾值,就會進行rehash
if (c > threshold && tab.length < MAXIMUM_CAPACITY)
rehash(node);

/**
* Doubles size of table and repacks entries, also adding the
* given node to new table
*/
private void rehash(HashEntry<K,V> node) {
HashEntry<K,V>[] oldTable = table;
int oldCapacity = oldTable.length;
int newCapacity = oldCapacity << 1;
threshold = (int)(newCapacity * loadFactor);
HashEntry<K,V>[] newTable =
(HashEntry<K,V>[]) new HashEntry[newCapacity];
int sizeMask = newCapacity - 1;
for (int i = 0; i < oldCapacity ; i++) {
HashEntry<K,V> e = oldTable[i];
if (e != null) {
HashEntry<K,V> next = e.next;
int idx = e.hash & sizeMask;
if (next == null) // Single node on list
newTable[idx] = e;
else { // Reuse consecutive sequence at same slot
HashEntry<K,V> lastRun = e;
int lastIdx = idx;
for (HashEntry<K,V> last = next;
last != null;
last = last.next) {
int k = last.hash & sizeMask;
if (k != lastIdx) {
lastIdx = k;
lastRun = last;
}
}
newTable[lastIdx] = lastRun;
// Clone remaining nodes
for (HashEntry<K,V> p = e; p != lastRun; p = p.next) {
V v = p.value;
int h = p.hash;
int k = h & sizeMask;
HashEntry<K,V> n = newTable[k];
newTable[k] = new HashEntry<K,V>(h, p.key, v, n);
}
}
}
}
int nodeIndex = node.hash & sizeMask; // add the new node
node.setNext(newTable[nodeIndex]);
newTable[nodeIndex] = node;
table = newTable;
}

rehash會將空間容量翻倍,將node放進去,是一個比較耗時的操作,做了一點油畫,盡量重用現有的元素,不去新建元素,也就是說在翻倍前和翻倍後,同一個元素,很可能在用一個位置

2.3. BlockingQueue
阻塞隊列

阻塞隊列(BlockingQueue)是一個支持兩個附加操作的隊列。這兩個附加的操作是:在隊列為空時,獲取元素的線程會等待隊列變為非空。當隊列滿時,存儲元素的線程會等待隊列可用。阻塞隊列常用於生產者和消費者的場景,生產者是往隊列裏添加元素的線程,消費者是從隊列裏拿元素的線程。阻塞隊列就是生產者存放元素的容器,而消費者也只從容器裏拿元素。

以上兩段內容轉自:http://ifeve.com/java-blocking-queue/
技術分享圖片

是一個接口,不是一個實際的類,是一個並發容器,但不是一個高性能的並發容器,和concurrentHashMap(高性能)不一樣,但它本身的好處是在多個線程共享數據的容器

如上圖,如果隊列為空,還試圖往隊列裏面讀數據,讀的線程就會等待,等待有其他的線程往裏面寫數據的時候,才會喚醒,並且去拿到數據。如果隊列已經滿了,還想往隊列中存數據,寫的線程就會等待,等有人將數據拿掉之後才會寫進去。

所以會引起線程的阻塞。

ArrayBlockingQueue源碼
/** Main lock guarding all access */
final ReentrantLock lock;//保證線程安全
/** Condition for waiting takes */
private final Condition notEmpty;//提示讀操作不為空
/** Condition for waiting puts */
private final Condition notFull;//提示寫操作不為滿

public void put(E e) throws InterruptedException {
checkNotNull(e);
final ReentrantLock lock = this.lock;
//put操作直接加鎖,所以是比較耗性能的。
lock.lockInterruptibly();
try {
while (count == items.length)
//如果是滿的會進行等待,直到下面的take操作
notFull.await();
insert(e);
} finally {
lock.unlock();
}
}
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (count == 0)
//這裏是為了讀操作
notEmpty.await();
//去掉寫操作的鎖,具體代碼往下看
return extract();
} finally {
lock.unlock();
}
}

private E extract() {
final Object[] items = this.items;
E x = this.<E>cast(items[takeIndex]);
items[takeIndex] = null;
takeIndex = inc(takeIndex);
--count;
//在這裏講鎖打開
notFull.signal();
return x;
}

2.4. ConcurrentLinkedQueue
高性能的鏈表隊列,處理類似於concurrentHashMap,內部使用大量的無鎖的算法。

Java高並發程序設計學習筆記(五):JDK並發包(各種同步控制工具的使用、並發容器及典型源碼分析(Hashmap等))