RabbitMQ客戶端原始碼分析(三)之Command
阿新 • • 發佈:2018-12-16
RabbitMQ-java-client版本
com.rabbitmq:amqp-client:4.3.0
RabbitMQ
版本宣告: 3.6.15
Command
-
Command
介面是AMQP方法-引數的容器介面,帶有可選的內容頭(content header)和內容體(content body) -
原始碼
/** * Interface to a container for an AMQP method-and-arguments, with optional content header and body. * AMQP方法和引數的容器介面,帶有可選的content header and body */
AMQCommand
- MQCommand委託多個
CommandAssembler
類進行位元組流與協議的轉換,CommandAssembler
- 核心方法原始碼,將
Command
轉換為多個Frame
並且傳送
public void transmit(AMQChannel channel) throws IOException {
int channelNumber = channel.getChannelNumber();
AMQConnection connection = channel.getConnection();
synchronized (assembler) {
Method m = this.assembler.getMethod();
connection.writeFrame(m.toFrame(channelNumber));
if (m.hasContent()) {
byte[] body = this.assembler.getContentBody();
connection.writeFrame(this.assembler.getContentHeader()
.toFrame(channelNumber, body.length));
//如果body長度超過client與server協商出的最大幀長度,將分多個Frame傳送
int frameMax = connection.getFrameMax();
int bodyPayloadMax = (frameMax == 0) ? body.length : frameMax
- EMPTY_FRAME_SIZE;
for (int offset = 0; offset < body.length; offset += bodyPayloadMax) {
int remaining = body.length - offset;
int fragmentLength = (remaining < bodyPayloadMax) ? remaining
: bodyPayloadMax;
Frame frame = Frame.fromBodyFragment(channelNumber, body,
offset, fragmentLength);
connection.writeFrame(frame);
}
}
}
connection.flush();
}
CommandAssembler
-
原始碼分析,
CommandAssembler
是執行緒安全的final class CommandAssembler { private static final byte[] EMPTY_BYTE_ARRAY = new byte[0]; /** Current state, used to decide how to handle each incoming frame. * 當前狀態,用於決定如何處理每個傳入的幀,不過這個名字取名CAState真的好嗎????(哈哈) * */ private enum CAState { EXPECTING_METHOD, EXPECTING_CONTENT_HEADER, EXPECTING_CONTENT_BODY, COMPLETE } // private CAState state; /** The method for this command */ private Method method; /** The content header for this command */ private AMQContentHeader contentHeader; /** The fragments of this command's content body - a list of byte[] */ private final List<byte[]> bodyN; /** sum of the lengths of all fragments 所有內容幀body陣列加起來的長度*/ private int bodyLength; /** No bytes of content body not yet accumulated */ private long remainingBodyBytes; public CommandAssembler(Method method, AMQContentHeader contentHeader, byte[] body) { this.method = method; this.contentHeader = contentHeader; this.bodyN = new ArrayList<byte[]>(2); this.bodyLength = 0; this.remainingBodyBytes = 0; appendBodyFragment(body); if (method == null) { this.state = CAState.EXPECTING_METHOD; } else if (contentHeader == null) { this.state = method.hasContent() ? CAState.EXPECTING_CONTENT_HEADER : CAState.COMPLETE; } else { this.remainingBodyBytes = contentHeader.getBodySize() - this.bodyLength; updateContentBodyState(); } } public synchronized Method getMethod() { return this.method; } public synchronized AMQContentHeader getContentHeader() { return this.contentHeader; } /** @return true if the command is complete */ public synchronized boolean isComplete() { return (this.state == CAState.COMPLETE); } /** Decides whether more body frames are expected */ private void updateContentBodyState() { this.state = (this.remainingBodyBytes > 0) ? CAState.EXPECTING_CONTENT_BODY : CAState.COMPLETE; } private void consumeMethodFrame(Frame f) throws IOException { if (f.type == AMQP.FRAME_METHOD) { //如果是方法幀就讀取方法幀payload中的流,這裡的f.getInputStream()僅僅是payload,不包含幀頭資料 this.method = AMQImpl.readMethodFrom(f.getInputStream()); this.state = this.method.hasContent() ? CAState.EXPECTING_CONTENT_HEADER : CAState.COMPLETE; } else { throw new UnexpectedFrameError(f, AMQP.FRAME_METHOD); } } private void consumeHeaderFrame(Frame f) throws IOException { if (f.type == AMQP.FRAME_HEADER) { //內容頭幀 this.contentHeader = AMQImpl.readContentHeaderFrom(f.getInputStream()); this.remainingBodyBytes = this.contentHeader.getBodySize(); updateContentBodyState(); } else { throw new UnexpectedFrameError(f, AMQP.FRAME_HEADER); } } private void consumeBodyFrame(Frame f) { if (f.type == AMQP.FRAME_BODY) { byte[] fragment = f.getPayload(); this.remainingBodyBytes -= fragment.length; updateContentBodyState(); if (this.remainingBodyBytes < 0) { throw new UnsupportedOperationException("%%%%%% FIXME unimplemented"); } appendBodyFragment(fragment); } else { throw new UnexpectedFrameError(f, AMQP.FRAME_BODY); } } /** Stitches together a fragmented content body into a single byte array * 合併內容幀body陣列 * */ private byte[] coalesceContentBody() { if (this.bodyLength == 0) return EMPTY_BYTE_ARRAY; if (this.bodyN.size() == 1) return this.bodyN.get(0); byte[] body = new byte[bodyLength]; int offset = 0; for (byte[] fragment : this.bodyN) { System.arraycopy(fragment, 0, body, offset, fragment.length); offset += fragment.length; } this.bodyN.clear(); this.bodyN.add(body); return body; } public synchronized byte[] getContentBody() { return coalesceContentBody(); } private void appendBodyFragment(byte[] fragment) { if (fragment == null || fragment.length == 0) return; bodyN.add(fragment); bodyLength += fragment.length; } /** * 處理Frame,根據當前的狀態獲取相關的Frame * @param f frame to be incorporated * @return true if command becomes complete * @throws IOException if error reading frame */ public synchronized boolean handleFrame(Frame f) throws IOException { switch (this.state) { case EXPECTING_METHOD: consumeMethodFrame(f); break; case EXPECTING_CONTENT_HEADER: consumeHeaderFrame(f); break; case EXPECTING_CONTENT_BODY: consumeBodyFrame(f); break; default: throw new AssertionError("Bad Command State " + this.state); } return isComplete(); } }