併發程式設計(二)concurrent 工具類
併發程式設計(二)concurrent 工具類
一、CountDownLatch
經常用於監聽某些初始化操作,等初始化執行完畢後,通知主執行緒繼續工作。
import java.util.concurrent.CountDownLatch; public class CountDownLatchTest extends Thread { private final static CountDownLatch countDown = new CountDownLatch(2); // (1) @Override public void run() { // 喚醒執行緒執行緒 countDown.countDown(); // (2) System.out.println(Thread.currentThread().getName() + "執行完畢..."); } public static void main(String[] args) { new Thread(new CountDownLatchTest()).start(); new Thread(new CountDownLatchTest()).start(); try { Thread.sleep(1000); countDown.await(); // (3) System.out.println(Thread.currentThread().getName() + "繼續執行..."); } catch (InterruptedException e) { e.printStackTrace(); } } }
宣告一個 CountDownLatch 物件,引數 2 表示被阻塞的執行緒需要被喚醒再次才能執行。
final CountDownLatch countDown = new CountDownLatch(2);
countDown() 呼叫兩次後,主執行緒才會繼續執行
countDown.countDown();
阻塞當前執行緒-main
countDown.await();
執行結果如下:
Thread-1執行完畢... Thread-0執行完畢... main繼續執行... // Thread-0, Thread-1 執行完成才會繼續執行主執行緒
二、CyclicBarrier
假設有隻有的一個場景:每個執行緒代表一個跑步運動員,當運動員都準備好後,才一起出發,只要有一個沒有準備了,大家都等待。
import java.io.IOException; import java.util.Random; import java.util.concurrent.BrokenBarrierException; import java.util.concurrent.CyclicBarrier; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; public class UseCyclicBarrier { static class Runner implements Runnable { private CyclicBarrier barrier; private String name; public Runner(CyclicBarrier barrier, String name) { this.barrier = barrier; this.name = name; } @Override public void run() { try { Thread.sleep(1000 * (new Random()).nextInt(5)); System.out.println(name + " 準備OK."); barrier.await(); //(1) } catch (InterruptedException e) { e.printStackTrace(); } catch (BrokenBarrierException e) { e.printStackTrace(); } System.out.println(name + " Go!!"); } } public static void main(String[] args) throws IOException, InterruptedException { CyclicBarrier barrier = new CyclicBarrier(2); // (2) ExecutorService executor = Executors.newFixedThreadPool(2); executor.submit(new Thread(new Runner(barrier, "Thread-1"))); executor.submit(new Thread(new Runner(barrier, "Thread-2"))); executor.shutdown(); } }
await() 阻塞當前的執行緒。
barrier.await();
宣告一個 CyclicBarrier 物件,引數 2 表示 barrier 必須有兩個執行緒都準備好了才能執行。
CyclicBarrier barrier = new CyclicBarrier(2);
執行結果如下:
Thread-1 準備OK. Thread-2 準備OK. Thread-1 Go!! Thread-2 Go!!
修改
CyclicBarrier barrier = new CyclicBarrier(3)
後這兩個執行緒都會被阻塞, 執行結果如下:Thread-1 準備OK. Thread-2 準備OK.
三、Future
四、Semaphore
Semaphore 訊號量非常適合高併發訪問。
public class UseSemaphore {
public static void main(String[] args) {
// 執行緒池
ExecutorService exec = Executors.newCachedThreadPool();
// 只能5個執行緒同時訪問
final Semaphore semp = new Semaphore(5); // (1)
// 模擬20個客戶端訪問
for (int index = 0; index < 20; index++) {
final int NO = index;
Runnable run = new Runnable() {
public void run() {
try {
// 獲取許可
semp.acquire(); // (2)
System.out.println("Accessing: " + NO);
//模擬實際業務邏輯
Thread.sleep((long) (Math.random() * 10000));
// 訪問完後,釋放
semp.release(); // (3)
} catch (InterruptedException e) {
;
}
}
};
exec.execute(run);
}
try {
Thread.sleep(10);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(semp.getQueueLength());
// 退出執行緒池
exec.shutdown();
}
}
宣告一個 Semaphore 物件,引數 5 表示最多有5個執行緒同時訪問。
final Semaphore semp = new Semaphore(5);
semp.acquire()
獲取 semp 物件,如果超過5個執行緒,那麼其餘的執行緒就會阻塞,直到有執行緒執行完畢。semp.release()
釋放 semp 物件,這樣其餘的執行緒就可以執行了。
補充:
PV(page view) 網站的總訪問量,頁面瀏覽量或點選量,使用者每重新整理一次就會記錄一次。
UV(unique vistor) 訪問網站的一臺電腦客戶端為一個訪客。一般來講,時間上以00:00~24:00之內相同的客戶端記錄一次。
QPS(query per second) 即每秒查詢數,QPS 很大程度代表了系統業務的繁忙程度。一旦當前 QPS 超過所設定的預警閥值,可以考慮對叢集擴容,以免壓力過大導致宕機。
RT(response time) 即請求的響應時間,這個指標非常關鍵,直接說明客戶端的體驗,因此任何系統設計師都想降低 RT 時間。
對系統進行峰值評估,採用所謂的80/20原則,即80%的請求20%的時間到達:
QRS = (PV * 80%) / (24 * 60 * 60 * 20%)
五、ReentrantLock
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
public class ReentrantLockTest implements Runnable {
private Lock lock = new ReentrantLock(); // (1)
public void run(){
try {
lock.lock(); // (2)
System.out.println(Thread.currentThread().getName() + "進入..");
Thread.sleep(1000);
System.out.println(Thread.currentThread().getName() + "退出..");
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock(); // (3)
}
}
public static void main(String[] args) throws InterruptedException {
final ReentrantLockTest ur = new ReentrantLockTest();
for (int i = 0; i < 10; i++) {
new Thread(ur).start();
}
}
}
ReentrantLock 一般用法:
private Lock lock = new ReentrantLock(); try { lock.lock(); //do something } finally { lock.unlock(); }
condition 使用方法,注意 condition 可以例項化多個:
Lock lock = new ReentrantLock(); Condition condition = lock.newCondition(); condition.await(); //阻塞執行緒,釋放鎖 condition.signal();//喚醒執行緒,不釋放鎖
公平鎖(true)和非公平鎖(false),非公平鎖執行效率比公平鎖高
Lock lock = new ReentrantLock(boolean isFair);
讀寫鎖,實現讀寫分離的鎖,適用於讀多寫少的情況下(讀讀共享,讀寫互斥)
private ReentrantReadWriteLock rwlock = new ReentrantReadWriteLock(); // (1) private ReadLock readLock = rwlock.readLock(); // (2) private WriteLock writeLock = rwlock.writeLock(); // (3) public void read(){ try { readLock.lock(); // do something } finally { readLock.unlock(); } } public void write(){ try { writeLock.lock(); // do something } finally { writeLock.unlock(); } }
每天用心記錄一點點。內容也許不重要,但習慣很重要!