1. 程式人生 > >Spring Boot與RabbitMQ的整合訊息確認

Spring Boot與RabbitMQ的整合訊息確認

訊息生產者和消費者
import com.rabbitmq.client.Channel;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.ChannelAwareMessageListener;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import org.springframework.amqp.rabbit.support.CorrelationData;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.ResponseBody;

import java.io.IOException;
import java.util.Date;
import java.util.UUID;

/**
 * Created by yangliu on 2018/4/8.
 */
@Controller
@RequestMapping("/rabbitMq")
public class TestController {
    private Logger logger= LoggerFactory.getLogger(TestController.class);
    @Autowired
    RabbitAdmin rabbitAdmin;

    @Bean
    public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory) {
        RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory);
        rabbitAdmin.getRabbitTemplate().setConfirmCallback(new MsgSendConfirmCallBack());
        rabbitAdmin.getRabbitTemplate().setReturnCallback(new MsgSendReturnCallback());
        return rabbitAdmin;
    }


    @RequestMapping("/sendMq")
    @ResponseBody
    public String send(String name) throws Exception {
        String context = "hello "+name+" --" + new Date();
        String sendStr;
        for(int i=1;i<=100;i++){
            sendStr="第["+i+"]個 hello  --" + new Date();
            logger.debug("HelloSender: " + sendStr);
            sendMessage("myqueue",sendStr);
            //Thread.sleep(1000);
        }
        return context;
    }

    /**
     * 方式一:動態宣告exchange和queue它們的繫結關係  rabbitAdmin
     * @param exchangeName
     * @param queueName
     */
    protected void declareBinding(String exchangeName, String queueName) {
        if (rabbitAdmin.getQueueProperties(queueName) == null) {
            /*  queue 佇列宣告
            durable=true,交換機持久化,rabbitmq服務重啟交換機依然存在,保證不丟失; durable=false,相反
            auto-delete=true:無消費者時,佇列自動刪除; auto-delete=false:無消費者時,佇列不會自動刪除
            排他性,exclusive=true:首次申明的connection連線下可見; exclusive=false:所有connection連線下*/
            Queue queue = new Queue(queueName, true, false, false, null);
            rabbitAdmin.declareQueue(queue);
            TopicExchange directExchange = new TopicExchange(exchangeName);
            rabbitAdmin.declareExchange(directExchange);//宣告exchange
            Binding binding = BindingBuilder.bind(queue).to(directExchange).with(queueName);    //將queue繫結到exchange
            rabbitAdmin.declareBinding(binding);      //宣告繫結關係
            rabbitAdmin.getRabbitTemplate().setMandatory(true);
            rabbitAdmin.getRabbitTemplate().setConfirmCallback(new MsgSendConfirmCallBack());//訊息確認
            rabbitAdmin.getRabbitTemplate().setReturnCallback(new MsgSendReturnCallback());//確認後回撥
        } else {
            rabbitAdmin.getRabbitTemplate().setQueue(queueName);
            rabbitAdmin.getRabbitTemplate().setExchange(queueName);
            rabbitAdmin.getRabbitTemplate().setRoutingKey(queueName);
        }
    }

    /**
     * 傳送訊息
     * @param queueName
     * @param message
     * @throws Exception
     */
    public void sendMessage(String queueName, String message) throws Exception {
        declareBinding(queueName, queueName);
        CorrelationData correlationId = new CorrelationData(UUID.randomUUID().toString());
        rabbitAdmin.getRabbitTemplate().convertAndSend(queueName, queueName, message,correlationId);
        logger.debug("[rabbitmq-sendMessage]queueName:{} ,uuid:{},msg:{}",queueName,correlationId.getId(),message);
    }

    /**
     * 消費者
     * @param connectionFactory
     * @return
     */
    @Bean
    public SimpleMessageListenerContainer messageContainer(ConnectionFactory connectionFactory) {
        SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory);
        Queue queue = new Queue("myqueue", true, false, false, null);
        container.setQueues(queue);
        container.setExposeListenerChannel(true);
        container.setMaxConcurrentConsumers(1);
        container.setConcurrentConsumers(1);
        container.setAcknowledgeMode(AcknowledgeMode.MANUAL); //設定確認模式手工確認
        container.setMessageListener(new ChannelAwareMessageListener() {
        @Override
        public void onMessage(Message message, Channel channel) throws Exception {

                    byte[] body = message.getBody();
                    try {
                        //業務邏輯
                        logger.info("消費 receive msg : " + new String(body));
                        // 訊息的標識,false只確認當前一個訊息收到,true確認所有consumer獲得的訊息
                        //channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
                        channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); //手動確認確認訊息成功消費
                    } catch (Exception e) {
                        logger.info("消費失敗: " + new String(body));
                        // ack返回false,並重新回到佇列,api裡面解釋得很清楚
                        try {
                            channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
                        } catch (IOException e1) {
                            e1.printStackTrace();
                        }
                    }

        }
        });
        return container;
    }


    /*

    //訊息的標識,false只確認當前一個訊息收到,true確認所有consumer獲得的訊息
    channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
    //ack返回false,並重新回到佇列,api裡面解釋得很清楚
    channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
    //拒絕訊息
    channel.basicReject(message.getMessageProperties().getDeliveryTag(), true);

    如果訊息沒有到exchange,則confirm回撥,ack=false
    如果訊息到達exchange,則confirm回撥,ack=true
    exchange到queue成功,則不回撥return
    exchange到queue失敗,則回撥return(需設定mandatory=true,否則不回回調,訊息就丟了)
    */
}

失敗後return回撥:

import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
public class MsgSendReturnCallback implements RabbitTemplate.ReturnCallback {
    @Override  
    public void returnedMessage(Message message, int replyCode, String replyText,
                                String exchange, String routingKey) {
        System.out.println("確認後回撥return--message:"+new String(message.getBody())+",replyCode:"+replyCode+",replyText:"
                +replyText+",exchange:"+exchange+",routingKey:"+routingKey);
    }
}  

確認後回撥: 

import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.support.CorrelationData;

public class MsgSendConfirmCallBack implements RabbitTemplate.ConfirmCallback {
        @Override
        public void confirm(CorrelationData correlationData, boolean ack, String cause) {
            if (ack) {
                System.out.println("訊息確認成功cause"+cause);
            } else {
                //處理丟失的訊息
                System.out.println("訊息確認失敗:"+correlationData.getId()+"#cause"+cause);
            }
        }
    }

相關推薦

Spring BootRabbitMQ整合訊息確認

訊息生產者和消費者import com.rabbitmq.client.Channel;import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.amqp.core.

Spring Boot 構建應用——整合訊息中介軟體 RabbitMQ

RabbitMQ 是訊息中介軟體的一種,實現了 AMQP 標準。訊息中介軟體的工作過程可以用生產者-消費者模型來表示。生產者發訊息到訊息佇列中去,消費者監聽指定的訊息佇列,並且當訊息佇列收到訊息之後,接收訊息佇列傳來的訊息,並且給予相應的處理。訊息佇列常用於分散

6、Spring Boot MyBatis整合

1.6 Spring Boot 與MyBatis整合 簡介 詳細介紹如何在Spring Boot中整合MyBatis,並通過註解方式實現對映。 完整原始碼: 1.6.1 建立 spring-boot-mybatis 專案 pom檔案如下 <?xml version="1

7、Spring Boot Redis 整合

1.7 Spring Boot 與 Redis 整合 簡介 繼續上篇的MyBatis操作,詳細介紹在Spring Boot中使用RedisCacheManager作為快取管理器,整合業務於一體。 完整原始碼: Spring-Boot-Demos 1.7.1 建立 spring-boot-r

Spring BootReact整合

前言 前不久學習了Web開發,用React寫了前端,Spring Boot搭建了後端,然而沒有成功地把兩個工程結合起來,造成前端與後端之間需要跨域通訊,帶來了一些額外的工作。 這一次成功地將前端工程與後端結合在一個Project中,記錄一下,也希望能幫到那些和我一樣的入門小白。 環境 Win

Spring BootQuartz整合

1.匯入依賴包 <!-- quartz定時器 --> <dependency> <groupId>org.quartz-scheduler</groupId> <artifactId>quartz</artif

spring bootjunit整合測試

  先建立一個REST介面 package com.laoxu.gamedog.controller; import org.springframework.web.bind.annotation.RequestMapping; import org.springframew

spring boot mybatis整合

本專案使用的環境: 開發工具:Intellij IDEA 2018.1.3 springboot: 2.0.5.RELEASE jdk:1.8.0_161 maven:3.5.3 首先先建立springboot專案 選擇需要的模組 填寫包名 1.開始在專案

Activiti學習之spring boot activiti整合

1、新建spring boot 專案 使用IDEA新建一個spring boot 專案,專案結構如下: 2、增加activiti、jpa、mysql等依賴 <dependencies> <dependency&

如何使用Spring BootRabbitMQ結合實現延遲佇列

背景 何為延遲佇列? 顧名思義,延遲佇列就是進入該佇列的訊息會被延遲消費的佇列。而一般的佇列,訊息一旦入隊了之後就會被消費者馬上消費。 延遲佇列能做什麼?延遲佇列多用於需要延遲工作的場景。最常見的是以下兩種場景:延遲消費。比如: 使用者生成訂單之後

Spring BootLog4j2整合之java.lang.IllegalStateException: Logback configuration error detected:

引言: 一個問題的分析與解決過程是表與裡的過程,是一個大膽猜測與小心求證的過程,spring boot與log4j2的整合過程中,我將描述一下分析這個問題的思路和過程。 我一直強調一點: 重要的不是解決問題的結論,而是解決問題的思路和方法,即使在解決完問題之後,

spring boot Mybatis整合(*)

業務層 tomcat ng- quest map big selectall esp 連接 在pom.xml文件中加入數據庫、spring-mybatis整合 <!-- spring boot 整合mybatis --> <de

springRabbitMQ整合 消費者消費不到訊息 重啟才能消費到的問題解決

RabbitMQ是當前一個挺火的訊息佇列中介軟體 相比ActiveMQ 訊息更不容易丟失 我之前用的是ActiveMQ 後邊有的時候會莫名其妙的收不到訊息 專案緊後邊也沒時間排查 經朋友的推薦下 換了RabbitMQ 後邊用著也沒啥問題 今天 的Rabbit

Spring Boot訊息 JMS、AMQP、RabbitMQ簡單概述

一、概述 1. 大多應用中,可通過訊息服務中介軟體來提升系統非同步通訊、擴充套件解耦能力 2. 訊息服務中兩個重要概念: 訊息代理(message broker)和目的地(destination) 當訊息傳送者傳送訊息以後,將由訊息代理接管,訊息代理保證訊息傳遞到指定目

spring bootjdbcTemplate的整合案例2

database bean n) ret struct mapping rri ott mode 簡單入門了spring boot後,接下來寫寫跟數據庫打交道的案例。博文采用spring的jdbcTemplate工具類與數據庫打交道。 下面是搭建的springbo

spring boot shiro的簡單整合使用

scheduler div turn map 用戶 ttr algorithm pen enc shrio官網:https://shiro.apache.org/ Apache Shiro是一個功能強大且易於使用的Java安全框架,可執行身份驗證,授權,加密和會話管理。借助

022 springRabbitmq整合

ring config resource 進行 virt vat gte urn address 一 .概述   本次我們使用spring幫助我們完成Rabbitmq的使用. 二 .環境的搭建   本次使用springboot的jar文件幫助整合rabbitmq,但是

Apache Shiro(三)——Spring Boot Shiro的 整合

在瞭解了Apache Shiro的架構、認證、授權之後,我們來看一下Shiro與Web的整合。下面以Spring Boot為例,介紹一下Spring Boot 與 Shiro的 整合。 一、建立一個Spring Boot專案 可以使用IDEA快速建立一個Spring Boot專

Spring bootRedis的整合使用

關於Redis的安裝與叢集部署,可以參考《Linux下Redis的叢集部署》 一、Redis的單機使用 (1) 新建gradle專案,依賴如下: dependencies { compile 'org.springframework.boot:spring-boot-starte

Spring Boot整理——spring boot 常用元件整合

一、專案搭建         首先建立的Spring Boot工程(war包),然後匯入相關依賴: <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/200