1. 程式人生 > >JAVA RPC (七) 手把手從零教你寫一個生產級RPC之client請求

JAVA RPC (七) 手把手從零教你寫一個生產級RPC之client請求

線程模型 asc mil 重試 rri else create request sas

上節說了關於通用請求代理,實際上對spring的bean引用都是通過koalasClientProxy來實現的,那麽在代理方法中才是我們實際的發送邏輯,咱們先看一下原生的thrift請求是什麽樣的。

public void startClient(String userName) {
        TTransport transport = null;
        try {
            //transport = new TSocket(SERVER_IP, SERVER_PORT, TIMEOUT);
            transport = new TFramedTransport(new
TSocket(SERVER_IP, SERVER_PORT, TIMEOUT)); // 協議要和服務端一致 TProtocol protocol = new TBinaryProtocol(transport); //TProtocol protocol = new TCompactProtocol(transport); // TProtocol protocol = new TJSONProtocol(transport); Client client = new Client(protocol); transport.open(); String result
= client.sayHello(userName); System.out.println("Thrify client result =: " + result); } catch (TTransportException e) { e.printStackTrace(); } catch (TException e) { e.printStackTrace(); } finally { if (null != transport) { transport.close(); } } }

用同步調用的例子來說明,首先需要new 一個TSocket對象,這個對象其實就是thrift維護的socket對象,內部封裝維護了Socket模型,相信大家對原生Socket已經很了解了,這次不在過多闡述,熟悉Socket的朋友一定知道每次需要調用socket.open的時候,需要和遠程server進行三個握手來建立通信,三次通信的代價是巨大的,對於跨異地調用或者是跨機房調用都是巨大的開銷,我們不可能每次通信都和遠程server進行三次握手,那麽我們可以將已經握手的TCP連接放入連接池中,每次從連接池中獲取到socket然後在進行調用,使用完畢之後再放入連接池,原理和mysql連接池DBCP和阿裏的德魯伊等等一個道理關於連接池相關的內容大家可以參照我git代碼中的AbstractBaseIcluster相關實現即可。

我們將socket對象緩存到緩存池中,每次請求都是TCP復用,這樣將極大的提升請求速度,這也是作為企業級RPC不可或缺的一部分。

看一下核心實現

 @Override
    public Object invoke(MethodInvocation invocation) throws InvocationTargetException, IllegalAccessException {

        Method method = invocation.getMethod ();
        String methodName = method.getName ();
        Object[] args = invocation.getArguments ();

        Class<?>[] parameterTypes = method.getParameterTypes ();
        if (method.getDeclaringClass () == Object.class) {
            try {
                return method.invoke ( this, args );
            } catch (IllegalAccessException e) {
                LOG.error ( e.getMessage (), e );
                return null;
            }
        }
        if ("toString".equals ( methodName ) && parameterTypes.length == 0) {
            return this.toString ();
        }
        if ("hashCode".equals ( methodName ) && parameterTypes.length == 0) {
            return this.hashCode ();
        }
        if ("equals".equals ( methodName ) && parameterTypes.length == 1) {
            return this.equals ( args[0] );
        }

        boolean serviceTop =false;

        Transaction transaction=null;
        if(TraceThreadContext.get () ==null){
            serviceTop=true;
            transaction = Cat.newTransaction("Service", method.getDeclaringClass ().getName ().concat ( "." ).concat ( methodName ).concat ( ".top" ));

            MessageTree tree = Cat.getManager().getThreadLocalMessageTree();
            String messageId = tree.getMessageId();

            if (messageId == null) {
                messageId = Cat.createMessageId();
                tree.setMessageId(messageId);
            }

            String childId = Cat.getProducer().createRpcServerId("default");

            String root = tree.getRootMessageId();

            if (root == null) {
                root = messageId;
            }
            Cat.logEvent(CatConstants.TYPE_REMOTE_CALL, "", Event.SUCCESS, childId);

            KoalasTrace koalasTrace = new KoalasTrace (  );
            koalasTrace.setChildId (childId  );
            koalasTrace.setParentId (  messageId);
            koalasTrace.setRootId ( root );
            TraceThreadContext.set (koalasTrace);
        } else{
            KoalasTrace currentKoalasTrace = TraceThreadContext.get ();
            String child_Id = Cat.getProducer().createRpcServerId("default");
            Cat.logEvent(CatConstants.TYPE_REMOTE_CALL, "", Event.SUCCESS, child_Id);
            currentKoalasTrace.setChildId ( child_Id );
        }
        try {
            TTransport socket = null;
            int currTryTimes = 0;
            while (currTryTimes++ < retryTimes) {
                ServerObject serverObject = icluster.getObjectForRemote ();
                if (serverObject == null) throw new IllegalStateException("no server list to use :" + koalasClientProxy.getServiceInterface ().getName ());
                GenericObjectPool<TTransport> genericObjectPool = serverObject.getGenericObjectPool ();
                try {
                    long before = System.currentTimeMillis ();
                    socket = genericObjectPool.borrowObject ();
                    long after = System.currentTimeMillis ();
                    LOG.debug ( "get Object from pool with {} ms", after - before );
                } catch (Exception e) {
                    if (socket != null)
                        genericObjectPool.returnObject ( socket );
                    LOG.error ( e.getMessage (), e );
                    if(transaction!=null)
                        transaction.setStatus ( e );
                    throw new IllegalStateException("borrowObject error :" + koalasClientProxy.getServiceInterface ().getName ());
                }

                Object obj = koalasClientProxy.getInterfaceClientInstance ( socket, serverObject.getRemoteServer ().getServer () );

                if (obj instanceof TAsyncClient) {
                    ((TAsyncClient) obj).setTimeout ( asyncTimeOut );
                    if (args.length < 1) {
                        genericObjectPool.returnObject ( socket );
                        throw new IllegalStateException ( "args number error" );
                    }

                    Object argslast = args[args.length - 1];
                    if (!(argslast instanceof AsyncMethodCallback)) {
                        genericObjectPool.returnObject ( socket );
                        throw new IllegalStateException ( "args type error" );
                    }

                    AsyncMethodCallback callback = (AsyncMethodCallback) argslast;
                    ReleaseResourcesKoalasAsyncCallBack releaseResourcesKoalasAsyncCallBack = new ReleaseResourcesKoalasAsyncCallBack ( callback, serverObject, socket );
                    args[args.length - 1] = releaseResourcesKoalasAsyncCallBack;

                }
                try {
                    Object o = method.invoke ( obj, args );
                    if (socket instanceof TSocket) {
                        genericObjectPool.returnObject ( socket );

                    }
                    if(transaction!=null)
                        transaction.setStatus ( Transaction.SUCCESS );
                    return o;
                } catch (Exception e) {
                    Throwable cause = (e.getCause () == null) ? e : e.getCause ();

                    boolean ifreturn = false;
                    if (cause instanceof TApplicationException) {
                        if (((TApplicationException) cause).getType () == 6666) {
                            LOG.info ( "the server{}--serverName【{}】 thread pool is busy ,retry it!", serverObject.getRemoteServer (),koalasClientProxy.getServiceInterface ().getName () );
                            if (socket != null) {
                                genericObjectPool.returnObject ( socket );
                                ifreturn = true;
                            }
                            Thread.yield ();
                            if (retryRequest)
                                continue;
                        }

                        if (((TApplicationException) cause).getType () == 9999) {
                            LOG.error ( "rsa error with service--{}--serverName【{}】", serverObject.getRemoteServer (),koalasClientProxy.getServiceInterface ().getName () );
                            if (socket != null) {
                                genericObjectPool.returnObject ( socket );
                            }
                            if(transaction!=null)
                                transaction.setStatus ( cause );
                            throw new IllegalStateException("rsa error with service" + serverObject.getRemoteServer ().toString ()+koalasClientProxy.getServiceInterface ().getName () );
                        }

                        if (((TApplicationException) cause).getType () == 6699) {
                            LOG.error ( "this client is not rsa support,please get the privateKey and publickey with server--{}--serverName【{}】", serverObject.getRemoteServer (),koalasClientProxy.getServiceInterface ().getName () );
                            if (socket != null) {
                                genericObjectPool.returnObject ( socket );
                            }
                            if(transaction!=null)
                                transaction.setStatus ( cause );
                            throw new IllegalStateException("this client is not rsa support,please get the privateKey and publickey with server" + serverObject.getRemoteServer ().toString ()+koalasClientProxy.getServiceInterface ().getName ());
                        }

                        if (((TApplicationException) cause).getType () == TApplicationException.INTERNAL_ERROR) {
                            LOG.error ( "this server is error please take the error log with server--{}--serverName【{}】the remote server error message data【{}】", serverObject.getRemoteServer (),koalasClientProxy.getServiceInterface ().getName (),((TApplicationException) cause).getMessage () );
                            if (socket != null) {
                                genericObjectPool.returnObject ( socket );
                            }
                            if(transaction!=null)
                                transaction.setStatus ( cause );
                            throw new IllegalStateException("this server is error please take the error log with server" + serverObject.getRemoteServer ()+koalasClientProxy.getServiceInterface ().getName ());
                        }

                        if (((TApplicationException) cause).getType () == TApplicationException.MISSING_RESULT) {
                            if (socket != null) {
                                genericObjectPool.returnObject ( socket );
                            }
                            return null;
                        }
                    }

                    if (cause instanceof RSAException) {
                        LOG.error ( "this client privateKey or publicKey is error,please check it! --{}--serverName【{}】", serverObject.getRemoteServer (),koalasClientProxy.getServiceInterface ().getName () );
                        if (socket != null) {
                            genericObjectPool.returnObject ( socket );
                        }
                        if(transaction!=null)
                            transaction.setStatus ( cause );
                        throw new IllegalStateException("this client privateKey or publicKey is error,please check it!" + serverObject.getRemoteServer ()+ koalasClientProxy.getServiceInterface ().getName ());
                    }

                    if(cause instanceof OutMaxLengthException){
                        LOG.error ( (cause ).getMessage (),cause );
                        if (socket != null) {
                            genericObjectPool.returnObject ( socket );
                        }
                        if(transaction!=null)
                            transaction.setStatus ( cause );
                        throw new IllegalStateException("to big content!" + serverObject.getRemoteServer ()+ koalasClientProxy.getServiceInterface ().getName ());
                    }

                    if (cause.getCause () != null && cause.getCause () instanceof ConnectException) {
                        LOG.error ( "the server {}--serverName【{}】 maybe is shutdown ,retry it!", serverObject.getRemoteServer (),koalasClientProxy.getServiceInterface ().getName () );
                        try {
                            if (socket != null) {
                                genericObjectPool.returnObject ( socket );
                                ifreturn = true;
                            }

                            if (retryRequest)
                                continue;
                        } catch (Exception e1) {
                            LOG.error ( "invalidateObject error!", e1 );
                        }
                    }

                    if (cause.getCause () != null && cause.getCause () instanceof SocketTimeoutException) {
                        LOG.error ( "read timeout SocketTimeoutException,retry it! {}--serverName【{}】", serverObject.getRemoteServer (),koalasClientProxy.getServiceInterface ().getName () );
                        if (socket != null) {
                            try {
                                genericObjectPool.invalidateObject ( socket );
                                ifreturn = true;
                            } catch (Exception e1) {
                                LOG.error ( "invalidateObject error ,", e );
                                if(transaction!=null)
                                    transaction.setStatus ( e1 );
                                throw new IllegalStateException("SocketTimeout and invalidateObject error" + serverObject.getRemoteServer () + koalasClientProxy.getServiceInterface ().getName ());
                            }
                        }
                        if (retryRequest)
                            continue;
                    }

                    if(cause instanceof TTransportException){
                        if(((TTransportException) cause).getType () == TTransportException.END_OF_FILE){
                            LOG.error ( "TTransportException,END_OF_FILE! {}--serverName【{}】", serverObject.getRemoteServer (),koalasClientProxy.getServiceInterface ().getName () );
                            if (socket != null) {
                                try {
                                    genericObjectPool.invalidateObject ( socket );
                                } catch (Exception e1) {
                                    LOG.error ( "invalidateObject error", e );
                                    if(transaction!=null)
                                    transaction.setStatus ( e1 );
                                    throw new IllegalStateException("TTransportException and invalidateObject error" + serverObject.getRemoteServer () + koalasClientProxy.getServiceInterface ().getName ());
                                }
                            }
                            if(transaction!=null)
                            transaction.setStatus ( cause );
                            throw new IllegalStateException("the remote server error!" + serverObject.getRemoteServer () + koalasClientProxy.getServiceInterface ().getName ());
                        }
                        if(cause.getCause ()!=null && cause.getCause () instanceof SocketException){
                            if(genericObjectPool.isClosed ()){
                                LOG.warn ( "serverObject {} is close!,retry it",serverObject );
                                if (retryRequest)
                                    continue;
                            }
                        }
                    }

                    if (socket != null && !ifreturn)
                        genericObjectPool.returnObject ( socket );
                    LOG.error ( "invoke server error,server ip -【{}】,port -【{}】--serverName【{}】", serverObject.getRemoteServer ().getIp (), serverObject.getRemoteServer ().getPort (),koalasClientProxy.getServiceInterface ().getName ()  );
                    if(transaction!=null)
                    transaction.setStatus ( cause );
                    throw e;
                }
            }
            IllegalStateException finallyException = new IllegalStateException("error!retry time out of:" + retryTimes + "!!! " + koalasClientProxy.getServiceInterface ().getName () );
            if(transaction!=null)
            transaction.setStatus ( finallyException );
            throw finallyException;
        } finally {
            if(transaction!=null)
                transaction.complete ();
            if(serviceTop)
                TraceThreadContext.remove ();
        }
    }

首先是線加入cat埋點,生成服務鏈路。這個地方先不用關註,接下來在重試循環體中來實現發送邏輯,當連接超時異常和服務端拒絕異常等異常時會進行重試

                    socket = genericObjectPool.borrowObject ();

獲取socket連接

                Object obj = koalasClientProxy.getInterfaceClientInstance ( socket, serverObject.getRemoteServer ().getServer () );
 public Object getInterfaceClientInstance(TTransport socket,String server) {

        if (!async) {
            Class<?> clazz = getSynClientClass ();
            try {
                if (synConstructor == null) {
                    synConstructor = clazz.getDeclaredConstructor ( TProtocol.class );
                }
                TTransport transport  = new TKoalasFramedTransport ( socket, maxLength_ );
                if(this.getPrivateKey ()!=null && this.getPublicKey () != null){
                    ((TKoalasFramedTransport) transport).setRsa ( (byte) 1 );
                    ((TKoalasFramedTransport) transport).setPrivateKey ( this.privateKey );
                    ((TKoalasFramedTransport) transport).setPublicKey ( this.publicKey );
                }

                TProtocol protocol = new KoalasBinaryProtocol ( transport );

                return synConstructor.newInstance ( protocol );

            } catch (NoSuchMethodException e) {
                logger.error ( "the clazz can‘t find the Constructor with TProtocol.class" );
            } catch (InstantiationException e) {
                logger.error ( "get InstantiationException", e );
            } catch (IllegalAccessException e) {
                logger.error ( "get IllegalAccessException", e );
            } catch (InvocationTargetException e) {
                logger.error ( "get InvocationTargetException", e );
            }
        } else {
                if (null == asyncClientManagerList) {
                    synchronized (this) {
                        if (null == asyncClientManagerList) {
                            asyncClientManagerList = new ArrayList<> ();
                            for (int i = 0; i < asyncSelectorThreadCount; i++) {
                                try {
                                    asyncClientManagerList.add(new TAsyncClientManager());
                                } catch (IOException e) {
                                    e.printStackTrace ();
                                }
                            }
                        }
                    }
                }
            Class<?> clazz = getAsyncClientClass ();

            if (asyncConstructor == null) {
                try {
                    asyncConstructor = clazz.getDeclaredConstructor ( TProtocolFactory.class, TAsyncClientManager.class, TNonblockingTransport.class );
                } catch (NoSuchMethodException e) {
                    e.printStackTrace ();
                }
            }

            try {
                return asyncConstructor.newInstance ( new KoalasBinaryProtocol.Factory (), asyncClientManagerList.get (socket.hashCode () % asyncSelectorThreadCount), socket );
            } catch (InstantiationException e) {
                logger.error ( "get InstantiationException", e );
            } catch (IllegalAccessException e) {
                logger.error ( "get IllegalAccessException", e );
            } catch (InvocationTargetException e) {
                logger.error ( "get InvocationTargetException", e );
            }

        }
        return null;
    }

獲取Thrift發送對象,也就是原生thrift代碼中的xxxxx.client,它才是最終的發送對象,然後反射調用服務端,獲取結果後返回給調用方,這樣一個client端的同步調用邏輯就全部完成了

https://gitee.com/a1234567891/koalas-rpc

koalas-RPC 個人作品,提供大家交流學習,有意見請私信,歡迎拍磚。客戶端采用thrift協議,服務端支持netty和thrift的TThreadedSelectorServer半同步半異步線程模型,支持動態擴容,服務上下線,權重動態,可用性配置,頁面流量統計等,持續為個人以及中小型公司提供可靠的RPC框架技術方案

更多學習內容請加高級java QQ群:825199617

JAVA RPC (七) 手把手從零教你寫一個生產級RPC之client請求