1. 程式人生 > >Java多執行緒詳解

Java多執行緒詳解

如果對什麼是執行緒、什麼是程序仍存有疑惑,請先Google之,因為這兩個概念不在本文的範圍之內。

用多執行緒只有一個目的,那就是更好的利用cpu的資源,因為所有的多執行緒程式碼都可以用單執行緒來實現。說這個話其實只有一半對,因為反應“多角色”的程式程式碼,最起碼每個角色要給他一個執行緒吧,否則連實際場景都無法模擬,當然也沒法說能用單執行緒來實現:比如最常見的“生產者,消費者模型”。

很多人都對其中的一些概念不夠明確,如同步、併發等等,讓我們先建立一個數據字典,以免產生誤會。

  • 多執行緒:指的是這個程式(一個程序)執行時產生了不止一個執行緒
  • 並行與併發:
    • 並行:多個cpu例項或者多臺機器同時執行一段處理邏輯,是真正的同時。
    • 併發:通過cpu排程演算法,讓使用者看上去同時執行,實際上從cpu操作層面不是真正的同時。併發往往在場景中有公用的資源,那麼針對這個公用的資源往往產生瓶頸,我們會用TPS或者QPS來反應這個系統的處理能力。

併發與並行
  • 執行緒安全:經常用來描繪一段程式碼。指在併發的情況之下,該程式碼經過多執行緒使用,執行緒的排程順序不影響任何結果。這個時候使用多執行緒,我們只需要關注系統的記憶體,cpu是不是夠用即可。反過來,執行緒不安全就意味著執行緒的排程順序會影響最終結果,如不加事務的轉賬程式碼:
    void transferMoney(User from, User to, float amount){
      to.setMoney(to.getBalance() 
    + amount); from.setMoney(from.getBalance() - amount); }
  • 同步:Java中的同步指的是通過人為的控制和排程,保證共享資源的多執行緒訪問成為執行緒安全,來保證結果的準確。如上面的程式碼簡單加入@synchronized關鍵字。在保證結果準確的同時,提高效能,才是優秀的程式。執行緒安全的優先順序高於效能。

好了,讓我們開始吧。我準備分成幾部分來總結涉及到多執行緒的內容:

  1. 紮好馬步:執行緒的狀態
  2. 內功心法:每個物件都有的方法(機制)
  3. 太祖長拳:基本執行緒類
  4. 九陰真經:高階多執行緒控制類

紮好馬步:執行緒的狀態

先來兩張圖:


執行緒狀態
執行緒狀態轉換


各種狀態一目瞭然,值得一提的是"blocked"這個狀態:
執行緒在Running的過程中可能會遇到阻塞(Blocked)情況

  1. 呼叫join()和sleep()方法,sleep()時間結束或被打斷,join()中斷,IO完成都會回到Runnable狀態,等待JVM的排程。
  2. 呼叫wait(),使該執行緒處於等待池(wait blocked pool),直到notify()/notifyAll(),執行緒被喚醒被放到鎖定池(lock blocked pool ),釋放同步鎖使執行緒回到可執行狀態(Runnable)
  3. 對Running狀態的執行緒加同步鎖(Synchronized)使其進入(lock blocked pool ),同步鎖被釋放進入可執行狀態(Runnable)。

此外,在runnable狀態的執行緒是處於被排程的執行緒,此時的排程順序是不一定的。Thread類中的yield方法可以讓一個running狀態的執行緒轉入runnable。

內功心法:每個物件都有的方法(機制)

synchronized, wait, notify 是任何物件都具有的同步工具。讓我們先來了解他們


monitor


他們是應用於同步問題的人工執行緒排程工具。講其本質,首先就要明確monitor的概念,Java中的每個物件都有一個監視器,來監測併發程式碼的重入。在非多執行緒編碼時該監視器不發揮作用,反之如果在synchronized 範圍內,監視器發揮作用。

wait/notify必須存在於synchronized塊中。並且,這三個關鍵字針對的是同一個監視器(某物件的監視器)。這意味著wait之後,其他執行緒可以進入同步塊執行。

當某程式碼並不持有監視器的使用權時(如圖中5的狀態,即脫離同步塊)去wait或notify,會丟擲java.lang.IllegalMonitorStateException。也包括在synchronized塊中去呼叫另一個物件的wait/notify,因為不同物件的監視器不同,同樣會丟擲此異常。

再講用法:

  • synchronized單獨使用:
    • 程式碼塊:如下,在多執行緒環境下,synchronized塊中的方法獲取了lock例項的monitor,如果例項相同,那麼只有一個執行緒能執行該塊內容複製程式碼
      public class Thread1 implements Runnable {
         Object lock;
         public void run() {  
             synchronized(lock){
               ..do something
             }
         }
      }
      複製程式碼
    • 直接用於方法: 相當於上面程式碼中用lock來鎖定的效果,實際獲取的是Thread1類的monitor。更進一步,如果修飾的是static方法,則鎖定該類所有例項。
      public class Thread1 implements Runnable {
         public synchronized void run() {  
              ..do something
         }
      }
  • synchronized, wait, notify結合:典型場景生產者消費者問題

    複製程式碼
    /**
       * 生產者生產出來的產品交給店員
       */
      public synchronized void produce()
      {
          if(this.product >= MAX_PRODUCT)
          {
              try
              {
                  wait();  
                  System.out.println("產品已滿,請稍候再生產");
              }
              catch(InterruptedException e)
              {
                  e.printStackTrace();
              }
              return;
          }
    
          this.product++;
          System.out.println("生產者生產第" + this.product + "個產品.");
          notifyAll();   //通知等待區的消費者可以取出產品了
      }
    
      /**
       * 消費者從店員取產品
       */
      public synchronized void consume()
      {
          if(this.product <= MIN_PRODUCT)
          {
              try 
              {
                  wait(); 
                  System.out.println("缺貨,稍候再取");
              } 
              catch (InterruptedException e) 
              {
                  e.printStackTrace();
              }
              return;
          }
    
          System.out.println("消費者取走了第" + this.product + "個產品.");
          this.product--;
          notifyAll();   //通知等待去的生產者可以生產產品了
      }
    複製程式碼
    volatile

    多執行緒的記憶體模型:main memory(主存)、working memory(執行緒棧),在處理資料時,執行緒會把值從主存load到本地棧,完成操作後再save回去(volatile關鍵詞的作用:每次針對該變數的操作都激發一次load and save)。


volatile

針對多執行緒使用的變數如果不是volatile或者final修飾的,很有可能產生不可預知的結果(另一個執行緒修改了這個值,但是之後在某執行緒看到的是修改之前的值)。其實道理上講同一例項的同一屬性本身只有一個副本。但是多執行緒是會快取值的,本質上,volatile就是不去快取,直接取值。線上程安全的情況下加volatile會犧牲效能。

太祖長拳:基本執行緒類

基本執行緒類指的是Thread類,Runnable介面,Callable介面
Thread 類實現了Runnable介面,啟動一個執行緒的方法:

 MyThread my = new MyThread();
  my.start();

Thread類相關方法:

複製程式碼
//當前執行緒可轉讓cpu控制權,讓別的就緒狀態執行緒執行(切換)
public static Thread.yield() 
//暫停一段時間
public static Thread.sleep()  
//在一個執行緒中呼叫other.join(),將等待other執行完後才繼續本執行緒。    
public join()
//後兩個函式皆可以被打斷
public interrupte()
複製程式碼

關於中斷:它並不像stop方法那樣會中斷一個正在執行的執行緒。執行緒會不時地檢測中斷標識位,以判斷執行緒是否應該被中斷(中斷標識值是否為true)。終端只會影響到wait狀態、sleep狀態和join狀態。被打斷的執行緒會丟擲InterruptedException。
Thread.interrupted()檢查當前執行緒是否發生中斷,返回boolean
synchronized在獲鎖的過程中是不能被中斷的。

中斷是一個狀態!interrupt()方法只是將這個狀態置為true而已。所以說正常執行的程式不去檢測狀態,就不會終止,而wait等阻塞方法會去檢查並丟擲異常。如果在正常執行的程式中新增while(!Thread.interrupted()) ,則同樣可以在中斷後離開程式碼體

Thread類最佳實踐
寫的時候最好要設定執行緒名稱 Thread.name,並設定執行緒組 ThreadGroup,目的是方便管理。在出現問題的時候,列印執行緒棧 (jstack -pid) 一眼就可以看出是哪個執行緒出的問題,這個執行緒是幹什麼的。

如何獲取執行緒中的異常


不能用try,catch來獲取執行緒中的異常

Runnable

與Thread類似

Callable

future模式:併發模式的一種,可以有兩種形式,即無阻塞和阻塞,分別是isDone和get。其中Future物件用來存放該執行緒的返回值以及狀態

ExecutorService e = Executors.newFixedThreadPool(3);
 //submit方法有多重引數版本,及支援callable也能夠支援runnable介面型別.
Future future = e.submit(new myCallable());
future.isDone() //return true,false 無阻塞
future.get() // return 返回值,阻塞直到該執行緒執行結束

九陰真經:高階多執行緒控制類

以上都屬於內功心法,接下來是實際專案中常用到的工具了,Java1.5提供了一個非常高效實用的多執行緒包:java.util.concurrent, 提供了大量高階工具,可以幫助開發者編寫高效、易維護、結構清晰的Java多執行緒程式。

1.ThreadLocal類

用處:儲存執行緒的獨立變數。對一個執行緒類(繼承自Thread)
當使用ThreadLocal維護變數時,ThreadLocal為每個使用該變數的執行緒提供獨立的變數副本,所以每一個執行緒都可以獨立地改變自己的副本,而不會影響其它執行緒所對應的副本。常用於使用者登入控制,如記錄session資訊。

實現:每個Thread都持有一個TreadLocalMap型別的變數(該類是一個輕量級的Map,功能與map一樣,區別是桶裡放的是entry而不是entry的連結串列。功能還是一個map。)以本身為key,以目標為value。
主要方法是get()和set(T a),set之後在map裡維護一個threadLocal -> a,get時將a返回。ThreadLocal是一個特殊的容器。

2.原子類(AtomicInteger、AtomicBoolean……)

如果使用atomic wrapper class如atomicInteger,或者使用自己保證原子的操作,則等同於synchronized

//返回值為boolean
AtomicInteger.compareAndSet(int expect,int update)

該方法可用於實現樂觀鎖,考慮文中最初提到的如下場景:a給b付款10元,a扣了10元,b要加10元。此時c給b2元,但是b的加十元程式碼約為:

if(b.value.compareAndSet(old, value)){
   return ;
}else{
   //try again
   // if that fails, rollback and log
}

AtomicReference
對於AtomicReference 來講,也許物件會出現,屬性丟失的情況,即oldObject == current,但是oldObject.getPropertyA != current.getPropertyA。
這時候,AtomicStampedReference就派上用場了。這也是一個很常用的思路,即加上版本號

3.Lock類 

lock: 在java.util.concurrent包內。共有三個實現:

ReentrantLock
ReentrantReadWriteLock.ReadLock
ReentrantReadWriteLock.WriteLock

主要目的是和synchronized一樣, 兩者都是為了解決同步問題,處理資源爭端而產生的技術。功能類似但有一些區別。

區別如下:

lock更靈活,可以自由定義多把鎖的枷鎖解鎖順序(synchronized要按照先加的後解順序)
提供多種加鎖方案,lock 阻塞式, trylock 無阻塞式, lockInterruptily 可打斷式, 還有trylock的帶超時時間版本。
本質上和監視器鎖(即synchronized是一樣的)
能力越大,責任越大,必須控制好加鎖和解鎖,否則會導致災難。
和Condition類的結合。
效能更高,對比如下圖:

synchronized和Lock效能對比

ReentrantLock    
可重入的意義在於持有鎖的執行緒可以繼續持有,並且要釋放對等的次數後才真正釋放該鎖。
使用方法是:

1.先new一個例項

static ReentrantLock r=new ReentrantLock();
2.加鎖      
r.lock()或r.lockInterruptibly();

此處也是個不同,後者可被打斷。當a執行緒lock後,b執行緒阻塞,此時如果是lockInterruptibly,那麼在呼叫b.interrupt()之後,b執行緒退出阻塞,並放棄對資源的爭搶,進入catch塊。(如果使用後者,必須throw interruptable exception 或catch)    

3.釋放鎖   

r.unlock()

必須做!何為必須做呢,要放在finally裡面。以防止異常跳出了正常流程,導致災難。這裡補充一個小知識點,finally是可以信任的:經過測試,哪怕是發生了OutofMemoryError,finally塊中的語句執行也能夠得到保證。

ReentrantReadWriteLock

可重入讀寫鎖(讀寫鎖的一個實現)

 ReentrantReadWriteLock lock = new ReentrantReadWriteLock()
  ReadLock r = lock.readLock();
  WriteLock w = lock.writeLock();

兩者都有lock,unlock方法。寫寫,寫讀互斥;讀讀不互斥。可以實現併發讀的高效執行緒安全程式碼

4.容器類

這裡就討論比較常用的兩個:

BlockingQueue
ConcurrentHashMap

BlockingQueue
阻塞佇列。該類是java.util.concurrent包下的重要類,通過對Queue的學習可以得知,這個queue是單向佇列,可以在佇列頭新增元素和在隊尾刪除或取出元素。類似於一個管  道,特別適用於先進先出策略的一些應用場景。普通的queue介面主要實現有PriorityQueue(優先佇列),有興趣可以研究

BlockingQueue在佇列的基礎上添加了多執行緒協作的功能:


BlockingQueue


除了傳統的queue功能(表格左邊的兩列)之外,還提供了阻塞介面put和take,帶超時功能的阻塞介面offer和poll。put會在佇列滿的時候阻塞,直到有空間時被喚醒;take在隊 列空的時候阻塞,直到有東西拿的時候才被喚醒。用於生產者-消費者模型尤其好用,堪稱神器。

常見的阻塞佇列有:

ArrayListBlockingQueue
LinkedListBlockingQueue
DelayQueue
SynchronousQueue

ConcurrentHashMap
高效的執行緒安全雜湊map。請對比hashTable , concurrentHashMap, HashMap

5.管理類

管理類的概念比較泛,用於管理執行緒,本身不是多執行緒的,但提供了一些機制來利用上述的工具做一些封裝。
瞭解到的值得一提的管理類:ThreadPoolExecutor和 JMX框架下的系統級管理類 ThreadMXBean
ThreadPoolExecutor
如果不瞭解這個類,應該瞭解前面提到的ExecutorService,開一個自己的執行緒池非常方便:

複製程式碼
ExecutorService e = Executors.newCachedThreadPool();
    ExecutorService e = Executors.newSingleThreadExecutor();
    ExecutorService e = Executors.newFixedThreadPool(3);
    // 第一種是可變大小執行緒池,按照任務數來分配執行緒,
    // 第二種是單執行緒池,相當於FixedThreadPool(1)
    // 第三種是固定大小執行緒池。
    // 然後執行
    e.execute(new MyRunnableImpl());
複製程式碼

該類內部是通過ThreadPoolExecutor實現的,掌握該類有助於理解執行緒池的管理,本質上,他們都是ThreadPoolExecutor類的各種實現版本。請參見javadoc:


ThreadPoolExecutor引數解釋


翻譯一下:

複製程式碼
corePoolSize:池內執行緒初始值與最小值,就算是空閒狀態,也會保持該數量執行緒。
maximumPoolSize:執行緒最大值,執行緒的增長始終不會超過該值。
keepAliveTime:當池內執行緒數高於corePoolSize時,經過多少時間多餘的空閒執行緒才會被回收。回收前處於wait狀態
unit:
時間單位,可以使用TimeUnit的例項,如TimeUnit.MILLISECONDS 
workQueue:待入任務(Runnable)的等待場所,該引數主要影響排程策略,如公平與否,是否產生餓死(starving)
threadFactory:執行緒工廠類,有預設實現,如果有自定義的需要則需要自己實現ThreadFactory介面並作為引數傳入。
複製程式碼