1. 程式人生 > >併發程式設計(二)concurrent 工具類

併發程式設計(二)concurrent 工具類

併發程式設計(二)concurrent 工具類

一、CountDownLatch

經常用於監聽某些初始化操作,等初始化執行完畢後,通知主執行緒繼續工作。

import java.util.concurrent.CountDownLatch;

public class CountDownLatchTest extends Thread {
    private final static CountDownLatch countDown = new CountDownLatch(2); // (1)

    @Override
    public void run() {
        // 喚醒執行緒執行緒
        countDown.countDown(); // (2)
        System.out.println(Thread.currentThread().getName() + "執行完畢...");
    }

    public static void main(String[] args) {

        new Thread(new CountDownLatchTest()).start();
        new Thread(new CountDownLatchTest()).start();
        try {
            Thread.sleep(1000);
            countDown.await();  // (3)
            System.out.println(Thread.currentThread().getName() + "繼續執行...");
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}
  1. 宣告一個 CountDownLatch 物件,引數 2 表示被阻塞的執行緒需要被喚醒再次才能執行。

    final CountDownLatch countDown = new CountDownLatch(2);
  2. countDown() 呼叫兩次後,主執行緒才會繼續執行

    countDown.countDown();
  3. 阻塞當前執行緒-main

    countDown.await();
  4. 執行結果如下:

    Thread-1執行完畢...
    Thread-0執行完畢...
    main繼續執行...  // Thread-0, Thread-1 執行完成才會繼續執行主執行緒

二、CyclicBarrier

假設有隻有的一個場景:每個執行緒代表一個跑步運動員,當運動員都準備好後,才一起出發,只要有一個沒有準備了,大家都等待。

import java.io.IOException;  
import java.util.Random;  
import java.util.concurrent.BrokenBarrierException;  
import java.util.concurrent.CyclicBarrier;  
import java.util.concurrent.ExecutorService;  
import java.util.concurrent.Executors; 
public class UseCyclicBarrier {

    static class Runner implements Runnable {  
        private CyclicBarrier barrier;  
        private String name;  
        
        public Runner(CyclicBarrier barrier, String name) {  
            this.barrier = barrier;  
            this.name = name;  
        }  
        @Override  
        public void run() {  
            try {  
                Thread.sleep(1000 * (new Random()).nextInt(5));  
                System.out.println(name + " 準備OK.");  
                barrier.await(); //(1) 
            } catch (InterruptedException e) {  
                e.printStackTrace();  
            } catch (BrokenBarrierException e) {  
                e.printStackTrace();  
            }  
            System.out.println(name + " Go!!");  
        }  
    } 
    
    public static void main(String[] args) throws IOException, InterruptedException {  
        CyclicBarrier barrier = new CyclicBarrier(2);  // (2) 
        ExecutorService executor = Executors.newFixedThreadPool(2);  
        
        executor.submit(new Thread(new Runner(barrier, "Thread-1")));
        executor.submit(new Thread(new Runner(barrier, "Thread-2")));
  
        executor.shutdown();  
    }  
}
  1. await() 阻塞當前的執行緒。

    barrier.await();
  2. 宣告一個 CyclicBarrier 物件,引數 2 表示 barrier 必須有兩個執行緒都準備好了才能執行。

    CyclicBarrier barrier = new CyclicBarrier(2);
  3. 執行結果如下:

    Thread-1 準備OK.
    Thread-2 準備OK.
    Thread-1 Go!!
    Thread-2 Go!!
  4. 修改 CyclicBarrier barrier = new CyclicBarrier(3) 後這兩個執行緒都會被阻塞, 執行結果如下:

    Thread-1 準備OK.
    Thread-2 準備OK.

三、Future

future模式請參考這裡

四、Semaphore

Semaphore 訊號量非常適合高併發訪問。

public class UseSemaphore {  
    public static void main(String[] args) {  
        // 執行緒池  
        ExecutorService exec = Executors.newCachedThreadPool();  
        // 只能5個執行緒同時訪問  
        final Semaphore semp = new Semaphore(5); // (1)
        // 模擬20個客戶端訪問  
        for (int index = 0; index < 20; index++) {  
            final int NO = index;  
            Runnable run = new Runnable() {  
                public void run() {  
                    try {  
                        // 獲取許可  
                        semp.acquire(); // (2)
                        System.out.println("Accessing: " + NO);  
                        //模擬實際業務邏輯
                        Thread.sleep((long) (Math.random() * 10000));  
                        // 訪問完後,釋放  
                        semp.release(); // (3)
                    } catch (InterruptedException e) {
                        ;
                    }  
                }  
            };  
            exec.execute(run);  
        } 
        
        try {
            Thread.sleep(10);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        
        System.out.println(semp.getQueueLength());

        // 退出執行緒池  
        exec.shutdown();  
    }  
} 
  1. 宣告一個 Semaphore 物件,引數 5 表示最多有5個執行緒同時訪問。

    final Semaphore semp = new Semaphore(5);
  2. semp.acquire() 獲取 semp 物件,如果超過5個執行緒,那麼其餘的執行緒就會阻塞,直到有執行緒執行完畢。

  3. semp.release() 釋放 semp 物件,這樣其餘的執行緒就可以執行了。

補充:

  • PV(page view) 網站的總訪問量,頁面瀏覽量或點選量,使用者每重新整理一次就會記錄一次。

  • UV(unique vistor) 訪問網站的一臺電腦客戶端為一個訪客。一般來講,時間上以00:00~24:00之內相同的客戶端記錄一次。

  • QPS(query per second) 即每秒查詢數,QPS 很大程度代表了系統業務的繁忙程度。一旦當前 QPS 超過所設定的預警閥值,可以考慮對叢集擴容,以免壓力過大導致宕機。

  • RT(response time) 即請求的響應時間,這個指標非常關鍵,直接說明客戶端的體驗,因此任何系統設計師都想降低 RT 時間。

對系統進行峰值評估,採用所謂的80/20原則,即80%的請求20%的時間到達:

QRS = (PV * 80%) / (24 * 60 * 60 * 20%)

五、ReentrantLock

import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

public class ReentrantLockTest implements Runnable {
    
    private Lock lock = new ReentrantLock(); // (1)
    
    public void run(){
        try {
            lock.lock(); // (2)
            System.out.println(Thread.currentThread().getName() + "進入..");
            Thread.sleep(1000);
            System.out.println(Thread.currentThread().getName() + "退出..");
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            lock.unlock(); // (3)
        }
    }
    
    public static void main(String[] args) throws InterruptedException {
        final ReentrantLockTest ur = new ReentrantLockTest();

        for (int i = 0; i < 10; i++) {
            new Thread(ur).start();
        }
    }
}
  1. ReentrantLock 一般用法:

    private Lock lock = new ReentrantLock();
    try {
        lock.lock();
        //do something
    } finally {
        lock.unlock();
    }
    
  2. condition 使用方法,注意 condition 可以例項化多個:

    Lock lock = new ReentrantLock();
    Condition condition = lock.newCondition();
    condition.await(); //阻塞執行緒,釋放鎖
    condition.signal();//喚醒執行緒,不釋放鎖
  3. 公平鎖(true)和非公平鎖(false),非公平鎖執行效率比公平鎖高

    Lock lock = new ReentrantLock(boolean isFair);
  4. 讀寫鎖,實現讀寫分離的鎖,適用於讀多寫少的情況下(讀讀共享,讀寫互斥)

    private ReentrantReadWriteLock rwlock = new ReentrantReadWriteLock(); // (1)
    private ReadLock readLock = rwlock.readLock();    // (2)
    private WriteLock writeLock = rwlock.writeLock(); // (3)
    
    public void read(){
        try {
            readLock.lock();
            // do something
        } finally {
            readLock.unlock();
        }
    }
    
    public void write(){
        try {
            writeLock.lock();
            // do something
        } finally {
            writeLock.unlock();
        }
    }

每天用心記錄一點點。內容也許不重要,但習慣很重要!