1. 程式人生 > >RabbitMQ 相關問題總結--RabbitMQ 如何確保訊息傳送和消費?

RabbitMQ 相關問題總結--RabbitMQ 如何確保訊息傳送和消費?

RabbitMQ 相關問題總結

HA 的RabbitMQ 叢集架構:


一、RabbitMQ 如何高可用部署,如何確保叢集不宕機?

RabbitMQ可用採用三種方式來部署叢集:

1. cluster 

    a. 不支援跨網段,用於同一個網段內的區域網。

    b.可用隨意的或者動態的增加 或者減少。

    c.節點之間需要執行相同版本的RabbitMQ 和Erlang

2.federation

    部署在廣域網,允許單臺伺服器上的交換機或者佇列,接收發布到另外一臺機器上的訊息和佇列。

3.sholve

   和federation類似,工作在更低層次,可以應用於廣域網

二、節點型別:

1.RAM Node

    記憶體節點,將所有的佇列,交換機, 繫結,使用者 許可權與vhost的元資料儲存在記憶體中,可以讓佇列和交換機宣告更加的便捷。

2. Disk Node 

    將元資料儲存在磁碟中,單節點系統,只執行 磁碟型別的節點,防止重啟RabbitMQ時,丟失系統的配置資訊。

三、ErLang Cookie

    ErLang Cookie 是保證不同節點之間的通訊,不同節點之間共享相同的Cookie,叢集部署時候,需要copy這個資料到不同的節點,使得cookie一致。

四、RabbitMQ叢集模式分類

1.普通模式:

     預設的叢集模式,假設叢集上有兩個節點 node1,node2, 訊息實體只存在一個節點,如果生產者生產一個訊息,丟向node1,但是消費者在node2進行消費,那麼就需要node2將node1中的訊息取出,並且傳送給消費者。

2. 映象模式:

    需要將消費者的佇列變成映象佇列,存在於多個節點。實現RabbitMQ的高可用,作用就是,訊息實體會在佇列之間同步。

RabbitMQ相關操作命令:

1. 單個節點的服務啟動 停止

rabbitmqctl stop
rabbitmq-server -detached 

2.查詢節點狀態
rabbitmqctl cluster_status

調研:

1. 產品定位(能做什麼,不能做什麼)
2. 產品特性(適合做什麼,不適合做什麼)
3. 產品背景(社群活躍度,團隊知名度,維護力度,口碑)
4. 產品架構(實現原理)
5. 產品安裝(使用者指南)
6. 產品維護(運維管理)

demo 驗證

RabbitMQ 如何確保訊息傳送和消費:



   從RabbitMQ 結構圖來看,有如下幾個 過程:

1. 訊息從生產者Producer傳送到交換機Exchange

2.交換機根據路由規則將訊息轉發到相應佇列

3. 佇列將訊息進行儲存

4.消費者訂閱佇列訊息,並進行消費

第一個過程  訊息 從生產者傳送到交換機

a.中間網路斷開怎麼辦? 

rabbitmq 的解決方案:

1). 設定通道channel 為事務模式 

   通過channel.txSelect 開啟事務,channel.txCommit 提交事務,channel.txRollback 用於事務回滾

如果在還沒有提交事務之前,RabbitMQ丟擲異常,我們可以 將其捕獲,然後進行事務回滾。缺點是 事務模式會極大的消耗RabbitMQ的效能。

2). 設定通道confirm 模式

    通過confirm.select開啟confirm模式,如果設定了no_wait為 false的話,那麼broker會返回confirm.select_ok,表示broker同意將通道設定為confirm模式。 但是這個優點是傳送方確認是非同步的,傳送方可以不等到確認就傳送下一條訊息。當訊息被brocker接收,broker 會發送basic.ack,生產者可以通過回撥函式確認這個訊息,如果因為RabbitMQ本身問題導致訊息丟失,broker會發送basic.nack,生產者通過回撥方法接收到nack後,可以考慮訊息重發

 要注意的是 事務模式 和confirm模式不能共存,是互斥的。

@Service("confirmCallBackListener")
public class ConfirmCallBackListener implements ConfirmCallback{

	@Override
	public void confirm(CorrelationData correlationData, boolean ack, String cause) {
		System.out.println("confirm--:correlationData:"+correlationData+",ack:"+ack+",cause:"+cause);
	}

}


第二個過程  訊息由交換機路由到訊息佇列

    對於不能路由到訊息佇列的訊息,如果設定了mandatory的話,basic.return 會在basic.ack或者basic.nack之前返回。

    對於可以路由到訊息佇列的訊息,在confirm模式下,如果返回basic.ack的話說明如下:

     1.訊息被接收到所有的佇列中了 2.如果是映象佇列,說明被所有的映象佇列接收 3.如果是持久化的訊息到持久化佇列已經持久化到看硬碟。

@Service("returnCallBackListener")
public class ReturnCallBackListener implements ReturnCallback {

	@Override
	public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
		String msgId = "";
		if (message.getMessageProperties().getCorrelationId() != null) {
			msgId = new String(message.getMessageProperties().getCorrelationId());
		}
		System.out.println("return--message: msgId:" + msgId + ",msgBody:" + new String(message.getBody())
				+ ",replyCode:" + replyCode + ",replyText:" + replyText + ",exchange:" + exchange + ",routingKey:"
				+ routingKey);
	}

}

當然如果不想在生產者實行return 函式,可以採用備份交換機的方式,當交換機沒有路由到正確佇列,會將訊息轉發到這個備份交換機上,前提是這個備份交換機綁定了正確佇列,這樣就可以被消費了。channel.exchangeDeclare方法的時候新增alternate-exchange引數宣告備份交換機

第三個階段, 佇列訊息持久化階段

     如果佇列不設定持久化,即使訊息是持久化的,訊息依然不能儲存,皮之不存,毛將焉附需要佇列和訊息都是持久化的,佇列持久化只是儲存其元資料,在重啟,宕機後不丟失,但是要訊息也持久化,需要保證訊息是持久化的。並且如果是設定了confim模式的話,basic.ack 之前是會將資料進行落盤的, 並且RabbitMQ採用映象佇列,多個副本方式,Master 宕機,依然可以使用Slave 不影響 叢集正常使用,保證高可用

第4個階段  消費者消費訊息

為了保證訊息正常的到達消費者,RabbitMQ 提供了訊息 acknowledgement來確認訊息。

預設autoAck=None   isAutoAck=false 來確認訊息, autoAck = false 是表示,需要等到消費者傳送顯示的回覆確認訊號之後,訊息才從記憶體中移除,但是如果acknowledgeMode設定了Auto ,那麼isAutoAck= true  ,這個其實是不安全的,fire-And-forget, 訊息傳送出去之後,但是可能還沒到消費者,TCP連線就斷了(由於配置了auto,只要消費發出去了,就刪掉了),那麼TCP連線斷開了,這部分訊息就丟失了,是不安全的,但是對應能一直keepup連線的,是可以提高吞吐量的。

還可以設定autoAck = manual ,isAutoack= false,那麼就是 佇列每次向消費者傳送訊息之後,需要消費者手動確認basc.ack,basic,nack等,rabbitmq才可以將訊息刪除或者重新入隊。 isAutoack=false 情況下 ,一致沒有收到  消費者的 basic.ack, RabbitMQ如果檢測到 和消費者埠連線 埠,會重新發這條訊息。

還有注意如果basic.nack basic.reject 只是簡單的拒絕 ,而不是重新 requeue的話,那麼訊息是不會重新入隊的

檢視下basic.Nack說明

 /**
     * Reject one or several received messages.
     *
     * Supply the <code>deliveryTag</code> from the {@link com.rabbitmq.client.AMQP.Basic.GetOk}
     * or {@link com.rabbitmq.client.AMQP.Basic.GetOk} method containing the message to be rejected.
     * @see com.rabbitmq.client.AMQP.Basic.Nack
     * @param deliveryTag the tag from the received {@link com.rabbitmq.client.AMQP.Basic.GetOk} or {@link com.rabbitmq.client.AMQP.Basic.Deliver}
     * @param multiple true to reject all messages up to and including
     * the supplied delivery tag; false to reject just the supplied
     * delivery tag.
     * @param requeue true if the rejected message(s) should be requeued rather
     * than discarded/dead-lettered
     * @throws java.io.IOException if an error is encountered
     */
    void basicNack(long deliveryTag, boolean multiple, boolean requeue)
            throws IOException;

最後一個引數設定true的話才會重新入隊
channel.basicNack(message.getMessageProperties().getDeliveryTag(), false,true);


  <bean id="bgate1001RequestlistenerContainer"  class="org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer">  
        <property name="queueNames" value="Payt_Bgate1001Request_Payt"></property> 
        <property name="connectionFactory" ref="jyMqConnectionFactory"></property>  
        <property name="messageListener" ref="bgate1001RequestAdapter"></property>
        <property name="concurrentConsumers" value="100" />
        <property name="adviceChain">
            <array>
                <ref bean="retryInterceptor" />
            </array>
        </property>
        <property name="autoStartup" value="false" /> 
        <property name="acknowledgeMode" value="NONE"></property>
    </bean> 

檢視原始碼:

private void consumeFromQueue(String queue) throws IOException {
		this.channel.basicConsume(queue, this.acknowledgeMode.isAutoAck(), "", false, this.exclusive,
				this.consumerArgs, this.consumer);
		if (logger.isDebugEnabled()) {
			logger.debug("Started on queue '" + queue + "': " + this);
		}
	}


還有個問題,消費者啥時候傳送basic.ack呢?

                Object[] listenerArguments = buildListenerArguments(convertedMessage);
		Object result = invokeListenerMethod(methodName, listenerArguments, message);
		if (result != null) {
			handleResult(result, message, channel);
		} else {
			logger.trace("No result object given - no result to handle");
		}
protected void doInvokeListener(ChannelAwareMessageListener listener, Channel channel, Message message)
			throws Exception {

		RabbitResourceHolder resourceHolder = null;
		Channel channelToUse = channel;
		boolean boundHere = false;
		try {
			if (!isExposeListenerChannel()) {
				// We need to expose a separate Channel.
				resourceHolder = getTransactionalResourceHolder();
				channelToUse = resourceHolder.getChannel();
				/*
				 * If there is a real transaction, the resource will have been bound; otherwise
				 * we need to bind it temporarily here. Any work done on this channel
				 * will be committed in the finally block.
				 */
				if (isChannelLocallyTransacted(channelToUse) &&
							!TransactionSynchronizationManager.isActualTransactionActive()) {
						resourceHolder.setSynchronizedWithTransaction(true);
						TransactionSynchronizationManager.bindResource(this.getConnectionFactory(),
								resourceHolder);
					boundHere = true;
				}
			}
			else {
				// if locally transacted, bind the current channel to make it available to RabbitTemplate
				if (isChannelLocallyTransacted(channel)) {
					RabbitResourceHolder localResourceHolder = new RabbitResourceHolder(channelToUse, false);
					localResourceHolder.setSynchronizedWithTransaction(true);
					TransactionSynchronizationManager.bindResource(this.getConnectionFactory(),
							localResourceHolder);
					boundHere = true;
				}
			}
			// Actually invoke the message listener...
			try {
				listener.onMessage(message, channelToUse);
			}
			catch (Exception e) {
				throw wrapToListenerExecutionFailedExceptionIfNeeded(e, message);
			}
		}
		finally {
			if (resourceHolder != null && boundHere) {
				// so the channel exposed (because exposeListenerChannel is false) will be closed
				resourceHolder.setSynchronizedWithTransaction(false);
			}
			ConnectionFactoryUtils.releaseResources(resourceHolder);
			if (boundHere) {
				// unbind if we bound
				TransactionSynchronizationManager.unbindResource(this.getConnectionFactory());
				if (!isExposeListenerChannel() && isChannelLocallyTransacted(channelToUse)) {
					/*
					 *  commit the temporary channel we exposed; the consumer's channel
					 *  will be committed later. Note that when exposing a different channel
					 *  when there's no transaction manager, the exposed channel is committed
					 *  on each message, and not based on txSize.
					 */
					RabbitUtils.commitIfNecessary(channelToUse);
				}
			}
		}
	}


還未有完整的原始碼分析文件,暫時先到此,如果有問題請指正!