1. 程式人生 > >非同步程式設計CompletableFuture實現高併發系統優化之請求合併

非同步程式設計CompletableFuture實現高併發系統優化之請求合併

  先說場景:

  根據Redis官網介紹,單機版Redis的讀寫效能是12萬/秒,批量處理可以達到70萬/秒。不管是快取或者是資料庫,都有批量處理的功能。當我們的系統達到瓶頸的時候,我們考慮充分的壓榨快取和資料庫的效能,應對更大的併發請求。適用於電商促銷雙十一,等特定高併發的場景,讓系統可以支撐更高的併發。

  思路:

一個使用者請求到後臺,我沒有立即去處理,而是把請求堆積到佇列中,堆積10毫秒的時間,由於是高併發場景,就堆積了一定數量的請求。

我定義一個定時任務,把佇列中的請求,按批處理的方式,向後端的Redis快取,或者資料庫發起批量的請求,拿到批量的結果,再把結果分發給對應的請求使用者。

對於單個使用者而言,他的請求變慢了10毫秒是無感知的。但是對於我們系統,卻可以提高几倍的抗併發能力。

這個請求合併,結果分發的功能,就要用到一個類CompletableFuture 實現非同步程式設計,不同執行緒之間的資料互動。

  執行緒1 如何建立非同步任務?

//建立非同步任務 
CompletableFuture<Map<String, Object>> future = new CompletableFuture<>();
      
//阻塞等待獲取結果。
Map<String, Object> result = future.get();

  執行緒2 如何把資料賦值給執行緒1 ?

// 執行緒2的處理結果 
Object result = "結果";
//執行緒2 的結果,賦值 給 執行緒1
future.complete(result);

  CompletableFuture 是由大牛 Doug Lea 在JDK1.8 提供的類,我們來看看complete()方法的原始碼。

    /**
     * If not already completed, sets the value returned by {@link
     * #get()} and related methods to the given value.
     *
     * @param value the result value
     * @return {@code true} if this invocation caused this CompletableFuture
     * to transition to a completed state, else {@code false}
     */
    public boolean complete(T value) {
        boolean triggered = completeValue(value);
        postComplete();
        return triggered;
    }

  翻譯:

      如果尚未完成,則將返回的值和相關方法get()設定為給定值。

  也就是說,

    執行緒1 的get() 方法,拿到的就是執行緒 2 的complete() 方法給的值。

 

看到這裡,應該基本明白這個異常程式設計的意思了。它的核心就是執行緒通訊,資料傳輸。直接上程式碼:

package www.itbac.com;

import javax.annotation.PostConstruct;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.*;

public class CompletableFutureTest {

    //併發安全的阻塞佇列,積攢請求。(每隔N毫秒批量處理一次)
    LinkedBlockingQueue<Request> queue = new LinkedBlockingQueue();

    // 定時任務的實現,每隔開N毫秒處理一次資料。
    @PostConstruct
    public void init() {
        // 定時任務執行緒池
        ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(1);
        scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
            @Override
            public void run() {
//                捕獲異常
                try {
                    //1.從阻塞佇列中取出queue的請求,生成一次批量查詢。
                    int size = queue.size();
                    if (size == 0) {
                        return;
                    }
                    List<Request> requests = new ArrayList<>(size);
                    for (int i = 0; i < size; i++) {
                        // 移出佇列,並返回。
                        Request poll = queue.poll();
                        requests.add(poll);
                    }
                    //2.組裝一個批量查詢請求引數。
                    List<String> movieCodes = new ArrayList<>();
                    for (Request request : requests) {
                        movieCodes.add(request.getMovieCode());
                    }
                    //3. http 請求,或者 dubbo 請求。批量請求,得到結果list。
                    System.out.println("本次合併請求數量:"+movieCodes.size());
                    List<Map<String, Object>> responses = new ArrayList<>();

                    //4.把list轉成map方便快速查詢。
                    HashMap<String, Map<String, Object>> responseMap = new HashMap<>();
                    for (Map<String, Object> respons : responses) {
                        String code = respons.get("code").toString();
                        responseMap.put(code,respons);
                    }
                    //4.將結果響應給每一個單獨的使用者請求。
                    for (Request request : requests) {
                        //根據請求中攜帶的能表示唯一引數,去批量查詢的結果中找響應。
                        Map<String, Object> result = responseMap.get(request.getMovieCode());

                        //將結果返回到對應的請求執行緒。2個執行緒通訊,非同步程式設計賦值。
                        //complete(),原始碼註釋翻譯:如果尚未完成,則將由方法和相關方法返回的值設定為給定值
                        request.getFuture().complete(result);
                    }

                } catch (Exception e) {
                    e.printStackTrace();
                }

            }
            // 立即執行任務,並間隔10 毫秒重複執行。
        }, 0, 10, TimeUnit.MILLISECONDS);

    }

    // 1萬個使用者請求,1萬個併發,查詢電影資訊
    public Map<String, Object> queryMovie(String movieCode) throws ExecutionException, InterruptedException {
        //請求合併,減少介面呼叫次數,提升效能。
        //思路:將不同使用者的同類請求,合併起來。
        //並非立刻發起介面呼叫,請求 。是先收集起來,再進行批量請求。
        Request request = new Request();
        //請求引數
        request.setMovieCode(movieCode);
        //非同步程式設計,建立當前執行緒的任務,由其他執行緒非同步運算,獲取非同步處理的結果。
        CompletableFuture<Map<String, Object>> future = new CompletableFuture<>();
        request.setFuture(future);

        //請求引數放入佇列中。定時任務去消化請求。
        queue.add(request);

        //阻塞等待獲取結果。
        Map<String, Object> stringObjectMap = future.get();
        return stringObjectMap;
    }

}
    //請求包裝類
    class Request {

    //請求引數: 電影id。
    private String movieCode;

    // 多執行緒的future接收返回值。
    //每一個請求物件中都有一個future接收請求。
    private CompletableFuture<Map<String, Object>> future;



    public CompletableFuture<Map<String, Object>> getFuture() {
        return future;
    }

    public void setFuture(CompletableFuture<Map<String, Object>> future) {
        this.future = future;
    }

    public Request() {
    }

    public Request(String movieCode) {
        this.movieCode = movieCode;
    }

    public String getMovieCode() {
        return movieCode;
    }

    public void setMovieCode(String movieCode) {
        this.movieCode = movieCode;
    }
}

  這樣就實現了請求合併,批量處理,結果分發響應。讓系統支撐更高的併發量。

當然,因為不是天天雙十一,沒有那麼大的併發量,就新增一個動態的配置,只有當特定的時間,才進行請求堆積。其他時間還是正常的處理。這部分邏輯就不寫出來了。

&n