1. 程式人生 > >RabbitMQ客戶端原始碼分析(三)之Command

RabbitMQ客戶端原始碼分析(三)之Command

RabbitMQ-java-client版本

  1. com.rabbitmq:amqp-client:4.3.0
  2. RabbitMQ版本宣告: 3.6.15

Command

  1. Command介面是AMQP方法-引數的容器介面,帶有可選的內容頭(content header)和內容體(content body)

  2. 原始碼

    /**
     * Interface to a container for an AMQP method-and-arguments, with optional content header and body.
     * AMQP方法和引數的容器介面,帶有可選的content header and body
     */
    public interface Command { /** * Retrieves the {@link Method} held within this Command. Downcast to * concrete (implementation-specific!) subclasses as necessary. * * @return the command's method. */ Method getMethod(); /** * Retrieves the ContentHeader subclass instance held as part of this Command, if any. * * Downcast to one of the inner classes of AMQP, * for instance {@link AMQP.BasicProperties}, as appropriate. * * @return the Command's {@link ContentHeader}, or null if none */
    ContentHeader getContentHeader(); /** * Retrieves the body byte array that travelled as part of this * Command, if any. * * @return the Command's content body, or null if none */ byte[] getContentBody(); }

AMQCommand

  1. MQCommand委託多個CommandAssembler類進行位元組流與協議的轉換,CommandAssembler
    是執行緒安全的
  2. 核心方法原始碼,將Command轉換為多個Frame並且傳送
 public void transmit(AMQChannel channel) throws IOException {
        int channelNumber = channel.getChannelNumber();
        AMQConnection connection = channel.getConnection();

        synchronized (assembler) {
            Method m = this.assembler.getMethod();
            connection.writeFrame(m.toFrame(channelNumber));
            if (m.hasContent()) {
                byte[] body = this.assembler.getContentBody();

                connection.writeFrame(this.assembler.getContentHeader()
                        .toFrame(channelNumber, body.length));
                //如果body長度超過client與server協商出的最大幀長度,將分多個Frame傳送
                int frameMax = connection.getFrameMax();
                int bodyPayloadMax = (frameMax == 0) ? body.length : frameMax
                        - EMPTY_FRAME_SIZE;

                for (int offset = 0; offset < body.length; offset += bodyPayloadMax) {
                    int remaining = body.length - offset;

                    int fragmentLength = (remaining < bodyPayloadMax) ? remaining
                            : bodyPayloadMax;
                    Frame frame = Frame.fromBodyFragment(channelNumber, body,
                            offset, fragmentLength);
                    connection.writeFrame(frame);
                }
            }
        }

        connection.flush();
    }

CommandAssembler

  1. 原始碼分析,CommandAssembler是執行緒安全的

        final class CommandAssembler {
            private static final byte[] EMPTY_BYTE_ARRAY = new byte[0];
        
            /** Current state, used to decide how to handle each incoming frame.
             * 當前狀態,用於決定如何處理每個傳入的幀,不過這個名字取名CAState真的好嗎????(哈哈)
             * */
            private enum CAState {
                EXPECTING_METHOD, EXPECTING_CONTENT_HEADER, EXPECTING_CONTENT_BODY, COMPLETE
            }
            //
            private CAState state;
        
            /** The method for this command */
            private Method method;
            
            /** The content header for this command */
            private AMQContentHeader contentHeader;
        
            /** The fragments of this command's content body - a list of byte[] */
            private final List<byte[]> bodyN;
            /** sum of the lengths of all fragments 所有內容幀body陣列加起來的長度*/
            private int bodyLength;
        
            /** No bytes of content body not yet accumulated */
            private long remainingBodyBytes;
        
            public CommandAssembler(Method method, AMQContentHeader contentHeader, byte[] body) {
                this.method = method;
                this.contentHeader = contentHeader;
                this.bodyN = new ArrayList<byte[]>(2);
                this.bodyLength = 0;
                this.remainingBodyBytes = 0;
                appendBodyFragment(body);
                if (method == null) {
                    this.state = CAState.EXPECTING_METHOD;
                } else if (contentHeader == null) {
                    this.state = method.hasContent() ? CAState.EXPECTING_CONTENT_HEADER : CAState.COMPLETE;
                } else {
                    this.remainingBodyBytes = contentHeader.getBodySize() - this.bodyLength;
                    updateContentBodyState();
                }
            }
        
            public synchronized Method getMethod() {
                return this.method;
            }
        
            public synchronized AMQContentHeader getContentHeader() {
                return this.contentHeader;
            }
        
            /** @return true if the command is complete */
            public synchronized boolean isComplete() {
                return (this.state == CAState.COMPLETE);
            }
        
            /** Decides whether more body frames are expected */
            private void updateContentBodyState() {
                this.state = (this.remainingBodyBytes > 0) ? CAState.EXPECTING_CONTENT_BODY : CAState.COMPLETE;
            }
        
            private void consumeMethodFrame(Frame f) throws IOException {
                if (f.type == AMQP.FRAME_METHOD) {
                    //如果是方法幀就讀取方法幀payload中的流,這裡的f.getInputStream()僅僅是payload,不包含幀頭資料
                    this.method = AMQImpl.readMethodFrom(f.getInputStream());
                    this.state = this.method.hasContent() ? CAState.EXPECTING_CONTENT_HEADER : CAState.COMPLETE;
                } else {
                    throw new UnexpectedFrameError(f, AMQP.FRAME_METHOD);
                }
            }
        
            private void consumeHeaderFrame(Frame f) throws IOException {
                if (f.type == AMQP.FRAME_HEADER) {
                    //內容頭幀
                    this.contentHeader = AMQImpl.readContentHeaderFrom(f.getInputStream());
                    this.remainingBodyBytes = this.contentHeader.getBodySize();
                    updateContentBodyState();
                } else {
                    throw new UnexpectedFrameError(f, AMQP.FRAME_HEADER);
                }
            }
        
            private void consumeBodyFrame(Frame f) {
                if (f.type == AMQP.FRAME_BODY) {
                    byte[] fragment = f.getPayload();
                    this.remainingBodyBytes -= fragment.length;
                    updateContentBodyState();
                    if (this.remainingBodyBytes < 0) {
                        throw new UnsupportedOperationException("%%%%%% FIXME unimplemented");
                    }
                    appendBodyFragment(fragment);
                } else {
                    throw new UnexpectedFrameError(f, AMQP.FRAME_BODY);
                }
            }
        
            /** Stitches together a fragmented content body into a single byte array
             * 合併內容幀body陣列
             * */
            private byte[] coalesceContentBody() {
                if (this.bodyLength == 0) return EMPTY_BYTE_ARRAY;
                if (this.bodyN.size() == 1) return this.bodyN.get(0);
        
                byte[] body = new byte[bodyLength];
                int offset = 0;
                for (byte[] fragment : this.bodyN) {
                    System.arraycopy(fragment, 0, body, offset, fragment.length);
                    offset += fragment.length;
                }
                this.bodyN.clear();
                this.bodyN.add(body);
                return body;
            }
        
            public synchronized byte[] getContentBody() {
                return coalesceContentBody();
            }
        
            private void appendBodyFragment(byte[] fragment) {
                if (fragment == null || fragment.length == 0) return;
                bodyN.add(fragment);
                bodyLength += fragment.length;
            }
        
            /**
             * 處理Frame,根據當前的狀態獲取相關的Frame
             * @param f frame to be incorporated
             * @return true if command becomes complete
             * @throws IOException if error reading frame
             */
            public synchronized boolean handleFrame(Frame f) throws IOException
            {
                switch (this.state) {
                  case EXPECTING_METHOD:          consumeMethodFrame(f); break;
                  case EXPECTING_CONTENT_HEADER:  consumeHeaderFrame(f); break;
                  case EXPECTING_CONTENT_BODY:    consumeBodyFrame(f);   break;
        
                  default:
                      throw new AssertionError("Bad Command State " + this.state);
                }
                return isComplete();
            }
        }