1. 程式人生 > >Java高併發程式設計 JDK併發包(上)

Java高併發程式設計 JDK併發包(上)

1. 重入鎖 

重入鎖可以完全替代synchronized關鍵字。使用java.util.concurrent.locks.ReentrantLock類實現,下面是一個重入鎖的簡單例子:


package cn.net.bysoft.java.concurrency.design.ch03;
import java.util.concurrent.locks.ReentrantLock;
public class Example1 implements Runnable {
public static ReentrantLock lock = new ReentrantLock();
public static int i = 0;

public static void main(String[] args) throws InterruptedException {
Example1 exp = new Example1();
Thread t1 = new Thread(exp);
Thread t2 = new Thread(exp);
t1.start();
t2.start();
t1.join();
t2.join();
System.out.println(i);
}
@Override
public void run() {
for (int j = 0; j < 1000000; j++) {
lock.lock();
try {
i++;
} finally {

lock.unlock();
}
}
}
}

使用重入鎖保護臨界區資源i,確保多執行緒對i操作的安全性。與synchronized相比,重入鎖有著顯示的操作過程。開發人員必須手動指定何時加鎖,何時釋放鎖。也正因為這樣,重入鎖對邏輯控制的靈活性要遠遠好於synchronized。但在退出臨界區時,必須記得釋放鎖。


1.1 中斷響應 

對於synchronized來說,如果一個執行緒在等待鎖,那麼結果只有兩種,要麼獲得鎖,要麼就保持等待。而使用重入鎖,則提供另一種可能,那就是執行緒可以被中斷。


比如你和朋友約好去打球,如果你等了半小時,朋友還沒到,然後你接到一個電話,說不能如約了,那麼你就可以打道回府了。


中斷正式提供了一套類似的機制。如果一個執行緒正在等待鎖,那麼它依然可以收到一個通知,被告知無需再等待,可以停止工作了。這種情況對死鎖有一定的幫助。


下面的程式碼產生了一個死鎖,但得益於鎖中斷,我們可以很輕鬆地解鎖這個死鎖:


package cn.net.bysoft.java.concurrency.design.ch03;
import java.util.concurrent.locks.ReentrantLock;
public class Example2 implements Runnable {
public static ReentrantLock lock1 = new ReentrantLock();
public static ReentrantLock lock2 = new ReentrantLock();
int lock;
public Example2(int lock) {
this.lock = lock;
}
public static void main(String[] args) throws InterruptedException {
Example2 emp1 = new Example2(1);
Example2 emp2 = new Example2(2);
Thread t1 = new Thread(emp1);
Thread t2 = new Thread(emp2);
t1.start();
t2.start();
Thread.sleep(1000);
t2.interrupt();
}
@Override
public void run() {
try {
if (lock == 1) {
lock1.lockInterruptibly();
try {
Thread.sleep(500);
} catch (InterruptedException e) {
}
lock2.lockInterruptibly();
} else {
lock2.lockInterruptibly();
try {
Thread.sleep(500);
} catch (InterruptedException e) {
}
lock1.lockInterruptibly();
}
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
if(lock1.isHeldByCurrentThread())
lock1.unlock();
if(lock2.isHeldByCurrentThread())
lock2.unlock();
System.out.println(Thread.currentThread().getId() + ": 執行緒退出");
}
}
}

執行緒t1和t2啟動後,t1先佔用lock1,再佔用lock2;t2先佔用lock2,在請求lock1。因此很容易形成t1和t2的互相等待。使用lockInterruptiblu()方法,這是一個可以對中斷進行響應的鎖申請動作,即在等待鎖的過程中,可以響應中斷。


1.2 鎖申請等待限時 

出了等待外部中斷通知,要避免死鎖還有另一種方法,那就是限時等待。還是以打球為例,如果朋友遲遲不來,又無法聯絡到他,那麼,在等待1,2個小時後,就可以打道回府了。


對於執行緒來說,通常,我們無法判斷為什麼一個執行緒遲遲拿不到鎖。也許是因為死鎖了,也許是因為產生了飢餓。但如果給定一個等待時間,讓執行緒主動放棄,那麼對系統來說是有意義的,可以使用tryLock()方法進行一次限時等待:


package cn.net.bysoft.java.concurrency.design.ch03;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantLock;
public class Example3 implements Runnable {
public static ReentrantLock lock = new ReentrantLock();
public static void main(String[] args) {
Example3 exp = new Example3();
Thread t1 = new Thread(exp);
Thread t2 = new Thread(exp);
t1.start();
t2.start();
}
@Override
public void run() {
try {
if (lock.tryLock(5, TimeUnit.SECONDS))
Thread.sleep(6000);
else
System.out.println("get lock failed");
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
if (lock.isHeldByCurrentThread())
lock.unlock();
}
}
}

tryLock()方法接收兩個引數,一個表示等待時長,一個表示計時單位。如果超過時間還沒有得到鎖,返回false,如果成功獲得鎖,則返回true。


tryLock()方法也可以不輸入引數直接執行。在這種情況下,當前執行緒會嘗試獲得鎖,如果鎖並未被其他執行緒佔用,則申請成功,並立即返回true。如果獲得不到鎖,則不會進行等待,立即返回false。這種模式不會引起執行緒等待,因此也不會產生死鎖:


package cn.net.bysoft.java.concurrency.design.ch03;
import java.util.concurrent.locks.ReentrantLock;
public class Example4 implements Runnable {
public static ReentrantLock lock1 = new ReentrantLock();
public static ReentrantLock lock2 = new ReentrantLock();
int lock;
public Example4(int lock) {
this.lock = lock;
}
public static void main(String[] args) {
Example4 r1 = new Example4(1);
Example4 r2 = new Example4(2);
Thread t1 = new Thread(r1);
Thread t2 = new Thread(r2);
t1.start();
t2.start();
}
@Override
public void run() {
if (lock == 1) {
while (true) {
if (lock1.tryLock()) {
try {
try {
Thread.sleep(500);
} catch (InterruptedException e) {
}
if (lock2.tryLock()) {
try {
System.out.println(Thread.currentThread().getId() + ": My Job done");
return;
} finally {
lock2.unlock();
}
}
} finally {
lock1.unlock();
}
}
}
} else {
while (true) {
if (lock2.tryLock()) {
try {
try {
Thread.sleep(500);
} catch (InterruptedException e) {
}
if (lock1.tryLock()) {
try {
System.out.println(Thread.currentThread().getId() + ": My Job done");
return;
} finally {
lock1.unlock();
}
}
} finally {
lock2.unlock();
}
}
}
}
}
}

上面的程式碼採用了非常容易死鎖的加鎖順序,引發死鎖。


但是使用tryLock()後,這種情況就大大改善了。只要執行足夠長的時間,執行緒總會得到所有需要的資源,從而正常執行。


1.3 公平鎖 

在大多數情況下,鎖的申請都是非公平的。而公平的鎖,則不是這樣,它會按照時間的先後順序,保證先到先得,後到後得。公平鎖的一大特點是:它不會產生飢餓現象。只要你排隊,最終還是可以等到資源:


public ReentrantLock(boolean fair); 

公平鎖看似優美,但是要實現公平鎖必然要系統維護一個有序佇列,因此公平鎖的實現成本比較高,效能相對也非常低下,因此,預設情況下,鎖是非公平的。


package cn.net.bysoft.java.concurrency.design.ch03;
import java.util.concurrent.locks.ReentrantLock;
public class Example5 implements Runnable {
public static ReentrantLock fairLock = new ReentrantLock(true);
public static void main(String[] args) {
Example5 exp = new Example5();
Thread t1 = new Thread(exp, "t1");
Thread t2 = new Thread(exp, "t2");
t1.start();
t2.start();
}
@Override
public void run() {
while (true) {
try {
fairLock.lock();
System.out.println(Thread.currentThread().getName());
} finally {
fairLock.unlock();
}
}
}
}

對上面ReentrantLock的幾個重要方法整理如下:

lock():獲得鎖,如果鎖已經被佔用,則等待; 
lockInterruptibly():獲得鎖,但優先響應中斷; 
tryLock():嘗試獲得鎖,如果成功,返回true,失敗返回false。該方法不等待; 
tryLock(long time, TimeUnit unit):在給定時間內嘗試獲得鎖; 
unlock():釋放鎖;

就重入鎖的實現來看,主要集中在Java層面。包含三個要素:

原子狀態; 
等待佇列; 
阻塞與恢復; 2. Condition條件 

Condition與重入鎖是相關聯的,通過Lock介面的newCondition()方法可以生成一個與當前重入鎖繫結的Condition例項。利用它,可以讓執行緒在合適的時間等待,或者在某一個特定的時刻得到通知,繼續執行。


Condition介面提供的基本方法如下:


void await() throws InterruptedException;
void awaitUninterruptibly();
long awaitNanos(long nanosTimeout) throws InterruptedException;
boolean await(long time, TimeUnit unit) throws InterruptedException;
boolean awaitUnitl(Date deadline) throws InterruptedException;
void signal();
void signalAll(); 

以上方法含義如下:

await()方法會使當前執行緒等待,同時釋放當前鎖,當其他執行緒中使用signal()或者signalAll()方法時,執行緒會重新獲得鎖繼續執行。或者當執行緒被中斷時,也能跳出等待; 
awaitUninterruptible()方法與await()方法基本相同,但是它並不會在等待過程中響應中斷; 
singal()方法用於喚醒一個在等待中的執行緒。相對的singalAll()方法會喚醒所有在等待中的執行緒;

下面的程式碼簡單演示了Condition的功能:


package cn.net.bysoft.java.concurrency.design.ch03;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
public class Example6 implements Runnable {
public static ReentrantLock lock = new ReentrantLock();
public static Condition condition = lock.newCondition();
public static void main(String[] args) throws InterruptedException {
Example6 exp = new Example6();
Thread t1 = new Thread(exp);
t1.start();
Thread.sleep(2000);
lock.lock();
condition.signal();
lock.unlock();
}
@Override
public void run() {
try {
lock.lock();
condition.await();
System.out.println("Thread is going on");
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
}

當執行緒使用Condition.await()時,要求執行緒持有相關的重入鎖,在await()呼叫後,這個執行緒會釋放這把鎖,同理,在Condition.signal()方法呼叫時,也要求執行緒先獲得相關的鎖。在呼叫signal()方法後,一般需要釋放相關的鎖,謙讓給被喚醒的執行緒,讓它可以繼續執行。


3. 訊號量 

訊號量為多執行緒協作提供了更為強大的控制方法。廣義上說,訊號量是對鎖的擴充套件。無論是內部鎖synchronzied還是重入鎖ReentrantLock,一次都只允許一個執行緒訪問一個資源,而訊號量卻可以指定多個執行緒,同時訪問某一個資源。訊號量主要提供了一下建構函式:


public Semaphore(int permits)
public Semaphore(int permits, boolean fair) 

訊號量的主要邏輯方法有:


public void acquire();
public void acquireUninterruptibly();
public boolean tryAcquire();
public boolean tryAcquire(long timeout, TimeUnit unit);
public void release(); 

acquire()方法嘗試獲得一個准入的許可。若無法獲得,則執行緒會等待,直到有執行緒釋放一個許可或當前執行緒被中斷。acquireUninterruptibly()方法和acquire()方法類似,但是不影響中斷。tryAcquire()嘗試獲得一個許可,如果成功返回true,失敗則返回false,它不會進行等待,例子返回。release()用於線上程訪問資源結束後,釋放一個許可,以使其他等待許可的執行緒可以進行資源訪問:


package cn.net.bysoft.java.concurrency.design.ch03;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;
public class Example7 implements Runnable {
final Semaphore semp = new Semaphore(5);
public static void main(String[] args) {
ExecutorService exec = Executors.newFixedThreadPool(20);
final Example7 exp = new Example7();
for (int i = 0; i < 20; i++) {
exec.submit(exp);
}
}
@Override
public void run() {
try {
semp.acquire();
Thread.sleep(2000);
System.out.println(Thread.currentThread().getId() + ": done!");
semp.release();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}

這裡聲明瞭一個包含5個許可的訊號量。這意味著同時可以有5個執行緒進入程式碼段。申請訊號量使用acquire()操作,在離開時,務必使用release()釋放訊號量。


4. ReadWriteLock讀寫鎖 

讀寫分離鎖可以有效地幫助減少鎖競爭,以提升系統性能。用鎖分離的機制來提升效能非常容易理解,必須執行緒A1,A2,A3進行寫操作,B1,B2,B3進行讀操作,如果使用重入鎖或者內部鎖,則理論上說所有讀之間、讀與寫之間、寫和寫之間都是序列操作。


讀 
寫 


讀 
非阻塞 
阻塞 


寫 
阻塞 
阻塞 


讀-讀不互斥; 
讀-寫互斥; 
寫-寫互斥;

如果在系統中,讀操作次數遠遠大於寫操作,則讀寫鎖就可以發揮最大的功效:


package cn.net.bysoft.java.concurrency.design.ch03;
import java.util.Random;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
public class Example8 {
private static ReentrantLock lock = new ReentrantLock();
private static ReentrantReadWriteLock readWriteLock = new ReentrantReadWriteLock();
private static Lock readLock = readWriteLock.readLock();
private static Lock writeLock = readWriteLock.readLock();
private int value;
public Object handleRead(Lock lock) throws InterruptedException {
try {
lock.lock();
Thread.sleep(1000);
return value;
} finally {
lock.unlock();
}
}
public void handleWrite(Lock lock, int index) throws InterruptedException {
try {
lock.lock();
Thread.sleep(1000);
value = index;
} finally {
lock.unlock();
}
}
public static void main(String[] args) {
final Example8 exp = new Example8();
Runnable readRunnable = new Runnable() {
@Override
public void run() {
try {
exp.handleRead(readLock);
// exp.handleRead(lock);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
};
Runnable writeRunnable = new Runnable() {
@Override
public void run() {
try {
exp.handleWrite(writeLock, new Random().nextInt());
// exp.handleWrite(lock, new Random().nextInt());
} catch (InterruptedException e) {
e.printStackTrace();
}
}
};
for (int i = 0; i < 18; i++) {
new Thread(readRunnable).start();
}
for (int i = 18; i < 20; i++) {<