1. 程式人生 > >【併發】神祕的java併發包

【併發】神祕的java併發包

  java併發包

一 同步控制工具

    1.  重量級鎖 Synchronized

類鎖,物件鎖,變數鎖

1.1.1 類鎖

 

就是對整個靜態的class檔案加鎖,也就是說一個地方用到了這個class檔案,其他地方,就要等待獲取到class的鎖。典型的比如我們的單例模式。就是控制類級別的鎖。

class ThreadEF {

         public void MethodA(){

                   synchronized(ThreadEF.class){         }

         }

         public static synchronized void MethodB(){}

 }

如上程式碼所示,如果一個地方正在使用這個類,那麼其他地方就不能夠再使用這個類了。這就是類級別的鎖。然而方法A,與B就是互斥的。也就是說這兩種寫法都是類鎖的寫法。

 

1.1.2 物件鎖

class ThreadEG {

         public void MethodA(){

                   synchronized(this){  }

         }

         public synchronized void MethodB(){}

}

如上所示的物件加鎖方法就是物件鎖,一個類可以有多個例項,然而多個例項直接各自呼叫各自的方法是不會影響的,但是如上所示加了物件鎖之後,那麼這個例項是不可以對同時呼叫A與B方法的,這兩個方法是同步的。但是如果是兩個例項的話,這兩個方法就是非同步的。如果兩個例項要實現同步,那麼就得把鎖上升到類鎖。

 

1.1.3 變數鎖

class ThreadEH {

         private Object object=new Object();

         public void MethodA(){

                   synchronized(object){}

         }

         public void MethodB(){

                   synchronized (object){ }

         }

 }

如上程式碼所示,是通過變數來實現,A,B方法的同步的,然而這樣的同步其實也是物件鎖的特例。它無非還是讓物件的A,B兩個方法不能同時執行。那麼上面的index++問題,大家應該知道怎麼處理了吧。那就是類鎖來解決。

 

    1.  重入索 ReentrantLock
public class ReetrantLock1 implements Runnable {

     private static int index=0;

    public static ReentrantLock lock=new ReentrantLock(true);

    

     @Override

     public void run() {

         lock.lock();

         try {

              for(int i=0;i<10000;i++){

                   index++;

              }

         } catch (Exception e) {

              // TODO: handle exception

         } finally{

              lock.unlock();

         }

     }

    

     public static void main(String[] args) throws InterruptedException {

         Thread thread=new Thread(new ReetrantLock1());

         Thread thread2=new Thread(new ReetrantLock1());

         thread.start();

         thread2.start();

         thread.join();

         thread2.join();

         System.out.println(index);

     }

}

1.2.1 ReentrantLock與synchronized比較

(1)  可以指定公平性。而synchronized鎖預設是不公平的鎖,重入鎖的公平鎖可以保證每個執行緒都有執行的機會。Synchronized不公平鎖就不可以了,它會使得優先順序比較低的執行緒處於飢餓狀態。

(2)  Synchronized的效能明顯要弱於重入鎖,但是實際開發中由於吞吐量的問題,不至於因為鎖導致巨大的效能影響。所以基本還是使用Synchronized。

   (3)  Synchronized是jvm 自動進行管理的,自動獲取鎖,自動釋放鎖。Lock是手動管理的需要手動的申請鎖資源或者釋放鎖資源。

   (4)  Synchronized不支援鎖中斷要麼等待,要麼鎖定。而ReentrantLock是支援鎖中斷的,在某些特殊情況下,鎖中斷也是非常有必要的。

   比如死鎖的程式碼

public class ReetranLockInterrupt {

    public static void main(String[] args) {

       ReentrantLock lock1=new ReentrantLock();

       ReentrantLock lock2=new ReentrantLock();

       ThreadM threadM=new ThreadM(lock1, lock2);

       ThreadH threadH=new ThreadH(lock1, lock2);

       threadM.start();

       threadH.start();

    }

}


class ThreadM extends Thread{

    private ReentrantLock lock1=null;

    private ReentrantLock lock2=null;

    public ThreadM(ReentrantLock lock1,ReentrantLock lock2) {

        this.lock1=lock1;

        this.lock2=lock2;

    }

   

    @Override

    public void run() {

        lock1.lock();

        try {

            Thread.sleep(1000);

            lock2.lock();

        }catch (InterruptedException e) {

        }finally{

            lock2.unlock();

            lock1.unlock();

        }

    }

}



class ThreadH extends Thread{

    private ReentrantLock lock1=null;

    private ReentrantLock lock2=null;

    public ThreadH(ReentrantLock lock1,ReentrantLock lock2) {

        this.lock1=lock1;

        this.lock2=lock2;

    }

    @Override

    public void run() {

        lock2.lock();

        try {

Thread.sleep(1000);

            lock1.lock();

        } catch (InterruptedException e) {

        }finally{

            lock1.unlock();

            lock2.unlock();

        }

    }

}

 

如上所示是死鎖的程式碼,如果是Synchronized,那麼這個鎖是不會輕易解鎖的,那麼lock怎麼進行解鎖呢。

public class ReetranLockInterrupt {

    public static void main(String[] args) throws InterruptedException {

        ReentrantLock lock1=new ReentrantLock();

        ReentrantLock lock2=new ReentrantLock();

        ThreadM threadM=new ThreadM(lock1, lock2);

        ThreadH threadH=new ThreadH(lock1, lock2);

        threadM.start();

        threadH.start();

          Thread.sleep(5000);

        threadH.interrupt();

        threadM.interrupt();

     }

}


class ThreadM extends Thread{

     private ReentrantLock lock1=null;

     private ReentrantLock lock2=null;

     public ThreadM(ReentrantLock lock1,ReentrantLock lock2) {

         this.lock1=lock1;

         this.lock2=lock2;

     }

    

     @Override

     public void run() {

         try {

              lock1.lockInterruptibly();

              Thread.sleep(1000);

              lock2.lockInterruptibly();

         }catch (InterruptedException e) {

              // TODO Auto-generated catch block

              e.printStackTrace();

         }finally{

              System.out.println("ThreadM---退出中斷");

              lock2.unlock();

              lock1.unlock();

         }

     }

}


class ThreadH extends Thread{

     private ReentrantLock lock1=null;

     private ReentrantLock lock2=null;

     public ThreadH(ReentrantLock lock1,ReentrantLock lock2) {

         this.lock1=lock1;

         this.lock2=lock2;

     }

    

     @Override

     public void run() {

         try {

              lock2.lockInterruptibly();

                   Thread.sleep(1000);

              lock1.lockInterruptibly();

         } catch (InterruptedException e) {

              // TODO Auto-generated catch block

              e.printStackTrace();

         }finally{

              System.out.println("ThreadH---退出中斷");

              lock1.unlock();

              lock2.unlock();

         }

     }

}

如上圖所示ReetrantLock提供了鎖的中斷機制。這樣可以有效的解決死鎖問題。

    1.  重入條件 newCondition
public class LockConditions {

    private static String name;

    private static String sex;

    private static Lock lock=new ReentrantLock();

    public static Condition condition=lock.newCondition();

    static boolean flag=false;

   

   /**

    * 寫執行緒

    * @author Administrator

    */

  static class WriteThread extends Thread{  

      int num=0;

      public WriteThread(){}

      public void run(){

          while(true){

              try {

                  lock.lock();

                      if(flag){

                          condition.await();

                      }

                      if(num%2==1){

                           name="張三";

                           sex="男";

                       }else{

                           name="李四";

                           sex="女";

                       } 

                      num++;

                      flag=true;

                      condition.signal();

                }catch(Exception lock){

                    lock.printStackTrace();

                }finally {

                    lock.unlock();

                }

            }

          }

      }

  

  

  

   /**

    * 讀執行緒

    * @author Administrator

    *

    */

  static class ReadThread extends Thread{

       public ReadThread(){ }

       public void run(){

            while(true){

                try {

                    lock.lock();

                    if(!flag){

                     condition.await();

                    }

                    Thread.sleep(1000);

                    System.out.println(name+":"+sex);

                    flag=false;

                    condition.signal();

                } catch (Exception e) {

                    // TODO Auto-generated catch block

                    e.printStackTrace();

                }finally{

                    lock.unlock();

                }

               

            } 

       }

   }

     public static void main(String[] args) {

            WriteThread thread=new WriteThread();

            thread.start();

            ReadThread readThread=new ReadThread();

            readThread.start();

    }

}

 

如上所示重入鎖中的await(),notify(),notifyAll(),類似於物件鎖中的wait()、notify()、notifyAll()是定義在Object類裡的方法,可以用來控制執行緒的狀態。這三個方法最終呼叫的都是jvm級的native方法。隨著jvm執行平臺的不同可能有些許差異。如果物件呼叫了wait方法就會使持有該物件的執行緒把該物件的控制權交出去,然後處於等待狀態。如果物件呼叫了notify方法就會通知某個正在等待這個物件的控制權的執行緒可以繼續執行。如果物件呼叫了notifyAll方法就會通知所有等待這個物件控制權的執行緒繼續執行。

 

    1.  限時等待鎖 lock.tryLock()

還是剛才重入鎖中的程式碼,假設我們還是用相同的方式實現死鎖,只是我們換一下鎖的方式,把lock換成trylock

public class ReetranLockInterrupt {

    public static void main(String[] args) {

        ReentrantLock lock1 = new ReentrantLock();

        ReentrantLock lock2 = new ReentrantLock();

        ThreadM threadM = new ThreadM(lock1, lock2);

        ThreadH threadH = new ThreadH(lock1, lock2);

        threadM.start();

        threadH.start();

    }

}


class ThreadM extends Thread {

    private ReentrantLock lock1 = null;

    private ReentrantLock lock2 = null;


    public ThreadM(ReentrantLock lock1, ReentrantLock lock2) {

        this.lock1 = lock1;

        this.lock2 = lock2;

    }


    @Override

    public void run() {

        while(true){

            if(lock1.tryLock()){

                try {

                    if(lock2.tryLock()){

                            System.out.println("事情做完了");

                    }

                } catch (Exception e) {

                    e.printStackTrace();

                } finally {

                    lock2.unlock();

                   

                }

            }

            lock1.unlock();

        }

    }

}


class ThreadH extends Thread {

    private ReentrantLock lock1 = null;

    private ReentrantLock lock2 = null;


    public ThreadH(ReentrantLock lock1, ReentrantLock lock2) {

        this.lock1 = lock1;

        this.lock2 = lock2;

    }


    @Override

    public void run() {

        while(true){

            if(lock2.tryLock()){

                try {

                    if(lock1.tryLock()){

                        System.out.println("事情做完了");

                    }

                } catch (Exception e) {

                    e.printStackTrace();

                } finally {

                    lock1.unlock();

                   

                }

            }

            lock2.unlock();

        }

      }

}

 

1.5 訊號量 Semaphore

Semaphore是一種基於計數的訊號量。它可以設定一個閾值,基於此,多個執行緒競爭獲取許可訊號,做自己的申請後歸還,超過閾值後,執行緒申請許可訊號將會被阻塞。Semaphore可以用來構建一些物件池,資源池之類的,比如資料庫連線池,我們也可以建立計數為1的Semaphore,將其作為一種類似互斥鎖的機制,這也叫二元訊號量,表示兩種互斥狀態,這個類似於作業系統概念中的pv操作,如果不放訊號量就不能做PV操作。它的用法如下:

availablePermits函式用來獲取當前可用的資源數量

wc.acquire(); //申請資源

wc.release();// 釋放資源

 

舉個例子

需求: 一個廁所只有3個坑位,但是有10個人來上廁所,那怎麼辦?假設10的人的編號分別為1-10,並且1號先到廁所,10號最後到廁所。那麼1-3號來的時候必然有可用坑位,順利如廁,4號來的時候需要看看前面3人是否有人出來了,如果有人出來,進去,否則等待。同樣的道理,4-10號也需要等待正在上廁所的人出來後才能進去,並且誰先進去這得看等待的人是否有素質,是否能遵守先來先上的規則。

程式碼:

public class Semaphones implements Runnable {

   

     private Semaphore semaphore;

     public Semaphones(Semaphore semaphore) {

         this.semaphore=semaphore;

     }

   

    @Override

public void run() {

       

    try{

            // 剩下的資源(剩下的茅坑)

    int availablePermits = semaphore.availablePermits();

        if (availablePermits > 0) {

            System.out.println(Thread.currentThread().getName()+"天助我也,終於有茅坑了...");

        } else {

                System.out.println(Thread.currentThread().getName()+"怎麼沒有茅坑了...");

        }

            //申請茅坑 如果資源達到3次,就等待

            semaphore.acquire();

            System.out.println(Thread.currentThread().getName()+"終於輪我上廁所了..爽啊");

               Thread.sleep(new Random().nextInt(1000)); // 模擬上廁所時間。

            System.out.println(Thread.currentThread().getName()+"廁所上完了...");

            semaphore.release();

        }catch(Exception e){

            e.printStackTrace();   

        }

    }

   

    public static void main(String[] args) {

        //三個茅坑

        Semaphore semaphore=new Semaphore(3);

        ExecutorService executorService=Executors.newFixedThreadPool(10);

        for(int i=0;i<10;i++){

            executorService.execute(new Semaphones(semaphore));

        }

    }

}

執行結果

這就是訊號量的作用,它可以限定資源的數量, 資源用完以後等待,直到下一波釋放資源以後再進行資源的搶佔。類似於作業系統的PV操作。

1.6 讀寫鎖 ReentrantReadWriteLock 

         讀寫分離可以有效的減少鎖的競爭,以提升系統的效能。用鎖分離機制來提升系統的效能非常容易理解。比如執行緒A1,A2,A3進行寫操作。執行緒B1,B2,B3進行讀操作。如果使用重入鎖或者內部鎖,理論上不管是讀寫,寫讀,讀讀都是序列操作的。然而往往讀資料是不會破壞資料一致性的。所以讀寫所提供了以上的訪問優勢。

這在讀操作明顯多於寫操作,並且讀操作比較耗時的場景下,可以明顯提高讀寫效能。

 

public class LockConditions {

    private static  String name;

    private static String sex;

    private static ReentrantReadWriteLock  readAndWritelock=new ReentrantReadWriteLock();

    private static Lock readLock= readAndWritelock.readLock();

    private static Lock wirtLock=readAndWritelock.writeLock();

    static boolean flag=false;

   

   /**

    * 寫執行緒

    * @author Administrator

    */

  static class WriteThread extends Thread{  

     

      private Lock lock=null;

      private int num=0;

      public WriteThread(Lock lock,int num){

          this.lock=lock;

          this.num=num;

      }

      public WriteThread(){}

      public void run(){

              try {

                  lock.lock();

                     Thread.sleep(2000);

                      if(num%2==1){

                           name="張三";

                           sex="男";

                       }else{

                           name="李四";

                           sex="女";

                       } 

                }catch(Exception lock){

                    lock.printStackTrace();

                }finally {

                    lock.unlock();

                }

            }

      }

  

  

  

   /**

    * 讀執行緒

    * @author Administrator

    *

    */

  static class ReadThread extends Thread{

     

       private Lock lock=null;

       public ReadThread(Lock lock){

           this.lock=lock;

       }

       public void run(){

                try {

                    lock.lock();

                    Thread.sleep(3000);

                    System.out.println(name+":"+sex);

                } catch (Exception e) {

                    e.printStackTrace();

                }finally{

                    lock.unlock();

                }

       }

   }

     public static void main(String[] args) {

        ExecutorService executorService=Executors.newFixedThreadPool(10);

        for(int i=0;i<10;i++){

                executorService.execute(new WriteThread(wirtLock,i));

            }

        for(int i=0;i<100;i++){

            executorService.execute(new ReadThread(readLock));

            }

       

            executorService.shutdown();

    }

}

 

如上程式碼段所示,明顯可以看出讀操作的次數,明顯多於寫操作的次數,而且讀操作又比較耗時,此時採用讀寫鎖的分離技術就可以明顯的提高系統性能。

1.7 計數器 CountDownLatch

CountDownLatch類位於java.util.concurrent包下,利用它可以實現類似計數器的功能。比如有一個任務A,它要等待其他4個任務執行完畢之後才能執行,此時就可以利用CountDownLatch來實現這種功能了。中文界把它叫做倒計時門閂。

原理圖

如上圖所示我們要等待所有的子執行緒完成所有的任務以後主執行緒才開始執行,它的程式碼類似於寫了多個執行緒,依次使用了join()方法進行了等待操作。程式碼示例

public class CountDownLocach implements Runnable {

    private static CountDownLatch countDownLatch;

    public CountDownLocach(CountDownLatch countDownLatch) {

        this.countDownLatch=countDownLatch;

    }

    @Override

    public void run() {

        try {

            Thread.sleep(1000);

        System.out.println(Thread.currentThread().getName()+":"+"執行完畢");

            countDownLatch.countDown();

    } catch (InterruptedException e) {

            // TODO Auto-generated catch block

            e.printStackTrace();

    }

       

    }


    public static void main(String[] args) {

        ExecutorService executorService=Executors.newFixedThreadPool(10);

        CountDownLatch countDownLatch=new CountDownLatch(10);

        for(int i=0;i<10;i++){

            executorService.execute(new CountDownLocach(countDownLatch));

        }

        try {

            countDownLatch.await();

          System.out.println("所有執行緒執行完畢了 可以執行主執行緒了");

        } catch (InterruptedException e) {

            // TODO Auto-generated catch block

            e.printStackTrace();

        }

    }

}

 

執行結果

1.8 迴圈柵欄 CyclicBarrier

CyclicBarrier初始化時規定一個數目,然後計算呼叫了CyclicBarrier.await()進入等待的執行緒數。當執行緒數達到了這個數目時,所有進入等待狀態的執行緒被喚醒並繼續。 

 CyclicBarrier就象它名字的意思一樣,可看成是個障礙, 所有的執行緒必須到齊後才能一起通過這個障礙。 CyclicBarrier初始時還可帶一個Runnable的引數, 此Runnable任務在CyclicBarrier的數目達到後,所有其它執行緒被喚醒前被執行。

迴圈欄杆的工作原理如上圖所示,他可以多次控制訊號量實際上類似於CountDownLatch

這樣的話他可以等待集合完成以後,在開始幹某一件事情,對於有序的併發來說也算是一個不錯的選擇。

public class CyclicBarrireA implements Runnable{

    private CyclicBarrier  cyclicBarriter;

    private static boolean flag=true;

   

    public CyclicBarrireA(CyclicBarrier  cyclicBarriter){

    this.cyclicBarriter=cyclicBarriter;

    }


    @Override

    public void run() {

            doGather();

            try {

                cyclicBarriter.await();

            } catch (InterruptedException e) {

                // TODO Auto-generated catch block

                e.printStackTrace();

            } catch (BrokenBarrierException e) {

                // TODO Auto-generated catch block

                e.printStackTrace();

            }

            if(flag){

                System.out.println("集合完成");

                flag=false;

            }

            doWork();

            try {

                cyclicBarriter.await();

            } catch (InterruptedException e) {

                // TODO Auto-generated catch block

                e.printStackTrace();

            } catch (BrokenBarrierException e) {

                // TODO Auto-generated catch block

                e.printStackTrace();

            }

    }

   

   

    public void doGather(){

        System.out.println(Thread.currentThread().getName()+":"+"集合");

    }

   

    public void doWork(){

        System.out.println(Thread.currentThread().getName()+":"+"幹活");

    }

   

    public static void main(String[] args) throws InterruptedException, BrokenBarrierException {

        ExecutorService executorService=Executors.newFixedThreadPool(10);

        CyclicBarrier cyclicBarriter=new CyclicBarrier(10);

        for(int i=0;i<10;i++){

            executorService.execute(new CyclicBarrireA(cyclicBarriter));

        }

        executorService.shutdown();

    }

}

上圖是執行結果,由上圖可以看出,它可以線上程裡面控制所有的執行緒進行同步等待。

二 執行緒池

2.1 執行緒池與優缺點

          為了避免頻繁的創建於銷燬執行緒。我們可以讓執行緒進行服用。如果大家進行過資料庫開發。對資料庫的連線池並不會陌生。為了避免頻繁的創建於銷燬資料庫連線。我們可以使用資料庫連線池來維護資料庫的連線,使其處於一個被啟用的狀態。當需要連線資料庫時,不是建立一個數據庫連線,而是在連線池中獲取一個已經開啟的資料庫連線。執行緒池也是這樣的例子,當你用完執行緒以後不是直接關閉它,而是把這個執行緒放回到執行緒池中。

 

執行緒池的優點

第一:降低源消耗。通重複利用已建的程降低建和造成的消耗。
第二:提高響速度。當任到達,任可以不需要等到建就能立即行。
第三:提高程的可管理性程是稀缺源,如果無限制地建,不會消耗系統資源,
會降低系定性,使用程池可以一分配、調優控。但是,要做到合理利用程池,必須對實現原理了如指掌。

 

2.2 執行緒池原理

就是如上圖所示的概念,在java中 jdk為我們提供了執行緒池的實現。我們可以直接呼叫,或者進行改造。

如上圖所示是執行緒池的模型。

Executor框架的最頂層實現是ThreadPoolExecutor類,Executors工廠類中提供的newScheduledThreadPool、newFixedThreadPool、newCachedThreadPool方法其實也只是ThreadPoolExecutor的建構函式引數不同而已。通過傳入不同的引數,就可以構造出適用於不同應用場景下的執行緒池,那麼它的底層原理是怎樣實現的呢,這篇就來介紹下ThreadPoolExecutor執行緒池的執行過程。

 

corePoolSize: 核心池的大小。 當有任務來之後,就會建立一個執行緒去執行任務,當執行緒池中的執行緒數目達到corePoolSize後,就會把到達的任務放到快取隊列當中

maximumPoolSize: 執行緒池最大執行緒數,它表示線上程池中最多能建立多少個執行緒;

keepAliveTime: 表示執行緒沒有任務執行時最多保持多久時間會終止。

unit: 引數keepAliveTime的時間單位,有7種取值,在TimeUnit類中有7種靜態屬性

 

如上圖所示

1、判斷執行緒池裡的核心執行緒是否都在執行任務,如果不是(核心執行緒空閒或者還有核心執行緒沒有被建立)則建立一個新的工作執行緒來執行任務。如果核心執行緒都在執行任務,則進入下個流程。

2、執行緒池判斷工作佇列是否已滿,如果工作佇列沒有滿,則將新提交的任務儲存在這個工作佇列裡。如果工作佇列滿了,則進入下個流程。

3、判斷執行緒池裡的執行緒是否都處於工作狀態,如果沒有,則建立一個新的工作執行緒來執行任務。如果已經滿了,則交給飽和策略來處理這個任務。

 

2.3 執行緒池分類

Java通過Executors(jdk1.5併發包)提供四種執行緒池,分別為:
newCachedThreadPool建立一個可快取執行緒池,如果執行緒池長度超過處理需要,可靈活回收空閒執行緒,若無可回收,則新建執行緒

newFixedThreadPool 建立一個定長執行緒池,可控制執行緒最大併發數,超出的執行緒會在佇列中等待。
newScheduledThreadPool 建立一個定長執行緒池,支援定時及週期性任務執行。
newSingleThreadExecutor 建立一個單執行緒化的執行緒池,它只會用唯一的工作執行緒來執行任務,保證所有任務按照指定順序(FIFO, LIFO, 優先順序)執行。

2.3.1 newCachedThreadPool

建立一個可快取執行緒池,如果執行緒池長度超過處理需要,可靈活回收空閒執行緒,若無可回收,則新建執行緒。示例程式碼如下:

       // 無限大小執行緒池 jvm自動回收

ExecutorService newCachedThreadPool = Executors.newCachedThreadPool();

        for (int i = 0; i < 10; i++) {

            final int temp = i;

            newCachedThreadPool.execute(new Runnable() {

                @Override

                public void run() {

                    try {

                        Thread.sleep(100);

                    } catch (Exception e) {

                    }

                    System.out.println(Thread.currentThread().getName() + ",i:" + temp);

                }

            });

        }

 

總結: 執行緒池為無限大,當執行第二個任務時第一個任務已經完成,會複用執行第一個任務的執行緒,而不用每次新建執行緒。

2.3.2 newFixedThreadPool

建立一個定長執行緒池,可控制執行緒最大併發數,超出的執行緒會在佇列中等待。示例程式碼如下:

ExecutorService newFixedThreadPool = Executors.newFixedThreadPool(5);

         for (int i = 0; i < 10; i++) {

              final int temp = i;

              newFixedThreadPool.execute(new Runnable() {

                   @Override

                   public void run() {

                       System.out.println(Thread.currentThread().getId() + ",i:" + temp);

                   }

              });

         }

總結:因為執行緒池大小為3,每個任務輸出index後sleep 2秒,所以每兩秒列印3個數字。

定長執行緒池的大小最好根據系統資源進行設定。如Runtime.getRuntime().availableProcessors()

2.3.3 newScheduledThreadPool

建立一個定長執行緒池,支援定時及週期性任務執行。延遲執行示例程式碼如下:

ScheduledExecutorService newScheduledThreadPool = Executors.newScheduledThreadPool(5);

         for (int i = 0; i < 10; i++) {

              final int temp = i;

              newScheduledThreadPool.schedule(new Runnable() {

                   public void run() {

                       System.out.println("i:" + temp);

                   }

              }, 3, TimeUnit.SECONDS);

}

2.3.4 newSingleThreadExecutor

建立一個單執行緒化的執行緒池,它只會用唯一的工作執行緒來執行任務,保證所有任務按照指定順序(FIFO, LIFO, 優先順序)執行。示例程式碼如下:

    ExecutorService newSingleThreadExecutor = Executors.newSingleThreadExecutor();

         for (int i = 0; i < 10; i++) {

              final int index = i;

              newSingleThreadExecutor.execute(new Runnable() {

                   @Override

                   public void run() {

                       System.out.println("index:" + index);

                       try {

                            Thread.sleep(200);

                       } catch (Exception e) {}

                   }

              });

         }

注意: 結果依次輸出,相當於順序執行各個任務。

如上程式碼所示就是執行緒池的使用程式碼,如上我建立了一個固定有十個工作執行緒的執行緒池。

 

 

三 併發容器

3.1  concurrentHashMap

     是一個執行緒安全的hashMap,它可以解決多執行緒環境下,hashmap執行緒不安全的問題。解決hashmap執行緒不安全的問題的方式當然還有Collections.synchronizedMap(new HashMap())這種方式也可以實現hashMap的同步。然而concurrentHashMap是一個更高效的,併發的hashmap

如上是concurrentHashMap的資料結構,可以看出concurrentHashMap室友16個segment組成的。每個segment屬於自己的分段鎖。也就是說concurrentHashMap最大支援16個併發的執行緒進行寫操作。如果簡單的理解的話我們可以說hashtable是執行緒安全的。而concurrentHashMap是有16個小的hashtable構成的。不過實際上不是這樣的。每個segment

對應一張表。這張表又是一個HashBucket與多個HashEntry組成的。也就是說每個segment對應一個通過連結串列儲存解決衝突的雜湊表。

至於什麼是Hash表,還有衝突,請讀者自行翻看《資料結構的書》解決衝突的方式,包括雜湊,再雜湊,鏈地址法。而這裡所採用的方法就是鏈地址法。所以基於這樣的分段鎖的機制可以有效的保證資料的一致性與併發訪問的效率。

 

3.2  CopyOnWriteArrayList

這個集合已經自動實現了讀寫鎖的功能了,正如我們前在講解ReentrantReadWriteLock

的時候已經提到過的,加鎖的方式不應該影響讀與讀的操作,下面這個集合就是專門為讀寫操作設定的不需要手動加鎖的執行緒安全的集合類。

public class CopyOnWriteList implements Runnable{

    private List list=null;

    private ConcurrentLinkedQueue concurrentLinkedQueue=null;

    private CountDownLatch countdown=null;

    private CopyOnWriteArrayList copyOnWriteArrayList=null;

    private int num;

   

    public CopyOnWriteList(CopyOnWriteArrayList copyOnWriteArrayList,List list,ConcurrentLinkedQueue linkedQueue,CountDownLatch countdown,int num) {

        this.list=list;

        this.concurrentLinkedQueue=linkedQueue;

        this.countdown=countdown;

        this.num=num;

        this.copyOnWriteArrayList=copyOnWriteArrayList;

    }

 

    @Override

    public void run() {

        try {

           if(num==0){

               copyOnWriteArrayList.add("1");

           }else{

               Object peek = copyOnWriteArrayList.get(0);

               new Random().nextInt();

           }

           countdown.countDown();

        } catch (Exception e) {

            e.printStackTrace();

        }

       

    }

    @SuppressWarnings({ "rawtypes", "rawtypes" })

    public static void main(String[] args) throws InterruptedException {

         //執行時間:=74 li size:=30

         List list=Collections.synchronizedList(new ArrayList());

         //執行時間:=71 li size:=30

         ConcurrentLinkedQueue concurrentLinkedQueue=new ConcurrentLinkedQueue();

         ExecutorService executorService=Executors.newFixedThreadPool(10);

         CountDownLatch countdown=new CountDownLatch(100030);

         //執行時間:=64 li size:=30

         CopyOnWriteArrayList copyOnWriteArrayList=new CopyOnWriteArrayList();

         long timeA = System.currentTimeMillis();

         for(int i=0;i<30;i++){

             executorService.execute(new CopyOnWriteList(copyOnWriteArrayList,list, concurrentLinkedQueue,countdown,0));

         }

         

         for(int i=0;i<100000;i++){

             executorService.execute(new CopyOnWriteList(copyOnWriteArrayList,list, concurrentLinkedQueue,countdown,1));

         }

         countdown.await();

         long timeB = System.currentTimeMillis();

         //System.out.println("執行時間:="+(timeB-timeA)+" li size:="+list.size());

         System.out.println("執行時間:="+(timeB-timeA)+" li size:="+copyOnWriteArrayList.size());

         //long timeC = System.currentTimeMillis();

        // System.out.println("執行時間:="+(timeC-timeB)+" li size:="+concurrentLinkedQueue.size());

         executorService.shutdown();

    }

}

如上程式碼所示,不同的集合在讀寫比較大的情況下的使用情況,由於執行緒數還相對比較小,所以呢就只能看出微小的效能差異。

3.3  concurrentlinkedqueue

同concurrentHashMap類似,這是一個支援併發且執行緒安全的連結串列,它的底層使用了CAS無鎖操作。我們都知道ArrayList是不支援執行緒安全的。Collections.synchronizedList(new ArrayList())通過集合工具類我們可以實現ArrayList的執行緒安全。

public class Dbss implements Runnable{

    private List list=null;

    private ConcurrentLinkedQueue concurrentLinkedQueue=null;

    private CountDownLatch countdown=null;

   

    public Dbss(List list,ConcurrentLinkedQueue linkedQueue,CountDownLatch countdown) {

        this.list=list;

        this.concurrentLinkedQueue=linkedQueue;

        this.countdown=countdown;

    }

 

    @Override

    public void run() {

        try {

            for(int i=0;i<1000000;i++){

                concurrentLinkedQueue.add(i);

            }

            countdown.countDown();

        } catch (Exception e) {

            e.printStackTrace();

        }

       

    }

    @SuppressWarnings({ "rawtypes", "rawtypes" })

    public static void main(String[] args) throws InterruptedException {

         //執行時間:=2886 li size:=10000000

         List list=Collections.synchronizedList(new ArrayList());

         //執行時間:=1446 li size:=10000000

         ConcurrentLinkedQueue concurrentLinkedQueue=new ConcurrentLinkedQueue();

         ExecutorService executorService=Executors.newFixedThreadPool(10);

         CountDownLatch countdown=new CountDownLatch(10);

         long timeA = System.currentTimeMillis();

         for(int i=0;i<10;i++){

             executorService.execute(new Dbss(list, concurrentLinkedQueue,countdown));

         }

         countdown.await();

         long timeB = System.currentTimeMillis();

         //System.out.println("執行時間:="+(timeB-timeA)+" li size:="+list.size());

         System.out.println("執行時間:="+(timeB-timeA)+" li size:="+concurrentLinkedQueue.size());

         executorService.shutdown();

    }

}

如上所示程式碼可以看出concurrentLinkedQueue的程式碼所示,效能明顯可以快了近一倍,但當我們使用concurrentLinkedQueue.size()的時候,因為這個要遍歷整個集合,所以這個方法的效能不會特別好。所以這裡在判斷集合為空的時候,儘量用你isempty而不要用size()==0這樣不會導致效能問題。concurrentLinkedQueue效能好,但是concurrentLinkedQueue所需要的記憶體空間也比較大。所以這裡如果在計算次數比較多的情況下,要分配給足額的jvm堆記憶體。

3.4  Blockingqueue

public class ConcurrentCalculator {

    private static BlockingDeque<Msg>   addQ=new LinkedBlockingDeque<ConcurrentCalculator.Msg>();

    private static BlockingDeque<Msg>   mulQ=new LinkedBlockingDeque<ConcurrentCalculator.Msg>();

    private static BlockingDeque<Msg>   divQ=new LinkedBlockingDeque<ConcurrentCalculator.Msg>();

   

    static class AddThread implements Runnable{

        @Override

        public void run() {

            while(true){

                try {

                    Msg take = addQ.take();

                    int i = take.getI();

                    int j = take.getJ();

                    take.setJ(i+j);

                    mulQ.add(take);

                } catch (InterruptedException e) {

                    e.printStackTrace();

                }

            }

        }

    }

   

    static class Multiple implements Runnable{

        @Override

        public void run() {

            while(true){

                try {

                    Msg take = mulQ.take();

                    int j = take.getJ();

                    take.setJ(j*10);

                    divQ.add(take);

                } catch (InterruptedException e) {

                    e.printStackTrace();

                }

            }

        }

    }

   

    static class DivDid implements Runnable{

        @Override

        public void run() {

            while(true){

                Msg take;

                try {

                    take = divQ.take();

                    int j = take.getJ();

                    take.setJ(j/5);

                    long sxs=take.getI()+take.getJ();

                    System.out.println(take.getObj()+" "+sxs);

                } catch (InterruptedException e) {

                    e.printStackTrace();

                }

            }

        }

    }


   static class Msg{

    private int i;

    private int j;

    private String obj=null;

        public int getI() {

            return i;

        }

        public void setI(int i) {

            this.i = i;

        }

        public int getJ() {

            return j;

        }

        public void setJ(int j) {

            this.j = j;

        }

        public String getObj() {

            return obj;

        }

        public void setObj(String obj) {

            this.obj = obj;

        }

    }

   

    public static void main(String[] args) {

        new Thread(new AddThread()).start();

        new Thread(new Multiple()).start();

        new Thread(new DivDid()).start();

       

        for(int i=0;i<1000;i++){

            for(int j=0;j<1000;j++){

                Msg msg=new Msg();

                msg.setI(i);

                msg.setJ(j);

                msg.setObj("i:="+i+" j:="+j);

                addQ.add(msg);

            }

        }

   

    }

}

Blockingqueue是一個介面,它名下有很多的阻塞佇列。

他的工作模式就是當佇列為空的時候消費執行緒等待,當佇列滿的時候寫入執行緒等待。如上程式碼段是一個平行計算的程式碼塊,通過blockingqueue實現了。流水線的平行計算,是一個通過阻塞佇列與多執行緒實現,把順序執行的步奏並行化的例子。一般來說阻塞佇列非常適合於生產者,消費者模式的執行緒。比如小區的物業管理與使用者的意見的意見箱,比如我們的MQ作為系統之間通訊的阻塞佇列。通過中介軟體就可以做到系統之間能夠平滑升級。

3.5  concurrentskiplistmap

Concurrentskiplistmap的底層實現是通過調錶的方式,底層通過CAS與微粒度鎖來解決併發的執行緒安全問題。調錶結構的特點就是查詢效能快,有序。如果需要儲存鍵值對而且還要保證鍵的有序性,那麼調錶就是不二的選擇。

 

public class ConcurrentSki implements Runnable{

    private ConcurrentSkipListMap concurrentSkipListMap=null;

    private CountDownLatch countdown=null;

    public ConcurrentSki(ConcurrentSkipListMap concurrentSkipListMap,CountDownLatch countdown){

        this.concurrentSkipListMap=concurrentSkipListMap;

        this.countdown=countdown;

    }

   

    @Override

    public void run() {

        this.concurrentSkipListMap.put(Thread.currentThread().getName(), "fghjk");

        countdown.countDown();

    }

   

    public static void main(String[] args) throws InterruptedException {

        CountDownLatch countdown=new CountDownLatch(10000);

        ConcurrentSkipListMap concurrentSkipListMap=new ConcurrentSkipListMap();

        ExecutorService executorService=Executors.newFixedThreadPool(100);

        for(int i=0;i<100;i++){

            executorService.execute(new ConcurrentSki(concurrentSkipListMap,countdown));

        }

        countdown.await();

        Set entrySet = concurrentSkipListMap.entrySet();

        Iterator iterator = entrySet.iterator();

        while(iterator.hasNext()){

            Entry<Integer, Integer> next = (Entry<Integer, Integer>) iterator.next();

            System.out.println("key:="+next.getKey()+" value"+next.getValue());

        }

    }

}

如上程式碼所示,可以看出這個結果是一個按照key排序後的結果,而且也並沒有出現執行緒不安全的問題。效能也得到了提高。