1. 程式人生 > >第5篇 RabbitMQ整合SpringBoot實現Direct模式

第5篇 RabbitMQ整合SpringBoot實現Direct模式

直接程式碼  專案結構 pom需要增加對RabbitM的支援


Pom檔案如下

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
   <modelVersion>4.0.0</
modelVersion> <groupId>com.haibo</groupId> <artifactId>spring-rabbit-hello</artifactId> <version>0.0.1-SNAPSHOT</version> <packaging>jar</packaging> <name>spring-rabbit-hello</name> <description>Demo project for Spring Boot</
description> <parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>1.5.3.RELEASE</version> <relativePath/> <!-- lookup parent from repository --> </parent>
<properties> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding> <java.version>1.8</java.version> </properties> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-maven-plugin</artifactId> </plugin> </plugins> </build> <repositories> <repository> <id>nexus</id> <name>nexus</name> <url>http://localhost:8081/nexus/content/groups/public/</url> </repository> </repositories> </project>

package com.haibo;


import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.ChannelAwareMessageListener;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
 * rabbitmq 的配置類
 * 趙海波 on 2017/6/6.
 */
@Configuration
public class RabbitMQConfig10 {
    /**
     * 訊息交換機的名字
     */
public static final String EXCHANGE = "my-mq-exchange";
    /**
     * 佇列key1
     */
public static final String ROUTINGKEY1 = "queue_one_key1";
    /**
     * 佇列key2
     */
public static final String ROUTINGKEY2 = "queue_one_key2";

    /**
     * 配置連結資訊         * @return
     */
@Bean
public ConnectionFactory connectionFactory() {
        CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
        connectionFactory.setAddresses("139.199.11.69:5672");
        connectionFactory.setUsername("guest");
        connectionFactory.setPassword("guest");
        connectionFactory.setVirtualHost("/");
        connectionFactory.setPublisherConfirms(true); // 必須要設定
return connectionFactory;
    }

    /**
     * 配置訊息交換機
     * 針對消費者配置
     * FanoutExchange: 將訊息分發到所有的繫結佇列,無routingkey的概念
     * HeadersExchange :通過新增屬性key-value匹配
     * DirectExchange:按照routingkey分發到指定佇列
     * TopicExchange:多關鍵字匹配
     */
@Bean
public DirectExchange defaultExchange() {
        return new DirectExchange(EXCHANGE, true, false);
    }

    /**
     * 配置訊息佇列1
     * 針對消費者配置
     *
     * @return
*/
@Bean
public Queue queue() {
        return new Queue("queue_one", true); //佇列持久
}

    /**
     * 將訊息佇列1與交換機繫結
     * 針對消費者配置
     *
     * @return
*/
@Bean
public Binding binding() {
        return BindingBuilder.bind(queue()).to(defaultExchange()).with(RabbitMQConfig10.ROUTINGKEY1);
    }

    /**
     * 配置訊息佇列2
     * 針對消費者配置
     *
     * @return
*/
@Bean
public Queue queue1() {
        return new Queue("queue_one1", true); //佇列持久
}

    /**
     * 將訊息佇列2與交換機繫結
     * 針對消費者配置
     *
     * @return
*/
@Bean
public Binding binding1() {
        return BindingBuilder.bind(queue1()).to(defaultExchange()).with(RabbitMQConfig10.ROUTINGKEY2);
    }

    /**
     * 接受訊息的監聽,這個監聽會接受訊息佇列1的訊息
     * 針對消費者配置
     *
     * @return
*/
@Bean
public SimpleMessageListenerContainer messageContainer() {
        SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory());
        container.setQueues(queue());
        container.setExposeListenerChannel(true);
        container.setMaxConcurrentConsumers(1);
        container.setConcurrentConsumers(1);
        container.setAcknowledgeMode(AcknowledgeMode.MANUAL); //設定確認模式手工確認
container.setMessageListener(new ChannelAwareMessageListener() {
            public void onMessage(Message message, com.rabbitmq.client.Channel channel) throws Exception {
                byte[] body = message.getBody();
                System.out.println("收到訊息 : " + new String(body));
                channel.basicAck(message.getMessageProperties().getDeliveryTag(), true); //確認訊息成功消費
}
        });
        return container;
    }

    /**
     * @return
*/
@Bean
public SimpleMessageListenerContainer messageContainer2() {
        SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory());
        container.setQueues(queue1());
        container.setExposeListenerChannel(true);
        container.setMaxConcurrentConsumers(1);
        container.setConcurrentConsumers(1);
        container.setAcknowledgeMode(AcknowledgeMode.MANUAL); //設定確認模式手工確認
container.setMessageListener(new ChannelAwareMessageListener() {
            public void onMessage(Message message, com.rabbitmq.client.Channel channel) throws Exception {
                byte[] body = message.getBody();
                System.out.println("queue1 收到訊息 : " + new String(body));
                channel.basicAck(message.getMessageProperties().getDeliveryTag(), true); //確認訊息成功消費
}
        });
        return container;
    }

    @Bean
public SimpleMessageListenerContainer messageContainer3() {
        SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory());
        container.setQueues(queue1());
        container.setExposeListenerChannel(true);
        container.setMaxConcurrentConsumers(1);
        container.setConcurrentConsumers(1);
        container.setAcknowledgeMode(AcknowledgeMode.MANUAL); //設定確認模式手工確認
container.setMessageListener(new ChannelAwareMessageListener() {
            public void onMessage(Message message, com.rabbitmq.client.Channel channel) throws Exception {
                byte[] body = message.getBody();
                System.out.println("========queue2 收到訊息 : " + new String(body));
                channel.basicAck(message.getMessageProperties().getDeliveryTag(), true); //確認訊息成功消費
}
        });
        return container;
    }

    public static void main(String[] args) {
        SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(new RabbitMQConfig10().connectionFactory());
        container.setQueues(new RabbitMQConfig10().queue1());
        container.setExposeListenerChannel(true);
        container.setMaxConcurrentConsumers(1);
        container.setConcurrentConsumers(1);
        container.setAcknowledgeMode(AcknowledgeMode.MANUAL); //設定確認模式手工確認       
container.setMessageListener(new ChannelAwareMessageListener() {
            public void onMessage(Message message, com.rabbitmq.client.Channel channel) throws Exception {
                byte[] body = message.getBody();
                System.out.println("queue1 收到訊息 : " + new String(body));
                channel.basicAck(message.getMessageProperties().getDeliveryTag(), true);
                //確認訊息成功消費
}
        });
    }
}

生產者類如下

package com.haibo;

import java.util.UUID;

import org.springframework.amqp.rabbit.core.RabbitTemplate;  
import org.springframework.amqp.rabbit.support.CorrelationData;  
import org.springframework.web.bind.annotation.RequestMapping;  
import org.springframework.web.bind.annotation.RestController;

/**
 * 測試RabbitMQ傳送訊息的Controller
 */
@RestController
public class SendController10 implementsRabbitTemplate.ConfirmCallback{


    private RabbitTemplate rabbitTemplate;
    /**
     * 配置傳送訊息的rabbitTemplate,因為是構造方法,所以不用註解Spring也會自動注入(應該是新版本的特性)
     * @param rabbitTemplate
*/
public SendController10(RabbitTemplate rabbitTemplate){
        this.rabbitTemplate = rabbitTemplate;
        //設定消費回撥
this.rabbitTemplate.setConfirmCallback(this);
    }

    /**
     * 向訊息佇列1中傳送訊息
     * @param msg
* @return
*/
@RequestMapping("send1")
    public String send1(String msg){
        String uuid = UUID.randomUUID().toString();
        CorrelationData correlationId = new CorrelationData(uuid);
        rabbitTemplate.convertAndSend(RabbitMQConfig10.EXCHANGE, RabbitMQConfig10.ROUTINGKEY1, msg,
                correlationId);
        return null;
    }
    /**
     * 向訊息佇列2中傳送訊息
     * @param msg
* @return
*/
@RequestMapping("send2")
    public String send2(String msg){
        String uuid = UUID.randomUUID().toString();
        CorrelationData correlationId = new CorrelationData(uuid);
        rabbitTemplate.convertAndSend(RabbitMQConfig10.EXCHANGE, RabbitMQConfig10.ROUTINGKEY2, msg,
                correlationId);
        return null;
    }

    /**
     * 訊息的回撥,主要是實現RabbitTemplate.ConfirmCallback介面
     * 注意,訊息回撥只能代表成功訊息傳送到RabbitMQ伺服器,不能代表訊息被成功處理和接受
     */
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
        System.out.println(" 回撥id:" + correlationData);
        if (ack) {
            System.out.println("訊息成功消費");
        } else {
            System.out.println("訊息消費失敗:" + cause+"\n重新發送");
        }
    }
}

測試如下 

http://localhost:8000/send1?msg=123

http://localhost:8000/send2?msg=123