1. 程式人生 > >java高併發系統之非同步非阻塞

java高併發系統之非同步非阻塞

在做電商系統時,流量入口如首頁、活動頁、商品詳情頁等系統承載了網站的大部分流量,而這些系統的主要職責包括聚合資料拼裝模板、熱點統計、快取、下游功能降級開關、託底資料等等。其中聚合資料需要呼叫其它多個系統服務獲取資料、拼裝資料/模板然後返回給前端,聚合資料來源主要有依賴系統/服務、快取、資料庫等;而系統之間的呼叫可以通過如http介面呼叫(如HttpClient)、SOA服務呼叫(如dubbo、thrift)等等。

在Java中,如使用Tomcat,一個請求會分配一個執行緒進行請求處理,該執行緒負責獲取資料、拼裝資料或模板然後返回給前端;在同步呼叫獲取資料介面的情況下(等待依賴系統返回資料),整個執行緒是一直被佔用並阻塞的。如果有大量的這種請求,每個請求佔用一個執行緒,但執行緒一直處於阻塞,降低了系統的吞吐量,這將導致應用的吞吐量下降;我們希望在呼叫依賴的服務響應比較慢,此時應該讓出執行緒和CPU來處理下一個請求,當依賴的服務返回了再分配相應的執行緒來繼續處理。而這應該有更好的解決方案:非同步/協程。而Java是不支援協程的(雖然有些Java框架說支援,但還是高層API的封裝),因此在Java中我們還可以使用非同步來提升吞吐量。目前java一些開源框架(HttpClient\HttpAsyncClient、dubbo、thrift等等)大部分都支援。

幾種呼叫方式

同步阻塞呼叫

即序列呼叫,響應時間為所有服務的響應時間總和;

半非同步(非同步Future)

執行緒池,非同步Future,使用場景:併發請求多服務,總耗時為最長響應時間;提升總響應時間,但是阻塞主請求執行緒,高併發時依然會造成執行緒數過多,CPU上下文切換;

全非同步(Callback)

Callback方式呼叫,使用場景:不考慮回撥時間且只能對結果做簡單處理,如果依賴服務是兩個或兩個以上服務,則不能合併兩個服務的處理結果;不阻塞主請求執行緒,但使用場景有限。

非同步回撥鏈式編排

非同步回撥鏈式編排(JDK8 CompletableFuture),使用場景:其實不是非同步呼叫方式,只是對依賴多服務的Callback呼叫結果處理做結果編排,來彌補Callback的不足,從而實現全非同步鏈式呼叫。

接下來看看如何設計利用全非同步Callback呼叫和非同步回撥鏈式編排處理結果來實現全非同步系統設計。

同步阻塞呼叫

public class Test {

   public static void main(String[] args) throws Exception {

       RpcService rpcService = new RpcService();

       HttpService httpService = new HttpService();

       //耗時10ms

       Map<String, String> result1 = rpcService.getRpcResult();

       //耗時20ms

       Integer result2 = httpService.getHttpResult();

       //總耗時30ms

    }

   static class RpcService {

       Map<String, String> getRpcResult() throws Exception {

           //呼叫遠端方法(遠端方法耗時約10ms,可以使用Thread.sleep模擬)

       }

    }

   static class HttpService {

       Integer getHttpResult() throws Exception {

           //呼叫遠端方法(遠端方法耗時約20ms,可以使用Thread.sleep模擬)

           Thread.sleep(20);

           return 0;

       }

    }

}

半非同步(非同步Future)

public class Test {

   final static ExecutorService executor = Executors.newFixedThreadPool(2);

   public static void main(String[] args) {

       RpcService rpcService = new RpcService();

       HttpService httpService = new HttpService();

       Future<Map<String, String>> future1 = null;

       Future<Integer> future2 = null;

       try {

           future1 = executor.submit(() -> rpcService.getRpcResult());

           future2 = executor.submit(() -> httpService.getHttpResult());

           //耗時10ms

           Map<String, String> result1 = future1.get(300, TimeUnit.MILLISECONDS);

           //耗時20ms

           Integer result2 = future2.get(300, TimeUnit.MILLISECONDS);

           //總耗時20ms

       } catch (Exception e) {

           if (future1 != null) {

                future1.cancel(true);

           }

           if (future2 != null) {

                future2.cancel(true);

           }

           throw new RuntimeException(e);

       }

    }

   static class RpcService {

       Map<String, String> getRpcResult() throws Exception {

           //呼叫遠端方法(遠端方法耗時約10ms,可以使用Thread.sleep模擬)

       }

    }

   static class HttpService {

       Integer getHttpResult() throws Exception {

           //呼叫遠端方法(遠端方法耗時約20ms,可以使用Thread.sleep模擬)

       }

    }

}

全非同步(Callback)

public class AsyncTest {

public staticHttpAsyncClient httpAsyncClient;

   public static CompletableFuture<String> getHttpData(String url) {

       CompletableFuture asyncFuture = new CompletableFuture();

       HttpPost post = new HttpPost(url);

       HttpAsyncRequestProducer producer = HttpAsyncMethods.create(post);

       AsyncCharConsumer<HttpResponse> consumer = newAsyncCharConsumer<HttpResponse>() {

            HttpResponse response;

           protected HttpResponse buildResult(final HttpContext context) {

                return response;

           }

…...

       };

       FutureCallback callback = new FutureCallback<HttpResponse>() {

           public void completed(HttpResponse response) {

               asyncFuture.complete(EntityUtils.toString(response.getEntity()));

           }

…...

       };

       httpAsyncClient.execute(producer, consumer, callback);

       return asyncFuture;

    }

   public static void main(String[] args) throws Exception {

       AsyncTest.getHttpData("http://www.jd.com");

       Thread.sleep(1000000);

    }

}

本示例使用HttpAsyncClient演示。

非同步回撥鏈式編排

CompletableFuture提供了50多個API,可以滿足所需的各種場景的非同步處理的編排,在此列舉三個場景:

場景1:三個服務併發非同步呼叫,返回CompletableFuture,不阻塞主執行緒;


方法test1:

   public static void test1() throws Exception {

       HelloClientDemoTest service = new HelloClientDemoTest();

       /**

        * 場景1 兩個以上服務併發非同步呼叫,返回CompletableFuture,不阻塞主執行緒

        * 並且兩個服務也是非同步非阻塞呼叫

        */

       CompletableFuture future1 = service.getHttpData("http://www.jd.com");

       CompletableFuture future2 = service.getHttpData("http://www.jd.com");

       CompletableFuture future3 =service.getHttpData("http://www.jd.com");

       List<CompletableFuture> futureList = Lists.newArrayList(future1,future2, future3);

       CompletableFuture<Void> allDoneFuture =CompletableFuture.allOf(futureList.toArray(newCompletableFuture[futureList.size()]));

       CompletableFuture<String> future4 =allDoneFuture.thenApply(v -> {

            List<Object> result =futureList.stream().map(CompletableFuture::join)

                   .collect(Collectors.toList());

            //注意順序

            String result1 = (String)result.get(0);

            String result2 = (String)result.get(1);

            String result3 = (String)result.get(2);

            //處理業務....

            return result1 + result2 + result3;

        }).exceptionally(e -> {

            //e.printStackTrace();

            return "";

        });

       //返回

    }

場景2、兩個服務併發非同步呼叫,返回CompletableFuture,不阻塞主執行緒;


方法test2:

   public void test2() throws Exception {

       HelloClientDemoTest service = new HelloClientDemoTest();

       /**

        * 場景2 兩個介面併發非同步呼叫,返回CompletableFuture,不阻塞主執行緒

        * 並且兩個服務也是非同步非阻塞呼叫

        */

       CompletableFuture future1 = service.getHttpData("http://www.jd.com");

       CompletableFuture future2 =service.getHttpData("http://www.jd.com");

       CompletableFuture future3 =future1.thenCombine(future2, (f1, f2) -> {

            //理業務....

            return f1 + "," + f2;

        }).exceptionally(e -> {

            return "";

        });

       //返回

    }

場景3、兩個服務,併發非同步呼叫兩個服務,並且一個服務的結果返回後再次呼叫另一服務,然後將三個結果後並處理,返回CompletableFuture,整個處理過程中不阻塞任何執行緒;

方法test3:

    publicvoid test3() throws Exception {

       HelloClientDemoTest service = new HelloClientDemoTest();

       /**

        * 場景3 兩請求依賴呼叫,然後與另一服務結果組合處理,返回CompletableFuture,不阻塞主執行緒

        * 並且兩個服務也是非同步非阻塞呼叫

        */

        CompletableFuture future1 = service.getHttpData("http://www.jd.com");

        CompletableFuture future2 = service.getHttpData("http://www.jd.com");

        CompletableFuture<String> future3= future1.thenApply((param) -> {

            CompletableFuture future4 =service.getHttpData("http://www.jd.com");

            return future4;

        });

        CompletableFuture future5 =future2.thenCombine(future3, (f2, f3) -> {

            //....處理業務

            return f2 + "," + f3;

        }).exceptionally(e -> {

            return "";

        });

        //返回future5

    }

全非同步Web系統設計

主要技術:servlet3,JDK8 CompletableFuture,支援非同步Callback呼叫的RPC框架。

先看一下處理流程圖:


servlet3:Servlet 接收到請求之後,可能首先需要對請求攜帶的資料進行一些預處理;接著,Servlet 執行緒將請求轉交給一個非同步執行緒來執行業務處理,執行緒本身返回至容器。針對業務處理較耗時的情況,這將大大減少伺服器資源的佔用,並且提高併發處理速度。servlet3可參考商品詳情頁系統的Servlet3非同步化實踐,結合其中講解的servlet3整合:

public void submitFuture(finalHttpServletRequest req, final Callable<CompletableFuture> task) throwsException{

       final String uri = req.getRequestURI();

       final Map<String, String[]> params = req.getParameterMap();

       final AsyncContext asyncContext = req.startAsync();

       asyncContext.getRequest().setAttribute("uri", uri);

       asyncContext.getRequest().setAttribute("params", params);

       asyncContext.setTimeout(asyncTimeoutInSeconds * 1000);

       if(asyncListener != null) {

           asyncContext.addListener(asyncListener);

       }

       CompletableFuture future = task.call();

       future.thenAccept(result -> {

           HttpServletResponse resp = (HttpServletResponse)asyncContext.getResponse();

           try {

                if(result instanceof String) {

                    byte[] bytes = new byte[0];

                    if (StringUtils.isBlank(result)){

                       resp.setContentType("text/html;charset=gbk");

                       resp.setContentLength(0);

                    } else {

                        bytes =result.getBytes("GBK");

相關推薦

java併發系統非同步阻塞

在做電商系統時,流量入口如首頁、活動頁、商品詳情頁等系統承載了網站的大部分流量,而這些系統的主要職責包括聚合資料拼裝模板、熱點統計、快取、下游功能降級開關、託底資料等等。其中聚合資料需要呼叫其它多個系統服務獲取資料、拼裝資料/模板然後返回給前端,聚合資料來源主要有依賴系統

聊聊java併發系統非同步阻塞

幾種呼叫方式 同步阻塞呼叫 即序列呼叫,響應時間為所有服務的響應時間總和; 半非同步(非同步Future) 執行緒池,非同步Future,使用場景:併發請求多服務,總耗時為最長響應時間;提升總響應時間,但是阻塞主請求執行緒,高併發時依然會造成執行緒數過多,CPU上下文切換; 全非同步(Cal

【轉】聊聊java併發系統非同步阻塞

在做電商系統時,流量入口如首頁、活動頁、商品詳情頁等系統承載了網站的大部分流量,而這些系統的主要職責包括聚合資料拼裝模板、熱點統計、快取、下游功能降級開關、託底資料等等。其中聚合資料需要呼叫其它多個系統服務獲取資料、拼裝資料/模板然後返回給前端,聚合資料來源主要有依賴系統/服務、快取、資料庫等;而系統之間

聊聊併發系統降級特技(轉)

在開發高併發系統時有三把利器用來保護系統:快取、降級和限流。之前已經有一些文章介紹過快取和限流了。本文將詳細聊聊降級。當訪問量劇增、服務出現問題(如響應時間慢或不響應)或非核心服務影響到核心流程的效能時,仍然需要保證服務還是可用的,即使是有損服務。系統可以根據一些關鍵資料進行自動降級,也

聊聊併發系統限流特技(二)(轉)

上一篇《聊聊高併發系統限流特技-1》講了限流演算法、應用級限流、分散式限流;本篇將介紹接入層限流實現。 接入層限流 接入層通常指請求流量的入口,該層的主要目的有:負載均衡、非法請求過濾、請求聚合、快取、降級、限流、 A/B 測試、服務質量監控

Java併發程式設計synchronized關鍵字(二)

上一篇文章講了synchronized的部分關鍵要點,詳見:Java高併發程式設計之synchronized關鍵字(一) 本篇文章接著講synchronized的其他關鍵點。 在使用synchronized關鍵字的時候,不要以字串常量作為鎖定物件。看下面的例子: public class

Java併發程式設計synchronized關鍵字(一)

首先看一段簡單的程式碼: public class T001 { private int count = 0; private Object o = new Object(); public void m() { //任何執行緒要執行下面這段程式碼

實戰java併發程式設計CountDownLatch原始碼分析

首先看第一個! CountDownLatch 使用場景 CountDownLatch類是常見的併發同步控制類,適用於某一執行緒的執行在其他多個執行緒執行完成之後,比如火箭發射前需要各項指標檢查,只有當各項指標檢查完才能發射,再比如解析多個excel文件,只有當

實戰java併發程式設計ReentrantReadWriteLoc原始碼分析

前面分析了併發工具類CountDownLatch和CyclicBarrier,本文分享分析比較重要的ReentrantReadWriteLock。 使用場景 以前的同步方式需要對讀、寫操作進行同步,讀讀之間,讀寫之間,寫寫之間等;工程師們發現讀讀之間並不會影響資

Java併發程式設計第一階段,多執行緒基礎深入淺出

汪文君高併發程式設計第一階段01講-課程大綱及主要內容介紹 汪文君高併發程式設計第一階段02講-簡單介紹什麼是執行緒 汪文君高併發程式設計第一階段03講-建立並啟動執行緒 汪文君高併發程式設計第一階段04講-執行緒生命週期以及start方法原始碼剖析 汪文君高併發程式設計第

聊聊併發系統HTTP快取

簡介 最近遇到很多人來諮詢我關於瀏覽器快取的一些問題,而這些問題都是類似的,因此總結本文來解答以後遇到類似問題的朋友。 因本文主要以瀏覽器快取場景介紹,所以非瀏覽器場景下的一些用法本文不會介紹,而且本文以chrome為測試瀏覽器。 瀏覽器快取是指當我們使用瀏覽器訪問一些網站頁面或者http服務時,根

聊聊併發系統佇列術

佇列在資料結構中是一種線性表,從一端插入資料,然後從另一端刪除資料。本文目的不是講解各種佇列演算法,而是在應用層面講述使用佇列能解決哪些場景問題。 在我開發過的系統中,不是所有的業務都必須實時處理、不是所有的請求都必須實時反饋結果給使用者、不是所有的請求/處理都必須100%處理成功、不知道誰依賴“

併發系統限流特技

在開發高併發系統時有三把利器用來保護系統:快取、降級和限流。快取的目的是提升系統訪問速度和增大系統能處理的容量,可謂是抗高併發流量的銀彈;而降級是當服務出問題或者影響到核心流程的效能則需要暫時遮蔽掉,待高峰或者問題解決後再開啟;而有些場景並不能用快取和降級來解決,比如稀缺資源(秒殺、搶購)、寫服務(如評論、

Java併發系統的限流策略

概要在大資料量高併發訪問時,經常會出現服務或介面面對暴漲的請求而不可用的情況,甚至引發連鎖反映導致整個系統崩潰。此時你需要使用的技術手段之一就是限流,當請求達到一定的併發數或速率,就進行等待、排隊、降級、拒絕服務等。在開發高併發系統時有三把利器用來保護系統:快取、降級和限流。快取快取比較好理解,在大型高併發系

實戰Java併發程式設計LockSupport

LockSupport簡介: LockSupport是一個非常方便實用的執行緒阻塞工具,它可以線上程任意位置讓執行緒阻塞.和Thread.suspend()相比,它彌補了由於resume()在前發生,導致執行緒無法繼續執行的情況.和Object.wait()相比,它不需要先

實戰Java併發程式設計Java記憶體模型和執行緒安全

Java記憶體模型 原子性: 是指一個操作是不可中斷的.即使多個執行緒一起執行的時候,一個操作一旦開始,就不會被其他執行緒干擾. 一般CPU的指令是原子的. Q:i++是原子操作嗎? A:不是.

聊聊併發系統限流特技

在開發高併發系統時有三把利器用來保護系統:快取、降級和限流。快取的目的是提升系統訪問速度和增大系統能處理的容量,可謂是抗高併發流量的銀彈;而降級是當服務出問題或者影響到核心流程的效能則需要暫時遮蔽掉,待高峰或者問題解決後再開啟;而有些場景並不能用快取和降級來解決,比如

併發系統降級

在開發高併發系統時有三把利器用來保護系統:快取、降級和限流。之前已經有一些文章介紹過快取和限流了。本文將詳細聊聊降級。當訪問量劇增、服務出現問題(如響應時間慢或不響應)或非核心服務影響到核心流程的效能時,仍然需要保證服務還是可用的,即使是有損服務。系統可以根據一些關鍵

併發系統降級特技

在開發高併發系統時有三把利器用來保護系統:快取、降級和限流。之前已經有一些文章介紹過快取和限流了。本文將詳細聊聊降級。當訪問量劇增、服務出現問題(如響應時間慢或不響應)或非核心服務影響到核心流程的效能時,仍然需要保證服務還是可用的,即使是有損服務。系統可以根據一些關鍵資料

併發系統限流特技:有了它,京東6.18如虎添翼!

轉載 ------ 2016-06-24 張開濤相關文章 在開發高併發系統時有三把利器用來保護系統:快取、降級和限流。快取的目的是提升系統訪問速度和增大系統能處理的容量,可謂是抗高併發流量的銀彈;而降級是當服務出問題或者影響到核心流程的效能則需要暫時遮蔽掉,待高峰或者問