從零寫分散式RPC框架 系列 2.0 (2)RPC-Common模組設計實現
RPC-Common模組相對於1.0版本複雜了很多,最主要的變化在於將 Rpc的Netty處理器從RPC-Server和RPC-Client收回。1.0 版本的設計思路是儘可能減少冗餘依賴,所以RPC-Common一般只放通用的功能。現在則是儘可能都放在RPC-Common模組,以方便工程升級複雜化後的統一協調管理。以後功能將集中在一個模組下(名字不一定還是RPC-Common),RPC-Server和RPC-Client則會擔任將功能選擇性開放給使用者的輕量級角色。部分內容在 從零寫分散式RPC框架 系列 1.0 (2)RPC-Common模組設計實現 已有說明,本文不再贅述。
系列文章:
專欄:從零開始寫分散式RPC框架
專案GitHub地址:https://github.com/linshenkx/rpc-netty-spring-boot-starter
手寫通用型別負載均衡路由引擎(含隨機、輪詢、雜湊等及其帶權形式)
實現 序列化引擎(支援 JDK預設、Hessian、Json、Protostuff、Xml、Avro、ProtocolBuffer、Thrift等序列化方式)
從零寫分散式RPC框架 系列 2.0 (1)架構升級
從零寫分散式RPC框架 系列 2.0 (2)RPC-Common模組設計實現
從零寫分散式RPC框架 系列 2.0 (3)RPC-Server和RPC-Client模組改造
從零寫分散式RPC框架 系列 2.0 (4)使用BeanPostProcessor實現自定義@RpcReference註解注入
文章目錄
一 模組介紹
結構圖
流程圖
- RPC-Client 將服務請求封裝到遠端傳輸實體裡,經過編碼傳送到RPC-Server
- RPC-Server獲取請求,執行業務處理,返回結果
- RPC-Client獲取結果進行處理
二 協議相關實體類
RemotingTransporter
這裡需要注意的是invokeId我使用的是原子遞增的方式來確保唯一性,因為只需要對單客戶端保證唯一性,所以這種方式應該是夠用的。如果單客戶端有高併發請求的話(雖然不太合理),可以考慮其他方式。
Flag只佔一個位元組大小,但我這裡實現實現的效率比較低,後面應該得優化。
另外訊息體BodyContent只是個介面,根據訊息頭的標識,BodyContent可以代表不同的內容。
/**
* @version V1.0
* @author: lin_shen
* @date: 18-11-12
* @Description: 自定義遠端傳輸實體 (magic+flag+invokeId+bodyLength+bodyContent)
*/
@Data
@Builder
@AllArgsConstructor
@NoArgsConstructor
public class RemotingTransporter {
/**
* 魔數
*/
public static final short MAGIC=(short)0x9826;
/**
* 用原子遞增的方式來獲取不重複invokeId
*/
private static final AtomicLong invokeIdGnerator=new AtomicLong(0L);
/**
* 標誌位, 一共8個地址位。
* 低四位用來表示訊息體資料用的序列化工具的型別
* 高四位中,第一位為1表示是request請求,為0表示是reponse應答
* TODO:第二位為1表示雙向傳輸(即有返回response)
* TODO:第三位為1表示是心跳ping事件
* TODO:預留位
*/
private Flag flag;
@Getter
@ToString
public static class Flag{
private boolean isRequest;
private boolean isTwoway;
private boolean isPing;
private boolean isOther;
private int serializeType;
private byte thisByte;
public Flag(boolean isRequest, boolean isTwoway, boolean isPing, boolean isOther, int serializeType) {
if(serializeType<0||serializeType>15){
throw new IllegalArgumentException("serializeType 對應整數應該在 0 到 15 之間");
}
this.isRequest = isRequest;
this.isTwoway = isTwoway;
this.isPing = isPing;
this.isOther = isOther;
this.serializeType = serializeType;
int byteTem= (isRequest?1:0)<<7;
byteTem=byteTem | ((isTwoway?1:0)<<6);
byteTem=byteTem | ((isPing?1:0)<<5);
byteTem=byteTem | ((isOther?1:0)<<4);
byteTem=byteTem | serializeType;
this.thisByte= (byte) byteTem;
}
public Flag(byte thisByte){
this.thisByte=thisByte;
isRequest=((thisByte>>>7)&1)==1;
isTwoway=((thisByte>>>6)&1)==1;
isPing=((thisByte>>>5)&1)==1;
isOther=((thisByte>>>4)&1)==1;
serializeType=thisByte & 15;
}
}
/**
* 每一個請求的唯一識別id(由於採用非同步通訊的方式,用來把請求request和返回的response對應上)
*/
private long invokeId=invokeIdGnerator.getAndIncrement();
/**
* 訊息體位元組陣列長度
*/
private int bodyLength;
/**
* 訊息體內容(還需要編碼序列化成位元組陣列)
*/
private transient BodyContent bodyContent;
}
BodyContent及其實現類
BodyContent只起標識作用,本身無規定任何方法。在傳輸過程中以位元組陣列形式存在,由 RemotingTransporter 的訊息頭 標識其代表型別及序列化方式。
目前其實現類有 RpcRequest 和 RpcResponse,分別代表Rpc請求和Rpc響應。注意這些實現類應該只包含自身業務資訊,其他的應由 RemotingTransporter 統一標識處理。
三 Netty編碼器和解碼器
RemotingTransporterEncoder 編碼器
編碼器的實現比較簡單。
這裡需要注意,相對於1.0版本 編碼器的處理型別由構造方法動態傳入的設計,這裡直接限制了處理型別為 RemotingTransporter 。這是因為在1.0版本中,不管是RpcRequest還是RpcResponse都會被編碼,而我們並不想對每種型別都專門寫一個編碼器,所以一個比較討巧的做法是將型別動態傳入。
而在2.0版本中,因為我們統一了不管是 RpcRequest還是 RpcResponse在傳輸過程中其表現實體都為RemotingTransporter,將其型別資訊封裝到訊息頭裡,遮蔽了各自傳輸實體的差異,所以直接指定 RemotingTransporter 即可。而且還可以減少型別資訊和程式碼的高耦合,將不同型別不同編解碼方式的判斷和處理收歸到編解碼器裡。
/**
* @version V1.0
* @author: lin_shen
* @date: 18-11-12
* @Description: TODO
*/
@Log4j2
public class RemotingTransporterEncoder extends MessageToByteEncoder<RemotingTransporter> {
@Override
protected void encode(ChannelHandlerContext channelHandlerContext, RemotingTransporter remotingTransporter, ByteBuf byteBuf) throws Exception {
//獲取請求體陣列
//使用序列化引擎
byte[] body= SerializerEngine.serialize(remotingTransporter.getBodyContent(), SerializeTypeEnum.queryByCode(remotingTransporter.getFlag().getSerializeType()));
//magic+flag+invokeId+bodyLength+bodyContent
byteBuf.writeShort(RemotingTransporter.MAGIC)
.writeByte(remotingTransporter.getFlag().getThisByte())
.writeLong(remotingTransporter.getInvokeId())
.writeInt(body.length)
.writeBytes(body);
log.info("write end");
}
}
RemotingTransporterDecoder 解碼器
需要注意,這裡解碼器沒有繼承自ByteToMessageDecoder而是使用了ReplayingDecoder(當然它也繼承自ByteToMessageDecoder),其泛型引數指定了用於狀態管理的型別(不需要狀態管理可以使用Void),這裡我們是用了內部列舉類 RemotingTransporterDecoder.State。
其工作原理可簡單理解為:ByteBuf源源不斷地傳遞給解碼器,解碼器需要根據當前狀態判斷應該執行什麼讀取操作以進行解碼。因此一個狀態通常代表著傳輸體的一個變數,根據前面所說,這裡的狀態應該有:HEADER_MAGIC, HEADER_FLAG, HEADER_INVOKE_ID, HEADER_BODY_LENGTH, BODY 。另外需要注意讀取是狀態時連續且迴圈的,因此在根據狀態 switch 時應按順序編寫,並可不使用 break 以提高效率。每個狀態對應的解碼操作執行完的時候都應該將狀態及時切換,最後一個狀態執行完切換回預設初始狀態。最初的初始狀態在父類構造方法傳入。
/**
* @version V1.0
* @author: lin_shen
* @date: 18-11-12
* @Description: TODO
*/
@Log4j2
public class RemotingTransporterDecoder extends ReplayingDecoder<RemotingTransporterDecoder.State> {
private static final int MAX_BODY_SIZE = 1024 * 1024 * 5;
/**
* 用於暫存解碼RemotingTransporter資訊,一個就夠了
*/
private static RemotingTransporter remotingTransporter=RemotingTransporter.builder().build();
/**
* 用於ReplayingDecoder的狀態管理
*/
enum State {
HEADER_MAGIC, HEADER_FLAG, HEADER_INVOKE_ID, HEADER_BODY_LENGTH, BODY
}
public RemotingTransporterDecoder( ){
//設定 state() 的初始值,以便進入switch
super(State.HEADER_MAGIC);
}
@Override
protected void decode(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf, List<Object> list) throws Exception {
//注意這裡在BODY之前都沒有break
switch (this.state()){
case HEADER_MAGIC:
checkMagic(byteBuf.readShort());
//移到下一檢查點(一是改變state的值的狀態,二是獲取到最新的讀指標的下標)
checkpoint(State.HEADER_FLAG);
case HEADER_FLAG:
remotingTransporter.setFlag(new RemotingTransporter.Flag(byteBuf.readByte()));
checkpoint(State.HEADER_INVOKE_ID);
case HEADER_INVOKE_ID:
remotingTransporter.setInvokeId(byteBuf.readLong());
checkpoint(State.HEADER_BODY_LENGTH);
case HEADER_BODY_LENGTH:
remotingTransporter.setBodyLength(byteBuf.readInt());
checkpoint(State.HEADER_BODY_LENGTH);
case BODY:
int bodyLength = checkBodyLength(remotingTransporter.getBodyLength());
byte[] bytes=new byte[bodyLength];
byteBuf.readBytes(bytes);
Class genericClass=remotingTransporter.getFlag().isRequest()?RpcRequest.class: RpcResponse.class;
BodyContent bodyContent= (BodyContent) SerializerEngine.deserialize(bytes,genericClass,SerializeTypeEnum.queryByCode(remotingTransporter.getFlag().getSerializeType()));
RemotingTransporter remotingTransporter1=RemotingTransporter.builder()
.flag(remotingTransporter.getFlag())
.invokeId(remotingTransporter.getInvokeId())
.bodyLength(remotingTransporter.getBodyLength())
.bodyContent(bodyContent)
.build();
list.add(remotingTransporter1);
break;
default:
break;
}
//順利讀完body後應置回起點
checkpoint(State.HEADER_MAGIC);
}
private int checkBodyLength(int bodyLength) throws RemotingContextException {
if (bodyLength > MAX_BODY_SIZE) {
throw new RemotingContextException("body of request is bigger than limit value "+ MAX_BODY_SIZE);
}
return bodyLength;
}
private void checkMagic(short magic) throws RemotingContextException{
//檢查魔數
if (RemotingTransporter.MAGIC != magic) {
log.error("魔數不匹配");
throw new RemotingContextException("magic value is not equal "+RemotingTransporter.MAGIC);
}
}
}
四 Netty服務端和客戶端處理器
RpcServerHandler 服務端處理器
服務端處理器的主要變化時增加了Map<String, Semaphore> serviceSemaphoreMap用於限制不同服務的工作執行緒數。使其同一時間進入handle的執行緒數受到限制。
/**
* @version V1.0
* @author: lin_shen
* @date: 2018/10/31
* @Description: RPC服務端處理器(處理RpcRequest)
*/
@Log4j2
public class RpcServerHandler extends SimpleChannelInboundHandler<RemotingTransporter> {
/**
* 存放 服務名稱 與 服務例項 之間的對映關係
*/
private final Map<String, Object> handlerMap;
/**
* 存放 服務名稱 與 訊號量 之間的對映關係
* 用於限制每個服務的工作執行緒數
*/
private final Map<String, Semaphore> serviceSemaphoreMap;
public RpcServerHandler(Map<String, Object> handlerMap,Map<String, Semaphore> serviceSemaphoreMap) {
this.handlerMap = handlerMap;
this.serviceSemaphoreMap=serviceSemaphoreMap;
}
@Override
protected void channelRead0(ChannelHandlerContext channelHandlerContext, RemotingTransporter remotingTransporter) throws Exception {
log.info("channelRead0 begin");
remotingTransporter.setFlag(new RemotingTransporter.Flag(false,true,false,false,remotingTransporter.getFlag().getSerializeType()));
RpcResponse rpcResponse=new RpcResponse();
RpcRequest rpcRequest=(RpcRequest)remotingTransporter.getBodyContent();
Semaphore semaphore = serviceSemaphoreMap.get(rpcRequest.getInterfaceName());
boolean acquire=false;
try {
// 處理 RPC 請求成功
log.info("進入限流");
acquire=semaphore.tryAcquire();
if(acquire){
Object result= handle(rpcRequest);
rpcResponse.setResult(result);
}
} catch (Exception e) {
// 處理 RPC 請求失敗
rpcResponse.setException(e);
log.error("handle result failure", e);
} finally {
if(acquire){
semaphore.release();
log.info("釋放訊號量");
}
}
remotingTransporter.setBodyContent(rpcResponse);
channelHandlerContext.writeAndFlush(remotingTransporter).addListener(ChannelFutureListener.CLOSE);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
log.error("server caught exception", cause);
ctx.close();
}
private Object handle(RpcRequest request) throws Exception {
log.info("開始執行handle");
// 獲取服務例項
String serviceName = request.getInterfaceName();
Object serviceBean = handlerMap.get(serviceName);
if (serviceBean == null) {
throw new RuntimeException(String.format("can not find service bean by key: %s", serviceName));
}
// 獲取反射呼叫所需的變數
Class<?> serviceClass = serviceBean.getClass();
String methodName = request.getMethodName();
log.info(methodName);
Class<?>[] parameterTypes = request.getParameterTypes();
log.info(parameterTypes[0].getName());
Object[] parameters = request.getParameters();
// 執行反射呼叫
Method method = serviceClass.getMethod(methodName, parameterTypes);
method.setAccessible(true);
return method.invoke(serviceBean, parameters);
}
}
RpcClientHandler 客戶端處理器
主要是將請求Id對應Response改成了對應RemotingTransporter,其他基本不變
/**
* @version V1.0
* @author: lin_shen
* @date: 2018/11/1
* @Description: TODO
*/
@Log4j2
public class RpcClientHandler extends SimpleChannelInboundHandler<RemotingTransporter> {
private ConcurrentMap<Long,RemotingTransporter> remotingTransporterMap;
public RpcClientHandler(ConcurrentMap<Long,RemotingTransporter> remotingTransporterMap){
this.remotingTransporterMap=remotingTransporterMap;
}
@Override
protected void channelRead0(ChannelHandlerContext channelHandlerContext, RemotingTransporter remotingTransporter) throws Exception {
log.info("read a Response,invokeId: "+remotingTransporter.getInvokeId());
remotingTransporterMap.put(remotingTransporter.getInvokeId(),remotingTransporter);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
throws Exception {
log.error("client caught exception",cause);
ctx.close();
}
}