1. 程式人生 > >RabbitMq 集成 spring boot 消息隊列 入門Demo

RabbitMq 集成 spring boot 消息隊列 入門Demo

spring boot rabbitmq 入門集成


spring boot 集成 RabbitMq還是很方便的。現在來一個簡單的例子來集成rabbitmq。入門demo。

主要概念:

其中比較重要的概念有 4 個,分別為:虛擬主機,交換機,隊列,和綁定。


虛擬主機:一個虛擬主機持有一組交換機、隊列和綁定。為什麽需要多個虛擬主機呢?很簡單,RabbitMQ當中,用戶只能在虛擬主機的粒度進行權限控制。 因此,如果需要禁止A組訪問B組的交換機/隊列/綁定,必須為A和B分別創建一個虛擬主機。每一個RabbitMQ服務器都有一個默認的虛擬主機“/”。

交換機:Exchange 用於轉發消息,但是它不會做存儲 ,如果沒有 Queue bind 到 Exchange 的話,它會直接丟棄掉 Producer 發送過來的消息。 這裏有一個比較重要的概念:路由鍵 。消息到交換機的時候,交互機會轉發到對應的隊列中,那麽究竟轉發到哪個隊列,就要根據該路由鍵。

綁定:也就是交換機需要和隊列相綁定,這其中如上圖所示,是多對多的關系。


首先是配制文件。

#spring.application.name=spring-boot-rabbitmq
spring.rabbitmq.host=127.0.0.1
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest


發送者:

package com.basic.rabbitmq.send;

import com.basic.rabbitmq.configuration.RabbitMqConfig2;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import org.springframework.stereotype.Service;

import java.util.Date;

/**
 * Created by sdc on 2017/6/17.
 */
@Service("helloSender")
public class HelloSender {

    @Autowired
    private AmqpTemplate amqpTemplate;

//    private Rabbitt

    public void send() {
        String contenxt = "order_queue_message";
        this.amqpTemplate.convertAndSend(RabbitMqConfig2.QUEUE_EXCHANGE_NAME,"order_queue_routing",contenxt);
//        this.amqpTemplate.conver
    }

}


配制信息:

package com.basic.rabbitmq.configuration;

import com.basic.rabbitmq.receiver.Receiver;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.ConfirmListener;
import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.ChannelAwareMessageListener;
import org.springframework.amqp.rabbit.listener.MessageListenerContainer;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import org.springframework.amqp.rabbit.listener.adapter.MessageListenerAdapter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.io.IOException;

/**
 * Created by sdc on 2017/6/17.
 */
@Configuration
public class RabbitMqConfig2 {

    public static final String QUEUE_NAME = "order_queue";

    public static final String QUEUE_EXCHANGE_NAME = "topic_exchange_new";

    public static final  String routing_key = "order_queue_routing";

    @Bean
    public Queue queue() {
        //是否持久化
        boolean durable = false;
        //僅創建者可以使用該隊列,斷開後自動刪除
        boolean exclusive = false;
        //當所有消費者都斷開連接後,是否刪除隊列
        boolean autoDelete = false;
        return new Queue(QUEUE_NAME, durable, exclusive, autoDelete);
    }

    @Bean
    public TopicExchange exchange() {
        //是否持久化
        boolean durable = false;
        //當所有消費者都斷開連接後,是否刪除隊列
        boolean autoDelete = false;
        return  new TopicExchange(QUEUE_EXCHANGE_NAME, durable, autoDelete);
    }

    @Bean
    public Binding binding() {
        return BindingBuilder.bind(queue()).to(exchange()).with(routing_key);
    }

    @Bean
    public ConnectionFactory connectionFactory() {
        CachingConnectionFactory connectionFactory = new CachingConnectionFactory("127.0.0.1",5672);

        connectionFactory.setUsername("admin");
        connectionFactory.setPassword("admin");
        connectionFactory.setVirtualHost("/");
        /** 如果要進行消息回調,則這裏必須要設置為true */
        connectionFactory.setPublisherConfirms(true); // 必須要設置
//        connectionFactory.setPublisherReturns();
        return connectionFactory;
    }

    @Bean
    SimpleMessageListenerContainer container() {

        SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
        container.setConnectionFactory(connectionFactory());
//        container.setQueueNames(QUEUE_NAME);
        container.setQueues(queue());
        container.setAcknowledgeMode(AcknowledgeMode.MANUAL); //設置確認模式手工確認
        container.setMessageListener(new ChannelAwareMessageListener() {
            @Override
            public void onMessage(Message message, Channel channel) throws Exception {
                byte[] body = message.getBody();
                System.out.println("收到消息 : " + new String(body));
                channel.queueDeclare(QUEUE_NAME, true, false, false, null);
//                channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); //確認消息成功消費
//                channel.basicAck(); //應答
//                channel.basicReject();//拒絕
//                channel.basicRecover(); //恢復
//                channel.basicQos();
//                channel.addConfirmListener(new ConfirmListener() {
//                    @Override
//                    public void handleAck(long deliveryTag, boolean multiple) throws IOException {
//                        //失敗重發
//                    }
//
//                    @Override
//                    public void handleNack(long deliveryTag, boolean multiple) throws IOException {
//                        //確認ok
//                    }
//                });
            }
        });
        return  container;
    }

//    @Bean
//    MessageListenerAdapter listenerAdapter(Receiver receiver) {
//        return new MessageListenerAdapter(receiver, "receiveMessage");
//    }

}


測試類:

package com.rabbit.test;

import com.basic.rabbitmq.send.HelloSender;
import com.basic.system.Application;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;

import javax.annotation.Resource;

/**
 * Created by sdc on 2017/6/17.
 */
@RunWith(SpringRunner.class)
@SpringBootTest(classes = Application.class)
public class RabbitMqTest {

    @Autowired
    public HelloSender helloSender;

    @Test
    public void helloword() throws  Exception {
        helloSender.send();
    }


}


這只是一個demo,學習的時候會測試各種的事情,在這基礎上更改就可以了,心中的疑慮測試沒了就可以寫一些項目了。

本文出自 “10093778” 博客,請務必保留此出處http://10103778.blog.51cto.com/10093778/1944218

RabbitMq 集成 spring boot 消息隊列 入門Demo