(六)RabbitMQ訊息佇列-訊息任務分發與訊息ACK確認機制(PHP版)
在前面一章介紹了在PHP中如何使用RabbitMQ,至此入門的的部分就完成了,我們內心中一定還有很多疑問:如果多個消費者消費同一個佇列怎麼辦?如果這幾個消費者分任務的權重不同怎麼辦?怎麼把同一個佇列不同級別的任務分發給不同的消費者?如果消費者異常離線怎麼辦?不要著急,後面將慢慢解開面紗。我們將結合實際的應用場景來講解更多的高階用法。
任務分發機制
設想如果把每個訊息當做一個任務,生產者把任務釋出到RabbitMQ,然後Consumer接收訊息處理任務,如果我們發現一個Consumer不能完成任務處理怎麼辦呢,我們會增加Consumer的數量。由一個Consumer增加到兩個Consumer,如圖由C變為C1和C2共同來分單工作。如果C1和C2是完全一樣的,那RabbitMQ會將任務平均分發到兩個消費者。
如下我們新建c1.php和c2.php來訂閱同一個佇列在接收到訊息後sleep1秒模擬任務處理的時間。
p.php程式碼,生產100條帶編號的訊息:
<?php
/*
* 釋出-訂閱-P
* create by superrd
*/
$queueName = 'superrd';
$exchangeName = 'superrd';
$routeKey = 'superrd';
$message = 'task--';
$connection = new AMQPConnection(array('host' => '10.99.121.137', 'port' => '5672' , 'vhost' => '/', 'login' => 'superrd', 'password' => 'superrd'));
$connection->connect() or die("Cannot connect to the broker!\n");
try {
$channel = new AMQPChannel($connection);
$exchange = new AMQPExchange($channel);
$exchange->setName($exchangeName);
$exchange ->setType(AMQP_EX_TYPE_DIRECT);
$exchange->setFlags(AMQP_DURABLE);
$exchange->declareExchange();
$queue = new AMQPQueue($channel);
$queue->setName($queueName);
$queue->setFlags(AMQP_DURABLE);
$queue->declareQueue();
$queue->bind($exchangeName, $routeKey);
for($i=0 ; $i<100;$i++){
$exchange->publish($message.$i,$routeKey);
var_dump("[x] Sent $message $i");
}
} catch (AMQPConnectionException $e) {
var_dump($e);
exit();
}
$connection->disconnect();
c1.php和c2.php程式碼完全一樣:
<?php
/*
* 釋出-訂閱-c1c2
* create by superrd
*/
$queueName = 'superrd';
$exchangeName = 'superrd';
$connection->connect() or die("Cannot connect to the broker!\n");
$channel = new AMQPChannel($connection);
$exchange = new AMQPExchange($channel);
$exchange->setName($exchangeName);
$exchange->setType(AMQP_EX_TYPE_DIRECT);
$exchange->setFlags(AMQP_DURABLE);
$exchange->declareExchange();
$queue = new AMQPQueue($channel);
$queue->setName($queueName);
$queue->setFlags(AMQP_DURABLE);
$queue->declareQueue();
$queue->bind($exchangeName, $routeKey);
//阻塞模式接收訊息
echo "Message:\n";
while(True){
$queue->consume('processMessage');
//自動ACK應答
//$queue->consume('processMessage', AMQP_AUTOACK);
}
$conn->disconnect();
/*
* 消費回撥函式
* 處理訊息
*/
function processMessage($envelope, $q) {
$msg = $envelope->getBody();
sleep(1); //sleep1秒模擬任務處理
echo $msg."\n"; //處理訊息
$q->ack($envelope->getDeliveryTag()); //手動傳送ACK應答
}
開啟兩個中斷視窗分別執行c1.php和c2.php指令碼。確定兩個指令碼處於訂閱狀態,然後執行p.php指令碼。
看到上面兩幅圖結果就一目瞭然了。因為兩個指令碼sleep的時間相同所以任務是完全平均分發到兩個消費者的。我們修改下c2.php指令碼的sleep時間為2秒,看下結果會怎麼樣。
可以看到c1.php指令碼共收到66條訊息,c2.php指令碼收到34條訊息,基本是按照2:1來分配。那RabbitMQ是如何來保證這樣的分發機制呢,下面看RabbitMQ是如何通過ACK確認機制來實現任務分發的。
ACK訊息確認機制
首先RabbitMQ支援訊息確認機制來本證訊息被consumer正常處理,當然也可以通過no-ack不使用確認機制。RabbitMQ預設是使用ACK確認機制的。當Consumer接收到RabbitMQ釋出的訊息時需要在適當的時機發送一個ACK確認的包來告知RabbitMQ,自己接收到了訊息併成功處理。所以前面講到適當的時機建議是在處理完訊息任務後傳送。正如我們之前的程式碼。
$msg = $envelope->getBody();
sleep(1); //sleep1秒模擬任務處理
echo $msg."\n"; //處理訊息
$q->ack($envelope->getDeliveryTag()); //手動傳送ACK應答
那如果不傳送會怎樣呢?
在RabbitMQ中有一個prefetch_count的概念,這個引數的意思是允許Consumer最多同時處理幾個任務。我的版本的RabbitMQ預設這個引數是3,也就是說如果某一個Consumer在收到訊息後沒有傳送ACK確認包,RabbitMQ就會任務Consumer還在處理任務,當有3個訊息都沒有傳送ACK確認包時,RabbitMQ就不會再發送訊息給該Consumer。
我們把c2.php的sleep時間改回1秒,並且註釋掉ACK確認。
$msg = $envelope->getBody();
sleep(1); //sleep1秒模擬任務處理
echo $msg."\n"; //處理訊息
//$q->ack($envelope->getDeliveryTag()); //手動傳送ACK應答
發現c2指令碼只收到三條訊息。通過WEB管理工具也可以看到有三條訊息是沒有被ACK確認的。
當然任務並不會一直卡在這裡,在這是RabbitMQ任務c2在處理這三個任務。如果c2忽然終止RabbitMQ會重新分發任務。如下我終止c2指令碼。
三條任務被重新分發到了c1。再檢視下WEB管理工具,unackd已經為0
如果Consumer數量很多或者希望每個Consumer同時只處理一個任務可以通過在Consumer中設定PrefetchCount來實現更加均勻的任務分發。
$channel = new AMQPChannel($connection);
$channel->setPrefetchCount(1);
如下我修改了c2的PrefetchCount為1。在WEB管理外掛中可以看到已經有一個Consumer的PrefetchCount為1了。
RabbitMQ技術交流QQ群:327034977(新增時請備註RabbitMQ)