1. 程式人生 > >聊聊java高併發系統之非同步非阻塞

聊聊java高併發系統之非同步非阻塞

幾種呼叫方式

同步阻塞呼叫

即序列呼叫,響應時間為所有服務的響應時間總和;

半非同步(非同步Future)

執行緒池,非同步Future,使用場景:併發請求多服務,總耗時為最長響應時間;提升總響應時間,但是阻塞主請求執行緒,高併發時依然會造成執行緒數過多,CPU上下文切換;

全非同步(Callback)

Callback方式呼叫,使用場景:不考慮回撥時間且只能對結果做簡單處理,如果依賴服務是兩個或兩個以上服務,則不能合併兩個服務的處理結果;不阻塞主請求執行緒,但使用場景有限。

非同步回撥鏈式編排

非同步回撥鏈式編排(JDK8 CompletableFuture),使用場景:其實不是非同步呼叫方式,只是對依賴多服務的Callback呼叫結果處理做結果編排,來彌補Callback的不足,從而實現全非同步鏈式呼叫。

接下來看看如何設計利用全非同步Callback呼叫和非同步回撥鏈式編排處理結果來實現全非同步系統設計。

同步阻塞呼叫:

public class Test {
   public static void main(String[] args) throws Exception {
       RpcService rpcService = new RpcService();
       HttpService httpService = new HttpService();
       //耗時10ms
       Map result1 = rpcService.getRpcResult();
       //耗時20ms
       Integer result2 = httpService.getHttpResult();
       //總耗時30ms
    }
   static class RpcService {
       Map getRpcResult() throws Exception {
           //呼叫遠端方法(遠端方法耗時約10ms,可以使用Thread.sleep模擬)
       }
    }
   static class HttpService {
       Integer getHttpResult() throws Exception {
           //呼叫遠端方法(遠端方法耗時約20ms,可以使用Thread.sleep模擬)
           Thread.sleep(20);
           return 0;
       }
    }
}
  半非同步(非同步Future):
public class Test {
   final static ExecutorService executor = Executors.newFixedThreadPool(2);
   public static void main(String[] args) {
       RpcService rpcService = new RpcService();
       HttpService httpService = new HttpService();
       Future> future1 = null;
       Future future2 = null;
       try {
           future1 = executor.submit(() -> rpcService.getRpcResult());
           future2 = executor.submit(() -> httpService.getHttpResult());
           //耗時10ms
           Map result1 = future1.get(300, TimeUnit.MILLISECONDS);
           //耗時20ms
           Integer result2 = future2.get(300, TimeUnit.MILLISECONDS);
           //總耗時20ms
       } catch (Exception e) {
           if (future1 != null) {
                future1.cancel(true);
           }
           if (future2 != null) {
                future2.cancel(true);
           }
           throw new RuntimeException(e);
       }
    }
   static class RpcService {
       Map getRpcResult() throws Exception {
           //呼叫遠端方法(遠端方法耗時約10ms,可以使用Thread.sleep模擬)
       }
    }
   static class HttpService {
       Integer getHttpResult() throws Exception {
           //呼叫遠端方法(遠端方法耗時約20ms,可以使用Thread.sleep模擬)
       }
    }
}<strong>

</strong>

全非同步(callback):
public class AsyncTest {
public staticHttpAsyncClient httpAsyncClient;
   public static CompletableFuture getHttpData(String url) {
       CompletableFuture asyncFuture = new CompletableFuture();
       HttpPost post = new HttpPost(url);
       HttpAsyncRequestProducer producer = HttpAsyncMethods.create(post);
       AsyncCharConsumer consumer = newAsyncCharConsumer() {
            HttpResponse response;
           protected HttpResponse buildResult(final HttpContext context) {
                return response;
           }
…...
       };
       FutureCallback callback = new FutureCallback() {
           public void completed(HttpResponse response) {
               asyncFuture.complete(EntityUtils.toString(response.getEntity()));
           }
…...
       };
       httpAsyncClient.execute(producer, consumer, callback);
       return asyncFuture;
    }
 
   public static void main(String[] args) throws Exception {
       AsyncTest.getHttpData("http://www.jd.com");
       Thread.sleep(1000000);
    }
}

非同步回撥鏈式編排:

CompletableFuture提供了50多個API,可以滿足所需的各種場景的非同步處理的編排,下面舉一個場景:

三個服務併發非同步呼叫,返回CompletableFuture,不阻塞主執行緒;


   public static void test1() throws Exception {
       HelloClientDemoTest service = new HelloClientDemoTest();
       /**
        * 場景1 兩個以上服務併發非同步呼叫,返回CompletableFuture,不阻塞主執行緒
        * 並且兩個服務也是非同步非阻塞呼叫
        */
       CompletableFuture future1 = service.getHttpData("http://www.jd.com");
       CompletableFuture future2 = service.getHttpData("http://www.jd.com");
       CompletableFuture future3 =service.getHttpData("http://www.jd.com");
       List futureList = Lists.newArrayList(future1,future2, future3);
       CompletableFuture allDoneFuture =CompletableFuture.allOf(futureList.toArray(newCompletableFuture[futureList.size()]));
       CompletableFuture future4 =allDoneFuture.thenApply(v -> {
            List result =futureList.stream().map(CompletableFuture::join)

                   .collect(Collectors.toList());
            //注意順序
            String result1 = (String)result.get(0);
            String result2 = (String)result.get(1);
            String result3 = (String)result.get(2);
            //處理業務....
            return result1 + result2 + result3;
        }).exceptionally(e -> {
            //e.printStackTrace();
            return "";
        });
       //返回
    }