• Netty 在服務端與客戶端的網路通訊中,使用的是非同步雙向通訊(雙工)的方式,即客戶端和服務端可以相互主動發請求給對方,發訊息後不會同步等響應。這樣就會有一下問題:
  1. 如何識別訊息是請求還是響應?
  2. 請求如何正確對應到響應?

1. 如何識別訊息是請求還是響應

為了識別訊息型別是請求或者響應,我們在訊息中加入了 messageType 的屬性,在上文我們也提到,這個訊息型別在自定義協議的頭部,他有幾種型別:請求、響應、心跳,我們先來說說請求、響應。

public enum MessageType {
/**
* 普通請求
*/
REQUEST((byte) 1), /**
* 普通響應
*/
RESPONSE((byte) 2), /**
* 心跳
*/
HEARTBEAT((byte) 3),
;
private final byte value;
}

請求(Request)的核心欄位如下:

public class RpcRequest {
/**
* 介面名
*/
private String interfaceName;
/**
* 方法名
*/
private String methodName;
/**
* 引數列表
*/
private Object[] params;
/**
* 引數型別列表
*/
private Class<?>[] paramTypes;
/**
* 介面版本
*/
private String version;
}

響應(Response)的核心欄位如下:

public class RpcResponse<T> {
/**
* 請求id
*/
private long requestId;
/**
* 響應碼
*/
private Integer code;
/**
* 提示訊息
*/
private String message;
/**
* 響應資料
*/
private T data;
}

傳送訊息的時候,按照訊息型別和結構體,將資料組裝好,寫到 channel 即可。接收訊息則要先解碼,從訊息頭拿到訊息型別,根據訊息型別來反序列化到對應的結構體。

2. 請求如何正確對應到響應

流程圖如下:



有幾個關鍵點:

  1. 客戶端請求之後拿到 Future
  2. 有一個 Map 存放未響應的請求,Key: RequestId,Value: Future
  3. 服務端響應的資料中,包含了客戶端的 RequestId,這是對應的關鍵
  4. 響應的結果會被 NettyClientHandler.channelRead0 監聽到,根據響應的 RequestId 取出對應的 Future
  5. 將結果寫到對應的 Future 中
  6. 客戶端通過 future.get() 獲取到資料

1) 客戶端發請求

程式碼如下:

public class NettyInvoker extends AbstractInvoker {

    private final NettyClient nettyClient = NettyClient.getInstance();

    @Override
protected RpcResult doInvoke(RpcRequest request, URL selected) throws RpcException {
// 獲取 Channel
Channel channel = nettyClient.getChannel(socketAddress);
// 構造一個空 Future
CompletableFuture<RpcResponse<?>> resultFuture = new CompletableFuture<>();
// 構建 RPC 訊息,此處會構建 requestId
RpcMessage rpcMessage = buildRpcMessage(request);
// 將 request 和 Future 對應放到 Map 中
UnprocessedRequests.put(rpcMessage.getRequestId(), resultFuture);
// 發出請求
channel.writeAndFlush(rpcMessage);
// 返回結果
return new AsyncResult(resultFuture);
}
// ...
}

返回的 AsyncResult 只是 future 的包裝。

public class AsyncResult implements RpcResult {

    private final CompletableFuture<?> future;

    public AsyncResult(CompletableFuture<?> future) {
this.future = future;
}
}

2) 請求暫存

這個儲存未響應的請求在 ccx-rpc 中是 UnprocessedRequests 類在管理:

public class UnprocessedRequests {
private static final Map<Long, CompletableFuture<RpcResponse<?>>> FUTURE_MAP = new ConcurrentHashMap<>(); public static void put(long requestId, CompletableFuture<RpcResponse<?>> future) {
FUTURE_MAP.put(requestId, future);
}
}

3) 服務端響應資料監聽

使用 Netty 的 Handler 監聽服務端響應的資料,當有資料響應,則呼叫 UnprocessedRequests.complete 寫入。

public class NettyClientHandler extends SimpleChannelInboundHandler<RpcMessage> {
@Override
protected void channelRead0(ChannelHandlerContext context, RpcMessage requestMsg) {
RpcResponse<?> response = (RpcResponse<?>) requestMsg.getData();
UnprocessedRequests.complete(response);
}
}

UnprocessedRequests.complete 實際上就是找出並刪除對應的請求,然後將資料寫入:future.complete(rpcResponse)

public class UnprocessedRequests {
private static final Map<Long, CompletableFuture<RpcResponse<?>>> FUTURE_MAP = new ConcurrentHashMap<>(); /**
* 完成響應
*
* @param rpcResponse 響應內容
*/
public static void complete(RpcResponse<?> rpcResponse) {
CompletableFuture<RpcResponse<?>> future = FUTURE_MAP.remove(rpcResponse.getRequestId());
if (future != null) {
future.complete(rpcResponse);
} else {
throw new IllegalStateException("future is null. rpcResponse=" + JSONUtil.toJsonStr(rpcResponse));
}
}
}

最後通過 AsyncResult.getData 可以獲取到資料。

public class AsyncResult implements RpcResult {

    private final CompletableFuture<?> future;

    public AsyncResult(CompletableFuture<?> future) {
this.future = future;
} @Override
public Object getData() {
try {
return future.get();
} catch (InterruptedException | ExecutionException e) {
log.error("getData error.", e);
}
return null;
}
}

總結

Netty 網路通訊是非同步雙工的,我們需要用正確 Request-Response 模型讓客戶端和服務端正確互動。

  1. 如何區分請求或響應?

    在訊息中,可以加入 messageType 欄位用來區分是請求或者響應。
  2. 如何把請求和響應對應?

    發出的請求需要用 RequestId 標記並用 Map 存起來。服務端收到請求之後,將 RequestId 原封不動寫到響應結果中。客戶端收到響應結果後,拿出 RequestId 找到對應的 Future 並寫入結果。

ccx-rpc 程式碼已經開源

Github:https://github.com/chenchuxin/ccx-rpc

Gitee:https://gitee.com/imccx/ccx-rpc