1. 程式人生 > >多執行緒 (一)基礎執行緒機制--執行緒狀態--Executor

多執行緒 (一)基礎執行緒機制--執行緒狀態--Executor

一、使用執行緒

有三種使用執行緒的方法:

  1. 實現 Runnable 介面;
  2. 實現 Callable 介面;
  3. 繼承 Thread 類。

實現 Runnable 和 Callable 介面的類只能當做一個可以線上程中執行的任務,不是真正意義上的執行緒,因此最後還需要通過 Thread 來呼叫。可以說任務是通過執行緒驅動從而執行的。

實現 Runnable 介面

需要實現 run() 方法。

通過 Thread 呼叫 start() 方法來啟動執行緒。

public class MyRunnable implements Runnable {
    public void run() {
        // ...
    }
    public
static void main(String[] args) { MyRunnable instance = new MyRunnable(); Tread thread = new Thread(instance); thread.start(); } }

實現 Callable 介面

與 Runnable 相比,Callable 可以有返回值,返回值通過 FutureTask 進行封裝。

public class MyCallable implements Callable<Integer> {
    public Integer
call() { // ... } public static void main(String[] args) { MyCallable mc = new MyCallable(); FutureTask<Integer> ft = new FutureTask<>(mc); Thread thread = new Thread(ft); thread.start(); System.out.println(ft.get()); } }

繼承 Thread 類

同樣也是需要實現 run() 方法,並且最後也是呼叫 start() 方法來啟動執行緒。

public class MyThread extends Thread {
    public void run() {
        // ...
    }
    public static void main(String[] args) {
        MyThread mt = new MyThread();
        mt.start();
    }
}

實現介面 VS 繼承 Thread

實現介面會更好一些,因為:

  1. Java 不支援多重繼承,因此繼承了 Thread 類就無法繼承其它類,但是可以實現多個介面;
  2. 類可能只要求可執行就行,繼承整個 Thread 類開銷會過大。

二、基礎執行緒機制

sleep()

Thread.sleep(millisec) 方法會休眠當前正在執行的執行緒,millisec 單位為毫秒。也可以使用 TimeUnit.TILLISECONDS.sleep(millisec)。

sleep() 可能會丟擲 InterruptedException。因為異常不能跨執行緒傳播回 main() 中,因此必須在本地進行處理。執行緒中丟擲的其它異常也同樣需要在本地進行處理。

public void run() {
    try {
        // ...
        Thread.sleep(1000);
        // ...
    } catch (InterruptedException e) {
        System.err.println(e);
    }
}

yield()

對靜態方法 Thread.yield() 的呼叫聲明瞭當前執行緒已經完成了生命週期中最重要的部分,可以切換給其它執行緒來執行。

public void run() {
    // ...
    Thread.yield();
}

join()

線上程中呼叫另一個執行緒的 join() 方法,會將當前執行緒掛起,直到目標執行緒結束。

可以加一個超時引數。

deamon

守護執行緒(deamon)是程式執行時在後臺提供服務的執行緒,並不屬於程式中不可或缺的部分。

當所有非後臺執行緒結束時,程式也就終止,同時會殺死所有後臺執行緒。

main() 屬於非後臺執行緒。

使用 setDaemon() 方法將一個執行緒設定為後臺執行緒。

三、結束執行緒

阻塞

一個執行緒進入阻塞狀態可能有以下原因:

  1. 呼叫 Thread.sleep() 使執行緒睡眠;
  2. 呼叫 wait() 使執行緒掛起,直到執行緒得到 notify() 或 notifyAll() 訊息(或者 java.util.concurrent 類庫中等價的 signal() 或 signalAll() 訊息;
  3. 等待某個 I/O 的完成;
  4. 試圖在某個物件上呼叫其同步控制方法,但是物件鎖不可用,因為另一個執行緒已經獲得了這個鎖。

阻塞 睡眠 掛起

阻塞是一種狀態,而睡眠和掛起是一種手段,通過睡眠和掛起可以讓一個執行緒進入阻塞狀態。

睡眠和掛起這兩種手段的區別是,掛起手段會釋放物件鎖,而阻塞手段不會。

應該注意的是,睡眠和掛起都可以設定一個等待時間,超過等待時間之後,執行緒會退出阻塞狀態。但是如果不為掛起設定等待時間,那麼它只能等到通知的到來才能退出阻塞狀態。

中斷

使用中斷機制即可終止阻塞的執行緒。

使用 interrupt() 方法來中斷某個執行緒,它會設定執行緒的中斷狀態。Object.wait(), Thread.join() 和 Thread.sleep() 三種方法在收到中斷請求的時候會清除中斷狀態,並丟擲 InterruptedException。

應當捕獲這個 InterruptedException 異常,從而做一些清理資源的操作。

1. 不可中斷的阻塞

不能中斷 I/O 阻塞和 synchronized 鎖阻塞。

2. Executor 的中斷操作

Executor 避免對 Thread 物件的直接操作,使用 shutdownNow() 方法來中斷它裡面的所有執行緒,shutdownNow() 方法會發送 interrupt() 呼叫給所有執行緒。

如果只想中斷一個執行緒,那麼使用 Executor 的 submit() 而不是 executor() 來啟動執行緒,就可以持有執行緒的上下文。submit() 將返回一個泛型 Futrue,可以在它之上呼叫 cancel(),如果將 true 傳遞給 cancel(),那麼它將會發送 interrupt() 呼叫給特定的執行緒。

3. 檢查中斷

通過中斷的方法來終止執行緒,需要執行緒進入阻塞狀態才能終止。如果編寫的 run() 方法迴圈條件為 true,但是該執行緒不發生阻塞,那麼執行緒就永遠無法終止。

interrupt() 方法會設定中斷狀態,可以通過 interrupted() 方法來檢查中斷狀態,從而判斷一個執行緒是否已經被中斷。

interrupted() 方法在檢查完中斷狀態之後會清除中斷狀態,這樣做是為了確保一次中斷操作只會產生一次影響。

四、執行緒之間的協作

同步與通訊的概念理解

在作業系統中,有三個概念用來描述程序間的協作關係:

  1. 互斥:多個程序在同一時刻只有一個程序能進入臨界區;
  2. 同步:多個程序按一定順序執行;
  3. 通訊:多個程序間的資訊傳遞。

通訊是一種手段,它可以用來實現同步。也就是說,通過在多個程序間傳遞資訊,可以控制多個程序以一定順序執行。

而同步又可以保證互斥。即程序按一定順序執行,可以保證在同一時刻只有一個程序能訪問臨界資源。但是同步不止用來實現互斥,例如生成者消費者問題,生產者和消費者程序之間的同步不是用來控制對臨界資源的訪問。

總結起來就是:通訊 -> 同步 -> 互斥。

程序和執行緒在一定程度上類似,也可以用這些概念來描述。

在 Java 語言中,這些概念描述有些差別:

  1. 同步:可以和作業系統的互斥等同;
  2. 通訊:可以和作業系統的同步等同。

很多時候這三個概念都會混在一起用,不同的文章有不同的解釋,不能說哪個是對的哪個是錯的,只要自己能理解就行。

執行緒同步

給定一個程序內的所有執行緒,都共享同一儲存空間,這樣有好處又有壞處。這些執行緒就可以共享資料,非常有用。不過,在兩個執行緒同時修改某一資源時,這也會造成一些問題。Java 提供了同步機制,以控制對共享資源的互斥訪問。

1. synchronized

同步一個方法

使多個執行緒不能同時訪問該方法。

public synchronized void func(String name) {
    // ...
}

同步一個程式碼塊

public void func(String name) {
    synchronized(this) {
        // ...
    }
}

2. ReentrantLock

可以使用 Lock 來對一個語句塊進行同步。

private Lock lock;
public int func(int value) {
   try {
       lock.lock();
       // ...
   } finally {
      lock.unlock();
   }
}

ReentrantLock 是 java.util.concurrent(J.U.C)包中的鎖,相比於 synchronized,它多了一些高階功能:

等待可中斷

當持有鎖的執行緒長期不釋放鎖的時候,正在等待的執行緒可以選擇放棄等待,改為處理其他事情,可中斷特性對處理執行時間非常長的同步塊很有幫助。

可實現公平鎖

公平鎖是指多個執行緒在等待同一個鎖時,必須按照申請鎖的時間順序來依次獲得鎖;而非公平鎖則不保證這一點,在鎖被釋放時,任何一個等待鎖的執行緒都有機會獲得鎖。synchronized 中的鎖是非公平的,ReentrantLock 預設情況下也是非公平的,但可以通過帶布林值的建構函式要求使用公平鎖。

鎖繫結多個條件

一個 ReentrantLock 物件可以同時繫結多個 Condition 物件,而在 synchronized 中,鎖物件的 wait() 和 notify() 或 notifyAll() 方法可以實現一個隱含的條件,如果要和多於一個的條件關聯的時候,就不得不額外地新增一個鎖,而 ReentrantLock 則無須這樣做,只需要多次呼叫 newCondition() 方法即可。

如果需要使用上述功能,選用 ReentrantLock 是一個很好的選擇。從效能上來看,在新版本的 JDK 中對 synchronized 進行了很多優化,例如自旋鎖等。目前來看它和 ReentrantLock 的效能基本持平了,因此效能因素不再是選擇 ReentrantLock 的理由,而且 synchronized 有更大的優化空間,因此優先考慮 synchronized。

執行緒通訊

1. wait() notify() notifyAll()

它們都屬於 Object 的一部分,而不屬於 Thread。

wait() 會在等待時將執行緒掛起,而不是忙等待,並且只有在 notify() 或者 notifyAll() 到達時才喚醒。可以通過這種機制讓一個執行緒阻塞,直到某種特定條件滿足。

sleep() 和 yield() 並沒有釋放鎖,但是 wait() 會釋放鎖。

只有在同步控制方法或同步控制塊裡才能呼叫 wait() 、notify() 和 notifyAll()。

private boolean flag = false;

public synchronized void after() {
    while(flag == false) {
        wait();
        // ...
    }
}

public synchronized void before() {
    flag = true;
    notifyAll();
}

wait() 和 sleep() 的區別

這兩種方法都能將執行緒阻塞,一種是使用掛起的方式,一種使用睡眠的方式。

  1. wait() 是 Object 類的方法,而 sleep() 是 Thread 的靜態方法;
  2. 掛起會釋放鎖,睡眠不會。

2. BlockingQueue

java.util.concurrent.BlockingQueue 介面有以下阻塞佇列的實現:

  • FIFO 佇列 :LinkedBlockingQueue、ArrayListBlockingQueue(固定長度)
  • 優先順序佇列 :PriorityBlockingQueue

提供了阻塞的 take() 和 put() 方法:如果佇列為空 take() 將阻塞,直到佇列中有內容;如果佇列為滿 put() 將阻塞,指到佇列有空閒位置。

它們響應中斷,當收到中斷請求的時候會丟擲 InterruptedException,從而提前結束阻塞狀態。

是執行緒安全的。

使用 BlockingQueue 實現生產者消費者問題

// 生產者
public class Producer implements Runnable {
    private BlockingQueue<String> queue;

    public Producer(BlockingQueue<String> queue) {
        this.queue = queue;
    }

    @Override
    public void run() {
        System.out.println(Thread.currentThread().getName() + " is making product.");
        String product = "Made By " + Thread.currentThread().getName();
        try {
            queue.put(product);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}
// 消費者
public class Consumer implements Runnable {
    private BlockingQueue<String> queue;

    public Consumer(BlockingQueue<String> queue) {
        this.queue = queue;
    }

    @Override
    public void run() {
        try {
            String product = queue.take();
            System.out.println(Thread.currentThread().getName() + " is consuming product." + "( " + product + " )");
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}
// 客戶端
public class Client {
    public static void main(String[] args) {
        BlockingQueue<String> queue = new LinkedBlockingQueue<>(5);
        for (int i = 0; i < 2; i++) {
            new Thread(new Consumer(queue), "Consumer-" + i).start();
        }
        for (int i = 0; i < 5; i++) {
            // 只有兩個 Product,因此只能消費兩個,其它三個消費者被阻塞
            new Thread(new Producer(queue), "Producer-" + i).start();
        }
        for (int i = 2; i < 5; i++) {
            new Thread(new Consumer(queue), "Consumer-" + i).start();
        }
    }
}
// 執行結果
Producer-0 is making product.
Consumer-0 is consuming product.( Made By Producer-0 )
Producer-1 is making product.
Consumer-1 is consuming product.( Made By Producer-1 )
Producer-2 is making product.
Producer-3 is making product.
Producer-4 is making product.
Consumer-2 is consuming product.( Made By Producer-2 )
Consumer-3 is consuming product.( Made By Producer-3 )
Consumer-4 is consuming product.( Made By Producer-4 )

五、執行緒狀態轉換


  1. 新建(New):建立後尚未啟動;
  2. 可執行(Runnale):可能正在執行,也可能正在等待 CPU 時間片;
  3. 無限期等待(Waiting):等待其它執行緒顯示地喚醒,否則不會被分配 CPU 時間片;
  4. 限期等待(Timed Waiting):無需等待其它執行緒顯示地喚醒,在一定時間之後會被系統自動喚醒;
  5. 阻塞(Blocking):等待獲取一個排它鎖,如果其執行緒釋放了鎖就會結束此狀態;
  6. 死亡(Terminated):可以是執行緒結束任務之後自己結束,或者產生了異常而結束,中斷機制就是使用了丟擲中斷異常的方式讓一個阻塞的執行緒結束。

六、Executor

Executor 管理多個非同步任務的執行,而無需程式設計師顯示地管理執行緒的生命週期。

主要有三種 Executor:

  1. CachedTreadPool:一個任務建立一個執行緒;
  2. FixedThreadPool:所有任務只能使用固定大小的執行緒;
  3. SingleThreadExecutor:相當於大小為 1 的 FixedThreadPool。
ExecutorService exec = Executors.newCachedThreadPool();
for(int i = 0; i < 5; i++) {
    exec.execute(new MyRunnable());
}