1. 程式人生 > >(六)RabbitMQ訊息佇列-訊息任務分發與訊息ACK確認機制(PHP版)

(六)RabbitMQ訊息佇列-訊息任務分發與訊息ACK確認機制(PHP版)

在前面一章介紹了在PHP中如何使用RabbitMQ,至此入門的的部分就完成了,我們內心中一定還有很多疑問:如果多個消費者消費同一個佇列怎麼辦?如果這幾個消費者分任務的權重不同怎麼辦?怎麼把同一個佇列不同級別的任務分發給不同的消費者?如果消費者異常離線怎麼辦?不要著急,後面將慢慢解開面紗。我們將結合實際的應用場景來講解更多的高階用法。

任務分發機制

設想如果把每個訊息當做一個任務,生產者把任務釋出到RabbitMQ,然後Consumer接收訊息處理任務,如果我們發現一個Consumer不能完成任務處理怎麼辦呢,我們會增加Consumer的數量。由一個Consumer增加到兩個Consumer,如圖由C變為C1和C2共同來分單工作。如果C1和C2是完全一樣的,那RabbitMQ會將任務平均分發到兩個消費者。
RabbitMQ任務分發

如下我們新建c1.php和c2.php來訂閱同一個佇列在接收到訊息後sleep1秒模擬任務處理的時間。

p.php程式碼,生產100條帶編號的訊息:

<?php

/*
 * 釋出-訂閱-P
 * create by superrd
 */

$queueName = 'superrd';
$exchangeName = 'superrd';
$routeKey = 'superrd';
$message = 'task--';
$connection = new AMQPConnection(array('host' => '10.99.121.137', 'port' => '5672'
, 'vhost' => '/', 'login' => 'superrd', 'password' => 'superrd')); $connection->connect() or die("Cannot connect to the broker!\n"); try { $channel = new AMQPChannel($connection); $exchange = new AMQPExchange($channel); $exchange->setName($exchangeName); $exchange
->setType(AMQP_EX_TYPE_DIRECT); $exchange->setFlags(AMQP_DURABLE); $exchange->declareExchange(); $queue = new AMQPQueue($channel); $queue->setName($queueName); $queue->setFlags(AMQP_DURABLE); $queue->declareQueue(); $queue->bind($exchangeName, $routeKey); for($i=0 ; $i<100;$i++){ $exchange->publish($message.$i,$routeKey); var_dump("[x] Sent $message $i"); } } catch (AMQPConnectionException $e) { var_dump($e); exit(); } $connection->disconnect();

c1.php和c2.php程式碼完全一樣:

<?php

/*
 * 釋出-訂閱-c1c2
 * create by superrd
 */

$queueName = 'superrd';
$exchangeName = 'superrd';

$connection->connect() or die("Cannot connect to the broker!\n");
$channel = new AMQPChannel($connection);
$exchange = new AMQPExchange($channel);
$exchange->setName($exchangeName);
$exchange->setType(AMQP_EX_TYPE_DIRECT);
$exchange->setFlags(AMQP_DURABLE);
$exchange->declareExchange();

$queue = new AMQPQueue($channel);
$queue->setName($queueName);
$queue->setFlags(AMQP_DURABLE);
$queue->declareQueue();

$queue->bind($exchangeName, $routeKey);
//阻塞模式接收訊息

echo "Message:\n";
while(True){
        $queue->consume('processMessage');
//自動ACK應答
        //$queue->consume('processMessage', AMQP_AUTOACK);
}

$conn->disconnect();
/*
* 消費回撥函式
* 處理訊息
*/
function processMessage($envelope, $q) {
    $msg = $envelope->getBody();
    sleep(1);  //sleep1秒模擬任務處理
    echo $msg."\n"; //處理訊息
    $q->ack($envelope->getDeliveryTag()); //手動傳送ACK應答
}

開啟兩個中斷視窗分別執行c1.php和c2.php指令碼。確定兩個指令碼處於訂閱狀態,然後執行p.php指令碼。
RabbitMQ任務分發

RabbitMQ任務分發

看到上面兩幅圖結果就一目瞭然了。因為兩個指令碼sleep的時間相同所以任務是完全平均分發到兩個消費者的。我們修改下c2.php指令碼的sleep時間為2秒,看下結果會怎麼樣。
RabbitMQ任務分發

RabbitMQ任務分發

可以看到c1.php指令碼共收到66條訊息,c2.php指令碼收到34條訊息,基本是按照2:1來分配。那RabbitMQ是如何來保證這樣的分發機制呢,下面看RabbitMQ是如何通過ACK確認機制來實現任務分發的。

ACK訊息確認機制

首先RabbitMQ支援訊息確認機制來本證訊息被consumer正常處理,當然也可以通過no-ack不使用確認機制。RabbitMQ預設是使用ACK確認機制的。當Consumer接收到RabbitMQ釋出的訊息時需要在適當的時機發送一個ACK確認的包來告知RabbitMQ,自己接收到了訊息併成功處理。所以前面講到適當的時機建議是在處理完訊息任務後傳送。正如我們之前的程式碼。

    $msg = $envelope->getBody();
    sleep(1);  //sleep1秒模擬任務處理
    echo $msg."\n"; //處理訊息
    $q->ack($envelope->getDeliveryTag()); //手動傳送ACK應答

那如果不傳送會怎樣呢?

在RabbitMQ中有一個prefetch_count的概念,這個引數的意思是允許Consumer最多同時處理幾個任務。我的版本的RabbitMQ預設這個引數是3,也就是說如果某一個Consumer在收到訊息後沒有傳送ACK確認包,RabbitMQ就會任務Consumer還在處理任務,當有3個訊息都沒有傳送ACK確認包時,RabbitMQ就不會再發送訊息給該Consumer。
我們把c2.php的sleep時間改回1秒,並且註釋掉ACK確認。

    $msg = $envelope->getBody();
    sleep(1);  //sleep1秒模擬任務處理
    echo $msg."\n"; //處理訊息
    //$q->ack($envelope->getDeliveryTag()); //手動傳送ACK應答

RabbitMQ訊息確認機制

RabbitMQ訊息確認機制

發現c2指令碼只收到三條訊息。通過WEB管理工具也可以看到有三條訊息是沒有被ACK確認的。
RabbitMQ訊息確認機制

當然任務並不會一直卡在這裡,在這是RabbitMQ任務c2在處理這三個任務。如果c2忽然終止RabbitMQ會重新分發任務。如下我終止c2指令碼。

RabbitMQ訊息確認機制

三條任務被重新分發到了c1。再檢視下WEB管理工具,unackd已經為0
RabbitMQ訊息確認機制

如果Consumer數量很多或者希望每個Consumer同時只處理一個任務可以通過在Consumer中設定PrefetchCount來實現更加均勻的任務分發。

$channel = new AMQPChannel($connection);
$channel->setPrefetchCount(1);

如下我修改了c2的PrefetchCount為1。在WEB管理外掛中可以看到已經有一個Consumer的PrefetchCount為1了。

RabbitMQACK確認機制

RabbitMQ技術交流QQ群:327034977(新增時請備註RabbitMQ)