RocketMQ 底層通訊機制 原始碼分析
概述
下面我們通過原始碼分析下RocketMQ是怎麼利用Netty進行通訊的。
本文分析的是RocketMQ 最新版本 4.3.2版本。
RocketMQ 專案結構
首先來看下 RocketMQ 模組構成。

通過 RocketMQ 專案結構可以看出,RocketMQ 分了好多模組。 broker、client、filter、namesrv、remoting 等。
大家比較熟悉的幾個模組對應的原始碼如下:
Broker Master 和 Slave 對應的 broker 模組。
Producer 和 Consumer 對應的是 client 模組。
NameSerer 服務對應的是 namesrv 模組。
而各個服務之間的通訊則使用的 remoting 模組。
Remoting 模組

通過romoting 的模組結構大概瞭解,RocketMQ 通訊使用了Netty進行傳輸通訊。並在 org.apache.rocketmq.remoting.protocol 包中自定義了通訊協議。
通訊模組主要介面和類
RemotingService 介面
public interface RemotingService { //開啟服務 void start(); //關閉服務 void shutdown(); //註冊 hook (可以在呼叫之前和呼叫之後做一些擴充套件處理) void registerRPCHook(RPCHook rpcHook); }
RemotingService 定義了服務端和客戶端都需要的三個介面。
registerRPCHook() 方法可以註冊一個 hook。可以在遠端通訊之前和通訊之後,執行使用者自定的一些處理。類似前置處理器和後置處理器。
RPCHook 介面
public interface RPCHook { void doBeforeRequest(final String remoteAddr, final RemotingCommand request); void doAfterResponse(final String remoteAddr, final RemotingCommand request, final RemotingCommand response); }
在啟動服務之前,可以把自己實現的 RPCHook 註冊到服務中,執行遠端呼叫的時候處理一些業務邏輯。比如列印請求和響應的日誌資訊。
RemotingServer 和 RemotingClient 介面
RemotingServer 和 RemotingClient 介面都繼承了RemotingService 介面,並擴充套件了自己特有的方法。
RemotingServer 介面
public interface RemotingServer extends RemotingService { //註冊一個處理請求的處理器, 根據requestCode, 獲取處理器,處理請求 void registerProcessor(final int requestCode, final NettyRequestProcessor processor, final ExecutorService executor); //註冊一個預設的處理器,當根據requestCode匹配不到處理器,則使用這個預設的處理器 void registerDefaultProcessor(final NettyRequestProcessor processor, final ExecutorService executor); //獲取埠 int localListenPort(); //根據requestCode獲取請求處理器 Pair<NettyRequestProcessor, ExecutorService> getProcessorPair(final int requestCode); //同步呼叫(同步傳送訊息) RemotingCommand invokeSync(final Channel channel, final RemotingCommand request, final long timeoutMillis) throws InterruptedException, RemotingSendRequestException, RemotingTimeoutException; //非同步呼叫(非同步傳送訊息) void invokeAsync(final Channel channel, final RemotingCommand request, final long timeoutMillis, final InvokeCallback invokeCallback) throws InterruptedException, RemotingTooMuchRequestException, RemotingTimeoutException, RemotingSendRequestException; //單向傳送訊息,只發送訊息。不用處理髮送的結果。 void invokeOneway(final Channel channel, final RemotingCommand request, final long timeoutMillis) throws InterruptedException, RemotingTooMuchRequestException, RemotingTimeoutException, RemotingSendRequestException; }
-
1、registerProcessor 方法
註冊一個處理請求的處理器, 存放到 HashMap中,requestCode為 Map 的 key。
HashMap<Integer/* request code */, Pair<NettyRequestProcessor, ExecutorService>> processorTable
-
2、registerDefaultProcessor 方法
註冊一個預設的處理器,當根據requestCode匹配不到處理器,則使用這個預設的處理器
-
3、invokeSync 方法
以同步的方式向客戶端傳送訊息。
-
4、invokeAsync 方法
以非同步的方式向客戶端傳送訊息。
-
5、invokeOneway 方法
只向客戶端傳送訊息,而不處理客戶端返回的訊息。該方法只是向socket中寫入資料,而不需要處理客戶端返回的訊息。
RemotingClient 介面
public interface RemotingClient extends RemotingService { //更新 NameServer 地址 void updateNameServerAddressList(final List<String> addrs); //獲取 NameServer 地址 List<String> getNameServerAddressList(); //同步呼叫(同步傳送訊息) RemotingCommand invokeSync(final String addr, final RemotingCommand request, final long timeoutMillis) throws InterruptedException, RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException; //非同步呼叫(非同步傳送訊息) void invokeAsync(final String addr, final RemotingCommand request, final long timeoutMillis, final InvokeCallback invokeCallback) throws InterruptedException, RemotingConnectException, RemotingTooMuchRequestException, RemotingTimeoutException, RemotingSendRequestException; //單向傳送訊息,只發送訊息。不用處理髮送的結果。 void invokeOneway(final String addr, final RemotingCommand request, final long timeoutMillis) throws InterruptedException, RemotingConnectException, RemotingTooMuchRequestException, RemotingTimeoutException, RemotingSendRequestException; //註冊一個處理請求的處理器, 根據requestCode, 獲取處理器,處理請求 void registerProcessor(final int requestCode, final NettyRequestProcessor processor, final ExecutorService executor); //設定傳送非同步訊息的執行緒池,如果不設定,則使用預設的 void setCallbackExecutor(final ExecutorService callbackExecutor); //獲取執行緒池 ExecutorService getCallbackExecutor(); //判斷 channel 是否可寫 boolean isChannelWritable(final String addr); }
-
1、updateNameServerAddressList、getNameServerAddressList 方法
更新 NameServer 地址。
獲取 NameServer 地址。
-
2、invokeSync、invokeAsync、invokeOneway 方法
這三個方法參見 RemotingServer 介面中的方法。
-
3、setCallbackExecutor
設定處理非同步響應訊息的執行緒池。
服務端和客戶端的實現
- NettyRemotingServer 類實現了RemotingServer 介面
- NettyRemotingClient 類實現了RemotingClient介面
這兩個類使用Netty 來實現服務端和客戶端服務的。
NettyRemotingServer 解析
通過 NettyRemotingServer類中的start() 方法開啟一個 Netty 的服務端。
程式碼如下:
@Override public void start() { this.defaultEventExecutorGroup = new DefaultEventExecutorGroup( nettyServerConfig.getServerWorkerThreads(), new ThreadFactory() { private AtomicInteger threadIndex = new AtomicInteger(0); @Override public Thread newThread(Runnable r) { return new Thread(r, "NettyServerCodecThread_" + this.threadIndex.incrementAndGet()); } }); ServerBootstrap childHandler = this.serverBootstrap.group(this.eventLoopGroupBoss, this.eventLoopGroupSelector) .channel(useEpoll() ? EpollServerSocketChannel.class : NioServerSocketChannel.class) .option(ChannelOption.SO_BACKLOG, 1024) .option(ChannelOption.SO_REUSEADDR, true) .option(ChannelOption.SO_KEEPALIVE, false) .childOption(ChannelOption.TCP_NODELAY, true) .childOption(ChannelOption.SO_SNDBUF, nettyServerConfig.getServerSocketSndBufSize()) .childOption(ChannelOption.SO_RCVBUF, nettyServerConfig.getServerSocketRcvBufSize()) .localAddress(new InetSocketAddress(this.nettyServerConfig.getListenPort())) .childHandler(new ChannelInitializer<SocketChannel>() { @Override public void initChannel(SocketChannel ch) throws Exception { ch.pipeline() .addLast(defaultEventExecutorGroup, HANDSHAKE_HANDLER_NAME, new HandshakeHandler(TlsSystemConfig.tlsMode)) .addLast(defaultEventExecutorGroup, //編碼 new NettyEncoder(), //解碼 new NettyDecoder(), //心跳檢測 new IdleStateHandler(0, 0, nettyServerConfig.getServerChannelMaxIdleTimeSeconds()), //連線管理handler,處理connect, disconnect, close等事件 new NettyConnectManageHandler(), //處理接收到RemotingCommand訊息後的事件, 收到伺服器端響應後的相關操作 new NettyServerHandler() ); } }); if (nettyServerConfig.isServerPooledByteBufAllocatorEnable()) { childHandler.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT); } try { ChannelFuture sync = this.serverBootstrap.bind().sync(); InetSocketAddress addr = (InetSocketAddress) sync.channel().localAddress(); this.port = addr.getPort(); } catch (InterruptedException e1) { throw new RuntimeException("this.serverBootstrap.bind().sync() InterruptedException", e1); } if (this.channelEventListener != null) { this.nettyEventExecutor.start(); } this.timer.scheduleAtFixedRate(new TimerTask() { @Override public void run() { try { NettyRemotingServer.this.scanResponseTable(); } catch (Throwable e) { log.error("scanResponseTable exception", e); } } }, 1000 * 3, 1000); }
從 start 方法中啟動一個Netty 的服務端。
NettyEncoder NettyDecoder NettyServerHandler
NettyRemotingClient 解析
通過 NettyRemotingClient 類中的 start 方法開啟一個 netty 客戶端
@Override public void start() { this.defaultEventExecutorGroup = new DefaultEventExecutorGroup( nettyClientConfig.getClientWorkerThreads(), new ThreadFactory() { private AtomicInteger threadIndex = new AtomicInteger(0); @Override public Thread newThread(Runnable r) { return new Thread(r, "NettyClientWorkerThread_" + this.threadIndex.incrementAndGet()); } }); Bootstrap handler = this.bootstrap.group(this.eventLoopGroupWorker).channel(NioSocketChannel.class) .option(ChannelOption.TCP_NODELAY, true) .option(ChannelOption.SO_KEEPALIVE, false) .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, nettyClientConfig.getConnectTimeoutMillis()) .option(ChannelOption.SO_SNDBUF, nettyClientConfig.getClientSocketSndBufSize()) .option(ChannelOption.SO_RCVBUF, nettyClientConfig.getClientSocketRcvBufSize()) .handler(new ChannelInitializer<SocketChannel>() { @Override public void initChannel(SocketChannel ch) throws Exception { ChannelPipeline pipeline = ch.pipeline(); if (nettyClientConfig.isUseTLS()) { if (null != sslContext) { pipeline.addFirst(defaultEventExecutorGroup, "sslHandler", sslContext.newHandler(ch.alloc())); log.info("Prepend SSL handler"); } else { log.warn("Connections are insecure as SSLContext is null!"); } } pipeline.addLast( defaultEventExecutorGroup, //傳送訊息編碼 new NettyEncoder(), //接收訊息解碼 new NettyDecoder(), //心跳監測 new IdleStateHandler(0, 0, nettyClientConfig.getClientChannelMaxIdleTimeSeconds()), //連線管理handler,處理connect, disconnect, close等事件 new NettyConnectManageHandler(), //處理接收到RemotingCommand訊息後的事件, 收到伺服器端響應後的相關操作 new NettyClientHandler()); } }); this.timer.scheduleAtFixedRate(new TimerTask() { @Override public void run() { try { NettyRemotingClient.this.scanResponseTable(); } catch (Throwable e) { log.error("scanResponseTable exception", e); } } }, 1000 * 3, 1000); if (this.channelEventListener != null) { this.nettyEventExecutor.start(); } }
從 start 方法中啟動一個Netty 客戶端服務。
NettyEncoder NettyDecoder
序列化反序列化
通過分析 RemotingServer
和 RemotingClient
介面及實現可以發現,傳送訊息和接收到的訊息都是 RemotingCommand
物件。
經過分析 NettyEncoder
和 NettyDecoder
發現,序列化和反序列化呼叫的是 RemotingCommand
物件的 encode
和 decode
方法
訊息格式

- 第一部分是訊息的長度,佔用4個位元組。等於第二、三、四部分長度的總和。
- 第二部分是訊息頭的長度,佔用4個位元組。等於第三部分長度大小。
- 第三部分是通過Json序列化的訊息頭的資料。
- 第四部分是序列化的訊息資料。
具體的訊息格式我們通過 RemotingCommand類的 encode
和 decode
方法進行分析。
RemotingCommand.encode() 方法
public ByteBuffer encode() { // 1> header length size int length = 4; // 2> header data length byte[] headerData = this.headerEncode(); length += headerData.length; // 3> body data length if (this.body != null) { length += body.length; } ByteBuffer result = ByteBuffer.allocate(4 + length); // length result.putInt(length); // header length result.put(markProtocolType(headerData.length, serializeTypeCurrentRPC)); // header data result.put(headerData); // body data; if (this.body != null) { result.put(this.body); } result.flip(); return result; }
length = 4
2、通過 this.headerEncode() 獲取序列化的 header data。
3、然後申請一個長度為 length + header length + header data +body
大小的ByteBuffer。
ByteBuffer result = ByteBuffer.allocate(4 + length);
4、然後向 ByteBuffer result
中填充資料
headerEncode 方法
該方法主要是實現了訊息頭的序列化。
private byte[] headerEncode() { this.makeCustomHeaderToNet(); if (SerializeType.ROCKETMQ == serializeTypeCurrentRPC) { return RocketMQSerializable.rocketMQProtocolEncode(this); } else { return RemotingSerializable.encode(this); } }
序列化訊息頭有兩種方式SerializeType.ROCKETMQ 和 SerializeType.JSON。
如果是SerializeType.JSON方式序列化比較簡單。
RemotingSerializable.encode 方法
SerializeType.JSON 型別序列化。
public static byte[] encode(final Object obj) { final String json = toJson(obj, false); if (json != null) { return json.getBytes(CHARSET_UTF8); } return null; }
直接把物件轉換成json字串,然後轉換成 byte[] 陣列
RocketMQSerializable.rocketMQProtocolEncode 方法
SerializeType.ROCKETMQ 型別序列化。
public static byte[] rocketMQProtocolEncode(RemotingCommand cmd) { // String remark byte[] remarkBytes = null; int remarkLen = 0; if (cmd.getRemark() != null && cmd.getRemark().length() > 0) { remarkBytes = cmd.getRemark().getBytes(CHARSET_UTF8); remarkLen = remarkBytes.length; } // HashMap<String, String> extFields byte[] extFieldsBytes = null; int extLen = 0; if (cmd.getExtFields() != null && !cmd.getExtFields().isEmpty()) { extFieldsBytes = mapSerialize(cmd.getExtFields()); extLen = extFieldsBytes.length; } int totalLen = calTotalLen(remarkLen, extLen); ByteBuffer headerBuffer = ByteBuffer.allocate(totalLen); // int code(~32767) headerBuffer.putShort((short) cmd.getCode()); // LanguageCode language headerBuffer.put(cmd.getLanguage().getCode()); // int version(~32767) headerBuffer.putShort((short) cmd.getVersion()); // int opaque headerBuffer.putInt(cmd.getOpaque()); // int flag headerBuffer.putInt(cmd.getFlag()); // String remark if (remarkBytes != null) { headerBuffer.putInt(remarkBytes.length); headerBuffer.put(remarkBytes); } else { headerBuffer.putInt(0); } // HashMap<String, String> extFields; if (extFieldsBytes != null) { headerBuffer.putInt(extFieldsBytes.length); headerBuffer.put(extFieldsBytes); } else { headerBuffer.putInt(0); } return headerBuffer.array(); }
可以看到 程式碼把 RemotingCommand 物件中的資料按照一定的順序轉換成位元組儲存到ByteBuffer 中。
從程式碼中可以看出訊息頭中包括,request code、請求端實現語言、版本等資訊。
RemotingCommand.decode() 方法
public static RemotingCommand decode(final ByteBuffer byteBuffer) { int length = byteBuffer.limit(); int oriHeaderLen = byteBuffer.getInt(); int headerLength = getHeaderLength(oriHeaderLen); byte[] headerData = new byte[headerLength]; byteBuffer.get(headerData); RemotingCommand cmd = headerDecode(headerData, getProtocolType(oriHeaderLen)); int bodyLength = length - 4 - headerLength; byte[] bodyData = null; if (bodyLength > 0) { bodyData = new byte[bodyLength]; byteBuffer.get(bodyData); } cmd.body = bodyData; return cmd; }
這裡的byteBuffer中的資料包含 header length + header data +body data
。
為什麼不是包含 length+header length + header data +body data
呢?
因為netty在獲取這條訊息的時候是通過 io.netty.handler.codec.LengthFieldBasedFrameDecoder
進行拆包的。該拆包的原理就是通過 訊息的 length
長度進行拆分的。所以拆分出來的資料就是 header length + header data +body data
這部分。
1、從byteBuffer中獲取header length 長度。
2、然後再通過header length 長度從 byteBuffer 獲取 header data。
3、剩下的byteBuffer資料就是body的資料。
把解析出來的資料轉換成 RemotingCommand 物件。
private static RemotingCommand headerDecode(byte[] headerData, SerializeType type) { switch (type) { case JSON: RemotingCommand resultJson = RemotingSerializable.decode(headerData, RemotingCommand.class); resultJson.setSerializeTypeCurrentRPC(type); return resultJson; case ROCKETMQ: RemotingCommand resultRMQ = RocketMQSerializable.rocketMQProtocolDecode(headerData); resultRMQ.setSerializeTypeCurrentRPC(type); return resultRMQ; default: break; } return null; }
判斷該資料是通過 SerializeType.ROCKETMQ 序列化還是 SerializeType.JSON 序列化的。
然後根據型別進行反序列化操作。
RemotingSerializable.decode 方法
SerializeType.JSON 反序列化。
public static <T> T decode(final byte[] data, Class<T> classOfT) { final String json = new String(data, CHARSET_UTF8); return fromJson(json, classOfT); }
直接把 json 資料反序列化成物件。
RocketMQSerializable.rocketMQProtocolDecode 方法
SerializeType.ROCKETMQ 反序列化。
public static RemotingCommand rocketMQProtocolDecode(final byte[] headerArray) { RemotingCommand cmd = new RemotingCommand(); ByteBuffer headerBuffer = ByteBuffer.wrap(headerArray); // int code(~32767) cmd.setCode(headerBuffer.getShort()); // LanguageCode language cmd.setLanguage(LanguageCode.valueOf(headerBuffer.get())); // int version(~32767) cmd.setVersion(headerBuffer.getShort()); // int opaque cmd.setOpaque(headerBuffer.getInt()); // int flag cmd.setFlag(headerBuffer.getInt()); // String remark int remarkLength = headerBuffer.getInt(); if (remarkLength > 0) { byte[] remarkContent = new byte[remarkLength]; headerBuffer.get(remarkContent); cmd.setRemark(new String(remarkContent, CHARSET_UTF8)); } // HashMap<String, String> extFields int extFieldsLength = headerBuffer.getInt(); if (extFieldsLength > 0) { byte[] extFieldsBytes = new byte[extFieldsLength]; headerBuffer.get(extFieldsBytes); cmd.setExtFields(mapDeserialize(extFieldsBytes)); } return cmd; }
根據 encode 的順序進行反序列化操作。