1. 程式人生 > >並發編程(三):從AQS到CountDownLatch與ReentrantLock

並發編程(三):從AQS到CountDownLatch與ReentrantLock

splay public 繼續 for admin font 通信 html integer

一、目錄

1、AQS簡要分析 2、談CountDownLatch 3、談ReentrantLock 4、談消費者與生產者模式(notfiyAll/wait、signAll/await、condition)

二、AQS簡要分析

問題:AQS是什麽?有什麽用? AQS是什麽? 字面上看,它被稱為抽象隊列式的同步器(AbstractQueuedSynchronizer)。簡單說,它就是一個同步隊列容器。 AQS有什麽用?
  1. 為什麽會產生ArrayList、LinkedList、HashMap這些容器?它們底層實現無非都是對數組、鏈表、樹的操作,至於它們的產生,就是因為對編程人員對於數組、鏈表、樹的增刪改查操作非常繁瑣而提出的解決方案。
  2. 那為什麽會產生AQS呢?談到同步,大家最容易想到的就是在多線程中如何確保安全的資源共享。那同步隊列就是為了解決資源共享的同步容器。像上述容器一樣,在頂層就設計好,編程人員只需要調用接口就能輕易實現復雜的資源共享問題。
既然談到資源共享,那同步容器怎麽實現資源共享呢? AQS定義兩種資源共享方式:Exclusive(獨占、只有一個線程執行,如ReentrantLock)和Share(共享,多個線程可同時執行,如Semaphore/CountDownLatch)。 那什麽是獨占式? 在談synchronized的資源共享實現方式的時候,當線程A訪問共享資源的時候,其它的線程全部被堵塞,直到線程A讀寫完畢,其它線程才能申請同步互斥鎖從而訪問共享資源。如果之前看過我關於synchronized的討論,這裏應該不難理解,為了照顧未了解過的讀者,再重新回顧一下。
以RenentrantLock為例,如何知道共享資源是否有線程正在被訪問呢?其實,它有一個state變量初始值為0,表示未鎖定狀態。當線程A訪問的時候,state+1,就代表該線程鎖定了共享資源,其他線程將無法訪問,而當線程A訪問完共享資源以後,state-1,直到state等於0,就將釋放對共享變量的鎖定,其他線程將可以搶占式或者公平式爭奪。當然,它支持可重入,那什麽是可重入呢?同一線程可以重復鎖定共享資源,每鎖定一次state+1,也就是鎖定多次。說明:鎖定多少次就要釋放多少次。 什麽是共享式呢? 以CountDownLatch為例,共享資源可以被N個線程訪問,也就是初始化的時候,state就被指定為N(N與線程個數相等),線程countDown()一次,state會CAS減1,直到所有線程執行完(state=0),那些await()的線程將被喚醒去執行執行剩余動作。
什麽是CAS?CAS的定義為Compare-And-Swap,語義為比較並且交換。在深入理解JVM書中,談到自旋鎖,因為鎖的堵塞釋放對於cpu資源的損害很高,那麽自旋鎖就是當線程A訪問共享資源的時候,其他線程並不放棄對鎖的持有,它們在不停循環,不斷嘗試的獲取鎖,直到獲得鎖就停止循環,自旋鎖是對於資源共享的一種優化手段,但是它適用於對鎖持有時間比較短的情況。 獨占式lock流程(unlock同理):
  1. 調用自定義同步器的tryAcquire()嘗試直接去獲取資源,如果成功就返回。
  2. 沒成功,則addWaiter()將線程加入等待隊列的尾部,並標記為獨享模式。
  3. acquireQueued()使線程在等待隊列中休息,有機會時會去嘗試獲得資源。獲得資源後返回。如果整個過程有中斷過返回true,否則返回false。
  4. 如果線程在等待過程中中斷過,它是不響應的。只是獲得資源後才再進行自我中斷selfInterrupt(),將中斷補上。
技術分享 共享式流程(類似於獨占式 ):
  1. tryAcquireShared()嘗試獲取資源,成功則直接返回。
  2. 失敗則通過 doAcquireShared()進入等待隊列,直到被喚醒或者中斷並且成功獲取資源才返回。
  3. 不同:獨占式是只喚醒後繼節點。共享式是喚醒後繼,後繼還會去喚醒它的後繼,從而實現共享。

以上是核心的關於CountDownLatch、ReentrantLock的分析。由於博主研究程度有限,想更深層次研究,請參考:Java並發AQS詳解

三、淺談CountDownLatch

CountDownLatch是什麽? 有什麽用? CountDownLatch是一個同步容器,但是有人叫它發令槍,也有人叫它門閂。初始化設定線程的個數,調用countDownLatch.await()阻塞所有線程,直到countDownLatch.countDown()為0,那麽將繼續執行剩余的操作。例如,跑步比賽,所有線程都await()在起跑線,當所有人告訴裁判準備好了,裁判發令槍一響,運動員開炮。門閂道理一樣,門不開全給我等著! 作用:為了實現同步共享數據的一種更加高效的解決辦法。
/**
 * CountDownLatch相當於指令槍或者門閂,所有線程都awit()阻塞在起跑線,只有countDown到state為0,其他線程才能往下運行。
 * @author qiuyongAaron
 */
public class CountDownLatchDemo {
     private static final int PLAYER_NUM=5;
 
     public static void main(String[] args) {
 
           CountDownLatch start=new CountDownLatch(1);
           CountDownLatch end =new CountDownLatch(PLAYER_NUM);
           Player [] players=new Player[PLAYER_NUM];
 
           for(int i=0;i<PLAYER_NUM;i++)
                players[i]=new Player(start, end, i);
           //指定線程個數的線程池!
           ExecutorService exe=Executors.newFixedThreadPool(PLAYER_NUM);
           for(Player player:players)
                exe.execute(player);
 
           System.out.println("比賽開始!");
           //比賽開始!
           start.countDown();
 
           try {
                end.await();
           } catch (InterruptedException e) {
                e.printStackTrace();
           }finally{
                System.out.println("比賽結束!");
                exe.shutdown();
           }
     }
 
}
 
class Player implements Runnable{
     private CountDownLatch start;
     private CountDownLatch end;
     private int id;
 
     Random random=new Random();
     public Player(CountDownLatch start,CountDownLatch end,int id) {
           this.start=start;
           this.end=end;
           this.id=id;
     }
 
     @Override
     public void run() {
           try {
                //等待比賽開始。
                start.await();
                TimeUnit.SECONDS.sleep(random.nextInt(10));
                System.out.println("Player-"+id+":arrived");
           } catch (InterruptedException e) {
                e.printStackTrace();
           }finally{
                //選手-id到達終點,end計數為0結束比賽!
                end.countDown();
           }
     }
}
 
//運行結果:
比賽開始!
Player-3:arrived
Player-4:arrived
Player-0:arrived
Player-1:arrived
Player-2:arrived
比賽結束!

三、談ReentrantLock

1、ReentrantLock是什麽?有什麽用? ReentrantLock跟synchronized作用差不多,是在於synchronized基礎上的一種簡易同步容器,並沒有深層次的原理剖析。 2、ReentrantLock的基礎用法 2.1 回顧synchronized如何實現線程同步。 技術分享
/**
 * 示例一:同步鎖的使用
 * reentrantlock用於替代synchronized
 * 本例中由於m1鎖定this,只有m1執行完畢的時候,m2才能執行
 * @author qiuyongAaron
 */
public class ReentrantLockOne {
     public synchronized void m1(){
           for(int i=0;i<10;i++){
                try {
                     TimeUnit.SECONDS.sleep(1);
                } catch (InterruptedException e) {
                     e.printStackTrace();
                }
                System.out.println(i);
           }
     }
 
     public synchronized void m2(){
           System.out.println("hello m2!");
     }
 
     public static void main(String[] args) {
           ReentrantLockOne lock=new ReentrantLockOne();
 
           new Thread(()->lock.m1(),"t1").start();
 
           try {
                TimeUnit.SECONDS.sleep(2);
           } catch (InterruptedException e) {
                e.printStackTrace();
           }
 
           new Thread(()->lock.m2(),"t2").start();
     }
}
Synchronized實現線程同步

2.2 ReentrantLock實現線程同步-與synchronized作用一致!

技術分享
/**
 * 示例二:等價於同步鎖
 * 使用reentrantlock可以完成同樣的功能
 * 需要註意的是,必須要必須要必須要手動釋放鎖(重要的事情說三遍)
 * 使用syn鎖定的話如果遇到異常,jvm會自動釋放鎖,但是lock必須手動釋放鎖,因此經常在finally中進行鎖的釋放
 * @author qiuyongAaron
 */
public class ReentrantLockTwo {
     ReentrantLock lock =new ReentrantLock();
     public  void m1(){
           try {
                lock.lock();
                for(int i=0;i<10;i++){
                     TimeUnit.SECONDS.sleep(1);
                     System.out.println(i);
                }
           } catch (InterruptedException e) {
                e.printStackTrace();
           }finally{
                lock.unlock();
           }
     }
 
     public synchronized void m2(){
           lock.lock();
           System.out.println("hello m2!");
           lock.unlock();
     }
 
     public static void main(String[] args) {
           ReentrantLockTwo lock=new ReentrantLockTwo();
 
           new Thread(()->lock.m1(),"t1").start();
 
           try {
                TimeUnit.SECONDS.sleep(2);
           } catch (InterruptedException e) {
                e.printStackTrace();
           }
 
           new Thread(()->lock.m2(),"t2").start();
     }
}
ReentrantLock同步互斥 2.3 ReentrantLock嘗試獲取鎖,若指定時間無法獲取鎖放棄等待! 技術分享
/**
 * 示例三:tryLock
 * 使用reentrantlock可以進行“嘗試鎖定”tryLock,這樣無法鎖定,或者在指定時間內無法鎖定,線程可以決定是否繼續等待
 * @author qiuyongAaron
 */
public class ReentrantLockThree {
     ReentrantLock lock=new ReentrantLock();
 
     public void m1(){
           try {
                lock.lock();
                for(int i=0;i<10;i++){
                     TimeUnit.SECONDS.sleep(1);
                     System.out.println(i);
                }
           } catch (Exception e) {
                e.printStackTrace();
           }finally{
                lock.unlock();
           }
     }
 
     boolean locked=false;
     public void m2(){
           try {
                lock.tryLock(5,TimeUnit.SECONDS);
                System.out.println("m2:"+locked);
           } catch (Exception e) {
                e.printStackTrace();
           }finally{
                if(locked) lock.unlock();
           }
     }
 
     public static void main(String[] args) {
           ReentrantLockThree lock=new ReentrantLockThree();
 
           new Thread(()->lock.m1(),"t1").start();
 
           try {
                TimeUnit.SECONDS.sleep(1);
           } catch (InterruptedException e) {
                e.printStackTrace();
           }
 
           new Thread(()->lock.m2(),"t2").start();
     }
}
ReentrantLock嘗試獲取鎖

2.4 指定公平鎖或者搶占式鎖

技術分享
/**
 * ReentrantLock還可以指定為公平鎖
 * @author qiuyongAaron
 */
public class ReentrantLockFive extends Thread{
 
     //默認false:為非公平鎖  true:公平鎖
     ReentrantLock lock=new ReentrantLock();
 
     @Override
     public void run() {
           for(int i=0;i<100;i++){
                lock.lock();
                try {
                     TimeUnit.SECONDS.sleep(1);
                     System.out.println(Thread.currentThread().getName()+"獲得鎖"+"-"+i);
                } catch (InterruptedException e) {
                     e.printStackTrace();
                }finally{
                     lock.unlock();
                }
           }
 
     }
 
     public static void main(String[] args) {
           ReentrantLockFive lock=new ReentrantLockFive();
           new Thread(lock,"t1").start();
           new Thread(lock,"t2").start();
     }
}
 
運行結果:
//非公平鎖
t2獲得鎖-0 t2獲得鎖-1 t1獲得鎖-0 t1獲得鎖-1 t1獲得鎖-2 t2獲得鎖-2
//公平鎖
t1獲得鎖-0 t2獲得鎖-0 t1獲得鎖-1 t2獲得鎖-1 t1獲得鎖-2 t2獲得鎖-2
ReentrantLock公平鎖 3、ReentrantLock實現線程通信
/**
 * 模擬生產者消費者模式-線程之間通信 synchronized-notifyAll/wait
 * @author qiuyongAaron
 */
public class MyContainerOne {
     LinkedList<Integer> list=new LinkedList<Integer>();
      static final int MAX=10;
      int count=0;
 
      //生產者線程
      public synchronized void put(int i){
            while(list.size()==MAX){
                 try {
                     this.wait();
                } catch (InterruptedException e) {
                     e.printStackTrace();
                }
            }
            list.add(i);
            ++count;
            this.notifyAll();//通知消費者來消費
      }
 
      //消費者線程
      public synchronized int get(){
            while(list.size()==0){
                 try {
                     this.wait();
                } catch (InterruptedException e) {
                     e.printStackTrace();
                }
            }
            int num=list.removeFirst();
            count--;
            this.notifyAll();//通知生產者生產
            return num;
      }
 
      public static void main(String[] args) {
           MyContainerOne container=new MyContainerOne();
 
           //制造10個消費者
           for(int i=0;i<10;i++){
                new Thread(()->{
                     for(int j=0;j<5;j++) System.out.println(container.get());
                     },
                "c"+i).start();
           }
 
           try {
                TimeUnit.SECONDS.sleep(2);
           } catch (InterruptedException e) {
                e.printStackTrace();
           }
 
           //制造2個生產者
           for(int i=0;i<2;i++){
                new Thread(()->{
                     for(int j=0;j<25;j++) container.put(j);
                     },
                "p"+i).start();
           }
     }
}
/**
 * 模擬生產者消費者模式-reentrantLock-awit/signAll
 * @author qiuyongAaron
 */
public class MyContainerTwo {
 
     LinkedList<Integer> list=new LinkedList<Integer>();
      static final int MAX=10;
      int count=0;
 
      ReentrantLock lock=new ReentrantLock();
      Condition producer=lock.newCondition();
      Condition consumer=lock.newCondition();
 
      //生產者線程
      public  void put(int i){
            try {
                 lock.lock();
                 while(list.size()==MAX){
                     producer.await();
                 }
                 list.add(i);
                 ++count;
                 consumer.signalAll();//通知消費者來消費
            } catch (InterruptedException e){
                 e.printStackTrace();
            }finally{
                 lock.unlock();
            }
      }
 
      //消費者線程
      public  int get(){
            try{
                 lock.lock();
                 while(list.size()==0){
                      consumer.await();
                 }
                 int num=list.removeFirst();
                 count--;
                 producer.signalAll();//通知生產者生產
                 return num;
            }catch(Exception e){
                 e.printStackTrace();
            }finally{
                 lock.unlock();
            }
            return 0;
      }
 
      public static void main(String[] args) {
           MyContainerTwo container=new MyContainerTwo();
 
           //制造10個消費者
           for(int i=0;i<10;i++){
                new Thread(()->{
                     for(int j=0;j<5;j++) System.out.println(container.get());
                     },
                "c"+i).start();
           }
 
           try {
                TimeUnit.SECONDS.sleep(2);
           } catch (InterruptedException e) {
                e.printStackTrace();
           }
 
           //制造2個生產者
           for(int i=0;i<2;i++){
                new Thread(()->{
                     for(int j=0;j<25;j++) container.put(j);
                     },
                "p"+i).start();
           }
     }
}
總結:synchronized實現線程的消費者-生產者模式是通過wait/notifyAll實現,ReentrantLock是通過condition+await/signAll。那他們有什麽區別呢?synchronized要麽通過notify隨機喚醒一個,或者notifyAll喚醒所有不管你是消費者還是生產者、而ReentrantLock是喚醒指定的線程的,更加精確效率更高。

四、版權聲明

  作者:邱勇Aaron

  出處:http://www.cnblogs.com/qiuyong/

  您的支持是對博主深入思考總結的最大鼓勵。

  本文版權歸作者所有,歡迎轉載,但未經作者同意必須保留此段聲明,且在文章頁面明顯位置給出原文連接,尊重作者的勞動成果。

  參考:馬士兵並發編程、並發編程實踐

     AQS詳解:http://www.cnblogs.com/waterystone/p/4920797.html

     CountDownLatch詳解:http://www.cnblogs.com/yezhenhan/archive/2012/01/07/2315652.html

並發編程(三):從AQS到CountDownLatch與ReentrantLock