1. 程式人生 > >java併發與多執行緒總結

java併發與多執行緒總結

Java併發總結

標籤(空格分隔): Java

1.多執行緒的優點

  • 資源利用率更好
  • 程式在某些情況下更簡單
  • 程式響應更快

2.建立執行緒

1.實現Runnable介面

new Thread(Runnable).start()

  • 可以避免由於java單繼承帶來的侷限
  • 增強程式的健壯性,程式碼能夠被多個執行緒共享,程式碼和資料是=資料是獨立的
  • 適合多個相同程式程式碼的執行緒區處理同意資源的情況

2.繼承Thread類

new MyThread().start()

注意:啟動執行緒的方式必須是start(),若是直接呼叫Thread.run()程式碼也能執行,但是就變成了普通方法的呼叫了,並沒有啟動執行緒

3.執行緒狀態

1.執行緒狀態介紹

執行緒在一定條件下,狀態會發生變化。執行緒一共有以下幾種狀態:

  • 新建狀態(New):新建立了一個執行緒物件。

  • 就緒狀態(Runnable):執行緒物件建立後,其他執行緒呼叫了該物件的start()方法。該狀態的執行緒位於“可執行執行緒池”中,變得可執行,只等待獲取CPU的使用權。即在就緒狀態的程序除CPU之外,其它的執行所需資源都已全部獲得。

  • 執行狀態(Running):就緒狀態的執行緒獲取了CPU,執行程式程式碼。

  • 阻塞狀態(Blocked):阻塞狀態是執行緒因為某種原因放棄CPU使用權,暫時停止執行。直到執行緒進入就緒狀態,才有機會轉到執行狀態。阻塞的情況分三種:

    • 等待阻塞:執行的執行緒執行wait()方法,該執行緒會釋放佔用的所有資源,JVM會把該執行緒放入“等待池”中。進入這個狀態後,是不能自動喚醒的,必須依靠其他執行緒呼叫notify()或notifyAll()方法才能被喚醒,

    • 同步阻塞:執行的執行緒在獲取物件的同步鎖時,若該同步鎖被別的執行緒佔用,則JVM會把該執行緒放入“鎖池”中。

    • 其他阻塞:執行的執行緒執行sleep()或join()方法,或者發出了I/O請求時,JVM會把該執行緒置為阻塞狀態。當sleep()狀態超時、join()等待執行緒終止或者超時、或者I/O處理完畢時,執行緒重新轉入就緒狀態。

  • 死亡狀態(Dead):執行緒執行完了或者因異常退出了run()方法,該執行緒結束生命週期。

執行緒狀態圖

2.中斷機制

  • 可中斷的阻塞狀態:Thread.sleep(),Object.wait(),Thread.join(),ReenTrantLock.lockInterruptibly()。

  • 不可中斷的阻塞狀態:
    synchronized和I/O阻塞狀態

1.可以通過呼叫Thread物件的interrupt()方法來中斷執行緒,但是此方法只是將中斷標誌設定為true標誌,並不能直接中斷執行緒,若執行interrupt()方法時執行緒處於:

  • 未阻塞狀態,那麼此時阻塞標誌已經設為true,等到下一次阻塞狀態來臨時,就會直接丟擲InterruptedException異常,但是隻會被捕捉到,可以在catch塊內自行return來結束run方法,否則,只是異常被捕捉,執行緒仍然可以繼續往下執行

  • 可中斷的阻塞狀態,執行緒收到中斷訊號後,會立即丟擲InterruptedException, 同時會把中斷狀態置回為false。

  • 不可中斷的阻塞狀態,不丟擲InterruptedException,也不會退出阻塞狀態

2.檢查中斷狀態:

  • 使用 Thread物件的isInterrupted()方法判斷中斷狀態,當呼叫了interrupt(),isInterrupted()返回true,一旦丟擲中斷異常中斷標誌被置為false,isInterrupted()返回false

  • Thread.interrupted()只是靜態方法,只用來判斷當前呼叫它的執行緒的中斷狀態,和Thread物件的isInterrupted不同的是,它在每次呼叫一定會將中斷狀態置為false

4.守護執行緒

Java中有兩類執行緒:

1.使用者執行緒:執行在前臺的執行緒
2.守護執行緒:執行在後臺的執行緒,併為前臺執行緒的執行提供便利服務(比如垃圾回收執行緒),當所有的使用者執行緒都結束了,那麼守護執行緒也會結束,因為被守護者沒有了。因此,不要在守護執行緒中執行業務邏輯操作(比如對資料的讀寫等)。

  • 可以用Thread物件的setDaemon(true)方法設定當前執行緒為守護執行緒,但要在start()之前,否則無效.
  • 在守護執行緒中建立的子執行緒也是守護執行緒
  • 不是所有的應用都可以分配給守護執行緒來進行服務,比如讀寫操作或者計算邏輯
  • 後臺程序在不執行finally子句的情況下就會終止其run()方法

5.同步機制

1.原子性和可見性

  • 原子性不能被執行緒排程器中斷的操作,是不可分割的。由Java記憶體模型來直接保證的原子性變數操作包括read、load、assign、use、store、和write這六個,基本資料型別的訪問讀寫是具備原子性的(long和double)的非原子性協定例外),synchronized塊之間的操作也具備原子性

  • 可見性,Java允許多個執行緒儲存共享成員變數的私有拷貝,等到進行完操作後,再賦值回主存(減少了同主存通訊的次數,提高了執行的速度)。因此執行緒對變數的修改是互相不可見的,在賦值的時候就會發生覆蓋,這樣就引出一個問題-變數可見性因此,當一個執行緒修改了共享變數的值,其他執行緒能夠立即得知這個修改,就稱這個變數具有可見性

2.volatile關鍵字

private volatile boolean value;  

volatile關鍵字具有可見性,被它修飾的變數不能被執行緒拷貝,即直接在主存讀和寫,保證了新值能立即重新整理,每個執行緒時刻看到的都是最新值。因此,保證了多執行緒操作時變數的可見性,而不具有原子性,因為它不會阻塞執行緒,是一種稍弱的同步機制,要使volatile變數提供理想的執行緒安全(同時可見性和原子性),必須同時滿足下面兩個條件,否則要加鎖來保證原子性:

  • 對變數的寫操作不依賴於當前值。例如自增操作就依賴當前值,因為它是一個讀取-修改-寫入操作序列組成的組合操作,必須以原子方式執行,而 volatile 不能提供必須的原子特性
  • 該變數沒有包含在具有其他變數的不變式中。

3.synchronised關鍵字

採用synchronized修飾符實現的同步機制叫做互斥鎖機制,每一個物件都有一個monitor(鎖標記),只能分配給一個執行緒。當執行緒擁有這個鎖標記時才能訪問這個資源,沒有鎖標記便進入鎖池,因此叫做互斥鎖,synchronised同時具有可見性和原子性,原子性是因為鎖內的操作不可分割,可見性因為 入鎖(進入synchronized)會獲取主存中變數的最新值和出鎖(退出synchronized)會將變數的最新值重新整理回主存

1.例項方法的同步與例項方法內的同步塊
例項方法內的synchronized是同步在某個例項物件上。即物件鎖

public class MyClass {
public synchronized void log1(String msg1, String msg2){
        //...
}
public void log2(String msg1, String msg2){
        synchronized(object){
            //...
        }
    }}

2.靜態方法的同步與靜態方法內的同步塊
靜態方法內的synchronized是同步在類物件上,鎖住的是整個類,即類鎖
public class MyClass {

public static synchronized void log1(String msg1, String msg2){
        //...
}
public void log2(String msg1, String msg2){
        synchronized(MyClass.class){
            //...
        }
    }
}

使用同步機制獲取互斥鎖的情況,進行幾點說明:

  • 一旦某個被某個執行緒獲取,那麼其他所有在這個鎖(同一個物件或類)上競爭的執行緒都會阻塞,不管是不是同一個方法或程式碼塊(仔細體會)。以物件鎖為例,假如有三個synchronized方法a,b,c,當執行緒A進入例項物件M中的方法a時,它便獲得了該M物件鎖,其他的執行緒會在M的所有的synchronized方法處阻塞,即在方法 a,b,c 處都要阻塞

  • 物件級別鎖,鎖住的是物件,有上面說的鎖的特性.

  • 類級別鎖,鎖住的是整個類,它用於控制對 static 成員變數以及 static 方法的併發訪問。有上面說的鎖的特性

  • 互斥是實現同步的一種手段,臨界區、互斥量和訊號量都是主要的互斥實現方式。synchronized 關鍵字經過編譯後,會在同步塊的前後分別形成 monitorenter 和 monitorexit這兩個位元組碼指令。根據虛擬機器規範的要求,在執行 monitorenter指令時,首先要嘗試獲取物件的鎖,如果獲得了鎖,把鎖的計數器加 1,相應地,在執行 monitorexit 指令時會將鎖計數器減 1,當計數器為 0 時,鎖便被釋放了。由於synchronized同步塊對同一個執行緒是可重入的,一個執行緒可以多次獲得同一個物件的互斥鎖,要釋放相應次數的該互斥鎖,才能最終釋放掉該鎖。

4.顯示的Lock鎖

  • unlock()需放在finally子句中,try中必須有return,以確保unlock()不會過早的發生。

  • 顯示的Lock物件在加鎖和釋放鎖方面,相比synchronized,還賦予了更細粒度的控制力。

  • Lock物件必須被顯示的建立、鎖定和釋放。相比synchronized,程式碼缺乏優雅性。

  • 在使用Lock鎖時,某些事物失敗丟擲一個異常,可以使用finally去做清理工作,以維護系統使其處於良好的狀態,這是synchronized不具有的

  • ReentrantLock允許嘗試獲取但最終未獲取鎖,如果其他執行緒已經獲取這個鎖,可以決定離開去執行其他一些事情,而不是等待鎖被釋放。這是synchronized不具有的

5.synchronized 和 Volatile的比較

  • 在訪問volatile變數時不會執行加鎖操作,因此也不會使執行緒阻塞。

  • 加鎖機制既可以確保可見性又可以確保原子性,而 volatile 變數只能確保可見性。

  • 如果過度依賴volatile變數來控制狀態的可見性,通常會比使用鎖的程式碼更脆弱。僅當volatile變數能簡化程式碼的實現以及對同步策略的驗證時,才使用它

  • 在需要同步的時候,第一選擇應該是synchronized關鍵字,這是最安全的方式,

6.TheadLocal

ThreadLocal類的例項,即便被多個執行緒鎖共享,但是在每個執行緒當中都有一份私有拷貝,並且多個執行緒無法看到對方的值,即執行緒對於此變數的使用完全是在自己拷貝物件上。

private ThreadLocal myThreadLocal = new ThreadLocal();

ThreadLocal可以儲存任意物件

myThreadLocal.set("A thread local value");//儲存此物件的值
String threadLocalValue = (String) myThreadLocal.get();//讀取
ThreadLocal myThreadLocal1 = new ThreadLocal<String>();//泛型使用

7.死鎖

1.普通迴圈等待死鎖

如果在同一時間,執行緒A持有鎖M並且想獲得鎖N,執行緒B持有鎖N並且想獲得鎖M,那麼這兩個執行緒將永遠等待下去,這種情況就是最簡單的死鎖形式。

public class DeadLock{
private final Object left = new Object();
private final Object right = new Object();

public void leftRight() throws Exception
{
    synchronized (left)
    {
        Thread.sleep(2000);
        synchronized (right)
        {
            System.out.println("leftRight end!");
        }
    }
}

public void rightLeft() throws Exception
{
    synchronized (right)
    {
        Thread.sleep(2000);
        synchronized (left)
        {
            System.out.println("rightLeft end!");
        }
    }
}
}

死鎖的四個必要條件:

  • 互斥條件:執行緒對所分配到的資源進行排他性使用,即在一段時間內,某資源只能被一個執行緒佔用。如果此時還有其它執行緒請求該資源,則只能等待,直到佔有該資源的執行緒用畢釋放。
  • 請求和保持條件:已經保持了至少一個資源,但是又提出新的資源請求,而資源已被其他執行緒佔有,此時請求執行緒只能等待,但對自己已獲得的資源保持不放。
  • 不可搶佔條件:執行緒已獲得的資源在未使用完之前不能被搶佔,只能執行緒使用完之後自己釋放。
  • 迴圈等待條件:在發生死鎖時,一個任務等待其他任務所持有的資源,後者又在等待另一個任務所持有的資源,這樣一直下去,直到有一個任務所持有的資源,使得大家被鎖住。

避免死鎖的方式:
1、只在必要的最短時間內持有鎖,考慮使用同步語句塊代替整個同步方法;
2、設計時考慮清楚鎖的順序,儘量減少潛在的加鎖互動數量
3、既然死鎖的產生是兩個執行緒無限等待對方持有的鎖,我們可以使用Lock類中的tryLock方法去嘗試獲取鎖,這個方法可以指定一個超時時限,獲取鎖超時後會返回一個失敗資訊,放棄取鎖。

2.重入鎖死

如果一個執行緒在兩次呼叫lock()間沒有呼叫unlock()方法,那麼第二次呼叫lock()就會被阻塞,這就出現了重入鎖死。避免重入鎖死有兩個選擇:

  • 編寫程式碼時避免再次獲取已經持有的鎖
  • 使用可重入鎖

8.多執行緒集合的安全使用

在 Collections 類中有多個靜態方法,它們可以獲取通過同步方法封裝非同步集合而得到的集合:
public static Collection synchronizedCollention(Collection c)
public static List synchronizedList(list l)
public static Map synchronizedMap(Map m)
public static Set synchronizedSet(Set s)
public static SortedMap synchronizedSortedMap(SortedMap sm)
public static SortedSet synchronizedSortedSet(SortedSet ss)

在多執行緒環境中,當遍歷當前集合中的元素時,希望阻止其他執行緒新增或刪除元素。安全遍歷的實現方法如下:

import java.util.*;  

public class SafeCollectionIteration extends Object {  
public static void main(String[] args) {  
    //為了安全起見,僅使用同步列表的一個引用,這樣可以確保控制了所有訪問  
    //集合必須同步化,這裡是一個List  
    List wordList = Collections.synchronizedList(newArrayList());  

    //wordList中的add方法是同步方法,會自動獲取wordList例項的物件鎖  
    wordList.add("Iterators");  
    wordList.add("require");  
    wordList.add("special");  
    wordList.add("handling");  

    //獲取wordList例項的物件鎖,  
    //迭代時,此時必須阻塞其他執行緒呼叫add或remove等方法修改元素
    synchronized ( wordList ) {  
        Iterator iter = wordList.iterator();  
        while ( iter.hasNext() ) {  
            String s = (String) iter.next();  
            System.out.println("found string: " + s + ", length=" + s.length());  
        }  
    }  
}  
} 

大部分的執行緒安全類都是相對執行緒安全的,也就是我們通常意義上所說的執行緒安全,像Vector這種,add、remove方法都是原子操作,不會被打斷,但也僅限於此,如果有個執行緒在遍歷某個Vector、有個執行緒同時在add這個Vector,99%的情況下都會出現·ConcurrentModificationException·,也就是fail-fast機制

6.多執行緒協作

1.wait、notify、notifyAll的使用

wait():將當前執行緒置入休眠狀態,直到接到喚醒通知或被中斷為止。呼叫後當前執行緒立即釋放鎖。

notify():用來通知那些正在等待該物件的物件鎖的其他執行緒。如果有多個執行緒等待,則執行緒規劃器任意挑選出其中一個wait()狀態的執行緒來發出通知喚醒它。其他執行緒繼續阻塞,但是呼叫後當前執行緒不會立馬釋放該物件鎖,直到程式出鎖

notifyAll():notifyAll會使在該物件鎖上wait的所有執行緒統統退出wait的狀態(即全部被喚醒),待程式出鎖後,所有被喚醒的執行緒共同競爭該鎖,沒有競爭到鎖的執行緒會一直競爭(不是阻塞)

注意:在呼叫wait、notify、notifyAll方法之前,必須先獲得物件鎖,並且只能在synchronized程式碼塊或者方法中呼叫。否則丟擲IllegalMonitorStateException異常

總結:

  • 如果執行緒呼叫了物件的wait()方法,那麼該執行緒便會處於該物件的 等待池 中,等待池中的執行緒不會去競爭該物件的鎖。

  • 如果執行緒呼叫了物件的notifyAll()方法(喚醒所有wait執行緒)或notify()方法(只隨機喚醒一個wait執行緒),被喚醒的的執行緒便會進入該物件的鎖池中,鎖池中的執行緒會去競爭該物件鎖。

  • 優先順序高的執行緒競爭到物件鎖的概率大,假若某執行緒沒有競爭到該物件鎖,它還會留在鎖池中。

2.notify 通知的遺漏

當執行緒 A 還沒開始 wait 的時候,執行緒 B 已經 notify 了,這樣,執行緒 B 的通知是沒有任何響應的,當 執行緒B 退出 synchronized 程式碼塊後,執行緒A 再開始 wait,便會一直阻塞等待。也就是說這個通知訊號提前來了,沒有wait執行緒收到,因此丟失了訊號,為了避免丟失訊號,可以設定一個成員變數來標誌訊號。

public class MyWaitNotify{
MonitorObject myMonitorObject = new MonitorObject();

//一旦呼叫notify,則設為true表示喚醒訊號發出來了,則設為false表示喚醒訊號已經被其他某個執行緒消耗了,
boolean wasSignalled = false;
public void doWait(){
    synchronized(myMonitorObject){
    //自旋鎖,迴圈檢查,只有當為true時才表示有喚醒訊號來了
    while(!wasSignalled){
             try{
               myMonitorObject.wait();
           } catch(InterruptedException e){...}
          }
       //clear signal and continue running.
         wasSignalled = false;
  }
 }
 public void doNotify(){
     synchronized(myMonitorObject){
        wasSignalled = true;
        myMonitorObject.notify();
   }
 }
}

如果有多個執行緒被notifyAll()喚醒,所有被喚醒的執行緒都會在while迴圈裡檢查wasSignalled變數值,但是隻有一個執行緒可以獲得物件鎖並且退出wait()方法並清除wasSignalled標誌(設為false)。這時這個標誌已經被第一個喚醒的執行緒消耗了,所以其餘的執行緒會檢查到標誌為false,還是會回到等待狀態。

3.字串常量或全域性物件作為鎖的隱患

字串常量全域性物件在不同的例項當中是同一個物件,即其實用的是同一把鎖。因此本來在不同例項物件的執行緒會互相干擾,例如在例項A中的執行緒呼叫notifyAll()可能會喚醒例項B當中的wait執行緒。因此應該避免使用這兩種物件作為監視器物件,而應使用每個例項中唯一的物件

String myMonitorObject = "";//相同String 常量賦值在記憶體當中只會有一份物件

4.生產者-消費者模型synchronized實現

生產者和消費者在同一時間段內共用同一儲存空間,生產者向空間裡生產資料,而消費者取走資料。問題在於如何通過執行緒之間的協作使得生產和消費輪流進行。

package job_3;

import java.util.Random;

public class Datebuf {
private Integer i = 0;
private int result;
private Random random;

public Datebuf() {
    random = new Random();
}

public void sendData() {
    while (!Thread.interrupted()) {
        synchronized (this) {
            try {
                while (i != 0) {
                    this.wait();
                }
                i = random.nextInt(100);
                System.out.println("執行緒 " + Thread.currentThread().getName()
                        + "生產" + i);
                this.notify();
            } catch (InterruptedException e) {
                return;
            }
        }
    }
}

public void addData() {
    while (!Thread.interrupted()) {
        synchronized (this) {
            try {
                while (i == 0) {
                    this.wait();
                }
                result += i;
                System.out.println("執行緒 " + Thread.currentThread().getName()
                        + "消費" + i + "--result=" + result);
                i = 0;
                this.notify();
            } catch (InterruptedException e) {
                return;
            }
        }
    }
}

}

5.其他協調方法

1.join()
一個執行緒可以呼叫其他執行緒的join()方法,其效果是等待其他執行緒結束才繼續執行。如果某個執行緒呼叫t.join(),此執行緒將被掛起,直到目標執行緒t結束才恢復(即t.isAlive()為假)。

2.yield()
建議執行緒排程器讓其他具有相同優先順序的執行緒優先執行,但只是建議,並不一定就是別的執行緒先運行了

7.執行緒池

1.ExecutorService介紹

ExecutorService的生命週期包括三種狀態:執行,關閉,終止。建立後便進入了執行狀態,當呼叫了 shutdown()方法時,便進入關閉狀態。

使用執行緒池的好處:

  • 降低資源消耗。通過重複利用已創的執行緒降低執行緒建立和銷燬在成的消耗
  • 提高響應速度。當任務到達的時候,不需要再去建立執行緒就能立即執行
  • 提高執行緒的可管理性。執行緒是稀缺資源,如果無限制的建立不僅會消耗系統資源,還會降低系統的穩定性,使用執行緒池可以進行統一的分配、調優和監控

2.自定義執行緒池

public ThreadPoolExecutor (
            int corePoolSize, 
            int maximumPoolSize, 
            long keepAliveTime, 
            TimeUnit unit,
            BlockingQueue<Runnable> workQueue)
  • corePoolSize:執行緒池中所儲存的核心執行緒數,包括空閒執行緒。

  • maximumPoolSize:池中允許的最大執行緒數

  • keepAliveTime:執行緒池中的空閒執行緒所能持續的最長時間,超過將被執行緒池移除,可以通過調大此值來提高執行緒的利用率。

  • unit:持續時間的單位

  • workQueue:任務執行前儲存任務的佇列,僅儲存由 execute 方法提交的 Runnable 任務。

    • ArrayBlockingQueue:是一個基於陣列結構的有界阻塞佇列,此佇列按 FIFO(先進先出)原則對元素進行排序。

    • LinkedBlockingQueue:一個基於連結串列結構的阻塞佇列,此佇列按FIFO (先進先出) 排序元素,吞吐量通常要高於ArrayBlockingQueue。靜態工廠方法Executors.newFixedThreadPool()使用了這個佇列。

    • SynchronousQueue:一個不儲存元素的阻塞佇列。每個插入操作必須等到另一個執行緒呼叫移除操作,否則插入操作一直處於阻塞狀態(緩衝區為1的生產者消費者模式)

    • PriorityBlockingQueue:一個具有優先順序的無限阻塞佇列。

  • 飽和策略
    • AbortPolicy: 直接丟擲異常。
    • CallerRunsPolicy:只用呼叫者所線上程來執行任務。
    • DiscardOldestPolicy:丟棄佇列裡最近的一個任務,並執行當前任務。
    • DiscardPolicy:不處理,丟棄掉。
    • 當然也可以根據應用場景需要來實現RejectedExecutionHandler介面自定義策略。如記錄日誌或持久

3.建立執行緒池

以下四個執行緒池底層都是呼叫了ThreadPoolExecutor的構造方法,所以它們主要只是參構造數設定上的差異,理解了它們的預設構造引數值就能明白它們的區別

newCachedThreadPool()

public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                            60L, TimeUnit.SECONDS,
                            new SynchronousQueue<Runnable>());
}
  • 直接提交策略SynchronousQueue,無界執行緒池(maximumPoolSize無限大),可以進行自動執行緒回收

newFixedThreadPool(int)

public static ExecutorService newFixedThreadPool(int nThreads){
    return new ThreadPoolExecutor(nThreads, nThreads,
                                  0L, TimeUnit.MILLISECONDS,
                                  new LinkedBlockingQueue<Runnable>());
}
  • 使用了LinkedBlockingQueue無界佇列(workQueue無限大),執行緒數當超過了coreSize,此佇列由於是鏈式則可以無限新增(資源耗盡,另當別論),永遠也不會觸發產生新的執行緒,且執行緒結束即死亡不會被重複利用。

newScheduledThreadPool(int)

public ScheduledThreadPoolExecutor(int corePoolSize) {
    super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
          new DelayedWorkQueue());
}

延遲呼叫週期執行示例,表示延遲1秒後每3秒執行一次:

scheduledThreadPool.scheduleAtFixedRate(new Runnable() {
           @Override
           public void run() {
System.out.println("delay 1 seconds, and excute every 3     seconds");
}
}, 1, 3, TimeUnit.SECONDS);
  • 定長排程型執行緒池,這個池子裡的執行緒可以按 schedule 依次 delay 執行,或週期執行,這是特殊的DelayedWorkQueue的效果.

SingleThreadExecutor()

public static ExecutorService newSingleThreadExecutor() {
    return new FinalizableDelegatedExecutorService(
    new ThreadPoolExecutor(1, 1,0L, 
    TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>()));
}
  • 單例執行緒,任意時間池中只能有一個執行緒,如果向該執行緒池提交了多個任務,這些任務將排隊

當試圖通過 excute 方法將一個 Runnable 任務新增到執行緒池中時,按照如下順序來處理:

Created with Raphaël 2.1.0提交任務核心執行緒池滿了?緩衝佇列無法加入?最大執行緒池滿了?飽和政策處理建立新執行緒加入緩衝佇列yesnoyesnoyesno

4.幾種排隊的策略

  • 直接提交。緩衝佇列採用 SynchronousQueue,它將任務直接交給執行緒處理而不保持它們。如果不存在可用於立即執行任務的執行緒(即無空閒執行緒),則試圖把任務加入緩衝佇列將會失敗,因此會構造一個新的執行緒來處理新新增的任務,並將其加入到執行緒池中。直接提交通常要求無界maximumPoolSizes(Integer.MAX_VALUE) 以避免拒絕新提交的任務。newCachedThreadPool 採用的便是這種策略。

  • 無界佇列,典型的便是的LinkedBlockingQueue(鏈式),當執行緒數超過corePoolSize時,新的任務將在緩衝佇列無限排隊。因此,建立的執行緒就不會超過corePoolSize,maximumPoolSize的值也就無效了。當每個任務完全獨立於其他任務,即任務執行互不影響時,適合於使用無界佇列。newFixedThreadPool採用的便是這種策略。

  • 有界佇列。當使用有限的 maximumPoolSizes 時,有界佇列(一般緩衝佇列使用ArrayBlockingQueue,並制定佇列的最大長度)有助於防止資源耗盡,但是可能較難調整和控制,佇列大小和最大池大小需要相互折衷,需要設定合理的引數。

5.關閉執行緒池

  • shutdown:不可以再submit新的task,已經submit(分兩種,正在執行的和在緩衝佇列的)的將繼續執行。並interrupt()空閒執行緒。
  • shutdownNow:試圖停止當前正執行的task,清除未執行的任務並返回尚未執行的task的list。
  • 只要呼叫了這兩個關閉方法的其中一個,isShutdown方法就會返回true。當所有的任務都已關閉後,才表示執行緒池關閉成功,這時呼叫isTerminaed方法會返回true

6.Executor執行Runnable和Callable任務

Runnable
無返回值,無法丟擲經過檢查的異常,通過execute方法新增

ExecutorService executorService = Executors.newSingleThreadExecutor();//建立執行緒池
executorService.execute(new TestRunnable());//新增任務

Callable
返回Future物件,獲取返回結果時可能會丟擲異常,通過submit方法新增

package threadLearn;

import java.util.ArrayList;   
import java.util.List;   
import java.util.concurrent.*;   

public class CallableDemo{   
public static void main(String[] args){   
    ExecutorService executorService=Executors.newCachedThreadPool();   
    List<Future<String>> resultList = new ArrayList<Future<String>>();   
    //建立10個任務並執行   
        for (int i = 0; i < 10; i++){   
        //使用ExecutorService執行Callable型別的任務,並將結果儲存在future變數中   
        Future<String> future = executorService.submit(new TaskWithResult(i));   
        //將任務執行結果儲存到List中   
        resultList.add(future);   
    }   
    //遍歷任務的結果   
    for (Future<String> fs : resultList){   
            try{   
                while(!fs.isDone()){//Future返回如果沒有完成,則一直迴圈等待,直到Future返回完成  
                    System.out.println("還沒完成");
                }
                System.out.println(fs.get());//列印各個執行緒(任務)執行的結果   
            }catch(InterruptedException e){   
                e.printStackTrace();   
            }catch(ExecutionException e){   
                e.printStackTrace();   
            }finally{   
                //啟動一次順序關閉,執行以前提交的任務,但不接受新任務  
                executorService.shutdown();   
            }   
    }   
}   
}   

class TaskWithResult implements Callable<String>{   
private int id;   

public TaskWithResult(int id){   
    this.id = id;   
}   

/**  
 * 任務的具體過程,一旦任務傳給ExecutorService的submit方法, 
 * 則該方法自動在一個執行緒上執行 
 */   
public String call() throws Exception {  
    System.out.println("call()方法被自動呼叫" + Thread.currentThread().getName());   
    Thread.sleep(1000);
    //該返回結果將被Future的get方法得到  
    return "call()方法被自動呼叫,任務返回的結果是:" + id + "-----" + Thread.currentThread().getName();   
    }   
} 

如果真正的結果的返回尚未完成,則get()方法會阻塞等待,可以通過呼叫 isDone()方法判斷 Future 是否完成了返回。

9.執行緒異常處理

由於執行緒的本質特性(可以理解不同的執行緒是平行空間),從某個執行緒中逃逸的異常是無法被別的執行緒捕獲的。一旦異常逃出任務的run()方法,就會向外傳向控制檯。Thread.UncaughtExceptionHandler是JavaSE5中的新介面,它允許在每個Thread物件上都附著一個異常處理器。Thread.UncaughtExceptionHandler.uncaughtException()會線上程因未捕獲的異常而臨近死亡時被呼叫。

Thread t = new Thread(r);
t.setUncaughtExceptionHandler(new  MyUncaughtExceptionHandler());

8.Lock 鎖與Condition

1.Lock鎖的介紹

Lock介面有3個實現它的類:ReentrantLock、ReetrantReadWriteLock.ReadLock和ReetrantReadWriteLock.WriteLock,即重入鎖、讀鎖和寫鎖。

  • lock 必須被顯式地建立、鎖定和釋放,為了使用更多的功能,一般用 ReentrantLock 為其例項化
  • 為了保證鎖最終一定會被釋放(可能會有異常發生),要把互斥區放在 try 語句塊內,並在finally語句塊中釋放鎖,尤其當有return語句時,return 語句必須放在try字句中,以確保unlock()不會過早發生,從而將資料暴露給第二個任務。

(1)ReentrantLock與synchronized的比較

  • 等待可中斷:當持有鎖的執行緒長期不釋放鎖時,正在等待的執行緒可以選擇放棄等待,改為處理其他事情,由synchronized產生的互斥鎖時,會一直阻塞,是不能被中斷的

  • 可實現公平鎖:多個執行緒在等待同一個鎖時,必須按照申請鎖的時間順序排隊等待,通過構造方法 ReentrantLock(ture)來要求使用公平鎖

  • 鎖可以繫結多個條件:ReentrantLock 物件可以同時繫結多個 Condition 物件(名曰:條件變數或條件佇列),我們還可以通過繫結 Condition 物件來判斷當前執行緒通知的是哪些執行緒(即與Condition物件繫結在一起的其他執行緒

(2)ReetrantLock的忽略中斷鎖和響應中斷鎖

忽略中斷鎖與 synchronized實現的互斥鎖一樣,不能響應中斷,而響應中斷鎖可以響應中斷。

ReentrantLock lock = new ReentrantLock();  
lock.lockInterruptibly();//獲取響應中斷鎖  

(3)讀寫鎖

用讀鎖來鎖定讀操作,用寫鎖來鎖定寫操作,這樣寫操作和寫操作之間會互斥,讀操作和寫操作之間會互斥,但讀操作和讀操作就不會互斥。

ReadWriteLock rwl = new ReentrantReadWriteLock();  
rwl.writeLock().lock()  //獲取寫鎖  
rwl.readLock().lock()  //獲取讀鎖 

2.生產者-消費者模型Lock與Condition實現

package job_3;
import java.util.Random;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;

public class LockDatebuf implements Data {
private Integer i = 0;
private int result;
private Random random;
private ReentrantLock lock = new ReentrantLock();
private Condition condition = lock.newCondition();
public LockDatebuf() {
    random = new Random();
}

public void sendData() throws InterruptedException {
    while (!Thread.interrupted()) {
        lock.lockInterruptibly();
            try {
                while (i != 0) {
                    condition.await();
                }
                i = random.nextInt(100);
                System.out.println("執行緒 " + Thread.currentThread().getName()
                        + "生產" + i);
                condition.signal();
            } catch (InterruptedException e) {

            }finally{
                lock.unlock();
            }
    }
}

public void addData() throws InterruptedException {
    while (!Thread.interrupted()) {
        lock.lockInterruptibly();
            try {
                while (i == 0) {
                    condition.await();
                }
                result += i;
                System.out.println("執行緒 " + Thread.currentThread().getName()
                        + "消費" + i + "--result=" + result);
                i = 0;
                condition.signal();
            } catch (InterruptedException e) {

            }finally{
                lock.unlock();
            }
    }
}
}

9.併發新特性

1.CountDownLatch

可以讓一組任務必須在另一組任務全部結束後才開始執行,向CountDownLatch物件設定一個初始計數值,任何在這個物件上呼叫await()方法的執行緒都將阻塞,直至計數值子減為0。其他任務在結束其工作時,可以呼叫countDown()來減小這個計數值。CountDownLatch被設計為只觸發一次。

package threadLearn;
import java.util.Random;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

class CountDownLatchTest {
public static void main(String[] args) throws InterruptedException {
    // 只觸發一次,計數值不能被重置
    int size = 5;
    CountDownLatch latch = new CountDownLatch(size);
    ExecutorService exec = Executors.newCachedThreadPool();
    exec.execute(new Waiting(latch,"wait執行緒"));
    for (int i = 0; i < size; i++) {
        exec.execute(new OtherTask(latch));
    }
    TimeUnit.SECONDS.sleep(1);
    exec.shutdown();
}
}

class Waiting implements Runnable {
    private CountDownLatch latch;
    private String name;
    public Waiting(CountDownLatch latch,String name) {
        this.latch = latch;
        this.name = name;
    }
    public void run() {
        try {
            latch.await();
            System.out.println(name+"最後執行的任務...");
        } catch (Exception e) {
            return;
        }
    }
}

class OtherTask implements Runnable {
    private CountDownLatch latch;
    private Random rand = new Random();
    public OtherTask(CountDownLatch latch) {
        this.latch = latch;
    }

    public void run() {
        try {
            doWork();
            latch.countDown();
        } catch (Exception e) {
            return;
        }
    }

    private void doWork() throws InterruptedException {
        TimeUnit.MICROSECONDS.sleep(1000);
        System.out.println(Thread.currentThread().getName() + " 執行完畢");
    }
}

2.障礙器 CyclicBarrier

它適用於這樣一種情況:你希望建立一組任務,它們併發地執行工作,另外的一個任務在這一組任務併發執行結束前一直阻塞等待,直到該組任務全部執行結束,這個任務才得以執行。這非常像CountDownLatch,只是 CountDownLatch是隻觸發一次的事件,而CyclicBarrier可以多次重用

package threadLearn;
import java.util.concurrent.BrokenBarrierException;   
import java.util.concurrent.CyclicBarrier;   
public class CyclicBarrierTest {   
    public static void main(String[] args) {   
            //建立CyclicBarrier物件,  
            //並設定執行完一組5個執行緒的併發任務後,再執行MainTask任務  
            CyclicBarrier cb = new CyclicBarrier(5, new MainTask());   
            new SubTask("A", cb).start();   
            new SubTask("B", cb).start();   
            new SubTask("C", cb).start();   
            new SubTask("D", cb).start();   
            new SubTask("E", cb).start();  
    }   
}   

/**  
* 最後執行的任務 
*/   
class MainTask implements Runnable {   
    public void run() {   
            System.out.println("......終於要執行最後的任務了...    ...");   
    }   
}   

/**  
* 一組併發任務  
*/   
class SubTask extends Thread {   
    private String name;   
    private CyclicBarrier cb;   

    SubTask(String name, CyclicBarrier cb) {   
            this.name = name;   
            this.cb = cb;   
    }   

    public void run() {   
            System.out.println("[併發任務" + name + "]  開始執行");   
            for (int i = 0; i < 999999; i++) ;    //模擬耗時的任務   
            System.out.println("[併發任務" + name + "]  開始執行完畢,通知障礙器");   
            try {   
                    //每執行完一項任務就通知障礙器   
                    cb.await();   
            } catch (InterruptedException e) {   
                    e.printStackTrace();   
            } catch (BrokenBarrierException e) {   
                    e.printStackTrace();   
            }   
    }   
}  

3.訊號量Semaphore

訊號量 Semaphore 實際上是一個功能完畢的計數訊號量,從概念上講,它維護了一個許可集合,對控制一定資源的消費與回收有著很重要的意義。Semaphore 可以控制某個資源被同時訪問的任務數,它通過acquire()獲取一個許可,release()釋放一個許可。如果被同時訪問的任務數已滿,則其他 acquire 的任務進入等待狀態,直到有一個任務被release掉,它才能得到許可。Semaphore 僅僅是對資源的併發訪問的任務數進行監控,而不會保證執行緒安全,因此,在訪問的時候,要自己控制執行緒的安全訪問。

10.效能調優

(1)比較各類互斥技術

  • Atomic類
    不激烈情況下,效能比synchronized略遜,而激烈的時候,也能維持常態。激烈的時候,Atomic的效能會優於ReentrantLock一倍左右。缺點是隻能同步一個值,一段程式碼中只能出現一個Atomic的變數,多於一個同步無效。因為他不能在多個Atomic之間同步。

  • 關鍵字synchronized
    在資源競爭不是很激烈的情況下,Synchronized的效能要優於ReetrantLock,原因在於,編譯程式通常會盡可能的進行優化synchronize

  • Lock
    在資源競爭很激烈的情況下,Synchronized的效能會下降幾十倍,但是ReetrantLock的效能能維持常態。ReentrantLock提供了多樣化的同步,比如有時間限制的同步,可以被Interrupt的同步(synchronized的同步是不能Interrupt的)等。ReentrantLock擁有Synchronized相同的併發性和記憶體語義,此外還多了 鎖投票,定時鎖等候和中斷鎖等候。可以被中斷。

(2)免鎖容器

CopyOnWiteArrayList的寫入將導致建立整個底層陣列的副本,而原陣列將保留在原地,使得複製的陣列在被修改時,讀取操作可以安全的執行。當修改完成時,一個原子性的操作把新的陣列換入,使得新的讀取操作可以看到這個新的修改。

  • 好處是當多個迭代器同時遍歷和修改這個列表時,不會丟擲ConcurrentModificationException。
  • CopyOnWriteArraySet將使用CopyOnWriteArrayList來實現其免鎖行為。
  • ConcurrenHashMap和ConcurrentLinkedQueue使用了類似的技術,允許併發的讀取和寫入,但是容器中只有部分內容而不是整個容器可以被複制和修改。在修改完成之前,讀取者仍舊不能看到他們。

(3)ReadWriteLock

對向資料結構相對不頻繁的寫入,但是有多個任務要經常讀取這個資料結構的這類情況進行了優化。ReadWriteLock使得你可以同時有多個讀者,只要他們都不試圖寫入即可。如果寫鎖已經被其他任務持有,那麼任何讀者都不能訪問,直至這個寫鎖被釋放為止。即適用於讀者多於寫者的情況。

對於ReadWriteLock的應用主要是:快取和提高對資料結構的併發性。

  • 鎖降級:重入還允許從寫入鎖降級為讀取鎖,其實現方式是:先獲取寫入鎖,然後獲取讀取鎖,最後釋放寫入鎖。但是,從讀取鎖升級到寫入鎖是不可能的。

  • 鎖獲取的中斷:讀取鎖和寫入鎖都支援鎖獲取期間的中斷。

  • Condition 支援 :寫入鎖提供了一個Condition實現,對於寫入鎖來說,該實現的行為與 ReentrantLock.newCondition() 提供的 Condition 實現對 ReentrantLock 所做的行為相同。當然,此 Condition 只能用於寫入鎖。
    讀取鎖不支援 Condition,readLock().newCondition() 會丟擲 UnsupportedOperationException。
  • 重入:此鎖允許 reader 和 writer 按照 ReentrantLock 的樣式重新獲取讀取鎖或寫入鎖。在寫入執行緒保持的所有寫入鎖都已經釋放後,才允許重入 reader 使用它們。

下面的程式碼展示瞭如何利用重入來執行鎖降級:

class CachedData {
Object data;
volatile boolean cacheValid;
ReentrantReadWriteLock rwl = new ReentrantReadWriteLock();

void processCachedData() {
    rwl.readLock().lock();
    if (!cacheValid) {
    //在獲取寫鎖之前必須釋放讀鎖
        rwl.readLock().unlock();
        rwl.writeLock().lock();
        // 重新檢查狀態,因為可能其他執行緒已經獲取到讀鎖了
        if (!cacheValid) {
            data = ...
            cacheValid = true;
        }
        rwl.readLock().lock();
        rwl.writeLock().unlock(); 
    }
    use(data);
    rwl.readLock().unlock();
}

}

相關推薦

java併發執行總結

Java併發總結 標籤(空格分隔): Java 1.多執行緒的優點 資源利用率更好 程式在某些情況下更簡單 程式響應更快 2.建立執行緒 1.實現Runnable介面 new Thread(Runnable).start()

java併發執行API學習

Executor介面 public interface Executor { void execute(Runnable command); }     Executor介面中之定義了一個方法execute(Runnable command),該

python 程序併發執行併發總結

本文對python支援的幾種併發方式進行簡單的總結。 Python支援的併發分為多執行緒併發與多程序併發(非同步IO本文不涉及)。概念上來說,多程序併發即執行多個獨立的程式,優勢在於併發處理的任務都由作業系統管理,不足之處在於程式與各程序之間的通訊和資料共享不

Java併發執行java.util.concurrent併發總結

引言前面已經針對Java多執行緒框架中具體的點介紹了很多了,現在是需要一個概括性總結的時候了,正好從網上找到一張描述java.util.concurrent包組成結構的類圖,正好可以對java多執行緒中

Java併發執行(一)-----概念

其實之前一直想專門寫一篇,單獨說一說Java的多執行緒與高併發,但是一直以來,都沒有想到能夠用什麼比較有趣的表現形式去表達出來,而且網上充斥著很多類似的部落格,有好的又不好的,有簡介的有繁瑣的,所以也一直沒寫。 但是想了想既然之前有這個想法,而且也已經好久沒有寫過部落格了,索性還是寫一寫,儘量寫的有意思一點

Java併發執行(二)-----執行的實現方式

今天,我們開始Java高併發與多執行緒的第二篇,執行緒的實現方式。     通常來講,執行緒有三種基礎實現方式,一種是繼承Thread類,一種是實現Runnable介面,還有一種是實現Callable介面,當然,如果我們鋪開,擴充套件一下,會有很多種實現方式,但是歸根溯源,其實都是這幾種實

併發執行的關係、區別、高併發的技術方案

高併發與多執行緒的關係、區別、高併發的技術方案 http://youzhixueyuan.com/high-concurrency-and-multithreading-high-concurrency-technical-solutions.html 什麼是高併發? 高併發(High

java併發執行

volatile—保證可見性、禁止指令重排序,不保證原子性 出於執行速率的考慮,java編譯器會把經常訪問的變數存放在快取,直接從快取中讀取變數,多執行緒下記憶體與快取不一樣 volatile不會被快取到暫存器,多執行緒下可見 使用條件: 只有單個執行緒更新變數的值 該變數不與

java併發模擬——執行計數

java併發模擬——多執行緒計數 最近在學習多執行緒與高併發的知識,這是一個能力進階的必要途徑。在大量的系統中,都會多多少少存在併發問題,如何更好的解決高併發是一個探究的問題。 下面我準備了一個簡單的多執行緒計數demo來模擬併發操作,觀察列印輸出情況,真正的去感受一下併發操作。 首先環

C++11 併發執行篇(未完成)

從C++11新標準開始,C++語言本身增加了對多執行緒的支援,意味著使用C++可實現多執行緒程式的可移植,跨平臺。 在標準的C++程式中,主執行緒從main()開始執行,我們自己在C++中建立的執行緒,也需要從一個函式開始執行(這個函式叫做初始函式),一旦這個函式執行完

Java多媒體執行處理

Java多媒體與多執行緒處理 編寫一個Java 多執行緒程式,完成三個售票視窗同時出售20張票(如下圖所示); 程式分析:1.票數要使用同一個靜態值;                    

【本人禿頂程式設計師】你分得清分散式、高併發執行嗎?

←←←←←←←←←←←← 快,點關注! 當提起這三個詞的時候,是不是很多人都認為分散式=高併發=多執行緒? 當面試官問到高併發系統可以採用哪些手段來解決,或者被問到分散式系統如何解決一致性的問題,是不是一臉懵逼? 確實,在一開始接觸的時候,不少人都會將三者混淆,誤以為所謂的分散式

JAVA併發執行程式設計之同步“監視器monitor”(三)

在JAVA虛擬機器中,每個物件(Object和class)通過某種邏輯關聯監視器,為了實現監視器的互斥功能,每個物件(Object和class)都關聯著一個鎖(有時也叫“互斥量”),這個鎖在作業系統書籍中稱為“訊號量”,互斥(“mutex”)是一個二進位制的訊號量。 如果一個執行緒擁有了某些資料的鎖,其他的

如何分清分散式、高併發執行嗎?

當提起這三個詞的時候,是不是很多人都認為分散式=高併發=多執行緒? 當面試官問到高併發系統可以採用哪些手段來解決,或者被問到分散式系統如何解決一致性的問題,是不是一臉懵逼?   確實,在一開始接觸的時候,不少人都會將三者混淆,誤以為所謂的分散式高併發的

併發執行基礎之執行之間共享資料

1、共享資料帶來什麼問題?        A、條件競爭:併發中競爭條件的形成,取決於一個以上執行緒的相對執行順序,每個執行緒都搶著完成自己的任務。大多數情況下,即使改變執行順序,也是良性競爭,其結果可以接受。例如,有兩個執行緒同時向一個處理佇列中新增任務,因為系統提供的不變數

Java併發程式設計--執行之HelloWorld

上篇部落格我們介紹了一些基本概念,程序、執行緒、併發。下面我們開始寫第一個多執行緒的程式。 兩種方式:一、實現Runnable介面;二、基礎Thread類。 一、實現Runnable介面 pack

Java併發03:執行實現三方式:繼承Thread類、實現Runnable介面、實現Callable介面

本章主要對Java多執行緒實現的三種方式進行學習。 1.序言 在JDK5版本之前,提供了兩種多執行緒的實現方式: 繼承Thread類,重寫run()方法 實現Runnable介面,實現run()方法 這兩種種方式的本質都是一個:實現Runna

IO流執行總結

IO流 1.概念 IO流用來處理裝置之間的資料傳輸 Java對資料的操作是通過流的方式 Java用於操作流的類都在IO包中 流按流向分為兩種:輸入流,輸出流。 流按操作型別分為兩種:位元組流與字元流。  位元組流可以操作任何資料,字元流只能操作純字元資料,比較方便。 2.I

C++11 併發執行(二)

1)執行緒間共享資料 執行緒間共享資料的問題 原因:由於修改資料引起,如果都只是讀資料,沒有任何問題; 競爭條件: 例子:電影院同時買熱門電影票,只剩幾個位置 **data r

併發執行區別

1、高併發     高併發是一種狀態,如果大量請求訪問閘道器介面。這種情況會發生大量執行操作,如資料庫操作、資源請求、硬體佔用等。這就需要對介面進行優化,而多執行緒是處理高併發的一種手段。 2、多執行緒     是一種非同步處理的一種方式,在同一時刻最大限度的利用計