1. 程式人生 > >Github專案NettyRpc 閱讀(Netty+多執行緒+AQS+CAS+volatile)

Github專案NettyRpc 閱讀(Netty+多執行緒+AQS+CAS+volatile)

Github專案:https://github.com/luxiaoxun/NettyRpc

Fork: https://github.com/sw008/NettyRpc

此專案很適合學習多執行緒和Netty

RPC呼叫流程

大體思路:客戶端通過ConcurrentHashMap<String, RPCFuture>儲存請求ID和RPCFuture物件,然後把請求物件傳送給服務端,返回RPCFuture物件給呼叫者。服務端處理成功後返回響應物件(包含請求ID)。客戶端輸入流接收響應物件,通過請求ID在ConcurrentHashMap中找到傳送時建立的RPCFuture更新其相應資訊,並更新其AQS的狀態,release喚醒呼叫RPCFuture.get()而掛起的執行緒。

1 客戶端採用JDK動態代理建立ObjectProxy類代理物件,並與服務介面繫結。

2 客戶端呼叫服務介面方法,觸發動態代理物件的ObjectProxy.invoke()

3 客戶端傳送請求, ObjectProxy.invoke(Object proxy, Method method, Object[] args)  是JDK動態代理InvocationHandler介面的方法

   3.1 通過method、args,生成RpcRequest類物件(其包含成員變數 requestId、className、 methodName、parameterTypes、parameters)

   3.2 ConnectManage.getInstance().chooseHandler() :RpcClientHandler  一個簡單的負載均衡方法,找到應該呼叫的伺服器。因為Netty客服端主機與服務端主機是通過一條Channel連結,每一條Channel代表一個服務端主機。每個RpcClientHandler中包含一個Channel連結服務端,一個ConcurrentHashMap<String, RPCFuture>記錄請求ID和其對應的請求

   3.3 RpcClientHandler.sendRequest(RpcRequest request) 將請求物件傳送給服務端主機,等待對方接收成功後,返回RPCFuture物件實現非同步呼叫

RpcClientHandler類

ConcurrentHashMap<String, RPCFuture> pendingRPC;//儲存 請求ID+對應RPCFuture

public RPCFuture sendRequest(RpcRequest request) {
        final CountDownLatch latch = new CountDownLatch(1);

        //建立自定義非同步請求類RPCFuture物件 
        RPCFuture rpcFuture = new RPCFuture(request);

        //pendingRPC為ConcurrentHashMap<String, RPCFuture> 記錄請求ID和對應非同步請求
        //對方伺服器通過channel返回Response物件時,本機輸入流方法 通過pendingRPC+請求ID更新對應RPCFuture狀態
        pendingRPC.put(request.getRequestId(), rpcFuture);

        //傳送請求RpcRequest,並新增對方接收成功的非同步監聽物件,回撥物件ChannelFutureListener
        channel.writeAndFlush(request).addListener(
            new ChannelFutureListener() { //例項化 一個匿名區域性內部類物件
                //一個非同步監聽物件 ,監聽回撥由Netty框架實現
                //服務端接收到後 回撥此匿名內部類物件 的方法  (注意不是對方處理完回撥)     
                @Override
                public void operationComplete(ChannelFuture future) {
                    //此處使用區域性內部類的閉包特性,此區域性內部類物件可呼叫此方法的區域性變數latch
                    //對方接受成功,通過CountDownLatch喚醒當前執行緒
                    latch.countDown();
                }
        });
        try {
            //當前執行緒掛起 等待接收監聽回撥喚醒
            latch.await();
        } catch (InterruptedException e) {
            logger.error(e.getMessage());
        }

        //先返回RPCFuture,此時只代表請求送達,但是對方伺服器可能還沒有處理完成
        return rpcFuture;
}

 4 服務端接收處理資訊

   服務端RpcHandler類繼承Netty的SimpleChannelInboundHandler並實現channelRead0()方法,接收客戶端資訊,並通過反射執行。    

RpcHandler類

public void channelRead0(final ChannelHandlerContext ctx,final RpcRequest request) throws Exception {
        //接到資訊後,直接提交到RpcServer中的執行緒池執行
        RpcServer.submit(new Runnable() { 
            //同樣用到了區域性內部類的閉包特性,可以呼叫當前方法區域性變數
            @Override
            public void run() {
                RpcResponse response = new RpcResponse();
                //例項化RpcResponse 並裝配資訊 
                response.setRequestId(request.getRequestId());
                try {
                    Object result = handle(request);
                    response.setResult(result);
                } catch (Throwable t) {
                    response.setError(t.toString());
                }
                //傳送response到客戶端
                ctx.writeAndFlush(response).addListener(new ChannelFutureListener() {
                    //新增非同步監聽物件,傳送成功後回撥此物件方法
                    @Override
                    public void operationComplete(ChannelFuture channelFuture) throws Exception {
                        logger.debug("Send response for request " + request.getRequestId());
                    }
                });
            }
        });
    }

 5 客戶端接收響應資訊

   RpcClientHandler類繼承Netty的SimpleChannelInboundHandler並實現channelRead0方法,接收服務端響應資訊。

   可以發現客戶端傳送請求和接收響應的方法都是RpcClientHandler類實現,因為傳送和接收需要依靠同一個pendingRPC進行結果匹配,傳送時將RPCFuture放入其中,接收響應後通過請求ID更新對應RPCFuture。

RpcClientHandler類
//客戶端 收到響應資訊
ConcurrentHashMap<String, RPCFuture> pendingRPC;
@Override
public void channelRead0(ChannelHandlerContext ctx, RpcResponse response) throws Exception {
        //用過請求ID 在pendingRPC中找到傳送時儲存的RPCFuture 
        String requestId = response.getRequestId();
        //pendingRPC儲存了傳送時的RPCFuture
        RPCFuture rpcFuture = pendingRPC.get(requestId);
        if (rpcFuture != null) {
            pendingRPC.remove(requestId);
            //更新對應rpcFuture,並且喚醒已經執行rpcFuture.get()的所有執行緒
            rpcFuture.done(response);
        }
}

6 RPCFuture類實現了Future介面,並通過AQS實現執行緒的掛起與喚醒。

sync物件實現了AbstractQueuedSynchronizer的tryRelease,tryAcquire方法。

當執行rpcFuture.done(response)時,將AQS中volatile int state通過CAS設定為1,喚醒已經執行rpcFuture.get()的所有執行緒。

RPCFuture類
//5中,接收到服務端響應後執行的方法rpcFuture.done(response);
public void done(RpcResponse reponse) {
        this.response = reponse;

        //sync為AQS物件,通過CAS更新AQS中的狀態值volatile int state;
        sync.release(1);

        invokeCallbacks();
        // Threshold
        long responseTime = System.currentTimeMillis() - startTime;
        if (responseTime > this.responseTimeThreshold) {
            logger.warn("Service response time is too slow. Request id = " + reponse.getRequestId() + ". Response Time = " + responseTime + "ms");
        }
    }

 當前程執行rpcFuture.get()時,判斷AQS中的volatile int state=1 ?,若還沒有響應資訊則當前執行緒進入掛起狀態。

RPCFuture類

@Override
    public Object get() throws InterruptedException, ExecutionException {

        //AQS中的狀態值volatile int state,判斷對方伺服器時候已經響應;
        sync.acquire(-1);

        if (this.response != null) {
            return this.response.getResult();
        } else {
            return null;
        }
    }

6 Sync類,是RPCFuture的靜態內部類。通過CAS控制volatile int state=1,決定呼叫執行緒是否需要掛起。volatile保證了可見性, CAS保證了原子性,整個過程是執行緒安全。使比較+賦值成為一個原子性操作,不會被其他執行緒打擾。可以把CAS理解成多執行緒的序列執行,再加上volatile的可見性有序性保障,所以是執行緒安全的。

AQS物件.acquire:請求資源,tryAcquire==false時掛起執行緒

AQS物件.release:釋放資源,tryRelease==true時喚醒一個掛起執行緒

http://www.cnblogs.com/waterystone/p/4920797.html

static class Sync extends AbstractQueuedSynchronizer {

        private static final long serialVersionUID = 1L;

        //future status
        private final int done = 1;
        private final int pending = 0;

        @Override
        //獲取資源
        protected boolean tryAcquire(int arg) {
            //判斷當前 volatile int state=1
            //返回false時,當前執行緒掛起
            return getState() == done;
        }

        @Override
        //釋放資源
        protected boolean tryRelease(int arg) {
            if (getState() == pending) {
                //CAS設定 volatile int state=1
                //CAS保證操作原子性,執行緒安全
                if (compareAndSetState(pending, done)) {                    
                    //因為只有傳送執行緒會執行其請求對應的RPCFuture的get方法,所以只會有一個執行緒掛起等待
                    //返回true時,AQS框架會喚醒第一個等待執行緒
                    return true;
                } else {
                    return false;
                }
            } else {
                return true;
            }
        }

        public boolean isDone() {
            getState();
            return getState() == done;
        }
    }