1. 程式人生 > >RabbitMQ(一)publish訊息確認

RabbitMQ(一)publish訊息確認


Using standard AMQP, the only way to guarantee that a message isn't lost is by using transactions -- make the channel transactional, publish the message, commit. In this case, transactions are unnecessarily heavyweight and decrease throughput by a factor of 250. To remedy this, a confirmation mechanism was introduced.


如果採用標準的 AMQP 協議,則唯一能夠保證訊息不會丟失的方式是利用事務機制 -- 令 channel 處於 transactional 模式、向其 publish 訊息、執行 commit 動作。在這種方式下,事務機制會帶來大量的多餘開銷,並會導致吞吐量下降 250% 。為了補救事務帶來的問題,引入了 confirmation 機制(即 Publisher Confirm)。


To enable confirms, a client sends the confirm.select method. Depending on whether no-wait was set or not, the broker may respond with a confirm.select-ok. Once the confirm.select method is used on a channel, it is said to be in confirm mode. A transactional channel cannot be put into confirm mode and once a channel is in confirm mode, it cannot be made transactional.


為了使能 confirm 機制,client 首先要傳送 confirm.select 方法幀。取決於是否設定了 no-wait 屬性,broker 會相應的判定是否以 confirm.select-ok 進行應答。一旦在 channel 上使用 confirm.select方法,channel 就將處於 confirm 模式處於 transactional 模式的 channel 不能再被設定成 confirm 模式,反之亦然。


Once a channel is in confirm mode, both the broker and the client count messages (counting starts at 1 on the first confirm.select). The broker then confirms messages as it handles them by sending a basic.ack on the same channel. The delivery-tag field contains the sequence number of the confirmed message. The broker may also set the multiple field in basic.ack to indicate that all messages up to and including the one with the sequence number have been handled.


一旦 channel 處於 confirm 模式,broker 和 client 都將啟動訊息計數(以 confirm.select 為基礎從 1 開始計數)。broker 會在處理完訊息後,在當前 channel 上通過傳送 basic.ack 的方式對其進行 confirm 。delivery-tag 域的值標識了被 confirm 訊息的序列號。broker 也可以通過設定 basic.ack 中的 multiple 域來表明到指定序列號為止的所有訊息都已被 broker 正確的處理了。


In exceptional cases when the broker is unable to handle messages successfully, instead of a basic.ack, the broker will send a basic.nack. In this context, fields of the basic.nack have the same meaning as the corresponding ones in basic.ack and the requeue field should be ignored. By nack'ing one or more messages, the broker indicates that it was unable to process the messages and refuses responsibility for them; at that point, the client may choose to re-publish the messages.

在異常情況中,broker 將無法成功處理相應的訊息,此時 broker 將傳送 basic.nack 來代替 basic.ack 。在這個情形下,basic.nack 中各域值的含義與 basic.ack 中相應各域含義是相同的,同時 requeue 域的值應該被忽略。通過 nack 一或多條訊息broker 表明自身無法對相應訊息完成處理,並拒絕為這些訊息的處理負責。在這種情況下,client 可以選擇將訊息 re-publish 。


After a channel is put into confirm mode, all subsequently published messages will be confirmed or nack'd once. No guarantees are made as to how soon a message is confirmed. No message will be both confirmed and nack'd.

在 channel 被設定成 confirm 模式之後,所有被 publish 的後續訊息都將被 confirm(即 ack) 或者被 nack 一次。但是沒有對訊息被 confirm 的快慢做任何保證,並且同一條訊息不會既被 confirm 又被 nack 。


An example in Java that publishes a large number of messages to a channel in confirm mode and waits for the acknowledgements can be found 
here.
一個 Java 示例展現了 publish 大量訊息到一個處於 confirm 模式的 channel 並等待獲取 acknowledgement 的情況,示例在這裡


When will messages be confirmed?
訊息會在何時被 confirm?


The broker will confirm messages once:

broker 將在下面的情況中對訊息進行 confirm :

  • it decides a message will not be routed to queues
    (if the mandatory flag is set then the basic.return is sent first) or
    broker 發現當前訊息無法被路由到指定的 queues 中(如果設定了 mandatory 屬性,則 broker 會先發送basic.return
  • a transient message has reached all its queues (and mirrors) or
    非持久屬性的訊息到達了其所應該到達的所有 queue 中(和映象 queue 中)
  • a persistent message has reached all its queues (and mirrors) and been persisted to disk (and fsynced) or
    持久訊息到達了其所應該到達的所有 queue 中(和映象 queue 中),並被持久化到了磁碟(被 fsync)
  • a persistent message has been consumed (and if necessary acknowledged) from all its queues
    持久訊息從其所在的所有 queue 中被 consume 了(如果必要則會被 acknowledge)



Notes


The broker loses persistent messages if it crashes before said messages are written to disk. Under certain conditions, this causes the broker to behave in surprising ways.

broker 會丟失持久化訊息,如果 broker 在將上述訊息寫入磁碟前異常。在一定條件下,這種情況會導致 broker 以一種奇怪的方式執行。


For instance, consider this scenario:

例如,考慮下述情景:

  1. a client publishes a persistent message to a durable queue
    一個 client 將持久訊息 publish 到持久 queue 中
  2. a client consumes the message from the queue (noting that the message is persistent and the queue durable), but doesn't yet ack it,
    另一個 client 從 queue 中 consume 訊息(注意:該訊息具有持久屬性,並且 queue 是持久化的),當尚未對其進行 ack 
  3. the broker dies and is restarted, and
    broker 異常重啟
  4. the client reconnects and starts consuming messages.
    client 重連並開始 consume 訊息

At this point, the client could reasonably assume that the message will be delivered again. This is not the case: the restart has caused the broker to lose the message. In order to guarantee persistence, a client should use confirms. If the publisher's channel had been in confirm mode, the publisher would 
not have received an ack for the lost message (since the consumer hadn't ack'd it and it hadn't been written to disk).

在上述情景下,client 有理由認為訊息需要被(broker)重新 deliver 。但這並非事實:重啟(有可能)會令 broker 丟失訊息。為了確保永續性,client 應該使用 confirm 機制。如果 publisher 使用的 channel 被設定為 confirm 模式,publisher 將不會收到已丟失訊息的 ack(這是因為 consumer 沒有對訊息進行 ack ,同時該訊息也未被寫入磁碟)。


下面是用rabbitmq-c實現publish訊息確認的程式碼:

/*
 *
 * gcc -o amqp_sendstring amqp_sendstring.c utils.c -I/usr/local/rabbitmq-c/include -L/usr/local/rabbitmq-c/lib -lrabbitmq
 *
 */

#include <stdlib.h>
#include <stdio.h>
#include <string.h>
#include <sys/time.h>
#include <unistd.h>
#include <stdint.h>

#include <amqp_tcp_socket.h>
#include <amqp.h>
#include <amqp_framing.h>

#include "utils.h"

int main(int argc, char const *const *argv)
{
  char const *hostname;
  int port, status;
  char const *exchange;
  char const *routingkey;
  char const *messagebody;
  amqp_socket_t *socket = NULL;
  amqp_connection_state_t conn;

  if (argc < 6) {
    fprintf(stderr, "Usage: amqp_sendstring host port exchange routingkey messagebody\n");
    return 1;
  }

  hostname = argv[1];
  port = atoi(argv[2]);
  exchange = argv[3];
  routingkey = argv[4];
  messagebody = argv[5];

  conn = amqp_new_connection();

  socket = amqp_tcp_socket_new(conn);
  if (!socket) {
    die("creating TCP socket");
  }

  status = amqp_socket_open(socket, hostname, port);
  if (status) {
    die("opening TCP socket");
  }

  die_on_amqp_error(amqp_login(conn, "/", 0, 131072, 0, AMQP_SASL_METHOD_PLAIN, "guest", "guest"),
                    "Logging in");
  amqp_channel_open(conn, 1);
  die_on_amqp_error(amqp_get_rpc_reply(conn), "Opening channel");

  {
    amqp_basic_properties_t props;
    props._flags = AMQP_BASIC_CONTENT_TYPE_FLAG | AMQP_BASIC_DELIVERY_MODE_FLAG;
    props.content_type = amqp_cstring_bytes("text/plain");
    props.delivery_mode = 2; /* persistent delivery mode */

    amqp_confirm_select(conn, 1);  //在通道上開啟Publish確認
    die_on_error(amqp_basic_publish(conn,
                                    1,
                                    amqp_cstring_bytes(exchange),
                                    amqp_cstring_bytes(routingkey),
                                    0,   //mandatory標誌位,訊息不能到達佇列則返回basic.return
                                    0,   //immediate標誌位,訊息不能到達消費者返回basic.return
                                    &props,
                                    amqp_cstring_bytes(messagebody)),
                 "Publishing");
  }

  {
    /* Publish訊息後需要在當前通道上監聽返回的資訊,來判斷訊息是否成功投遞
     * 這裡要息根據投遞訊息的方式來過濾判斷幾個方法
     */
    amqp_frame_t frame;
    amqp_rpc_reply_t ret;

    if (AMQP_STATUS_OK != amqp_simple_wait_frame(conn, &frame)) {
      return;
    }

    if (AMQP_FRAME_METHOD == frame.frame_type) {
      amqp_method_t method = frame.payload.method;
      fprintf(stdout, "method.id=%08X,method.name=%s\n",
        method.id, amqp_method_name(method.id));
      switch (method.id) {
        case AMQP_BASIC_ACK_METHOD:{
          /* if we've turned publisher confirms on, and we've published a message
           * here is a message being confirmed
           */
          {
            amqp_basic_ack_t *s;
            s = (amqp_basic_ack_t *) method.decoded;
            fprintf(stdout, "Ack.delivery_tag=%d\n", s->delivery_tag);
            fprintf(stdout, "Ack.multiple=%d\n", s->multiple);
          }

          break;

        case AMQP_BASIC_NACK_METHOD:
          /* if we've turned publisher confirms on, and we've published a message
           * here is a message not being confirmed
           */
          {
            amqp_basic_nack_t *s;
            s = (amqp_basic_nack_t *) method.decoded;
            fprintf(stdout, "NAck.delivery_tag=%d\n", s->delivery_tag);
            fprintf(stdout, "NAck.multiple=%d\n", s->multiple);
            fprintf(stdout, "NAck.requeue=%d\n", s->requeue);
          }

          break;

        case AMQP_BASIC_RETURN_METHOD:
          /* if a published message couldn't be routed and the mandatory flag was set
           * this is what would be returned. The message then needs to be read.
           */
          {
            amqp_message_t message;
            amqp_basic_return_t *s;
            char str[1024];
            s = (amqp_basic_return_t *) method.decoded;
            fprintf(stdout, "Return.reply_code=%d\n", s->reply_code);
            strncpy(str, s->reply_text.bytes, s->reply_text.len); str[s->reply_text.len] = 0;
            fprintf(stdout, "Return.reply_text=%s\n", str);

            ret = amqp_read_message(conn, frame.channel, &message, 0);
            if (AMQP_RESPONSE_NORMAL != ret.reply_type) {
              return;
            }
            strncpy(str, message.body.bytes, message.body.len); str[message.body.len] = 0;
            fprintf(stdout, "Return.message=%s\n", str);

            amqp_destroy_message(&message);
          }

          break;

        case AMQP_CHANNEL_CLOSE_METHOD:
          /* a channel.close method happens when a channel exception occurs, this
           * can happen by publishing to an exchange that doesn't exist for example
           *
           * In this case you would need to open another channel redeclare any queues
           * that were declared auto-delete, and restart any consumers that were attached
           * to the previous channel
           */
          return;

        case AMQP_CONNECTION_CLOSE_METHOD:
          /* a connection.close method happens when a connection exception occurs,
           * this can happen by trying to use a channel that isn't open for example.
           *
           * In this case the whole connection must be restarted.
           */
          return;

        default:
          fprintf(stderr ,"An unexpected method was received %d\n", frame.payload.method.id);
          return;
      }
    }
  }

  }

  die_on_amqp_error(amqp_channel_close(conn, 1, AMQP_REPLY_SUCCESS), "Closing channel");
  die_on_amqp_error(amqp_connection_close(conn, AMQP_REPLY_SUCCESS), "Closing connection");
  die_on_error(amqp_destroy_connection(conn), "Ending connection");

  return 0;
}


相關推薦

RabbitMQpublish訊息確認

Using standard AMQP, the only way to guarantee that a message isn't lost is by using transactions -- make the channel transactional,

python採用pika庫使用rabbitmqPublish\Subscribe(訊息釋出\訂閱)

之前的例子都基本都是1對1的訊息傳送和接收,即訊息只能傳送到指定的queue裡,但有些時候你想讓你的訊息被所有的Queue收到,類似廣播的效果,這時候就要用到exchange了, Exchange在定義的時候是有型別的,以決定到底是哪些Queue符合條件,可以接收訊息 fanout: 所有bin

SpringBoot 整合 RabbitMQ包含三種訊息確認機制以及消費端限流

目錄 說明 生產端 消費端 說明 本文 SpringBoot 與 RabbitMQ 進行整合的時候,包含了三種訊息的確認模式,如果查詢詳細的確認模式設定,請閱讀:RabbitMQ的三種訊息確認

訊息中介軟體——RabbitMQWindows/Linux環境搭建完整版

前言 最近在學習訊息中介軟體——RabbitMQ,打算把這個學習過程記錄下來。此章主要介紹環境搭建。此次主要是單機搭建(條件有限),包括在Windows、Linux環境下的搭建,以及RabbitMQ的監控平臺搭建。 環境準備 在搭建RabbitMQ之前,請先確保如下環境已經搭建完畢 Java環境(我

RabbitMQ——簡介

.json 文章 不同 ive 守護 運行 發布 密碼 機器學習 RabbitMQ(一) ——簡介 (轉載請附上本文鏈接——linhxx) 一、概述 RabbitMQ是一種消息的傳輸者(broker),除了消息持久化,不對消息內容本身做任何的處理。其類似於郵局,可以

RabbitMQ

加權 glib emc pat enable ava sse2 asm openssl RabbitMQ——Linux環境安裝配置指南 Erlang安裝 由於RabbitMQ是基於Erlang開發,所以需要安裝Erlang環境,可以到官網上下載安裝,也可以使用rpm命令安裝

python採用pika庫使用rabbitmq

  1 安裝: centos 執行yum install rabbitmq-server-3.5.2-1.noarch.rpm  進行安裝 ps:這裡不用 rpm -ivh rabbitmq-server-3.5.2-1.noarch.rpm進行安裝,而是用yum ins

Java架構之訊息佇列 訊息佇列的概述

訊息佇列系列分享大綱:  一、訊息佇列的概述 二、訊息佇列之RabbitMQ的使用 三、訊息佇列之Kafka的使用 四、訊息佇列之RabbitMQ的原理詳解 五、訊息佇列之Kafka的原理詳解 六、訊息佇列之面試集錦 1.訊息佇列的概述 訊息佇列(Me

學習之路-RabbitMQ:什麼是RabbitMQ

** RabbitMQ ** MQ全稱為Message Queue,即訊息佇列, RabbitMQ是由erlang語言開發,基於AMQP(Advanced Message Queue 高階訊息佇列協議)協議實現的訊息佇列,它是一種應用程式之間的通訊方法,訊息佇列在分散式系統開 發中應

訊息佇列】MSMQ——微軟訊息佇列簡介及安裝

一、前言       從這篇部落格開始小編就從一個簡單的例項來展示一下訊息佇列中MSMQ的基本使用方法,展示一下他對訊息的增刪改查,訊息佇列有很多種樣式,做.NET開發的程式猿,最容易安裝的就是MSM

RabbitMQ Win7系統下的安裝與配置

安裝 下載Erlang,地址:http://www.erlang.org/download/otp_win32_R15B.exe,雙擊安裝即可(首先裝) 下載RabbitMQ,地址:http://www.rabbitmq.com/releases/rabbitmq-serv

RabbitMQRabbitMQ安裝

一、安裝RabbitMQ 這裡為了方便,採用Docker安裝,不會Docker的同學,安裝windows版本的RabbitMQ吧,因為Linux系統安裝因為版本問題,可能會出現各種各樣的問題。

MFC——WINDOWS訊息機制

建立一個完整的視窗需要經過下面四個操作步驟: 設計一個視窗類; 註冊視窗類; 建立視窗; 顯示及更新視窗。 #include <Windows.h> #include <stdio.h> LRESULT CALLBACK Wi

SpringBoot整合RabbitMQ:簡單使用

我所用的Springboot 的版本是1.5.13先來看pom檔案:<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns

訊息中介軟體--RabbitMQ學習

Activemq介紹 Activemq是 Apache出品,最流行的能力強勁的開源訊息匯流排,並且它個完全支援MS規範的訊息中介軟體。 其豐富的AP、多種叢集構建模式使得他成為業界老牌訊息中介軟體,在中小型企業中應用廣泛。 MQ衡量指標:服務效能、資料儲存、叢集架構

python採用pika庫使用rabbitmq訊息確認Message acknowledgment

從上篇文章可知,每個工作者,都會依次分配到任務。那麼如果一個工作者,在處理任務的時候掛掉,這個任務就沒有完成,應當交由其他工作者處理。所以應當有一種機制,當一個工作者完成任務時,會反饋訊息。 訊息確認就是當工作者完成任務後,會反饋給rabbitmq 修改receive.py的內容: 1 def c

RabbitMQ訊息佇列: Detailed Introduction 詳細介紹

原文地址:http://blog.csdn.net/anzhsoft/article/details/19563091 1. 歷史     RabbitMQ是一個由erlang開發的AMQP(Advanced Message Queue )的開源實現。AMQP 的

RabbitMQ訊息佇列系列教程認識RabbitMQ

摘要 RabbitMQ是最為流行的訊息中介軟體,是處理高併發業務的利器。本系列教程,將跟大家一起學習RabbitMQ。 目錄 一、RabbitMQ是什麼? RabbitMQ是基於Erlang開發的目前最流行的開源訊息中介軟體,類似於MSMQ、ActiveMQ等訊息佇列元件。RabbitMQ是輕量級

PHP版 RabbitMQ小技巧用程式碼獲得伺服器上的訊息佇列名

  最近在新浪開發一個用了MQ的URL監控專案,對PHP版的RabbitMQ有一些瞭解。想先吐個槽,就算是今年1月份的某些MQ的教程,很多都是不對的,和我在4月份看到的 官方的Tutorial 都

C# Queue與RabbitMQ的愛恨情仇文末附原始碼:Q與MQ訊息佇列簡單應用

首先我們簡單瞭解一下什麼堆、棧、佇列。 堆是在程式執行時,而不是在程式編譯時,申請某個大小的記憶體空間。即動態分配記憶體,對其訪問和對一般記憶體的訪問沒有區別。 棧就是一個容器,後放進去的先拿出來,它下面本來有的東西要等它出來之後才能出來。(先進後出or後進先出) 佇列只能在隊頭做刪除操作,在隊尾做插入操作.