1. 程式人生 > >成為高階程式設計師不得不瞭解的併發

成為高階程式設計師不得不瞭解的併發

到目前為止,你學到的都是順序程式設計,順序程式設計的概念就是某一時刻只有一個任務在執行,順序程式設計固然能夠解決很多問題,但是對於某種任務,如果能夠併發的執行程式中重要的部分就顯得尤為重要,同時也可以極大提高程式執行效率,享受併發為你帶來的便利。但是,熟練掌握併發程式設計理論和技術,對於只會CRUD的你來說是一種和你剛學面向物件一樣的一種飛躍。

正如你所看到的,當並行的任務彼此干涉時,實際的併發問題就會接踵而至。而且併發問題不是很難復現,在你實際的測試過程中往往會忽略它們,因為故障是偶爾發生的,這也是我們研究它們的必要條件:如果你對併發問題置之不理,那麼你最終會承受它給你帶來的損害。

併發的多面性

更快的執行

速度問題聽起來很簡單,如果你想讓一個程式執行的更快一些,那麼可以將其切成多個分片,在單獨的處理器上執行各自的分片:前提是這些任務彼此之間沒有聯絡。

注意:速度的提高是以多核處理器而不是晶片的形式出現的。

如果你有一臺多處理器的機器,那麼你就可以在這些處理器之間分佈多個任務,從而極大的提高吞吐量。但是,併發通常是提高在單處理器上的程式的效能。在單處理上的效能開銷要比多處理器上的效能開銷大很多,因為這其中增加了執行緒切換(從一個執行緒切換到另外一個執行緒)的重要依據。表面上看,將程式的所有部分當作單個的任務執行好像是開銷更小一點,節省了執行緒切換的時間。

改進程式碼的設計

在單CPU機器上使用多工的程式在任意時刻仍舊只在執行一項工作,你肉眼觀察到控制檯的輸出好像是這些執行緒在同時工作,這不過是CPU的障眼法罷了,CPU為每個任務都提供了不固定的時間切片。Java 的執行緒機制是搶佔式的,也就是說,你必須編寫某種讓步語句才會讓執行緒進行切換,切換給其他執行緒。

基本的執行緒機制

併發程式設計使我們可以將程式劃分成多個分離的,獨立執行的任務。通過使用多執行緒機制,這些獨立任務中的每一項任務都由執行執行緒來驅動。一個執行緒就是程序中的一個單一的順序控制流。因此,單個程序可以擁有多個併發執行的任務,但是你的程式看起來每個任務都有自己的CPU一樣。其底層是切分CPU時間,通常你不需要考慮它。

定義任務

執行緒可以驅動任務,因此你需要一種描述任務的方式,這可以由 Runnable 介面來提供,要想定義任務,只需要實現 Runnable 介面,並在run 方法中實現你的邏輯即可。

public class TestThread implements Runnable{

    public static int i = 0;

    @Override
    public void run() {
        System.out.println("start thread..." + i);
        i++;
        System.out.println("end thread ..." + i);
    }

    public static void main(String[] args) {
        for(int i = 0;i < 5;i++){
            TestThread testThread = new TestThread();
            testThread.run();
        }
    }
}

任務 run 方法會有某種形式的迴圈,使得任務一直執行下去直到不再需要,所以要設定 run 方法的跳出條件(有一種選擇是從 run 中直接返回,下面會說到。)

在 run 中使用靜態方法 Thread.yield() 可以使用執行緒排程,它的意思是建議執行緒機制進行切換:你已經執行完重要的部分了,剩下的交給其他執行緒跑一跑吧。注意是建議執行,而不是強制執行。在下面新增 Thread.yield() 你會看到有意思的輸出

public void run() {
  System.out.println("start thread..." + i);
  i++;
  Thread.yield();
  System.out.println("end thread ..." + i);
}

Thread 類

將 Runnable 轉變工作方式的傳統方式是使用 Thread 類託管他,下面展示了使用 Thread 類來實現一個執行緒。

public static void main(String[] args) {
  for(int i = 0;i < 5;i++){
    Thread t = new Thread(new TestThread());
    t.start();
  }
  System.out.println("Waiting thread ...");
}

Thread 構造器只需要一個 Runnable 物件,呼叫 Thread 物件的 start() 方法為該執行緒執行必須的初始化操作,然後呼叫 Runnable 的 run 方法,以便在這個執行緒中啟動任務。可以看到,在 run 方法還沒有結束前,run 就被返回了。也就是說,程式不會等到 run 方法執行完畢就會執行下面的指令。

在 run 方法中打印出每個執行緒的名字,就更能看到不同的執行緒的切換和排程

@Override
public void run() {
  System.out.println(Thread.currentThread() + "start thread..." + i);
  i++;
  System.out.println(Thread.currentThread() + "end thread ..." + i);
}

這種執行緒切換和排程是交由 執行緒排程器 來自動控制的,如果你的機器上有多個處理器,執行緒排程器會在這些處理器之間默默的分發執行緒。每一次的執行結果都不盡相同,因為執行緒排程機制是未確定的。

使用 Executor

CachedThreadPool

JDK1.5 的java.util.concurrent 包中的執行器 Executor 將為你管理 Thread 物件,從而簡化了併發程式設計。Executor 在客戶端和任務之間提供了一個間接層;與客戶端直接執行任務不同,這個中介物件將執行任務。Executor 允許你管理非同步任務的執行,而無須顯示地管理執行緒的生命週期。

public static void main(String[] args) {
  ExecutorService service = Executors.newCachedThreadPool();
  for(int i = 0;i < 5;i++){
    service.execute(new TestThread());
  }
  service.shutdown();
}

我們使用 Executor 來替代上述顯示建立 Thread 物件。CachedThreadPool 為每個任務都建立一個執行緒。注意:ExecutorService 物件是使用靜態的 Executors 建立的,這個方法可以確定 Executor 型別。對 shutDown 的呼叫可以防止新任務提交給 ExecutorService ,這個執行緒在 Executor 中所有任務完成後退出。

FixedThreadPool

FixedThreadPool 使你可以使用有限的執行緒集來啟動多執行緒

public static void main(String[] args) {
  ExecutorService service = Executors.newFixedThreadPool(5);
  for(int i = 0;i < 5;i++){
    service.execute(new TestThread());
  }
  service.shutdown();
}

有了 FixedThreadPool 使你可以一次性的預先執行高昂的執行緒分配,因此也就可以限制執行緒的數量。這可以節省時間,因為你不必為每個任務都固定的付出建立執行緒的開銷。

SingleThreadExecutor

SingleThreadExecutor 就是執行緒數量為 1 的 FixedThreadPool,如果向 SingleThreadPool 一次性提交了多個任務,那麼這些任務將會排隊,每個任務都會在下一個任務開始前結束,所有的任務都將使用相同的執行緒。SingleThreadPool 會序列化所有提交給他的任務,並會維護它自己(隱藏)的懸掛佇列。

public static void main(String[] args) {
  ExecutorService service = Executors.newSingleThreadExecutor();
  for(int i = 0;i < 5;i++){
    service.execute(new TestThread());
  }
  service.shutdown();
}

從輸出的結果就可以看到,任務都是挨著執行的。我為任務分配了五個執行緒,但是這五個執行緒不像是我們之前看到的有換進換出的效果,它每次都會先執行完自己的那個執行緒,然後餘下的執行緒繼續“走完”這條執行緒的執行路徑。你可以用 SingleThreadExecutor 來確保任意時刻都只有唯一一個任務在執行。

從任務中產生返回值

Runnable 是執行工作的獨立任務,但它不返回任何值。如果你希望任務在完成時能夠返回一個值 ,這個時候你就需要考慮使用 Callable 介面,它是 JDK1.5 之後引入的,通過呼叫它的 submit 方法,可以把它的返回值放在一個 Future 物件中,然後根據相應的 get() 方法取得提交之後的返回值。

public class TaskWithResult implements Callable<String> {

    private int id;

    public TaskWithResult(int id){
        this.id = id;
    }

    @Override
    public String call() throws Exception {
        return "result of TaskWithResult " + id;
    }
}

public class CallableDemo {

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        ExecutorService executors = Executors.newCachedThreadPool();
        ArrayList<Future<String>> future = new ArrayList<>();
        for(int i = 0;i < 10;i++){

            // 返回的是呼叫 call 方法的結果
            future.add(executors.submit(new TaskWithResult(i)));
        }
        for(Future<String> fs : future){
            System.out.println(fs.get());
        }
    }
}

submit() 方法會返回 Future 物件,Future 物件儲存的也就是你返回的結果。你也可以使用 isDone 來查詢 Future 是否已經完成。

休眠

影響任務行為的一種簡單方式就是使執行緒 休眠,選定給定的休眠時間,呼叫它的 sleep() 方法, 一般使用的TimeUnit 這個時間類替換 Thread.sleep() 方法,示例如下:

public class SuperclassThread extends TestThread{

    @Override
    public void run() {
        System.out.println(Thread.currentThread() + "starting ..." );

        try {
            for(int i = 0;i < 5;i++){
                if(i == 3){
                    System.out.println(Thread.currentThread() + "sleeping ...");
                    TimeUnit.MILLISECONDS.sleep(1000);
                }
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        System.out.println(Thread.currentThread() + "wakeup and end ...");
    }

    public static void main(String[] args) {
        ExecutorService executors = Executors.newCachedThreadPool();
        for(int i = 0;i < 5;i++){
            executors.execute(new SuperclassThread());
        }
        executors.shutdown();
    }
}

關於 TimeUnit 中的 sleep() 方法和 Thread.sleep() 方法的比較,請參考下面這篇部落格

(https://www.cnblogs.com/xiadongqing/p/9925567.html)

優先順序

上面提到執行緒排程器對每個執行緒的執行都是不可預知的,隨機執行的,那麼有沒有辦法告訴執行緒排程器哪個任務想要優先被執行呢?你可以通過設定執行緒的優先順序狀態,告訴執行緒排程器哪個執行緒的執行優先順序比較高,"請給這個騎手馬上派單",執行緒排程器傾向於讓優先順序較高的執行緒優先執行,然而,這並不意味著優先順序低的執行緒得不到執行,也就是說,優先順序不會導致死鎖的問題。優先順序較低的執行緒只是執行頻率較低。

public class SimplePriorities implements Runnable{

    private int priority;

    public SimplePriorities(int priority) {
        this.priority = priority;
    }

    @Override
    public void run() {
        Thread.currentThread().setPriority(priority);
        for(int i = 0;i < 100;i++){
            System.out.println(this);
            if(i % 10 == 0){
                Thread.yield();
            }
        }
    }

    @Override
    public String toString() {
        return Thread.currentThread() + " " + priority;
    }

    public static void main(String[] args) {
        ExecutorService service = Executors.newCachedThreadPool();
        for(int i = 0;i < 5;i++){
            service.execute(new SimplePriorities(Thread.MAX_PRIORITY));
        }
        service.execute(new SimplePriorities(Thread.MIN_PRIORITY));
    }
}

toString() 方法被覆蓋,以便通過使用 Thread.toString() 方法來列印執行緒的名稱。你可以改寫執行緒的預設輸出,這裡採用了 Thread[pool-1-thread-1,10,main] 這種形式的輸出。

通過輸出,你可以看到,最後一個執行緒的優先順序最低,其餘的執行緒優先順序最高。注意,優先順序是在 run 開頭設定的,在構造器中設定它們不會有任何好處,因為這個時候執行緒還沒有執行任務。

儘管JDK有10個優先順序,但是一般只有MAX_PRIORITY,NORM_PRIORITY,MIN_PRIORITY 三種級別。

作出讓步

我們上面提過,如果知道一個執行緒已經在 run() 方法中執行的差不多了,那麼它就可以給執行緒排程器一個提示:我已經完成了任務中最重要的部分,可以讓給別的執行緒使用CPU了。這個暗示將通過 yield() 方法作出。

有一個很重要的點就是,Thread.yield() 是建議執行切換CPU,而不是強制執行CPU切換。

對於任何重要的控制或者在呼叫應用時,都不能依賴於 yield()方法,實際上, yield() 方法經常被濫用。

後臺執行緒

後臺(daemon) 執行緒,是指執行時在後臺提供的一種服務執行緒,這種執行緒不是屬於必須的。當所有非後臺執行緒結束時,程式也就停止了,同時會終止所有的後臺執行緒。反過來說,只要有任何非後臺執行緒還在執行,程式就不會終止。

public class SimpleDaemons implements Runnable{

    @Override
    public void run() {
        while (true){
            try {
                TimeUnit.MILLISECONDS.sleep(100);
                System.out.println(Thread.currentThread() + " " + this);
            } catch (InterruptedException e) {
                System.out.println("sleep() interrupted");
            }
        }
    }

    public static void main(String[] args) throws InterruptedException {
        for(int i = 0;i < 10;i++){
            Thread daemon = new Thread(new SimpleDaemons());
            daemon.setDaemon(true);
            daemon.start();
        }
        System.out.println("All Daemons started");
        TimeUnit.MILLISECONDS.sleep(175);
    }
}

在每次的迴圈中會建立10個執行緒,並把每個執行緒設定為後臺執行緒,然後開始執行,for迴圈會進行十次,然後輸出資訊,隨後主執行緒睡眠一段時間後停止執行。在每次run 迴圈中,都會列印當前執行緒的資訊,主執行緒執行完畢,程式就執行完畢了。因為 daemon 是後臺執行緒,無法影響主執行緒的執行。

但是當你把 daemon.setDaemon(true) 去掉時,while(true) 會進行無限迴圈,那麼主執行緒一直在執行最重要的任務,所以會一直迴圈下去無法停止。

ThreadFactory

按需要建立執行緒的物件。使用執行緒工廠替換了 Thread 或者 Runnable 介面的硬連線,使程式能夠使用特殊的執行緒子類,優先順序等。一般的建立方式為

class SimpleThreadFactory implements ThreadFactory {
  public Thread newThread(Runnable r) {
    return new Thread(r);
  }
}

Executors.defaultThreadFactory 方法提供了一個更有用的簡單實現,它在返回之前將建立的執行緒上下文設定為已知值

ThreadFactory 是一個介面,它只有一個方法就是建立執行緒的方法

public interface ThreadFactory {

    // 構建一個新的執行緒。實現類可能初始化優先順序,名稱,後臺執行緒狀態和 執行緒組等
    Thread newThread(Runnable r);
}

下面來看一個 ThreadFactory 的例子

public class DaemonThreadFactory implements ThreadFactory {

    @Override
    public Thread newThread(Runnable r) {
        Thread t = new Thread(r);
        t.setDaemon(true);
        return t;
    }
}

public class DaemonFromFactory implements Runnable{

    @Override
    public void run() {
        while (true){
            try {
                TimeUnit.MILLISECONDS.sleep(100);
                System.out.println(Thread.currentThread() + " " + this);
            } catch (InterruptedException e) {
                System.out.println("Interrupted");
            }
        }
    }

    public static void main(String[] args) throws InterruptedException {
        ExecutorService service = Executors.newCachedThreadPool(new DaemonThreadFactory());
        for(int i = 0;i < 10;i++){
            service.execute(new DaemonFromFactory());
        }
        System.out.println("All daemons started");
        TimeUnit.MILLISECONDS.sleep(500);
    }
}

Executors.newCachedThreadPool 可以接受一個執行緒池物件,建立一個根據需要建立新執行緒的執行緒池,但會在它們可用時重用先前構造的執行緒,並在需要時使用提供的ThreadFactory建立新執行緒。

public static ExecutorService newCachedThreadPool(ThreadFactory threadFactory) {
  return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                60L, TimeUnit.SECONDS,
                                new SynchronousQueue<Runnable>(),
                                threadFactory);
}

加入一個執行緒

一個執行緒可以在其他執行緒上呼叫 join() 方法,其效果是等待一段時間直到第二個執行緒結束才正常執行。如果某個執行緒在另一個執行緒 t 上呼叫 t.join() 方法,此執行緒將被掛起,直到目標執行緒 t 結束才回復(可以用 t.isAlive() 返回為真假判斷)。

也可以在呼叫 join 時帶上一個超時引數,來設定到期時間,時間到期,join方法自動返回。

對 join 的呼叫也可以被中斷,做法是線上程上呼叫 interrupted 方法,這時需要用到 try...catch 子句

public class TestJoinMethod extends Thread{

    @Override
    public void run() {
        for(int i = 0;i < 5;i++){
            try {
                TimeUnit.MILLISECONDS.sleep(1000);
            } catch (InterruptedException e) {
                System.out.println("Interrupted sleep");
            }
            System.out.println(Thread.currentThread() + " " + i);
        }
    }

    public static void main(String[] args) throws InterruptedException {
        TestJoinMethod join1 = new TestJoinMethod();
        TestJoinMethod join2 = new TestJoinMethod();
        TestJoinMethod join3 = new TestJoinMethod();

        join1.start();
//        join1.join();

        join2.start();
        join3.start();
    }
}

join() 方法等待執行緒死亡。 換句話說,它會導致當前執行的執行緒停止執行,直到它加入的執行緒完成其任務。

執行緒異常捕獲

由於執行緒的本質,使你不能捕獲從執行緒中逃逸的異常,一旦異常逃出任務的run 方法,它就會向外傳播到控制檯,除非你採取特殊的步驟捕獲這種錯誤的異常,在 Java5 之前,你可以通過執行緒組來捕獲,但是在 Java5 之後,就需要用 Executor 來解決問題,因為執行緒組不是一次好的嘗試。

下面的任務會在 run 方法的執行期間丟擲一個異常,並且這個異常會拋到 run 方法的外面,而且 main 方法無法對它進行捕獲

public class ExceptionThread implements Runnable{

    @Override
    public void run() {
        throw new RuntimeException();
    }

    public static void main(String[] args) {
        try {
            ExecutorService service = Executors.newCachedThreadPool();
            service.execute(new ExceptionThread());
        }catch (Exception e){
            System.out.println("eeeee");
        }
    }
}

為了解決這個問題,我們需要修改 Executor 產生執行緒的方式,Java5 提供了一個新的介面 Thread.UncaughtExceptionHandler ,它允許你在每個 Thread 上都附著一個異常處理器。Thread.UncaughtExceptionHandler.uncaughtException() 會線上程因未捕獲臨近死亡時被呼叫。

public class ExceptionThread2 implements Runnable{

    @Override
    public void run() {
        Thread t = Thread.currentThread();
        System.out.println("run() by " + t);
        System.out.println("eh = " + t.getUncaughtExceptionHandler());
      
        // 手動丟擲異常
        throw new RuntimeException();
    }
}

// 實現Thread.UncaughtExceptionHandler 介面,建立異常處理器
public class MyUncaughtExceptionHandler implements Thread.UncaughtExceptionHandler{

    @Override
    public void uncaughtException(Thread t, Throwable e) {
        System.out.println("caught " + e);
    }
}

public class HandlerThreadFactory implements ThreadFactory {

    @Override
    public Thread newThread(Runnable r) {
        System.out.println(this + " creating new Thread");
        Thread t = new Thread(r);
        System.out.println("created " + t);
        t.setUncaughtExceptionHandler(new MyUncaughtExceptionHandler());
        System.out.println("ex = " + t.getUncaughtExceptionHandler());
        return t;
    }
}

public class CaptureUncaughtException {

    public static void main(String[] args) {
        ExecutorService service = Executors.newCachedThreadPool(new HandlerThreadFactory());
        service.execute(new ExceptionThread2());
    }
}

在程式中添加了額外的追蹤機制,用來驗證工廠建立的執行緒會傳遞給UncaughtExceptionHandler,你可以看到,未捕獲的異常是通過 uncaughtException 來捕獲的。

文章來源:

《Java程式設計思想》

https://www.javatpoint.com/join()-met