1. 程式人生 > >CompletableFuture的多執行緒和非同步監聽實現

CompletableFuture的多執行緒和非同步監聽實現

大家好,我是烤鴨:
今天給大家說的是多執行緒併發的非同步監聽的情況。
這裡不得不說一下CompletableFuture這個類,普通我們執行多執行緒的時候只需要另外啟動一條執行緒。
說一下執行緒的3種方式:

extends Thread,implements Runnable,implements Callable。

        同步的實現方式有很多。這裡貼一下我的。

       這個handler是可以注入其他的比如service或者dao,完成業務邏輯,我這裡是注入的redis。

package com.mys.my.wechat.handler;

import com.mys.my.wechat.config.redis.RedisClient;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.concurrent.Executor;

@Service("musicHandler")
public class MusicHandler {
    public static Log logger = LogFactory.getLog(MusicHandler.class);
    public String redisString;
    public String openId;

    @Autowired
    private RedisClient redisClient;
    @Autowired
    private Executor taskAsyncPool;

    public void doAllHandler() {
        try {
            taskAsyncPool.execute(new Runnable() {
                @Override
                public void run() {
                    logger.info("xiami 任務啟動");
                    Date time = new Date();
//                  SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
//                  String re_StrTime = sdf.format(time);
                    //過期時間1小時
                    redisClient.set("xiamiMusic:"+openId,redisString,60*60);
                }
            });
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

呼叫:

                   //存redis
                    musicHandler.redisString = toJson;
                    musicHandler.openId = openId;
                    musicHandler.doAllMusicHandler();

以上就是同步呼叫,但是這樣只是執行,你無法監聽結果。

我現在說一下場景:

燒水的同時,洗衣機洗衣服,電腦下載,手機充電,我們生活中

也會有同時幹幾件事的情況,而需求是這幾件事都幹完了我才能出門,多執行緒確實能執行,但是怎麼監聽結果呢。

以上也許可以說時間是可以預測的。

但是具體的業務場景,如果需要你去呼叫4個介面,而他們之間的沒有任何影響,但是又必須

4個介面都執行完才能返回資料。這樣如果實現多執行緒的非同步監聽呢?

最常用的就是爬蟲,我想同時抓取幾個網站或者幾個網頁的資料,如果是單執行緒,效率很低。

多執行緒又必須保證每條執行緒完成抓取並返回資料。以下是一個小例子。

用CompletableFuture,程式碼如下:

package com.mys.my.wechat.service.impl;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.concurrent.CompletableFuture;

final Integer res = 0;
        final ArrayList<Integer> integers = new ArrayList<>();
            CompletableFuture<Integer> completableFuture1 = CompletableFuture.supplyAsync(() -> {
                //模擬執行耗時任務
                System.out.println("task 1 doing...");
                try {
                   Thread.sleep(1000);
                } catch (Exception e) {
                    e.printStackTrace();
                }
                //返回結果
                return 1;
            });
            //註冊完成事件
            completableFuture1.thenAccept(result -> {
                integers.add(1);
            });
        CompletableFuture<Integer> completableFuture2 = CompletableFuture.supplyAsync(() -> {
            //模擬執行耗時任務
            System.out.println("task 2 doing...");
            try {
                Thread.sleep(2000);
            } catch (Exception e) {
                e.printStackTrace();
            }
            //返回結果
            return 1;
        });
        //註冊完成事件
        completableFuture2.thenAccept(result -> {
            integers.add(1);
        });
        CompletableFuture<Integer> completableFuture3 = CompletableFuture.supplyAsync(() -> {
            //模擬執行耗時任務
            System.out.println("task 3 doing...");
            try {
                Thread.sleep(3000);
            } catch (Exception e) {
                e.printStackTrace();
            }
            //返回結果
            return 1;
        });
        //註冊完成事件
        completableFuture3.thenAccept(result -> {
            integers.add(1);
        });
        while(true){
            try {
                Thread.sleep(1000);
                if(integers.size()== 3){
                    System.out.println("done");
                    break;
                }
                System.out.println("s:"+integers.size());
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
這裡我們可以看到,主執行緒一直在監聽,其他新開啟的3個執行緒,如果他們執行完畢,就可以返回資料,
如果他們有沒執行完的,主執行緒就一直等。這樣就分工明確了,主執行緒的任務就是監視其他是否完畢,

而同時開啟3條執行緒執行速度也會很快。

這只是一個demo和想法實現,歡迎交流。