1. 程式人生 > >Java8併發教程:Threads和Executors

Java8併發教程:Threads和Executors

歡迎閱讀我的Java8併發教程的第一部分。這份指南將會以簡單易懂的程式碼示例來教給你如何在Java8中進行併發程式設計。這是一系列教程中的第一部分。在接下來的15分鐘,你將會學會如何通過執行緒,任務(tasks)和 exector services來並行執行程式碼。

  • 第一部分:Threads和Executors
  • 第二部分:同步和鎖

併發在Java5中首次被引入並在後續的版本中不斷得到增強。在這篇文章中介紹的大部分概念同樣適用於以前的Java版本。不過我的程式碼示例聚焦於Java8,大量使用lambda表示式和其他新特性。如果你對lambda表示式不屬性,我推薦你首先閱讀我的Java 8 教程

Threads 和 Runnables

所有的現代作業系統都通過程序和執行緒來支援併發。程序是通常彼此獨立執行的程式的例項,比如,如果你啟動了一個Java程式,作業系統產生一個新的程序,與其他程式一起並行執行。在這些程序的內部,我們使用執行緒併發執行程式碼,因此,我們可以最大限度的利用CPU可用的核心(core)。

Java從JDK1.0開始執行執行緒。在開始一個新的執行緒之前,你必須指定由這個執行緒執行的程式碼,通常稱為task。這可以通過實現Runnable——一個定義了一個無返回值無引數的run()方法的函式介面,如下面的程式碼所示:

Runnable task = () -> {
    String threadName = Thread.currentThread().getName();
    System.out.println("Hello " + threadName);
};

task.run();

Thread thread = new Thread(task);
thread.start();

System.out.println("Done!");

因為Runnable是一個函式介面,所以我們利用lambda表示式將當前的執行緒名列印到控制檯。首先,在開始一個執行緒前我們在主執行緒中直接執行runnable。

控制檯輸出的結果可能像下面這樣:

Hello main
Hello Thread-0
Done!

或者這樣:

Hello main
Done!
Hello Thread-0

由於我們不能預測這個runnable是在列印’done’前執行還是在之後執行。順序是不確定的,因此在大的程式中編寫併發程式是一個複雜的任務。

我們可以將執行緒休眠確定的時間。在這篇文章接下來的程式碼示例中我們可以通過這種方法來模擬長時間執行的任務。

Runnable runnable = () -> {
    try {
        String name = Thread.currentThread().getName();
        System.out.println("Foo " + name);
        TimeUnit.SECONDS.sleep(1);
        System.out.println("Bar " + name);
    }
    catch (InterruptedException e) {
        e.printStackTrace();
    }
};

Thread thread = new Thread(runnable);
thread.start();

當你執行上面的程式碼時,你會注意到在第一條列印語句和第二條列印語句之間存在一分鐘的延遲。TimeUnit在處理單位時間時一個有用的列舉類。你可以通過呼叫Thread.sleep(1000)來達到同樣的目的。

使用Thread類是很單調的且容易出錯。由於併發API在2004年Java5釋出的時候才被引入。這些API位於java.util.concurrent包下,包含很多處理併發程式設計的有用的類。自從這些併發API引入以來,在隨後的新的Java版本釋出過程中得到不斷的增強,甚至Java8提供了新的類和方法來處理併發。

接下來,讓我們走進併發API中最重要的一部——executor services。

Executors

併發API引入了ExecutorService作為一個在程式中直接使用Thread的高層次的替換方案。Executos支援執行非同步任務,通常管理一個執行緒池,這樣一來我們就不需要手動去建立新的執行緒。在不斷地處理任務的過程中,執行緒池內部執行緒將會得到複用,因此,在我們可以使用一個executor service來執行和我們想在我們整個程式中執行的一樣多的併發任務。

下面是使用executors的第一個程式碼示例:

ExecutorService executor = Executors.newSingleThreadExecutor();
executor.submit(() -> {
String threadName = Thread.currentThread().getName();
System.out.println("Hello " + threadName);

});

// => Hello pool-1-thread-1

Executors類提供了便利的工廠方法來建立不同型別的 executor services。在這個示例中我們使用了一個單執行緒執行緒池的 executor。

程式碼執行的結果類似於上一個示例,但是當執行程式碼時,你會注意到一個很大的差別:Java程序從沒有停止!Executors必須顯式的停止-否則它們將持續監聽新的任務。

ExecutorService提供了兩個方法來達到這個目的——shutdwon()會等待正在執行的任務執行完而shutdownNow()會終止所有正在執行的任務並立即關閉execuotr。

這是我喜歡的通常關閉executors的方式:

try {
    System.out.println("attempt to shutdown executor");
    executor.shutdown();
    executor.awaitTermination(5, TimeUnit.SECONDS);
    }
catch (InterruptedException e) {
    System.err.println("tasks interrupted");
}
finally {
    if (!executor.isTerminated()) {
        System.err.println("cancel non-finished tasks");
    }
    executor.shutdownNow();
    System.out.println("shutdown finished");
}

executor通過等待指定的時間讓當前執行的任務終止來“溫柔的”關閉executor。在等待最長5分鐘的時間後,execuote最終會通過中斷所有的正在執行的任務關閉。

Callables 和 Futures

除了Runnable,executor還支援另一種型別的任務——Callable。Callables也是類似於runnables的函式介面,不同之處在於,Callable返回一個值。

下面的lambda表示式定義了一個callable:在休眠一分鐘後返回一個整數。

Callable<Integer> task = () -> {
    try {
        TimeUnit.SECONDS.sleep(1);
        return 123;
    }
    catch (InterruptedException e) {
        throw new IllegalStateException("task interrupted", e);
    }
};

Callbale也可以像runnbales一樣提交給 executor services。但是callables的結果怎麼辦?因為submit()不會等待任務完成,executor service不能直接返回callable的結果。不過,executor 可以返回一個Future型別的結果,它可以用來在稍後某個時間取出實際的結果。

ExecutorService executor = Executors.newFixedThreadPool(1);
Future<Integer> future = executor.submit(task);

System.out.println("future done? " + future.isDone());

Integer result = future.get();

System.out.println("future done? " + future.isDone());
System.out.print("result: " + result);

在將callable提交給exector之後,我們先通過呼叫isDone()來檢查這個future是否已經完成執行。我十分確定這會發生什麼,因為在返回那個整數之前callable會休眠一分鐘、

在呼叫get()方法時,當前執行緒會阻塞等待,直到callable在返回實際的結果123之前執行完成。現在future執行完畢,我們可以在控制檯看到如下的結果:

future done? false
future done? true
result: 123

Future與底層的executor service緊密的結合在一起。記住,如果你關閉executor,所有的未中止的future都會丟擲異常。

executor.shutdownNow();
future.get();

你可能注意到我們這次建立executor的方式與上一個例子稍有不同。我們使用newFixedThreadPool(1)來建立一個單執行緒執行緒池的 execuot service。 這等同於使用newSingleThreadExecutor不過使用第二種方式我們可以稍後通過簡單的傳入一個比1大的值來增加執行緒池的大小。

Timeouts

任何future.get()呼叫都會阻塞,然後等待直到callable中止。在最糟糕的情況下,一個callable持續執行——因此使你的程式將沒有響應。我們可以簡單的傳入一個時長來避免這種情況。

ExecutorService executor = Executors.newFixedThreadPool(1);

    Future<Integer> future = executor.submit(() -> {
    try {
        TimeUnit.SECONDS.sleep(2);
        return 123;
    }
    catch (InterruptedException e) {
        throw new IllegalStateException("task interrupted", e);
    }
});

    future.get(1, TimeUnit.SECONDS);

執行上面的程式碼將會產生一個TimeoutException

Exception in thread "main" java.util.concurrent.TimeoutException
    at java.util.concurrent.FutureTask.get(FutureTask.java:205)

你可能已經猜到為什麼會丟擲這個異常。我們指定的最長等待時間為1分鐘,而這個callable在返回結果之前實際需要兩分鐘。

invokeAll

Executors支援通過invokeAll()一次批量提交多個callable。這個方法結果一個callable的集合,然後返回一個future的列表。

ExecutorService executor = Executors.newWorkStealingPool();

List<Callable<String>> callables = Arrays.asList(
        () -> "task1",
        () -> "task2",
        () -> "task3");

executor.invokeAll(callables)
    .stream()
    .map(future -> {
        try {
            return future.get();
        }
        catch (Exception e) {
            throw new IllegalStateException(e);
        }
    })
    .forEach(System.out::println);

在這個例子中,我們利用Java8中的函式流(stream)來處理invokeAll()呼叫返回的所有future。我們首先將每一個future對映到它的返回值,然後將每個值列印到控制檯。如果你還不屬性stream,可以閱讀我的Java8 Stream 教程

invokeAny

批量提交callable的另一種方式就是invokeAny(),它的工作方式與invokeAll()稍有不同。在等待future物件的過程中,這個方法將會阻塞直到第一個callable中止然後返回這一個callable的結果。

為了測試這種行為,我們利用這個幫助方法來模擬不同執行時間的callable。這個方法返回一個callable,這個callable休眠指定 的時間直到返回給定的結果。

Callable<String> callable(String result, long sleepSeconds) {
    return () -> {
        TimeUnit.SECONDS.sleep(sleepSeconds);
        return result;
    };
}

我們利用這個方法建立一組callable,這些callable擁有不同的執行時間,從1分鐘到3分鐘。通過invokeAny()將這些callable提交給一個executor,返回最快的callable的字串結果-在這個例子中為任務2:

ExecutorService executor = Executors.newWorkStealingPool();

List<Callable<String>> callables = Arrays.asList(
callable("task1", 2),
callable("task2", 1),
callable("task3", 3));

String result = executor.invokeAny(callables);
System.out.println(result);

// => task2

上面這個例子又使用了另一種方式來建立executor——呼叫newWorkStealingPool()。這個工廠方法是Java8引入的,返回一個ForkJoinPool型別的 executor,它的工作方法與其他常見的execuotr稍有不同。與使用一個固定大小的執行緒池不同,ForkJoinPools使用一個並行因子數來建立,預設值為主機CPU的可用核心數。

ForkJoinPools 在Java7時引入,將會在這個系列後面的教程中詳細講解。讓我們深入瞭解一下 scheduled executors 來結束本次教程。

Scheduled Executors

我們已經學習瞭如何在一個 executor 中提交和執行一次任務。為了持續的多次執行常見的任務,我們可以利用排程執行緒池。

ScheduledExecutorService支援任務排程,持續執行或者延遲一段時間後執行。

下面的例項,排程一個任務在延遲3分鐘後執行:

ScheduledExecutorService executor = Executors.newScheduledThreadPool(1);

Runnable task = () -> System.out.println("Scheduling: " + System.nanoTime());
ScheduledFuture<?> future = executor.schedule(task, 3, TimeUnit.SECONDS);

TimeUnit.MILLISECONDS.sleep(1337);

long remainingDelay = future.getDelay(TimeUnit.MILLISECONDS);
System.out.printf("Remaining Delay: %sms", remainingDelay);

排程一個任務將會產生一個專門的future型別——ScheduleFuture,它除了提供了Future的所有方法之外,他還提供了getDelay()方法來獲得剩餘的延遲。在延遲消逝後,任務將會併發執行。

為了排程任務持續的執行,executors 提供了兩個方法scheduleAtFixedRate()scheduleWithFixedDelay()。第一個方法用來以固定頻率來執行一個任務,比如,下面這個示例中,每分鐘一次:

ScheduledExecutorService executor =     Executors.newScheduledThreadPool(1);

Runnable task = () -> System.out.println("Scheduling: " + System.nanoTime());

int initialDelay = 0;
int period = 1;
executor.scheduleAtFixedRate(task, initialDelay, period, TimeUnit.SECONDS);

另外,這個方法還接收一個初始化延遲,用來指定這個任務首次被執行等待的時長。

請記住:scheduleAtFixedRate()並不考慮任務的實際用時。所以,如果你指定了一個period為1分鐘而任務需要執行2分鐘,那麼執行緒池為了效能會更快的執行。

在這種情況下,你應該考慮使用scheduleWithFixedDelay()。這個方法的工作方式與上我們上面描述的類似。不同之處在於等待時間 period 的應用是在一次任務的結束和下一個任務的開始之間。例如:

ScheduledExecutorService executor =         Executors.newScheduledThreadPool(1);

Runnable task = () -> {
    try {
        TimeUnit.SECONDS.sleep(2);
        System.out.println("Scheduling: " + System.nanoTime());
    }
    catch (InterruptedException e) {
        System.err.println("task interrupted");
    }
};

executor.scheduleWithFixedDelay(task, 0, 1, TimeUnit.SECONDS);

這個例子排程了一個任務,並在一次執行的結束和下一次執行的開始之間設定了一個1分鐘的固定延遲。初始化延遲為0,任務執行時間為0。所以我們分別在0s,3s,6s,9s等間隔處結束一次執行。如你所見,scheduleWithFixedDelay()在你不能預測排程任務的執行時長時是很有用的。

這是併發系列教程的第以部分。我推薦你親手實踐一下上面的程式碼示例。你可以從 Github 上找到這篇文章中所有的程式碼示例,所以歡迎你fork這個repo,給我星星

我希望你會喜歡這篇文章。如果你有任何的問題都可以在下面評論或者通過 Twitter 給我回復。