RabbitMQ Consumer獲取訊息的兩種方式(poll,subscribe)解析
http://blog.csdn.net/yangbutao/article/details/10395599
rabbitMQ中consumer通過建立到queue的連線,建立channel物件,通過channel通道獲取message,
Consumer可以宣告式的以API輪詢poll的方式主動從queue的獲取訊息,也可以通過訂閱的方式被動的從Queue中消費訊息,
最近翻閱了基於java的客戶端的相關原始碼,簡單做個分析。
程式設計模型虛擬碼如下:
ConnectionFactory factory = new ConnectionFactory();
Connection conn = factory.newConnection();
Channel channel=conn.createChannel();
建立Connection需要指定MQ的實體地址和埠,是socket tcp物理連線,而channel是一個邏輯的概念,支援在tcp連線上建立多個MQ channel
以下是基於channel上的兩種消費方式。
1、Subscribe訂閱方式
boolean autoAck = false;
channel.basicConsume(queueName, autoAck, "myConsumerTag",
new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag,
Envelope envelope,
AMQP.BasicProperties properties,
byte[] body)
throws IOException
{
String routingKey = envelope.getRoutingKey();
String contentType = properties.contentType;
long deliveryTag = envelope.getDeliveryTag();
// (process the message components here ...)
channel.basicAck(deliveryTag, false);
}
});
訂閱方式其實是向queue註冊consumer,通過rpc向queue server傳送註冊consumer的訊息,rabbitMQ Server在收到訊息後,根據訊息的內容型別判斷這是一個訂閱訊息,
這樣當MQ 中queue有訊息時,會自動把訊息通過該socket(長連線)通道傳送出去。
參見ChannelN中的方法
public String basicConsume(String queue, boolean autoAck, String consumerTag,
boolean noLocal, boolean exclusive, Map<String, Object> arguments,
final Consumer callback)
throws IOException
{
......
rpc((Method)
new Basic.Consume.Builder()
.queue(queue)
.consumerTag(consumerTag)
.noLocal(noLocal)
.noAck(autoAck)
.exclusive(exclusive)
.arguments(arguments)
.build(),
k);
try {
return k.getReply();
} catch(ShutdownSignalException ex) {
throw wrap(ex);
}
}
Consumer接收訊息的過程:
建立Connection後,會啟動MainLoop後臺執行緒,迴圈從socket(FrameHandler)中獲取資料包(Frame),呼叫channel.handleFrame(Frame frame)處理訊息,
public void handleFrame(Frame frame) throws IOException {
AMQCommand command = _command;
if (command.handleFrame(frame)) { // 對訊息進行協議assemble
_command = new AMQCommand(); // prepare for the next one
handleCompleteInboundCommand(command);//對訊息消費處理
}
}
ChannelN.handleCompleteInboundCommand
---ChannelN.processAsync
----dispatcher.handleDelivery
---QueueingConsumer.handleDelivery
---this._queue.add(new Delivery(envelope, properties, body));//訊息最終放到佇列中
每個Consumer都有一個BlockQueue,用於快取從socket中獲取的訊息。
接下來,Consumer物件就可以呼叫api來從客戶端快取的_queue中依次獲取訊息,進行消費,參見QueueingConsumer.nextDelivery()
對於這種長連線的方式,沒看到心跳功能,以防止長連線的因網路等原因連線失效
2、poll API方式
ChannelN:
GetResponse basicGet(String queue, boolean autoAck)
這種方式比較簡單,直接通過RPC從MQ Server端獲取佇列中的訊息