1. 程式人生 > >RabbitMQ第三課 基本概念和exchange

RabbitMQ第三課 基本概念和exchange

通信通道 路由規則 消息發送 display param 產生 parent 需要 rabbitmq

Rabbitmq使用必須理解的一些概念
(轉自:http://www.linuxidc.com/Linux/2013-11/92591.htm)
channel:通道,amqp支持一個tcp連接上啟用多個mq通信通道,每個通道都可以被作為通信流。
producer:生產者,是消息產生的源頭。
exchange:交換機,可以理解為具有路由表的路由規則。
queues:隊列,裝載消息的緩存容器。
consumer:消費者,連接到隊列並取走消息的客戶端。
核心思想:在RabbitMQ中,生產者從不直接將消息發送給隊列。
事實上,有些生產者甚至不知道消息是否被送到某個隊列中去了。生產者只負責將消息送給交換機,而交換機確切地知道什麽消息應該送到哪。
bind:綁定,實際上可以理解為交換機的路由規則。每個消息都有一個稱為路由鍵的屬性(routing key),就是一個簡單的字符串。一個綁定將【交換機,路由鍵,消息送達隊列】三者綁定在一起,形成一條路由規則。
exchange type:交換機類型:
fanout:不處理路由鍵,轉發到所有綁定的隊列上
direct:處理路由鍵,必須完全匹配,即路由鍵字符串相同才會轉發
topic:路由鍵模式匹配,此時隊列需要綁定要一個模式上。符號“#”匹配一個或多個詞,符號“*”匹配不多不少一個詞。因此“audit.#”能夠匹配到“audit.irs.corporate”,但是“audit.*” 只會匹配到“audit.irs”


創建或者聲明一個exchange
/**
* amqp_exchange_declare
*
* @param [in] connect連接 amqp_new_connection獲取
* @param [in] channel the channel to do the RPC on,程序自己設置一個通道號,一個連接可以多個通道號。
* @param [in] exchange 指定exchange名稱 eg:amqp_cstring_bytes("exchange_cat")
* @param [in] type 指定exchange類型,amqp_cstring_bytes("direct")

* "fanout" 廣播的方式,發送到該exchange的所有隊列上。
* "direct" 通過路由鍵發送到指定的隊列上。
* "topic" 通過匹配路由鍵的方式獲取,使用通配符*,#
* @param [in] passive 檢測exchange是否存在,設為true,若隊列存在則命令成功返回(調用其他參數不會影響exchange屬性),若不存在不會創建exchange,返回錯誤。設為false,如果exchange不存在則創建exchange,調用成功返回。如果exchange已經存在,並且匹配現在exchange的話則成功返回,如果不匹配則exchange聲明失敗。
* @param [in] durable 隊列是否持久化
* @param [in] auto_delete 連接斷開的時候,exchange是否自動刪除
* @param [in] internal internal
* @param [in] arguments arguments
* @returns amqp_exchange_declare_ok_t
*/
AMQP_PUBLIC_FUNCTION
amqp_exchange_declare_ok_t *
AMQP_CALL amqp_exchange_declare(amqp_connection_state_t state, amqp_channel_t channel, amqp_bytes_t exchange, amqp_bytes_t type, amqp_boolean_t passive, amqp_boolean_t durable, amqp_boolean_t auto_delete, amqp_boolean_t internal, amqp_table_t arguments);

調用函數之後,使用amqp_get_rpc_reply(conn)來獲取調用結果。

amqp_exchange_declare(conn, 1, amqp_cstring_bytes("exchange_cat"), amqp_cstring_bytes("direct"), 1, 1, 0,0, amqp_empty_table);

die_on_amqp_error(amqp_get_rpc_reply(conn), "Declaring exchange");


註意事項

amqp_exchange_declare函數中的參數passive設置為0,會出現在聲明exchange之後,聲明隊列amqp_queue_declare處於死循環的問題


amqp_exchange_declare(m_connState, 1, amqp_cstring_bytes(strExchange.c_str()), amqp_cstring_bytes("fanout"), 0, 1, 0, 0, amqp_empty_table);

amqp_queue_declare(m_connState, 1, amqp_cstring_bytes(strQueue.c_str()), 0, 0, 0, 1, amqp_empty_table);

amqp_queue_bind(m_connState, 1, amqp_cstring_bytes(strQueue.c_str()), amqp_cstring_bytes(strExchange.c_str()), amqp_empty_bytes, amqp_empty_table);



RabbitMQ第三課 基本概念和exchange