1. 程式人生 > >多執行緒程式設計CompletableFuture與parallelStream

多執行緒程式設計CompletableFuture與parallelStream

一、簡介

平常在頁面中我們會使用非同步呼叫$.ajax()函式,如果是多個的話他會並行執行相互不影響,實際上Completable我理解也是和它類似,是java 8裡面新出的非同步實現類,CompletableFuture類實現了Future介面,CompletableFuture與Stream的設計都遵循了類似的設計模式:使用Lambda表示式以及流水線的思想,從這個角度可以說CompletableFuture與Future的關係類似於Stream與Collection的關係。

二、程式碼

直接上程式碼,執行之後可以看出CompletableFuture是呼叫的時候就開始執行,當後續程式碼調到get的取值方法時,如果內部已經返回結果則直接拿到,如果還沒有返回將阻塞執行緒等待結果,可以設定超時時間避免長時間等待。

以下是模擬並行呼叫多個方法的場景,比如查詢頁可能會有多個條件選擇,這些條件需要後臺資料相互之間有沒有聯絡的場景,就不需要序列執行,非同步執行可以節省大量時間

import org.joda.time.LocalDateTime;
import org.junit.Test;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors; public class AppTest { public void printlnConsole(String msg) { String time = new LocalDateTime().toString("yyyy-MM-dd HH:mm:ss"); System.out.println(String.format("[%s]%s", time, msg)); } /** * 多工單次非同步執行 */ @Test public
void testManyFunAsync() { long start = System.nanoTime();//程式開始時間 try { int id = 1;//模擬一個引數,如學校Id printlnConsole("呼叫非同步任務..."); //使用非同步方式呼叫方法【呼叫時就會開始執行方法】 CompletableFuture futureClassCount = CompletableFuture.supplyAsync(() -> getClassCount(id)); CompletableFuture futureStudentCount = CompletableFuture.supplyAsync(() -> getStudentCount(id)); //do something 做了一些其他的事情超過了非同步任務執行的時間 printlnConsole("做一些其他的事情..."); Thread.sleep(3000); printlnConsole("其他事情完成"); //下面獲取非同步任務的結果,就會立即拿到返回值 printlnConsole("獲取非同步任務結果..."); Object classCount = futureClassCount.get(); //Object classCount = futureClassCount.get(2, TimeUnit.SECONDS);//可以設定超時時間,超過這個時間時將不再等待,返回異常 Object studentCount = futureStudentCount.get(); //Object studentCount = futureStudentCount.get(2, TimeUnit.SECONDS); printlnConsole("非同步任務結果獲取完成"); printlnConsole("ClassCount:" + classCount); printlnConsole("StudentCount:" + studentCount); } catch (Exception e) { e.printStackTrace(); } long end = System.nanoTime();//程式結束時間 long time = (end - start) / 1000000;//總耗時 System.out.println("執行時間:" + time); } public int getClassCount(int id) { try { Thread.sleep(2000); printlnConsole("getClassCount(" + id + ")執行完畢"); } catch (InterruptedException e) { e.printStackTrace(); } return 20; } public int getStudentCount(int id) { try { Thread.sleep(1000); printlnConsole("getStudentCount(" + id + ")執行完畢"); } catch (InterruptedException e) { e.printStackTrace(); } return 100; } }

anyOf()為任意一個子任務執行緒執行完畢後返回
allOf()為等待所有子任務執行緒全部執行完畢後返回
getNow()表示我需要立即拿到結果,如果當前的執行緒並未執行完成,則使用我傳入的值進行任務呼叫,引數為無法獲取結果時使用我傳入的值
get()獲取子執行緒運算的結果,會丟擲檢查到的異常
join()獲取子執行緒運算的結果,不會丟擲異常

package com.ysl;

import org.joda.time.LocalDateTime;
import org.junit.Test;

import java.util.Arrays;
import java.util.List;
import java.util.concurrent.*;
import java.util.stream.Collectors;

public class AppTest {

    public void printlnConsole(String msg) {
        String time = new LocalDateTime().toString("yyyy-MM-dd HH:mm:ss");
        System.out.println(String.format("[%s]%s", time, msg));
    }

    /**
     * 並行執行等待全部結果或等待任意結果
     */
    @Test
    public void testAllOfAnyOf() {
        long start = System.nanoTime();
        try {
            printlnConsole("呼叫非同步任務...");
            List<Integer> ids = Arrays.asList(1, 3, 5);//準備的請求引數
            //建立非同步方法陣列
            CompletableFuture[] futures = ids.stream().map(id -> CompletableFuture.supplyAsync(() -> getClassName(id))).toArray(size -> new CompletableFuture[size]);
            //指定該非同步方法陣列的子任務執行緒等待型別
            CompletableFuture.anyOf(futures).join();//anyOf()為任意一個子任務執行緒執行完畢後返回
            //CompletableFuture.allOf(futures).join();//allOf()為等待所有子任務執行緒全部執行完畢後返回

            printlnConsole("做一些其他的事情...");
            Thread.sleep(2000);
            printlnConsole("其他事情完成");

            printlnConsole("獲取非同步任務結果:");
            for (CompletableFuture f : futures) {
                //Object obj = f.getNow(1);//getNow()表示我需要立即拿到結果,如果當前的執行緒並未執行完成,則使用我傳入的值進行任務呼叫,引數為無法獲取結果時使用我傳入的值
                Object obj = f.get();//get()獲取子執行緒運算的結果,會丟擲檢查到的異常
                //Object obj = f.join();//join()獲取子執行緒運算的結果,不會丟擲異常
                printlnConsole(String.valueOf(obj));
            }
        } catch (Exception e) {
            e.printStackTrace();
        }

        long end = System.nanoTime();
        long time = (end - start) / 1000000;
        System.out.println("執行時間:" + time);
    }

    public String getClassName(int id) {
        try {
            Thread.sleep(id * 1000);
            printlnConsole("getClassName(" + id + ")執行完畢");
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return "taiyonghai-" + id;
    }
}

下面是並行流的演示parallelStream也是java 8新特性

 ids.stream()轉化為流.map()對映每個元素對應的結果.collect(Collectors.toList)把結果歸納為List;還有.filter()過濾元素.sorted()對元素進行排序.limit()獲取指定數量元素;也可以toArray(size -> new Class[size])轉化為陣列

以下是模擬根據Id查詢學生名稱的場景,接收到的是一個集合又都是呼叫同一個方法獲取,就可以使用並行流同時非同步請求等待返回結果

import org.joda.time.LocalDateTime;
import org.junit.Test;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;

public class AppTest {

    public void printlnConsole(String msg) {
        String time = new LocalDateTime().toString("yyyy-MM-dd HH:mm:ss");
        System.out.println(String.format("[%s]%s", time, msg));
    }

    /**
     * 單任務多次並行流執行
     */
    @Test
    public void testParallelStream() {
        long start = System.nanoTime();
        try {
            printlnConsole("呼叫非同步任務...");
            List<Integer> ids = Arrays.asList(1, 2, 3, 4, 5);//準備的請求引數
            //序列執行會等待每一個方法執行完畢後在繼續執行下一個
            //List<String> names = ids.stream().map(id -> getStudentName(id)).collect(Collectors.toList());
            //並行執行會同時呼叫多個方法待全部執行完畢後一起返回(parallelStream是非執行緒安全的,配合collect達到執行緒安全,後續驗證一下)
            List<String> names = ids.parallelStream().map(id -> getStudentName(id)).collect(Collectors.toList());
            //無論stream()或者parallelStream()呼叫時均會阻斷執行緒執行
            printlnConsole("做一些其他的事情...");
            Thread.sleep(3000);
            printlnConsole("其他事情完成");

            printlnConsole("獲取非同步任務結果:");
            names.forEach(item -> printlnConsole(item));
        } catch (Exception e) {
            e.printStackTrace();
        }
        long end = System.nanoTime();
        long time = (end - start) / 1000000;
        System.out.println("執行時間:" + time);
    }

    public String getStudentName(int id) {
        try {
            Thread.sleep(2000);
            printlnConsole("getStudentName(" + id + ")執行完畢");
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return "taiyonghai-" + id;
    }
}

 上面能看到並行流雖然是並行執行但等待結果是阻塞執行緒的,所以可以利用非同步CompletableFuture配合序列流來實現

以下是採用序列流配合非同步實現的併發處理

import org.joda.time.LocalDateTime;
import org.junit.Test;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;

public class AppTest {

    public void printlnConsole(String msg) {
        String time = new LocalDateTime().toString("yyyy-MM-dd HH:mm:ss");
        System.out.println(String.format("[%s]%s", time, msg));
    }

    /**
     * 單任務多次非同步執行
     */
    @Test
    public void testOneFunAsync() {
        long start = System.nanoTime();
        try {
            printlnConsole("呼叫非同步任務...");
            List<Integer> ids = Arrays.asList(1, 2, 3, 4, 5);//準備的請求引數
            //ids.stream()轉化為流.map()對映每個元素對應的結果.collect()把結果歸納為List;還有.filter()過濾元素.sorted()對元素進行排序.limit()獲取指定數量元素;
            List<CompletableFuture<String>> futures = ids.stream().map(id -> CompletableFuture.supplyAsync(() -> getStudentName(id))).collect(Collectors.toList());

            //不用並行流parallelStream()呼叫時就不會阻斷執行緒執行
            printlnConsole("做一些其他的事情...");
            Thread.sleep(3000);
            printlnConsole("其他事情完成");

            printlnConsole("獲取非同步任務結果:");
            futures.forEach(f -> {
                try {
                    Object obj = f.get();
                    printlnConsole(String.valueOf(obj));
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } catch (ExecutionException e) {
                    e.printStackTrace();
                }
            });
        }catch (Exception e){
            e.printStackTrace();
        }

        long end = System.nanoTime();
        long time = (end - start) / 1000000;
        System.out.println("執行時間:" + time);
    }

    public String getStudentName(int id) {
        try {
            Thread.sleep(2000);
            printlnConsole("getStudentName(" + id + ")執行完畢");
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return "taiyonghai-" + id;
    }
}

 當我的並行任務數量超過了我機器的核心數就會產生等待,我電腦是8核使用並行流執行數量就可以開8個子執行緒,當多餘這個數量時剩下的就需要等待前面執行緒執行完再執行

當需要並行執行的任務數量大於核心數的時候,產生的等待是我們不想看到的,這時CompletableFuture就更加適用,它可以手動這隻執行緒池大小,避免並行任務過多時的等待

我們將程式碼做些修正

以下是原始碼,這樣就可以提高對多工並行處理的支援了

import org.joda.time.LocalDateTime;
import org.junit.Test;

import java.util.Arrays;
import java.util.List;
import java.util.concurrent.*;
import java.util.stream.Collectors;

public class AppTest {

    public void printlnConsole(String msg) {
        String time = new LocalDateTime().toString("yyyy-MM-dd HH:mm:ss");
        System.out.println(String.format("[%s]%s", time, msg));
    }

    /**
     * 手動配置執行緒執行器的執行緒池大小
     */
    private final Executor myExecutor = Executors.newFixedThreadPool(20, new ThreadFactory() {
        @Override
        public Thread newThread(Runnable r) {
            Thread t = new Thread(r);
            //使用守護執行緒保證不會阻止程式的關停
            t.setDaemon(true);
            return t;
        }
    });
    /**
     * 單任務多次非同步執行
     */
    @Test
    public void testOneFunAsync() {
        long start = System.nanoTime();
        try {
            printlnConsole("呼叫非同步任務...");
            List<Integer> ids = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12);//準備的請求引數
            //ids.stream()轉化為流.map()對映每個元素對應的結果.collect()把結果歸納為List;還有.filter()過濾元素.sorted()對元素進行排序.limit()獲取指定數量元素;
            List<CompletableFuture<String>> futures = ids.stream().map(id -> CompletableFuture.supplyAsync(() -> getStudentName(id), myExecutor)).collect(Collectors.toList());

            //不用並行流parallelStream()呼叫時就不會阻斷執行緒執行
            printlnConsole("做一些其他的事情...");
            Thread.sleep(3000);
            printlnConsole("其他事情完成");

            printlnConsole("獲取非同步任務結果:");
            futures.forEach(f -> {
                try {
                    Object obj = f.get();
                    printlnConsole(String.valueOf(obj));
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } catch (ExecutionException e) {
                    e.printStackTrace();
                }
            });
        } catch (Exception e) {
            e.printStackTrace();
        }

        long end = System.nanoTime();
        long time = (end - start) / 1000000;
        System.out.println("執行時間:" + time);
    }

    public String getStudentName(int id) {
        try {
            Thread.sleep(2000);
            printlnConsole("getStudentName(" + id + ")執行完畢");
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return "taiyonghai-" + id;
    }
}

java 8的新特性也只做到了會用,很多深入的還不瞭解,還望指導謝謝,下面備份一下別人的總結我覺得挺有用的:

選擇正確的執行緒池大小
《Java併發程式設計實戰》中給出如下公式:

Number = NCpu * Ucpu * ( 1 + W/C)
Number : 執行緒數量
NCpu : 處理器核數
UCpu : 期望cpu利用率
W/C : 等待時間與計算時間比
我們這裡:99%d的時間是等待商店響應 W/C = 99 ,cpu利用率期望 100% ,NCpu = 9,推斷出 number = 800。但是為了避免過多的執行緒搞死計算機,我們選擇商店數與計算值中較小的一個。

並行流與CompletableFuture
目前,我們對集合進行計算有兩種方式:1.並行流 2.CompletableFuture;

1、而CompletableFuture更加的靈活,我們可以配置執行緒池的大小確保整體的計算不會因為等待IO而發生阻塞。

書上給出的建議如下:如果是計算密集型的操作並且沒有IO推薦stream介面,因為實現簡單效率也高,如果所有的執行緒都是計算密集型的也就沒有必要建立比核數更多的執行緒。

2、反之,如果任務涉及到IO,網路等操作:CompletableFuture靈活性更好,因為大部分執行緒處於等待狀態,需要讓他們更加忙碌,並且再邏輯中加入異常處理可以更有效的監控是什麼原因觸發了等待。