聊聊java高併發系統之非同步非阻塞
阿新 • • 發佈:2018-12-27
幾種呼叫方式
同步阻塞呼叫
即序列呼叫,響應時間為所有服務的響應時間總和;
半非同步(非同步Future)
執行緒池,非同步Future,使用場景:併發請求多服務,總耗時為最長響應時間;提升總響應時間,但是阻塞主請求執行緒,高併發時依然會造成執行緒數過多,CPU上下文切換;
全非同步(Callback)
Callback方式呼叫,使用場景:不考慮回撥時間且只能對結果做簡單處理,如果依賴服務是兩個或兩個以上服務,則不能合併兩個服務的處理結果;不阻塞主請求執行緒,但使用場景有限。
非同步回撥鏈式編排
非同步回撥鏈式編排(JDK8 CompletableFuture),使用場景:其實不是非同步呼叫方式,只是對依賴多服務的Callback呼叫結果處理做結果編排,來彌補Callback的不足,從而實現全非同步鏈式呼叫。
接下來看看如何設計利用全非同步Callback呼叫和非同步回撥鏈式編排處理結果來實現全非同步系統設計。
同步阻塞呼叫:
半非同步(非同步Future):public class Test { public static void main(String[] args) throws Exception { RpcService rpcService = new RpcService(); HttpService httpService = new HttpService(); //耗時10ms Map result1 = rpcService.getRpcResult(); //耗時20ms Integer result2 = httpService.getHttpResult(); //總耗時30ms } static class RpcService { Map getRpcResult() throws Exception { //呼叫遠端方法(遠端方法耗時約10ms,可以使用Thread.sleep模擬) } } static class HttpService { Integer getHttpResult() throws Exception { //呼叫遠端方法(遠端方法耗時約20ms,可以使用Thread.sleep模擬) Thread.sleep(20); return 0; } } }
public class Test { final static ExecutorService executor = Executors.newFixedThreadPool(2); public static void main(String[] args) { RpcService rpcService = new RpcService(); HttpService httpService = new HttpService(); Future> future1 = null; Future future2 = null; try { future1 = executor.submit(() -> rpcService.getRpcResult()); future2 = executor.submit(() -> httpService.getHttpResult()); //耗時10ms Map result1 = future1.get(300, TimeUnit.MILLISECONDS); //耗時20ms Integer result2 = future2.get(300, TimeUnit.MILLISECONDS); //總耗時20ms } catch (Exception e) { if (future1 != null) { future1.cancel(true); } if (future2 != null) { future2.cancel(true); } throw new RuntimeException(e); } } static class RpcService { Map getRpcResult() throws Exception { //呼叫遠端方法(遠端方法耗時約10ms,可以使用Thread.sleep模擬) } } static class HttpService { Integer getHttpResult() throws Exception { //呼叫遠端方法(遠端方法耗時約20ms,可以使用Thread.sleep模擬) } } }<strong> </strong>
全非同步(callback):
public class AsyncTest {
public staticHttpAsyncClient httpAsyncClient;
public static CompletableFuture getHttpData(String url) {
CompletableFuture asyncFuture = new CompletableFuture();
HttpPost post = new HttpPost(url);
HttpAsyncRequestProducer producer = HttpAsyncMethods.create(post);
AsyncCharConsumer consumer = newAsyncCharConsumer() {
HttpResponse response;
protected HttpResponse buildResult(final HttpContext context) {
return response;
}
…...
};
FutureCallback callback = new FutureCallback() {
public void completed(HttpResponse response) {
asyncFuture.complete(EntityUtils.toString(response.getEntity()));
}
…...
};
httpAsyncClient.execute(producer, consumer, callback);
return asyncFuture;
}
public static void main(String[] args) throws Exception {
AsyncTest.getHttpData("http://www.jd.com");
Thread.sleep(1000000);
}
}
非同步回撥鏈式編排:
CompletableFuture提供了50多個API,可以滿足所需的各種場景的非同步處理的編排,下面舉一個場景:
三個服務併發非同步呼叫,返回CompletableFuture,不阻塞主執行緒;
public static void test1() throws Exception {
HelloClientDemoTest service = new HelloClientDemoTest();
/**
* 場景1 兩個以上服務併發非同步呼叫,返回CompletableFuture,不阻塞主執行緒
* 並且兩個服務也是非同步非阻塞呼叫
*/
CompletableFuture future1 = service.getHttpData("http://www.jd.com");
CompletableFuture future2 = service.getHttpData("http://www.jd.com");
CompletableFuture future3 =service.getHttpData("http://www.jd.com");
List futureList = Lists.newArrayList(future1,future2, future3);
CompletableFuture allDoneFuture =CompletableFuture.allOf(futureList.toArray(newCompletableFuture[futureList.size()]));
CompletableFuture future4 =allDoneFuture.thenApply(v -> {
List result =futureList.stream().map(CompletableFuture::join)
.collect(Collectors.toList());
//注意順序
String result1 = (String)result.get(0);
String result2 = (String)result.get(1);
String result3 = (String)result.get(2);
//處理業務....
return result1 + result2 + result3;
}).exceptionally(e -> {
//e.printStackTrace();
return "";
});
//返回
}