1. 程式人生 > >RabbitMQ訊息佇列在PHP下的應用

RabbitMQ訊息佇列在PHP下的應用

訊息佇列的實現中,RabbitMQ以其健壯和可靠見長.公司的專案中選擇了它作為訊息佇列的實現.關於MQ的機制和原理網上有很多文章可以看,這裡就不再贅述,只講幾個比較容易混淆的問題

1,binding key和routing key

  binding key和routing key是都不過是自己設定的一組字元,只是用的地方不同,binding key是在繫結交換機和佇列時候通過方法傳遞的字串,routing key是在釋出訊息時候,順便帶上的字串,有些人說這兩個其實是一個東西,也對也不對,說對,是因為這兩個可以完全一樣,說不對,是因為這兩個起的作用不同,一個交換機可以繫結很多佇列,但是每個佇列也許需要的訊息型別不同,binding key就是這個繫結時候留在交換機和佇列之間的提示資訊,當訊息傳送出來後,隨著訊息一起傳送的routing key如果和binding key一樣就說明訊息是這個佇列要的東西,如果不一樣那就不要給這個佇列,交換機你找找下個佇列看看要不要.明白了吧,這兩個key就是暗號,對上了就是自己人,對不上那麻煩你再找找去.

  binding key和routing key的配對其實也不是就要完全一樣,還可以'相似'配對,建立交換機的時候,就要告訴MQ,我要宣告的這個交換機和它上面的佇列之間傳輸訊息時候要求routing key和binding key完全一樣,這種模式叫Direct,如果routing key和binding key可以'模糊'匹配,這種模式叫Topic,如果不需要匹配,儘管發,叫Fanout.

2,持久化

  交換機和佇列都可以在建立時候設定為持久化,重啟以後會回覆,但是其中的訊息未不會,如果要訊息也恢復,將訊息釋出到交換機的時候,可以指定一個標誌“Delivery Mode”(投遞模式),  1為非持久化,2為持久化.

3,流控機制

  當訊息生產的速度更快,而程序的處理能力低時,訊息就會堆積起來,佔用記憶體越來越多,導致MQ崩潰,所以rabbitmq有一個流控機制,當超過限定時候就會阻止接受訊息,mq流控有三種機制

      1,主動阻塞住發訊息太快的連線,這個無法調整,如果被阻塞了,在abbitmqctl 控制檯上會顯示一個blocked的狀態。        2,記憶體超過限量,會阻塞連線,在vm_memory_high_watermark可調        3,剩餘磁碟在限定以下mq會 主動阻塞所有的生產者,預設為50m,在disk_free_limit可調. 下面是在centos7上面的,MQ安裝過程.
1,必要的支援
yum install ncurses-devel   unixODBC unixODBC-devel  
2,erlang環境
wget http://www.erlang.org/download/ otp_src_17.3.tar.gz
tar zxvf otp_src_17.3.tar.gz
cd otp_src_17.3 
./configure --without-javac
#忽略警告
make && make install

3,安裝rabbitmq依賴檔案,安裝rabbitmq

複製程式碼
yum install xmlto
wget http://www.rabbitmq.com/releases/rabbitmq-server/v3.4.1/rabbitmq-server-3.4.1.tar.gz
tar zxvf rabbitmq-server-3.4.1.tar.gz
cd rabbitmq-server-3.4.1/
make TARGET_DIR=/usr/rabbitmq SBIN_DIR=/usr/rabbitmq/sbin MAN_DIR=/usr/rabbitmq/man DOC_INSTALL_DIR=/usr/rabbitmq/doc
make TARGET_DIR=/usr/rabbitmq SBIN_DIR=/usr/rabbitmq/sbin MAN_DIR=/usr/rabbitmq/man DOC_INSTALL_DIR=/usr/rabbitmq/doc  install
/usr/rabbitmq/sbin/rabbitmq-server -detached 啟動rabbitmq
/usr/rabbitmq/sbin/rabbitmqctl status 檢視狀態
/usr/rabbitmq/sbin/rabbitmqctl stop 關閉rabbitmq
複製程式碼

4,啟用管理外掛

複製程式碼
mkdir /etc/rabbitmq
cd  /usr/rabbitmq/sbin
./rabbitmq-plugins enable rabbitmq_management  (啟用外掛)
./rabbitmq-plugins disable rabbitmq_management (禁用外掛)
# 重啟rabbitmq
# 訪問 http://127.0.0.1:15672/
# 如果有iptables
# vi /etc/sysconfig/iptables  增加
#    -A INPUT -m state --state NEW -m tcp -p tcp --dport 15672 -j ACCEPT
# 重啟動iptable   systemctl restart iptables.service
複製程式碼

5,建立配置檔案

複製程式碼
#在/usr/rabbitmq/sbin/rabbitmq-defaults 檢視config檔案路徑
# 建立配置檔案 
touch/usr/rabbitmq/sbin
#vm_memory_high_watermark 記憶體低水位線,若低於該水位線,則開啟流控機制,阻止所有請求,預設值是0.4,即記憶體總量的40%,
#vm_memory_high_watermark_paging_ratio 記憶體低水位線的多少百分比開始通過寫入磁碟檔案來釋放記憶體
vi /usr/rabbitmq/sbin/rabbitmq.config 輸入
[
{rabbit, [{vm_memory_high_watermark_paging_ratio, 0.75},
         {vm_memory_high_watermark, 0.7}]}
].
複製程式碼

6,建立環境檔案

複製程式碼
touch /etc/rabbitmq/rabbitmq-env.conf
#輸入
    RABBITMQ_NODENAME=FZTEC-240088 節點名稱
    RABBITMQ_NODE_IP_ADDRESS=127.0.0.1 監聽IP
    RABBITMQ_NODE_PORT=5672 監聽埠
    RABBITMQ_LOG_BASE=/data/rabbitmq/log 日誌目錄
    RABBITMQ_PLUGINS_DIR=/data/rabbitmq/plugins 外掛目錄
    RABBITMQ_MNESIA_BASE=/data/rabbitmq/mnesia 後端儲存目錄
複製程式碼

7,安裝php的rabbitmq擴充套件

複製程式碼
yum install librabbitmq-devel.x86_64
wget http://pecl.php.net/get/amqp-1.4.0.tgz
tar zxvf amqp-1.4.0.tgz
cd amqp-1.4.0
/usr/local/php/bin/phpize
./configure --with-php-config=/usr/local/php/bin/php-config --with-amqp
make && make install
    
vim /usr/local/php/etc/php.ini
#輸入
          extension=amqp.so

service nginx reload
service php-fpm restart
複製程式碼

操作命令

複製程式碼
 檢視exchange資訊
          /usr/rabbitmq/sbin/rabbitmqctl list_exchanges name type durable auto_delete arguments

 檢視佇列資訊
          /usr/rabbitmq/sbin/rabbitmqctl list_queues name durable auto_delete messages consumers me
  檢視繫結資訊
          /usr/rabbitmq/sbin/rabbitmqctl list_bindings
 檢視連線資訊
          /usr/rabbitmq/sbin/rabbitmqctl list_connections
複製程式碼

php的server端指令碼

複製程式碼
<?php
$routingkey='key';
//設定你的連線
$conn_args = array('host' => 'localhost', 'port' => '5672', 'login' => 'guest', 'password' => 'guest');
$conn = new AMQPConnection($conn_args);
if ($conn->connect()) {
    echo "Established a connection to the broker \n";
}
else {
    echo "Cannot connect to the broker \n ";
}
//你的訊息
$message = json_encode(array('Hello World3!','php3','c++3:'));
//建立channel
$channel = new AMQPChannel($conn);
//建立exchange
$ex = new AMQPExchange($channel);
$ex->setName('exchange');//建立名字
$ex->setType(AMQP_EX_TYPE_DIRECT);
$ex->setFlags(AMQP_DURABLE);
//$ex->setFlags(AMQP_AUTODELETE);
//echo "exchange status:".$ex->declare();
echo "exchange status:".$ex->declareExchange();
echo "\n";
for($i=0;$i<100;$i++){
        if($routingkey=='key2'){
                $routingkey='key';
        }else{
                $routingkey='key2';
        }
        $ex->publish($message,$routingkey);
}
/*
$ex->publish($message,$routingkey);
建立佇列
$q = new AMQPQueue($channel);
設定佇列名字 如果不存在則新增
$q->setName('queue');
$q->setFlags(AMQP_DURABLE | AMQP_AUTODELETE);
echo "queue status: ".$q->declare();
echo "\n";
echo 'queue bind: '.$q->bind('exchange','route.key');
將你的佇列繫結到routingKey
echo "\n";

$channel->startTransaction();
echo "send: ".$ex->publish($message, 'route.key'); //將你的訊息通過制定routingKey傳送
$channel->commitTransaction();
$conn->disconnect();
*/
複製程式碼

php客戶端指令碼

複製程式碼
<?php
$bindingkey='key2';
//連線RabbitMQ
$conn_args = array( 'host'=>'127.0.0.1' , 'port'=> '5672', 'login'=>'guest' , 'password'=> 'guest','vhost' =>'/');
$conn = new AMQPConnection($conn_args);
$conn->connect();
//設定queue名稱,使用exchange,繫結routingkey
$channel = new AMQPChannel($conn);
$q = new AMQPQueue($channel);
$q->setName('queue2');
$q->setFlags(AMQP_DURABLE);
$q->declare();
$q->bind('exchange',$bindingkey);
//訊息獲取
$messages = $q->get(AMQP_AUTOACK) ;
if ($messages){
var_dump(json_decode($messages->getBody(), true ));
}
$conn->disconnect();
?>
複製程式碼

翻譯了部分mq常量設定,不正確的地方,大家以試驗為準

複製程式碼
/**
 * Passing in this constant as a flag will forcefully disable all other flags.
 * Use this if you want to temporarily disable the amqp.auto_ack ini setting.
 * 傳遞這個引數作為標誌將完全禁用其他標誌,如果你想臨時禁用amqp.auto_ack設定起效
 */
define('AMQP_NOPARAM', 0);

/**
 * Durable exchanges and queues will survive a broker restart, complete with all of their data.
 * 持久化交換機和佇列,當代理重啟動後依然存在,幷包括它們中的完整資料
 */
define('AMQP_DURABLE', 2);

/**
 * Passive exchanges and queues will not be redeclared, but the broker will throw an error if the exchange or queue does not exist.
 * 被動模式的交換機和佇列不能被重新定義,但是如果交換機和佇列不存在,代理將扔出一個錯誤提示
 */
define('AMQP_PASSIVE', 4);

/**
 * Valid for queues only, this flag indicates that only one client can be listening to and consuming from this queue.
 * 僅對佇列有效,這個人標誌定義佇列僅允許一個客戶端連線並且從其消費訊息
 */
define('AMQP_EXCLUSIVE', 8);

/**
 * For exchanges, the auto delete flag indicates that the exchange will be deleted as soon as no more queues are bound
 * to it. If no queues were ever bound the exchange, the exchange will never be deleted. For queues, the auto delete
 * flag indicates that the queue will be deleted as soon as there are no more listeners subscribed to it. If no
 * subscription has ever been active, the queue will never be deleted. Note: Exclusive queues will always be
 * automatically deleted with the client disconnects.
 * 對交換機而言,自動刪除標誌表示交換機將在沒有佇列繫結的情況下被自動刪除,如果從沒有佇列和其繫結過,這個交換機將不會被刪除.
 * 對佇列而言,自動刪除標誌表示如果沒有消費者和你繫結的話將被自動刪除,如果從沒有消費者和其繫結,將不被刪除,獨佔佇列在客戶斷
 * 開連線的時候將總是會被刪除
 */
define('AMQP_AUTODELETE', 16);

/**
 * Clients are not allowed to make specific queue bindings to exchanges defined with this flag.
 * 這個標誌標識不允許自定義佇列繫結到交換機上
 */
define('AMQP_INTERNAL', 32);

/**
 * When passed to the consume method for a clustered environment, do not consume from the local node.
 * 在叢集環境消費方法中傳遞這個引數,表示將不會從本地站點消費訊息
 */
define('AMQP_NOLOCAL', 64);

/**
 * When passed to the {@link AMQPQueue::get()} and {@link AMQPQueue::get()} methods as a flag,
 * the messages will be immediately marked as acknowledged by the server upon delivery.
 * 當在佇列get方法中作為標誌傳遞這個引數的時候,訊息將在被伺服器輸出之前標誌為acknowledged (已收到)
 */
define('AMQP_AUTOACK', 128);

/**
 * Passed on queue creation, this flag indicates that the queue should be deleted if it becomes empty.
 * 在佇列建立時候傳遞這個引數,這個標誌表示佇列將在為空的時候被刪除
 */
define('AMQP_IFEMPTY', 256);

/**
 * Passed on queue or exchange creation, this flag indicates that the queue or exchange should be
 * deleted when no clients are connected to the given queue or exchange.
 * 在交換機或者佇列建立的時候傳遞這個引數,這個標誌表示沒有客戶端連線的時候,交換機或者佇列將被刪除
 */
define('AMQP_IFUNUSED', 512);

/**
 * When publishing a message, the message must be routed to a valid queue. If it is not, an error will be returned.
 * 當釋出訊息的時候,訊息必須被正確路由到一個有效的佇列,否則將返回一個錯誤
 */
define('AMQP_MANDATORY', 1024);

/**
 * When publishing a message, mark this message for immediate processing by the broker. (High priority message.)
 * 當釋出訊息時候,這個訊息將被立即處理.
 */
define('AMQP_IMMEDIATE', 2048);

/**
 * If set during a call to {@link AMQPQueue::ack()}, the delivery tag is treated as "up to and including", so that multiple
 * messages can be acknowledged with a single method. If set to zero, the delivery tag refers to a single message.
 * If the AMQP_MULTIPLE flag is set, and the delivery tag is zero, this indicates acknowledgement of all outstanding
 * messages.
 * 當在呼叫AMQPQueue::ack時候設定這個標誌,傳遞標籤將被視為最大包含數量,以便通過單個方法標示多個訊息為已收到,如果設定為0
 * 傳遞標籤指向單個訊息,如果設定了AMQP_MULTIPLE,並且傳遞標籤是0,將所有未完成訊息標示為已收到
 */
define('AMQP_MULTIPLE', 4096);

/**
 * If set during a call to {@link AMQPExchange::bind()}, the server will not respond to the method.The client should not wait
 * for a reply method. If the server could not complete the method it will raise a channel or connection exception.
 * 當在呼叫AMQPExchange::bind()方法的時候,伺服器將不響應請求,客戶端將不應該等待響應,如果伺服器無法完成該方法,將會丟擲一個異常
 */
define('AMQP_NOWAIT', 8192);

/**
 * If set during a call to {@link AMQPQueue::nack()}, the message will be placed back to the queue.
 * 如果在呼叫AMQPQueue::nack方法時候設定,訊息將會被傳遞迴佇列
 */
define('AMQP_REQUEUE', 16384);

/**
 * A direct exchange type.
 * direct型別交換機
 */
define('AMQP_EX_TYPE_DIRECT', 'direct');

/**
 * A fanout exchange type.
 * fanout型別交換機
 */
define('AMQP_EX_TYPE_FANOUT', 'fanout');

/**
 * A topic exchange type.
 * topic型別交換機
 */
define('AMQP_EX_TYPE_TOPIC', 'topic');

/**
 * A header exchange type.
 * header型別交換機
 */
define('AMQP_EX_TYPE_HEADERS', 'headers');

/**
 * socket連線超時設定
 */
define('AMQP_OS_SOCKET_TIMEOUT_ERRNO', 536870947);