1. 程式人生 > >rabbitmq消息流轉分析

rabbitmq消息流轉分析

lar turned nor tac eply null urn soc fault

連接(connection):消費者或者生產者與消息中間件建立的tcp連接;

頻道(channel):tcp連接建立之後,必須現在連接上開頻道,才能進行其他操作(原因?

登錄(logging):建立頻道之後,要登錄到特定的虛擬機,一組虛擬機持有一組交換機和隊列,其他虛擬機用戶無法訪問當前用戶對應的虛擬機中的交換機和隊列;

交換機(exchange):在rabbitmq消息中間件啟動時就會創建一個默認的交換機(當然也可以人為創建),與連接無關,負責整個消息中間件中消息的投遞;交換機不會存儲消息,

如果沒有任何隊列與之綁定,那麽交換機會丟棄收到的消息;

隊列(queue):用來存儲交換機投遞過來的消息,通過路由鍵與交換機綁定,進行消息的持久化存儲;

隊列由消費者或者生產者連上消息中間件後自行創建,人為指定隊列名稱,如果當前創建的隊列rabbitmq上已經存在,rabbitmq不會重復創建;

路由鍵(routingkey):交換機和隊列進行消息投遞的識別碼,人為指定;

一、生產者發送消息:

conn = amqp_new_connection();//

socket = amqp_tcp_socket_new(conn);

status = amqp_socket_open(socket, hostname, port);

amqp_login(conn, "ois", 0, 131072, 0, AMQP_SASL_METHOD_PLAIN, "ois", "1");

amqp_channel_open(conn, 1);

die_on_amqp_error(amqp_get_rpc_reply(conn), "Opening channel");

amqp_bytes_t queuename;

{

???? amqp_queue_declare_ok_t *r = amqp_queue_declare(conn, 1, amqp_cstring_bytes("myqueue"), 0, 0, 0, 1,

???????? amqp_empty_table);

???? die_on_amqp_error(amqp_get_rpc_reply(conn), "Declaring queue");

???? queuename = amqp_bytes_malloc_dup(r->queue);

???? if (queuename.bytes == NULL) {

???????? fprintf(stderr, "Out of memory while copying queue name");

???????? return 1;

???? }

}//創建名稱為myqueue的隊列

amqp_queue_bind(conn, 1, queuename, amqp_cstring_bytes(exchange), amqp_cstring_bytes(routingkey),

???? amqp_empty_table);

die_on_amqp_error(amqp_get_rpc_reply(conn), "Binding queue");//將隊列myqueue通過路由鍵routingkey綁定到交換機exchange上

?

for (;;)

{

amqp_basic_properties_t props;

props._flags = AMQP_BASIC_CONTENT_TYPE_FLAG | AMQP_BASIC_DELIVERY_MODE_FLAG;

props.content_type = amqp_cstring_bytes("text/plain");

props.delivery_mode = 2; /* persistent delivery mode */

die_on_error(amqp_basic_publish(conn,

1,

amqp_cstring_bytes(exchange),

amqp_cstring_bytes(routingkey),

0,

0,

&props,

amqp_cstring_bytes("test message")),

"Publishing");

????microsleep(1*1000*100);

}//生產者發布消息,發布消息需要指定接收消息的交換機,以及路由鍵,交換機需要根據路由鍵投遞消息到具體的隊列中

二、消費者獲取消息進行處理

static void run(amqp_connection_state_t conn)

{

uint64_t start_time = now_microseconds();

int received = 0;

int previous_received = 0;

uint64_t previous_report_time = start_time;

uint64_t next_summary_time = start_time + SUMMARY_EVERY_US;

?

amqp_frame_t frame;

?

uint64_t now;

?

for (;;) {

amqp_rpc_reply_t ret;

amqp_envelope_t envelope;

?

now = now_microseconds();

if (now > next_summary_time) {

int countOverInterval = received - previous_received;

double intervalRate = countOverInterval / ((now - previous_report_time) / 1000000.0);

printf("%d ms: Received %d - %d since last report (%d Hz)\n",

(int)(now - start_time) / 1000, received, countOverInterval, (int) intervalRate);

?

previous_received = received;

previous_report_time = now;

next_summary_time += SUMMARY_EVERY_US;

}

?

amqp_maybe_release_buffers(conn);

ret = amqp_consume_message(conn, &envelope, NULL, 0);//定時獲取隊列中的消息

?

if (AMQP_RESPONSE_NORMAL != ret.reply_type) {

if (AMQP_RESPONSE_LIBRARY_EXCEPTION == ret.reply_type &&

AMQP_STATUS_UNEXPECTED_STATE == ret.library_error) {

if (AMQP_STATUS_OK != amqp_simple_wait_frame(conn, &frame)) {

return;

}

?

if (AMQP_FRAME_METHOD == frame.frame_type) {

switch (frame.payload.method.id) {

case AMQP_BASIC_ACK_METHOD:

/* if we‘ve turned publisher confirms on, and we‘ve published a message

* here is a message being confirmed

*/

?

break;

case AMQP_BASIC_RETURN_METHOD:

/* if a published message couldn‘t be routed and the mandatory flag was set

* this is what would be returned. The message then needs to be read.

*/

{

amqp_message_t message;

ret = amqp_read_message(conn, frame.channel, &message, 0);

if (AMQP_RESPONSE_NORMAL != ret.reply_type) {

return;

}

?

amqp_destroy_message(&message);

}

?

break;

?

case AMQP_CHANNEL_CLOSE_METHOD:

/* a channel.close method happens when a channel exception occurs, this

* can happen by publishing to an exchange that doesn‘t exist for example

*

* In this case you would need to open another channel redeclare any queues

* that were declared auto-delete, and restart any consumers that were attached

* to the previous channel

*/

return;

?

case AMQP_CONNECTION_CLOSE_METHOD:

/* a connection.close method happens when a connection exception occurs,

* this can happen by trying to use a channel that isn‘t open for example.

*

* In this case the whole connection must be restarted.

*/

return;

?

default:

fprintf(stderr ,"An unexpected method was received %u\n", frame.payload.method.id);

return;

}

}

}

?

} else {

amqp_destroy_envelope(&envelope);

}

?

received++;

}

}

?

int main(int argc, char const *const *argv)

{

conn = amqp_new_connection();

socket = amqp_tcp_socket_new(conn);

status = amqp_socket_open(socket, hostname, port);

die_on_amqp_error(amqp_login(conn, "ot", 0, 131072, 0, AMQP_SASL_METHOD_PLAIN, "guest", "guest"),

"Logging in");

amqp_channel_open(conn, 1);

die_on_amqp_error(amqp_get_rpc_reply(conn), "Opening channel");

{

amqp_queue_declare_ok_t *r = amqp_queue_declare(conn, 1, amqp_empty_bytes, 0, 0, 0, 1,

amqp_empty_table);

die_on_amqp_error(amqp_get_rpc_reply(conn), "Declaring queue");

queuename = amqp_bytes_malloc_dup(r->queue);

if (queuename.bytes == NULL) {

fprintf(stderr, "Out of memory while copying queue name");

return 1;

}

}

?

amqp_queue_bind(conn, 1, queuename, amqp_cstring_bytes(exchange), amqp_cstring_bytes(bindingkey),

amqp_empty_table);//綁定消息隊列

die_on_amqp_error(amqp_get_rpc_reply(conn), "Binding queue");

?

amqp_basic_consume(conn, 1, queuename, amqp_empty_bytes, 0, 1, 0, amqp_empty_table);

die_on_amqp_error(amqp_get_rpc_reply(conn), "Consuming");//指定這是一個消費者

?

run(conn);

?

die_on_amqp_error(amqp_channel_close(conn, 1, AMQP_REPLY_SUCCESS), "Closing channel");

die_on_amqp_error(amqp_connection_close(conn, AMQP_REPLY_SUCCESS), "Closing connection");

die_on_error(amqp_destroy_connection(conn), "Ending connection");

?

return 0;

}

?

?

?

?

?

?

?

rabbitmq消息流轉分析