1. 程式人生 > >rabbitmq消費訊息的兩種方式

rabbitmq消費訊息的兩種方式

rabbitMQ中consumer通過建立到queue的連線,建立channel物件,通過channel通道獲取message,
Consumer可以宣告式的以API輪詢poll的方式主動從queue的獲取訊息,也可以通過訂閱的方式被動的從Queue中消費訊息,
最近翻閱了基於java的客戶端的相關原始碼,簡單做個分析。
程式設計模型虛擬碼如下:
ConnectionFactory factory = new ConnectionFactory();
Connection conn = factory.newConnection();
Channel channel=conn.createChannel();
建立Connection需要指定MQ的實體地址和埠,是socket tcp物理連線,而channel是一個邏輯的概念,支援在tcp連線上建立多個MQ channel
以下是基於channel上的兩種消費方式。

1、Subscribe訂閱方式

boolean autoAck = false;
channel.basicConsume(queueName, autoAck, "myConsumerTag",
     new DefaultConsumer(channel) {
         @Override
         public void handleDelivery(String consumerTag,
                                    Envelope envelope,
                                    AMQP.BasicProperties properties,
                                    byte[] body)
             throws IOException
         {
             String routingKey = envelope.getRoutingKey();
             String contentType = properties.contentType;
             long deliveryTag = envelope.getDeliveryTag();
             // (process the message components here ...)
             channel.basicAck(deliveryTag, false);
         }
     });


訂閱方式其實是向queue註冊consumer,通過rpc向queue server傳送註冊consumer的訊息,rabbitMQ Server在收到訊息後,根據訊息的內容型別判斷這是一個訂閱訊息,
這樣當MQ 中queue有訊息時,會自動把訊息通過該socket(長連線)通道傳送出去。
參見ChannelN中的方法   

public String basicConsume(String queue, boolean autoAck, String consumerTag,
                               boolean noLocal, boolean exclusive, Map<String, Object> arguments,
                               final Consumer callback)
        throws IOException
    {
    ......
        rpc((Method)
            new Basic.Consume.Builder()
             .queue(queue)
             .consumerTag(consumerTag)
             .noLocal(noLocal)
             .noAck(autoAck)
             .exclusive(exclusive)
             .arguments(arguments)
            .build(),
            k);

        try {
            return k.getReply();
        } catch(ShutdownSignalException ex) {
            throw wrap(ex);
        }
    }


Consumer接收訊息的過程:
建立Connection後,會啟動MainLoop後臺執行緒,迴圈從socket(FrameHandler)中獲取資料包(Frame),呼叫channel.handleFrame(Frame frame)處理訊息,
    public void handleFrame(Frame frame) throws IOException {
        AMQCommand command = _command;
        if (command.handleFrame(frame)) { // 對訊息進行協議assemble
            _command = new AMQCommand(); // prepare for the next one
            handleCompleteInboundCommand(command);//對訊息消費處理
        }
    }
ChannelN.handleCompleteInboundCommand
       ---ChannelN.processAsync
           ----dispatcher.handleDelivery
                 ---QueueingConsumer.handleDelivery
                     ---this._queue.add(new Delivery(envelope, properties, body));//訊息最終放到佇列中
每個Consumer都有一個BlockQueue,用於快取從socket中獲取的訊息。
接下來,Consumer物件就可以呼叫api來從客戶端快取的_queue中依次獲取訊息,進行消費,參見QueueingConsumer.nextDelivery()

對於這種長連線的方式,沒看到心跳功能,以防止長連線的因網路等原因連線失效


2、poll API方式
ChannelN:
GetResponse basicGet(String queue, boolean autoAck)

這種方式比較簡單,直接通過RPC從MQ Server端獲取佇列中的訊息

關注公眾號: