1. 程式人生 > >Java 8併發教程:執行緒和執行程式

Java 8併發教程:執行緒和執行程式

歡迎來到我的Java 8併發教程的第一部分。 本指南教你用Java 8進行併發程式設計 ,並具有易於理解的程式碼示例。 這是涵蓋Java併發API的一系列教程的第一部分。在接下來的15分鐘內,您將學習如何通過執行緒,任務和執行器服務並行執行程式碼。

併發API首先引入了Java 5的發行版,然後逐漸增強了每個新的Java版本。 本文中顯示的大多數概念也適用於較舊版本的Java。然而我的程式碼示例著重於Java 8,並大量使用lambda表示式和其他新功能。如果你還不熟悉lambdas,我建議先閱讀我的Java 8 Tutorial

Threads and Runnables

所有現代作業系統通過程序執行緒來支援併發 。程序是通常彼此獨立執行的程式的例項,例如,如果您啟動一個Java程式,則作業系統會產生一個與其他程式並行執行的新程序。 在這些程序中,我們可以利用執行緒同時執行程式碼,因此我們可以充分利用CPU的可用核心。

Java支援從JDK 1.0開始的執行緒 。在開始一個新執行緒之前,你必須指定要由此執行緒執行的程式碼,通常稱為任務這通過實現Runnable - 定義單個void no-args方法run()的功能介面來完成,如以下示例所示:

Runnable task = () -> {
    String threadName = Thread.currentThread().getName();
    System.out.println("Hello " + threadName);
};

task.run();

Thread thread = new Thread(task);
thread.start();

System.out.println("Done!");
由於Runnable是一個功能介面,我們可以使用Java 8 lambda表示式將當前執行緒名稱列印到控制檯。 首先,我們在開始一個新執行緒之前直接在主執行緒上執行runnable。

控制檯上的結果可能如下所示:

Hello main
Hello Thread-0
Done!

或者說:

Hello main
Done!
Hello Thread-0

由於併發執行,我們無法預測在列印完成之前或之後是否可以呼叫runnable。該順序是非確定性的,因此在大型應用中使併發程式設計成為一項複雜的任務。

執行緒可以進入睡眠一段時間。 這在本文的後續程式碼示例中模擬長時間執行的任務非常方便:

unnable runnable = () -> {
    try {
        String name = Thread.currentThread().getName();
        System.out.println("Foo " + name);
        TimeUnit.SECONDS.sleep(1);
        System.out.println("Bar " + name);
    }
    catch (InterruptedException e) {
        e.printStackTrace();
    }
};

Thread thread = new Thread(runnable);
thread.start();

當您執行上述程式碼時,您將注意到第一個和第二個列印語句之間的延遲一秒鐘。 TimeUnit是用於處理單位時間的一個有用的列舉。 或者,您可以通過呼叫Thread.sleep(1000)來實現相同的Thread.sleep(1000)

使用Thread類可能非常繁瑣,容易出錯。由於這個原因, Concurrency API已經在2004年被推出了Java 5的發行版。該API位於包java.util.concurrentjava.util.concurrent,並且包含許多有用的類來處理併發程式設計。從那時起,併發API已經在每個新的Java版本中得到增強,甚至Java 8提供了處理併發性的新類和方法。

現在讓我們深入瞭解併發API的最重要的部分 - 執行程式服務。

Executors

Concurrency API將ExecutorService的概念引入直接處理執行緒的更高階替換。 執行器能夠執行非同步任務,通常管理一個執行緒池,所以我們不必手動建立新的執行緒。內部池的所有執行緒將被重新使用在引擎框架下執行任務,因此我們可以在單個執行程式服務的整個生命週期中執行儘可能多的並行任務。

這是第一個執行緒示例如何使用執行器:

ExecutorService executor = Executors.newSingleThreadExecutor();
executor.submit(() -> {
    String threadName = Thread.currentThread().getName();
    System.out.println("Hello " + threadName);
});

// => Hello pool-1-thread-1
Executors類提供了方便的工廠方法來建立不同型別的執行器服務。在這個示例中,我們使用一個大小為1的執行緒池的執行器。

結果看起來類似於上面的例子,但是執行程式碼時會發現一個重要的區別:java程序永遠不會停止!執行者必須明確停止 - 否則他們會繼續監聽新任務。

ExecutorService提供了兩種方法: shutdown()等待當前執行的任務完成,而shutdownNow()中斷所有執行的任務,並立即關閉執行程式。

這是我通常關閉執行程式的首選方式:

try {
    System.out.println("attempt to shutdown executor");
    executor.shutdown();
    executor.awaitTermination(5, TimeUnit.SECONDS);
}
catch (InterruptedException e) {
    System.err.println("tasks interrupted");
}
finally {
    if (!executor.isTerminated()) {
        System.err.println("cancel non-finished tasks");
    }
    executor.shutdownNow();
    System.out.println("shutdown finished");
}
執行者通過等待一段時間來終止當前執行的任務,輕輕地關閉。最長五秒鐘後,執行程式最終會關閉所有正在執行的任務。

Callables and Futures

除了Runnable執行器之外,還支援另一種名為Callable的任務。 可呼叫函式是像runnable這樣的功能介面,但不是void它們返回一個值。

這個lambda表示式定義了一個可以在休眠1秒後返回整數的可呼叫函式:

Callable<Integer> task = () -> {
    try {
        TimeUnit.SECONDS.sleep(1);
        return 123;
    }
    catch (InterruptedException e) {
        throw new IllegalStateException("task interrupted", e);
    }
};

 Callable可以像runnable那樣提交給執行者服務。但是可呼叫的結果如何?由於submit()不等待任務完成,所以執行程式服務無法直接返回可呼叫的結果。相反,執行者返回一個型別為Future的特殊結果,可以用於在稍後的時間點檢索實際結果。

ExecutorService executor = Executors.newFixedThreadPool(1);
Future<Integer> future = executor.submit(task);

System.out.println("future done? " + future.isDone());

Integer result = future.get();

System.out.println("future done? " + future.isDone());
System.out.print("result: " + result);

在提交可執行程式後,我們首先檢查未來是否已經通過isDone()完成執行。我很確定不是這樣,因為上面的可呼叫休眠一秒鐘後返回整數。

呼叫方法get()阻止當前執行緒,並等待直到可執行完成,然後返回實際結果123現在未來終於完成了,我們在控制檯上看到如下結果:

future done? false
future done? true
result: 123

期貨與基礎執行者服務緊密相連。請記住,如果關閉執行程式,每個未終止的未來將丟擲異常:

executor.shutdownNow();
future.get();

您可能已經注意到執行器的建立與前面的示例略有不同。 我們使用newFixedThreadPool(1)建立由大小為1的執行緒池支援的執行器服務。 這相當於newSingleThreadExecutor()但是稍後可以通過傳遞大於1的值來增加池大小。

Timeouts

future.get()任何呼叫將阻止並等待底層可呼叫終止。在最壞的情況下,可以永久執行 - 從而使您的應用程式無響應。 您可以通過傳遞超時來簡單地抵制這些場景:

ExecutorService executor = Executors.newFixedThreadPool(1);

Future<Integer> future = executor.submit(() -> {
    try {
        TimeUnit.SECONDS.sleep(2);
        return 123;
    }
    catch (InterruptedException e) {
        throw new IllegalStateException("task interrupted", e);
    }
});

future.get(1, TimeUnit.SECONDS);

執行上述程式碼會導致TimeoutException

Exception in thread "main" java.util.concurrent.TimeoutException
    at java.util.concurrent.FutureTask.get(FutureTask.java:205)

您可能已經猜到為什麼會丟擲此異常:我們指定了一秒鐘的最大等待時間,但是可呼叫實際上需要兩秒鐘才能返回結果。

 invokeAll

執行者支援通過invokeAll()一次性批量提交多個可轉換。 此方法接受可收回的集合並返回期貨列表。

ExecutorService executor = Executors.newWorkStealingPool();

List<Callable<String>> callables = Arrays.asList(
        () -> "task1",
        () -> "task2",
        () -> "task3");

executor.invokeAll(callables)
    .stream()
    .map(future -> {
        try {
            return future.get();
        }
        catch (Exception e) {
            throw new IllegalStateException(e);
        }
    })
    .forEach(System.out::println);
在這個例子中,我們使用Java 8函式流來處理呼叫invokeAll返回的所有期貨。我們首先將每個未來對映到其返回值,然後將每個值列印到控制檯。 If you're not yet familiar with streams read my Java 8 Stream Tutorial .如果您還不熟悉流,請閱讀我的Java 8 Stream Tutorial

InvokeAny

批量提交可呼叫的另一種方法是invokeAny()invokeAny()的方法略有不同。 而不是返回將來的物件,此方法阻止直到第一個可呼叫終止並返回該可呼叫的結果。

為了測試這種行為,我們使用這個幫助方法來模擬不同持續時間的可呼叫。 該方法返回一個可以呼叫一段時間的可呼叫函式,直到返回給定的結果:

Callable<String> callable(String result, long sleepSeconds) {
    return () -> {
        TimeUnit.SECONDS.sleep(sleepSeconds);
        return result;
    };
}

我們使用這種方法來建立一堆不等於1秒到3秒的可呼叫。通過invokeAny()將執行者提交給執行者返回最快的可呼叫的字串結果 - 在這種情況下task2:

ExecutorService executor = Executors.newWorkStealingPool();

List<Callable<String>> callables = Arrays.asList(
    callable("task1", 2),
    callable("task2", 1),
    callable("task3", 3));

String result = executor.invokeAny(callables);
System.out.println(result);

// => task2

上面的例子使用了另一種型別的通過newWorkStealingPool()建立的執行器。 此工廠方法是Java 8的一部分,並返回ForkJoinPool的執行程式,該執行程式與正常執行程式略有不同。而不是使用固定大小的執行緒池ForkJoinPools是為給定並行度大小建立的,預設值是主機CPU可用核心數。

 ForkJoinPools存在於Java 7之後,將在本系列的後續教程中詳細介紹。通過深入瞭解計劃的執行人員,完成本教程。

Scheduled Executors

我們已經學會了如何在執行者身上提交和執行一次任務。為了多次定期執行通用任務,我們可以利用預定的執行緒池。

ScheduledExecutorService能夠排程任務在經過一段時間後定期執行或一次執行。

此程式碼示例在通過三秒鐘的初始延遲後安排一個任務執行:

ScheduledExecutorService executor = Executors.newScheduledThreadPool(1);

Runnable task = () -> System.out.println("Scheduling: " + System.nanoTime());
ScheduledFuture<?> future = executor.schedule(task, 3, TimeUnit.SECONDS);

TimeUnit.MILLISECONDS.sleep(1337);

long remainingDelay = future.getDelay(TimeUnit.MILLISECONDS);
System.out.printf("Remaining Delay: %sms", remainingDelay);
計劃一個任務會產生一個ScheduledFuture型別的專門的未來,除了Future,還提供getDelay()方法來檢索剩餘的延遲。 在此延遲過後,任務將同時執行。

為了定期執行任務執行,執行器提供了scheduleAtFixedRate()scheduleWithFixedDelay()兩種方法。 第一種方法能夠以固定的時間速率執行任務,例如每秒一次,如本例所示:

ScheduledExecutorService executor = Executors.newScheduledThreadPool(1);

Runnable task = () -> System.out.println("Scheduling: " + System.nanoTime());

int initialDelay = 0;
int period = 1;
executor.scheduleAtFixedRate(task, initialDelay, period, TimeUnit.SECONDS);

此外,該方法接受一個初始延遲,它描述了第一次執行任務之前的前導等待時間。

請記住, scheduleAtFixedRate()不考慮任務的實際持續時間。 因此,如果您指定一秒鐘的時間段,但是任務需要2秒鐘才能執行,那麼執行緒池將很快工作。

在這種情況下,您應考慮使用scheduleWithFixedDelay()該方法的工作方式與上述對應方法一樣。不同的是,等待時間段適用於任務的結束和下一個任務的開始之間。 For example:例如:

ScheduledExecutorService executor = Executors.newScheduledThreadPool(1);

Runnable task = () -> {
    try {
        TimeUnit.SECONDS.sleep(2);
        System.out.println("Scheduling: " + System.nanoTime());
    }
    catch (InterruptedException e) {
        System.err.println("task interrupted");
    }
};

executor.scheduleWithFixedDelay(task, 0, 1, TimeUnit.SECONDS);
該示例在執行結束和下一次執行的開始之間以固定的延遲時間安排一秒。 初始延遲為零,任務持續時間為2秒。 所以我們的執行時間間隔為0s,3s,6s,9s等等。正如你可以看到scheduleWithFixedDelay()如果您無法預測計劃的任務的持續時間,這是方便的。

這是一系列併發教程中的第一部分。我建議您自己修理顯示的程式碼示例。 您可以在GitHub上找到本文中的所有程式碼示例,因此請隨意分配repo並給我一個星

我希望你喜歡這篇文章。 如果您有任何其他問題,請通過以下評論或通過Twitter向我傳送反饋意見。