1. 程式人生 > >並行化-你的高併發大殺器

並行化-你的高併發大殺器

1.前言

想必熱愛遊戲的同學小時候,都幻想過要是自己要是能像鳴人那樣會多重影分身之術,就能一邊打遊戲一邊上課了,可惜漫畫就是漫畫,現實中並沒有這個技術,你要麼只有老老實實的上課,要麼就只有逃課去打遊戲了。雖然在現實中我們無法實現多重影分身這樣的技術,但是我們可以在計算機世界中實現我們這樣的願望。

2.計算機中的分身術

計算機中的分身術不是天生就有了。在1971年,1971年,英特爾推出的全球第一顆通用型微處理器4004,由2300個電晶體構成。當時,公司的聯合創始人之一戈登摩爾就提出大名鼎鼎的“摩爾定律”——每過18個月,晶片上可以整合的電晶體數目將增加一倍。最初的主頻740kHz(每秒執行74萬次),現在過了快50年了,大家去買電腦的時候會發現現在的主頻都能達到4.0GHZ了(每秒40億次)。但是主頻越高帶來的收益卻是越來越小:

  • 據測算,主頻每增加1G,功耗將上升25瓦,而在晶片功耗超過150瓦後,現有的風冷散熱系統將無法滿足散熱的需要。有部分CPU都可以用來煎雞蛋了。

  • 流水線過長,使得單位頻率效能低下,越大的主頻其實整體效能反而不如小的主頻。

  • 戈登摩爾認為摩爾定律未來10-20年會失效。

在單核主頻遇到瓶頸的情況下,多核CPU應運而生,不僅提升了效能,並且降低了功耗。所以多核CPU逐漸成為現在市場的主流,這樣讓我們的多執行緒程式設計也更加的容易。

說到了多核CPU就一定要說GPU,大家可能對這個比較陌生,但是一說到顯示卡就肯定不陌生,筆者搞過一段時間的CUDA程式設計,我才意識到這個才是真正的平行計算,大家都知道圖片畫素點吧,比如19201080的圖片有210萬個畫素點,如果想要把一張圖片的每個畫素點都進行轉換一下,那在我們java裡面可能就要迴圈遍歷210萬次。 就算我們用多執行緒8核CPU,那也得迴圈幾十萬次。但是如果使用Cuda,最多可以365535*512=100661760(一億)個執行緒並行執行,就這種級別的圖片那也是馬上處理完成。但是Cuda一般適合於圖片這種,有大量的畫素點需要同時處理,但是其記憶體不多所以邏輯不能太複雜。GPU只是用來擴充套件介紹,感興趣可以和筆者交流。

3.應用中的並行

一說起讓你的服務高效能的手段,那麼非同步化,並行化這些肯定會第一時間在你腦海中顯現出來,在之前的文章:《非同步化,你的高併發大殺器》中已經介紹過了非同步化的優化手段,有興趣的朋友可以看看。並行化可以用來配合非同步化,也可以用來單獨做優化。

我們可以想想有這麼一個需求,在你下外賣訂單的時候,這筆訂單可能還需要查,使用者資訊,折扣資訊,商家資訊,菜品資訊等,用同步的方式呼叫,如下圖所示:

設想一下這5個查詢服務,平均每次消耗50ms,那麼本次呼叫至少是250ms,我們細想一下,在這個這五個服務其實並沒有任何的依賴,誰先獲取誰後獲取都可以,那麼我們可以想想,是否可以用多重影分身之術,同時獲取這五個服務的資訊呢?優化如下:

將這五個查詢服務並行查詢,在理想情況下可以優化至50ms。當然說起來簡單,我們真正如何落地呢?

3.1 CountDownLatch/Phaser

CountDownLatch和Phaser是JDK提供的同步工具類Phaser是1.7版本之後提供的工具類而CountDownLatch是1.5版本之後提供的工具類。這裡簡單介紹一下CountDownLatch,可以將其看成是一個計數器,await()方法可以阻塞至超時或者計數器減至0,其他執行緒當完成自己目標的時候可以減少1,利用這個機制我們可以將其用來做併發。 可以用如下的程式碼實現我們上面的下訂單的需求:

  1. public class CountDownTask {

  2.    private static final int CORE_POOL_SIZE = 4;

  3.    private static final int MAX_POOL_SIZE = 12;

  4.    private static final long KEEP_ALIVE_TIME = 5L;

  5.    private final static int QUEUE_SIZE = 1600;

  6.    protected final static ExecutorService THREAD_POOL = new ThreadPoolExecutor(CORE_POOL_SIZE, MAX_POOL_SIZE,

  7.            KEEP_ALIVE_TIME, TimeUnit.SECONDS, new LinkedBlockingQueue<>(QUEUE_SIZE));

  8.    public static void main(String[] args) throws InterruptedException {

  9.        // 新建一個為5的計數器

  10.        CountDownLatch countDownLatch = new CountDownLatch(5);

  11.        OrderInfo orderInfo = new OrderInfo();

  12.        THREAD_POOL.execute(() -> {

  13.            System.out.println("當前任務Customer,執行緒名字為:" + Thread.currentThread().getName());

  14.            orderInfo.setCustomerInfo(new CustomerInfo());

  15.            countDownLatch.countDown();

  16.        });

  17.        THREAD_POOL.execute(() -> {

  18.            System.out.println("當前任務Discount,執行緒名字為:" + Thread.currentThread().getName());

  19.            orderInfo.setDiscountInfo(new DiscountInfo());

  20.            countDownLatch.countDown();

  21.        });

  22.        THREAD_POOL.execute(() -> {

  23.            System.out.println("當前任務Food,執行緒名字為:" + Thread.currentThread().getName());

  24.            orderInfo.setFoodListInfo(new FoodListInfo());

  25.            countDownLatch.countDown();

  26.        });

  27.        THREAD_POOL.execute(() -> {

  28.            System.out.println("當前任務Tenant,執行緒名字為:" + Thread.currentThread().getName());

  29.            orderInfo.setTenantInfo(new TenantInfo());

  30.            countDownLatch.countDown();

  31.        });

  32.        THREAD_POOL.execute(() -> {

  33.            System.out.println("當前任務OtherInfo,執行緒名字為:" + Thread.currentThread().getName());

  34.            orderInfo.setOtherInfo(new OtherInfo());

  35.            countDownLatch.countDown();

  36.        });

  37.        countDownLatch.await(1, TimeUnit.SECONDS);

  38.        System.out.println("主執行緒:"+ Thread.currentThread().getName());

  39.    }

  40. }

建立一個執行緒池(具體配置根據具體業務,具體機器配置),進行併發的執行我們的任務(生成使用者資訊,菜品資訊等),最後利用await方法阻塞等待結果成功返回。

3.2CompletableFuture

相信各位同學已經發現,CountDownLatch雖然能實現我們需要滿足的功能但是其任然有個問題是,在我們的業務程式碼需要耦合CountDownLatch的程式碼,比如在我們獲取使用者資訊之後我們會執行countDownLatch.countDown(),很明顯我們的業務程式碼顯然不應該關心這一部分邏輯,並且在開發的過程中萬一寫漏了,那我們的await方法將只會被各種異常喚醒。

所以在JDK1.8中提供了一個類CompletableFuture,它是一個多功能的非阻塞的Future。(什麼是Future:用來代表非同步結果,並且提供了檢查計算完成,等待完成,檢索結果完成等方法。)在我之前的這篇文章中詳細介紹了《非同步技巧之CompletableFuture》,有興趣的可以看這篇文章。我們將每個任務的計算完成的結果都用CompletableFuture來表示,利用CompletableFuture.allOf匯聚成一個大的CompletableFuture,那麼利用get()方法就可以阻塞。

  1. public class CompletableFutureParallel {

  2.    private static final int CORE_POOL_SIZE = 4;

  3.    private static final int MAX_POOL_SIZE = 12;

  4.    private static final long KEEP_ALIVE_TIME = 5L;

  5.    private final static int QUEUE_SIZE = 1600;

  6.    protected final static ExecutorService THREAD_POOL = new ThreadPoolExecutor(CORE_POOL_SIZE, MAX_POOL_SIZE,

  7.            KEEP_ALIVE_TIME, TimeUnit.SECONDS, new LinkedBlockingQueue<>(QUEUE_SIZE));

  8.    public static void main(String[] args) throws InterruptedException, ExecutionException, TimeoutException {

  9.        OrderInfo orderInfo = new OrderInfo();

  10.        //CompletableFuture 的List

  11.        List<CompletableFuture> futures = new ArrayList<>();

  12.        futures.add(CompletableFuture.runAsync(() -> {

  13.            System.out.println("當前任務Customer,執行緒名字為:" + Thread.currentThread().getName());

  14.            orderInfo.setCustomerInfo(new CustomerInfo());

  15.        }, THREAD_POOL));

  16.        futures.add(CompletableFuture.runAsync(() -> {

  17.            System.out.println("當前任務Discount,執行緒名字為:" + Thread.currentThread().getName());

  18.            orderInfo.setDiscountInfo(new DiscountInfo());

  19.        }, THREAD_POOL));

  20.        futures.add( CompletableFuture.runAsync(() -> {

  21.            System.out.println("當前任務Food,執行緒名字為:" + Thread.currentThread().getName());

  22.            orderInfo.setFoodListInfo(new FoodListInfo());

  23.        }, THREAD_POOL));

  24.        futures.add(CompletableFuture.runAsync(() -> {

  25.            System.out.println("當前任務Other,執行緒名字為:" + Thread.currentThread().getName());

  26.            orderInfo.setOtherInfo(new OtherInfo());

  27.        }, THREAD_POOL));

  28.        CompletableFuture allDoneFuture = CompletableFuture.allOf(futures.toArray(new CompletableFuture[futures.size()]));

  29.        allDoneFuture.get(10, TimeUnit.SECONDS);

  30.        System.out.println(orderInfo);

  31.    }

  32. }

可以看見我們使用CompletableFuture能很快的完成的需求,當然這還不夠。

3.3 Fork/Join

我們上面用CompletableFuture完成了我們對多組任務並行執行,但是其依然是依賴我們的執行緒池,在我們的執行緒池中使用的是阻塞佇列,也就是當我們某個執行緒執行完任務的時候需要通過這個阻塞佇列進行,那麼肯定會發生競爭,所以在JDK1.7中提供了ForkJoinTask和ForkJoinPool。

ForkJoinPool中每個執行緒都有自己的工作佇列,並且採用Work-Steal演算法防止執行緒飢餓。 Worker執行緒用LIFO的方法取出任務,但是會用FIFO的方法去偷取別人佇列的任務,這樣就減少了鎖的衝突。

網上這個框架的例子很多,我們看看如何使用程式碼其完成我們上面的下訂單需求:

  1. public class OrderTask extends RecursiveTask<OrderInfo> {

  2.    @Override

  3.    protected OrderInfo compute() {

  4.        System.out.println("執行"+ this.getClass().getSimpleName() + "執行緒名字為:" + Thread.currentThread().getName());

  5.        // 定義其他五種並行TasK

  6.        CustomerTask customerTask = new CustomerTask();

  7.        TenantTask tenantTask = new TenantTask();

  8.        DiscountTask discountTask = new DiscountTask();

  9.        FoodTask foodTask = new FoodTask();

  10.        OtherTask otherTask = new OtherTask();

  11.        invokeAll(customerTask, tenantTask, discountTask, foodTask, otherTask);

  12.        OrderInfo orderInfo = new OrderInfo(customerTask.join(), tenantTask.join(), discountTask.join(), foodTask.join(), otherTask.join());

  13.        return orderInfo;

  14.    }

  15.    public static void main(String[] args) {

  16.        ForkJoinPool forkJoinPool = new ForkJoinPool(Runtime.getRuntime().availableProcessors() -1 );

  17.        System.out.println(forkJoinPool.invoke(new OrderTask()));

  18.    }

  19. }

  20. class CustomerTask extends RecursiveTask<CustomerInfo>{

  21.    @Override

  22.    protected CustomerInfo compute() {

  23.        System.out.println("執行"+ this.getClass().getSimpleName() + "執行緒名字為:" + Thread.currentThread().getName());

  24.        return new CustomerInfo();

  25.    }

  26. }

  27. class TenantTask extends RecursiveTask<TenantInfo>{

  28.    @Override

  29.    protected TenantInfo compute() {

  30.        System.out.println("執行"+ this.getClass().getSimpleName() + "執行緒名字為:" + Thread.currentThread().getName());

  31.        return new TenantInfo();

  32.    }

  33. }

  34. class DiscountTask extends RecursiveTask<DiscountInfo>{

  35.    @Override

  36.    protected DiscountInfo compute() {

  37.        System.out.println("執行"+ this.getClass().getSimpleName() + "執行緒名字為:" + Thread.currentThread().getName());

  38.        return new DiscountInfo();

  39.    }

  40. }

  41. class FoodTask extends RecursiveTask<FoodListInfo>{

  42.    @Override

  43.    protected FoodListInfo compute() {

  44.        System.out.println("執行"+ this.getClass().getSimpleName() + "執行緒名字為:" + Thread.currentThread().getName());

  45.        return new FoodListInfo();

  46.    }

  47. }

  48. class OtherTask extends RecursiveTask<OtherInfo>{

  49.    @Override

  50.    protected OtherInfo compute() {

  51.        System.out.println("執行"+ this.getClass().getSimpleName() + "執行緒名字為:" + Thread.currentThread().getName());

  52.        return new OtherInfo();

  53.    }

  54. }

我們定義一個OrderTask並且定義五個獲取資訊的任務,在compute中分別fork執行這五個任務,最後在將這五個任務的結果通過Join獲得,最後完成我們的並行化的需求。

3.4 parallelStream

在jdk1.8中提供了並行流的API,當我們使用集合的時候能很好的進行並行處理,下面舉了一個簡單的例子從1加到100:

  1. public class ParallelStream {

  2.    public static void main(String[] args) {

  3.        ArrayList<Integer> list = new ArrayList<Integer>();

  4.        for (int i = 1; i <= 100; i++) {

  5.            list.add(i);

  6.        }

  7.        LongAdder sum = new LongAdder();

  8.        list.parallelStream().forEach(integer -> {

  9. //            System.out.println("當前執行緒" + Thread.currentThread().getName());

  10.            sum.add(integer);

  11.        });

  12.        System.out.println(sum);

  13.    }

  14. }

parallelStream中底層使用的那一套也是Fork/Join的那一套,預設的併發程度是可用CPU數-1。

3.5 分片

可以想象有這麼一個需求,每天定時對id在某個範圍之間的使用者發券,比如這個範圍之間的使用者有幾百萬,如果給一臺機器發的話,可能全部發完需要很久的時間,所以分散式排程框架比如:elastic-job都提供了分片的功能,比如你用50臺機器,那麼id%50=0的在第0臺機器上,=1的在第1臺機器上發券,那麼我們的執行時間其實就分攤到了不同的機器上了。

4.並行化注意事項

  • 執行緒安全:在parallelStream中我們列舉的程式碼中使用的是LongAdder,並沒有直接使用我們的Integer和Long,這個是因為在多執行緒環境下Integer和Long執行緒不安全。所以執行緒安全我們需要特別注意。

  • 合理引數配置:可以看見我們需要配置的引數比較多,比如我們的執行緒池的大小,等待佇列大小,並行度大小以及我們的等待超時時間等等,我們都需要根據自己的業務不斷的調優防止出現佇列不夠用或者超時時間不合理等等。

5.最後

本文介紹了什麼是並行化,並行化的各種歷史,在Java中如何實現並行化,以及並行化的注意事項。希望大家對並行化有個比較全面的認識。最後給大家提個兩個小問題:

  1. 在我們並行化當中有某個任務如果某個任務出現了異常應該怎麼辦?

  2. 在我們並行化當中有某個任務的資訊並不是強依賴,也就是如果出現了問題這部分資訊我們也可以不需要,當並行化的時候,這種任務出現了異常應該怎麼辦?

最後打個廣告,如果你覺得這篇文章對你有文章,可以加下我的群:777158424 ,最近作者收集了很多最新的學習資料視訊以及面試資料,加群通過之後可領取,你的關注和轉發是對我最大的支援,O(∩_∩)O