1. 程式人生 > >Spring整合訊息佇列RabbitMQ(訊息失敗處理)

Spring整合訊息佇列RabbitMQ(訊息失敗處理)

1. RabbitMQ簡介

1.1. RabbitMQ

RabbitMQ是由Erlang(愛立信公司)語言開發,實現Advanced Message Queuing Protocol (AMQP高階訊息佇列協議)的訊息中介軟體。訊息中介軟體主要用於元件之間的解耦,訊息的傳送者無需知道訊息使用者的存在,反之亦然。

1.2. 結構圖

這裡寫圖片描述 
• Broker:訊息佇列伺服器實體,例如RabbitMQ服務 
• Vhost:虛擬主機,預設為“/”,一個broker裡可以有多個vhost,區分不同使用者許可權,類似java的命令空間
• Connection:應用程式與broker連線,可有多個連線 
• Channel:訊息通道,connection中可建立多個channel,每個channel代表一個會話任務,所有操作都在channel中進行。 
• Exchange:訊息交換機,channel中可有多個,用於投遞訊息。應用程式傳送訊息時先把訊息給交換機,由交換機投遞給佇列,不是直接給佇列。 
型別有三種:fanout(廣播)、Direct(處理路由鍵,輪播實現)、Topic(支援訊息模糊匹配) 
• Queue:佇列,用於存放訊息 
• Message:訊息,應用程式需要傳送的資料 
• Bind:根據routingKey繫結exchange與queue規則,決定訊息傳送的方向

1.3. 物件間關係

這裡寫圖片描述

2. rabbitMQ與spring整合

2.1. 傳送訊息Producer

傳送介面

public interface SimpleMQProducer {

    /**
     * 傳送訊息至MQ
     */
    public void sendDataToMQ(Object message); 

    /**
     * 傳送訊息至MQ
     */
    public void sendDataToMQ(Object message, String msgid); 

}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13

傳送介面實現

public class
SmartMQProducer implements InitializingBean,SimpleMQProducer{
protected final Loggerx logger = Loggerx.getLogger("dao"); protected RabbitTemplate rabbitTemplate = new RabbitTemplate(); protected String queue; protected String exchange; protected String routingKey; protected
ConnectionFactory connectionFactory; protected MessageConverter messageConverter; protected RetryTemplate retryTemplate; protected ConfirmCallback confirmCallback; protected ReturnCallback failedCallback; public RabbitTemplate getRabbitTemplate() { return rabbitTemplate; } public void setRabbitTemplate(RabbitTemplate rabbitTemplate) { this.rabbitTemplate = rabbitTemplate; } public void setQueue(String queue) { this.queue = queue; } public void setExchange(String exchange) { this.exchange = exchange; } public void setRoutingKey(String routingKey) { this.routingKey = routingKey; } public void setConnectionFactory(ConnectionFactory connectionFactory) { this.connectionFactory = connectionFactory; } public void setMessageConverter(MessageConverter messageConverter) { this.messageConverter = messageConverter; } public void setRetryTemplate(RetryTemplate retryTemplate) { this.retryTemplate = retryTemplate; } public void setConfirmCallback(ConfirmCallback confirmCallback) { this.confirmCallback = confirmCallback; } public void setFailedCallback(ReturnCallback failedCallback) { this.failedCallback = failedCallback; } @Override public void sendDataToMQ(Object message) { CorrelationData correlationId = null; try { correlationId = new CorrelationData(GUID.genTxNo(25)); } catch (Exception e) { logger.error(LogType.EX, "產生訊息id失敗",e); correlationId = new CorrelationData(UUID.randomUUID().toString()); } this.rabbitTemplate.convertAndSend(this.routingKey, message, correlationId); logger.info(LogType.EX, "傳送到MQ的訊息內容["+JsonUtil.toJSONString(message)+"],訊息ID["+correlationId.getId()+"]"); } @Override public void sendDataToMQ(Object message, String msgid) { CorrelationData correlationId = new CorrelationData(msgid); this.rabbitTemplate.convertAndSend(this.routingKey, message, correlationId); logger.info(LogType.EX, "傳送到MQ的訊息內容["+JsonUtil.toJSONString(message)+"],訊息ID["+correlationId.getId()+"]"); } @Override public void afterPropertiesSet() throws Exception { this.rabbitTemplate.setQueue(this.queue); this.rabbitTemplate.setExchange(this.exchange); this.rabbitTemplate.setRoutingKey(this.routingKey); this.rabbitTemplate.setConnectionFactory(this.connectionFactory); this.rabbitTemplate.setMessageConverter(this.messageConverter); this.rabbitTemplate.setMandatory(true); if (null == this.failedCallback) { // 確認訊息是否到達broker伺服器,也就是隻確認是否正確到達exchange中即可,只要正確的到達exchange中,broker即可確認該訊息返回給客戶端ack。 this.rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() { @Override public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) { //記錄本地日誌 Object object = StringUtil.ByteToObject(message.getBody()); logger.error(LogType.EX, "訊息傳送到MQ失敗,內容["+object+"]"); } }); }else { this.rabbitTemplate.setReturnCallback(this.failedCallback); } //設定回撥 this.rabbitTemplate.setConfirmCallback(this.confirmCallback); } }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
  • 87
  • 88
  • 89
  • 90
  • 91
  • 92
  • 93
  • 94
  • 95
  • 96
  • 97
  • 98
  • 99
  • 100
  • 101
  • 102
  • 103
  • 104
  • 105
  • 106
  • 107
  • 108

傳送到MQ失敗回撥處理

public abstract class SmartMQFailedCallBack implements ReturnCallback {

    protected final Loggerx logger = Loggerx.getLogger("bo");
    /**
     * 確認訊息是否到達broker伺服器,也就是隻確認是否正確到達queue中即可,只要正確的到達queue中,broker即可確認該訊息返回給客戶端ack。
     */
    public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
        if (null != message) {
            Object object = StringUtil.ByteToObject(message.getBody());
            logger.error(LogType.EX, "訊息傳送到MQ失敗,內容["+object+"]");
            executeFailedMessage(object);
        }else {
            logger.error(LogType.EX, "訊息傳送到MQ失敗,訊息內容為null"); 
        }
    }

    public abstract void executeFailedMessage(Object message);
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18

傳送到MQ後回撥處理(不分成功或失敗)

public abstract class SmartMQConfirmCallBack implements ConfirmCallback{
    protected final Logger logger = Logger.getLogger("bo");

    /**
     * 確認訊息是否到達broker伺服器,也就是隻確認是否正確到達exchange中即可,只要正確的到達exchange中,broker即可確認該訊息返回給客戶端ack。
     * 
     */
    public void confirm(CorrelationData correlationData, boolean ack) {
        if (ack) {
            logger.info(LogType.INFO, "訊息成功消費,訊息ID["+correlationData.getId()+"]");
        } else {
            logger.error(LogType.EX, "訊息失敗消費,訊息ID["+correlationData.getId()+"]");
        }
        executeCallBack(correlationData.getId(),ack);
    }

    public abstract void executeCallBack(String msgID,boolean ack);
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18

傳送端spring

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context"
    xmlns:rabbit="http://www.springframework.org/schema/rabbit"
    xsi:schemaLocation="
            http://www.springframework.org/schema/beans
                http://www.springframework.org/schema/beans/spring-beans.xsd
            http://www.springframework.org/schema/context
                http://www.springframework.org/schema/context/spring-context.xsd
            http://www.springframework.org/schema/rabbit
                http://www.springframework.org/schema/rabbit/spring-rabbit-1.0.xsd">

    <!-- 連線服務配置 -->
    <bean id="mqConnectionFactory" class="org.springframework.amqp.rabbit.connection.CachingConnectionFactory">
        <constructor-arg value="xxx.xxx.xx.xx"/>
        <property name="username" value="guest"/>
        <property name="password" value="guest"/>
        <property name="virtualHost" value="/"/>
        <property name="channelCacheSize" value="50"/>
        <property name="publisherConfirms" value="true"/>
        <property name="publisherReturns" value="true"/>
    </bean>

    <rabbit:admin connection-factory="mqConnectionFactory" />
    <!-- 宣告訊息轉換器為SimpleMessageConverter -->
    <bean id="msgConverter" class="org.springframework.amqp.support.converter.SimpleMessageConverter" />
    <!-- 訊息傳送重試 ,可選項-->
    <bean id="retryTemplate" class="org.springframework.retry.support.RetryTemplate">
        <property name="backOffPolicy">
            <bean class="org.springframework.retry.backoff.ExponentialBackOffPolicy">
                <property name="initialInterval" value="500" />
                <property name="multiplier" value="10.0" />
                <property name="maxInterval" value="10000" />
            </bean>
        </property>
    </bean>
    <!-- queue 佇列宣告 -->
    <!-- durable=true,交換機持久化,rabbitmq服務重啟交換機依然存在,保證不丟失; durable=false,相反 -->
    <!-- auto-delete=true:無消費者時,佇列自動刪除; auto-delete=false:無消費者時,佇列不會自動刪除 -->
    <!-- 排他性,exclusive=true:首次申明的connection連線下可見; exclusive=false:所有connection連線下都可見-->
    <rabbit:queue id="test" durable="true" auto-delete="false" exclusive="false" name="test" />

    <!-- exchange queue binging key 繫結 -->
    <!-- durable=true,交換機持久化,rabbitmq服務重啟交換機依然存在,保證不丟失; durable=false,相反 -->
    <!-- auto-delete=true:無消費者時,佇列自動刪除; auto-delete=false:無消費者時,佇列不會自動刪除 -->
    <rabbit:direct-exchange name="test" durable="true" auto-delete="false" id="test">
        <rabbit:bindings>
            <rabbit:binding queue="test" key="test_key" />
        </rabbit:bindings>
    </rabbit:direct-exchange>

    <!-- 訊息傳送到mq回撥處理,需要處理錯誤訊息,可選項 -->
    <bean id="testfailedCallback" class="xxx.TestMsgFailedCallBack"></bean>
    <!-- 訊息傳送到mq回撥處理,接著業務處理 ,可選項-->
    <bean id="testconfirmCallback" class="xxx.TestconfirmCallback"></bean>
    <bean id="testProducer" class="XXXX.SmartMQProducer">
        <property name="connectionFactory" ref="mqConnectionFactory" />
        <property name="messageConverter" ref="msgConverter" />
        <property name="retryTemplate" ref="retryTemplate" />
        <property name="confirmCallback" ref="testconfirmCallback" />
        <property name="failedCallback" ref="testfailedCallback" />
        <property name="exchange" value="test" />
        <property name="queue" value="test" />
        <property name="routingKey" value="test" />
    </bean> 

</beans>
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67

2.2. 消費端Consumer

接收端spring配置

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
    xmlns:aop="http://www.springframework.org/schema/aop"  xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xmlns:context="http://www.springframework.org/schema/context"
    xmlns:rabbit="http://www.springframework.org/schema/rabbit"
    xsi:schemaLocation="http://www.springframework.org/schema/beans
    http://www.springframework.org/schema/beans/spring-beans-3.1.xsd
    http://www.springframework.org/schema/rabbit
    http://www.springframework.org/schema/rabbit/spring-rabbit-1.0.xsd
    http://www.springframework.org/schema/aop
    http://www.springframework.org/schema/aop/spring-aop-3.1.xsd
    http://www.springframework.org/schema/context
    http://www.springframework.org/schema/context/spring-context-3.1.xsd">

    <!-- 連線服務配置 -->
    <bean id="mqConnectionFactory"
        class="org.springframework.amqp.rabbit.connection.CachingConnectionFactory">
        <constructor-arg value="xxx.xxx.xxx.xxx" />
        <property name="username" value="guest" />
        <property name="password" value="guest" />
        <property name="virtualHost" value="/" />
        <property name="channelCacheSize" value="50" />
    </bean>
    <!-- 建立rabbitAdmin 代理類 -->
    <rabbit:admin connection-factory="mqConnectionFactory" />

    <!-- 宣告訊息轉換器為SimpleMessageConverter -->
    <bean id="msgConverter"
        class="org.springframework.amqp.support.converter.SimpleMessageConverter" />

    <!-- queue 佇列宣告 -->
    <rabbit:queue id="test" name="test" durable="true" auto-delete="false" exclusive="false" />

    <!-- exchange queue binging key 繫結 -->
    <!-- durable=true,交換機持久化,rabbitmq服務重啟交換機依然存在,保證不丟失; durable=false,相反 -->
    <!-- auto-delete=true:無消費者時,佇列自動刪除; auto-delete=false:無消費者時,佇列不會自動刪除 -->
    <!-- 通過Binding來判定Queue、Exchange、routingKey -->
    <rabbit:direct-exchange id="test" name="test"
        durable="true" auto-delete="false">
        <rabbit:bindings>
            <rabbit:binding queue="test" key="test_key" />
        </rabbit:bindings>
    </rabbit:direct-exchange>

    <bean id="testMsgHandler" class="xxxx.testMsgHandler" />
    <bean id="testMsgAdapter" class="org.springframework.amqp.rabbit.listener.adapter.MessageListenerAdapter">
        <constructor-arg ref="testHandler" />
        <property name="defaultListenerMethod" value="handleTxMsg"></property>
        <property name="messageConverter" ref="msgConverter"></property>
    </bean>
    <
            
           

相關推薦

Spring整合訊息佇列RabbitMQ(訊息失敗處理)

1. RabbitMQ簡介 1.1. RabbitMQ RabbitMQ是由Erlang(愛立信公司)語言開發,實現Advanced Message Queuing Protocol (AMQP高階訊息佇列協議)的訊息中介軟體。訊息中介軟體主要用於元件之間的解耦,訊息的傳送者無需知道訊息使用者的存在,

(二)RabbitMQ訊息佇列-RabbitMQ訊息佇列架構與基本概念

沒錯我還是沒有講怎麼安裝和寫一個HelloWord,不過快了,這一章我們先了解下RabbitMQ的基本概念。 RabbitMQ架構 說是架構其實更像是應用場景下的架構(自己畫的有點醜,勿嫌棄) 從圖中可以看出RabbitMQ主要由Exchange和Qu

訊息佇列RabbitMQSpring整合

1.RabbitMQ簡介 RabbitMQ是流行的開源訊息佇列系統,用erlang語言開發。RabbitMQ是AMQP(高階訊息佇列協議)的標準實現。 官網:http://www.rabbitmq.com/ 2.Spring整合RabbitM

訊息佇列 RabbitMQSpring 整合使用

<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSc

MQ訊息佇列--RabbitMQ整合Spring理論及例項講解

今天Boss叫我去他的小黑屋分配任務,出門就記得倆詞“MQ”、“訊息佇列”。從來都沒聽說過這讓我怎麼搞?對於這種情況我慣有的方法論就是:先搞清楚它是什麼、有什麼用、有什麼工具可用、怎麼用,然後就是……擼起袖子使勁幹吧! 1、什麼是訊息佇列 訊息是指在兩個

SpringBoot(八) Spring訊息佇列RabbitMQ

概述 1.大多數應用中,可以通過訊息服務中介軟體來提升系統非同步能力和拓展解耦能力。 2.訊息服務中的兩個重要概念:訊息代理(Message broker)和目的地(destination) 當訊息傳送者傳送訊息後,將由訊息代理接管,訊息代理保證訊息傳遞到指定目的地。 3.訊息佇列主要有兩種形式的目的

.NET 開源工作流: Slickflow流程引擎高階開發(七)--訊息佇列(RabbitMQ)的整合使用

前言:工作流流程過程中,除了正常的人工審批型別的節點外,事件型別的節點處理也尤為重要。比如比較常見的事件型別的節點有:Timer/Message/Signal等。本文重點闡述訊息型別的節點處理,以及實現訊息驅動流程過程中對訊息佇列(RabbitMQ)的整合使用方式。 1. 節點間訊息傳遞 1.1 Messag

訊息佇列 RabbitMQ

什麼叫訊息佇列 訊息(Message)是指在應用間傳送的資料。訊息可以非常簡單,比如只包含文字字串,也可以更復雜,可能包含嵌入物件。 訊息佇列(Message Queue)是一種應用間的通訊方式,訊息傳送後可以立即返回,由訊息系統來確保訊息的可靠傳遞。訊息

訊息佇列RabbitMQ應答模式

為了確保訊息不會丟失,RabbitMQ支援訊息應答。消費者傳送一個訊息應答,告訴RabbitMQ這個訊息已經接收並且處理完畢了。RabbitMQ就可以刪除它了。如果一個消費者掛掉卻沒有傳送應答,RabbitMQ會理解為這個訊息沒有處理完全,然後交給另一個消費者去重新處理。這樣,你就可以確認即使消費者偶爾掛掉也

訊息佇列rabbitmq在mac上的安裝

一、安裝rabbitMq.         在mac平臺上安裝rabbitMq,開啟終端,在終端上輸入以下命令: brew install rabbitmq        安裝rabbitMq需要一些時間

Python 訊息佇列rabbitmq使用之工作佇列使用多個worker接收訊息

前面已經介紹過怎麼安裝rabbitmq以及要使用的三方庫 因此這裡直接進入例項 1、釋出端程式碼 # new_task.py import pika # 匯入pika import sys

Python 訊息佇列rabbitmq使用之 更加細緻的 有選擇的 釋出訊息/接收訊息

1、釋出端程式碼 # new_topic_p.py import pika import sys connection = pika.BlockingConnection(pika.Connec

Python 訊息佇列rabbitmq使用之 實現一個RPC系統

1、服務端程式碼 # rpc_server.py import pika # 建立連線 connection = pika.BlockingConnection(pika.ConnectionP

使用訊息佇列RabbitMQ

RabbitMQ 即一個訊息佇列,主要是用來實現應用程式的非同步和解耦,同時也能起到訊息緩衝,訊息分發的作用。 RabbitMQ是實現AMQP(高階訊息佇列協議)的訊息中介軟體的一種,AMQP,即Advanced Message Queuing Protocol, 高階訊息

訊息佇列RabbitMQ入門與5種模式詳解

1.RabbitMQ概述 簡介: MQ全稱為Message Queue,訊息佇列是應用程式和應用程式之間的通訊方法; RabbitMQ是開源的,實現了AMQP協議的,採用Erlang(面向併發程式語言)編寫的,可複用的企業級訊息系統; AMQP(高階訊息佇列協議)

訊息佇列-RabbitMq(PHP)

首先進行安裝: 將composer.json檔案放在你的專案中 composer.json {"require":{"php-amqplib/php-amqplib":"2.5.*"}

在C#中使用訊息佇列RabbitMQ

http://www.cnblogs.com/qy1141/p/4054135.html     作用就是提高系統的併發性,將一些不需要及時響應客戶端且佔用較多資源的操作,放入佇列,再由另外一個執行緒,去非同步處理這些佇列,可極大的提高系統的併發能力。 2、安裝

訊息佇列RabbitMq的五種形式佇列

MQ全稱為Message Queue,訊息佇列是系統之間的通訊方法; RabbitMQ是開源的,實現了AMQP協議的,採用Erlang(面向併發程式語言)編寫的,可複用的企業級訊息系統; AMQP(高階訊息佇列協議)是一個非同步訊息傳遞所使用應用層協議規範,為面向訊息中介

初步對訊息佇列RabbitMQ的瞭解

RabbitMQ是流行的開源訊息佇列系統,用erlang語言開發,完整的實現了AMPQ(高階訊息佇列協議)。網站: http://www.rabbitmq.com/ erlang網站:http://www.erlang.org/ 中文站:http://www.erlang-cn.com/ 首先,先安裝下R

OpenStack中訊息佇列(RabbitMQ)分析

可以說OpenStack使用這種MOM模式的訊息佇列機制無疑是一個聰明的選擇。其鬆耦合性以及動態可擴充套件性都非常符合開源雲的要求。無論是開發還是執行,都會帶了很多好處。唯一的缺點就是它是一個single point failure,如果RabbitMQ出錯了,那整個OpenStack也就無法運行了。雖然R