1. 程式人生 > >SpringBoot(十三):SpringBoot整合RabbitMQ

SpringBoot(十三):SpringBoot整合RabbitMQ

如果對RabbitMQ不熟悉的,建議先看RabbitMQ系列教程。

一、環境準備

  • RabbitMQ 3.7.4
  • SpringBoot 1.5.10.RELEASE
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>

application.yml

spring:
  rabbitmq:
    host: 192.168.239.128
    port: 5672
#    username: test
#    password: 123456
#    virtual-host: /vhost_test
#    publisher-confirms: true

二、簡單佇列

image

RabbitMQConfiguration.java 宣告佇列

package cn.saytime.config;

import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import
org.springframework.context.annotation.Configuration; @Configuration public class RabbitMQConfiguration { private static final String QUEUE_SIMPLE_NAME = "test_simple_queue"; @Bean public Queue queue(){ return new Queue(QUEUE_SIMPLE_NAME, false, false, false, null); } }

消費者

package cn.saytime.listener.simple;

import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

/**
 * 消費者
 */
@RabbitListener(queues = "test_simple_queue")
@Component
public class SimpleRecv {

    @RabbitHandler
    public void process(String message) {
        System.out.println("[x] rev : " + message);
    }

}

測試生產者

package cn.saytime;

import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;

@RunWith(SpringRunner.class)
@SpringBootTest
public class RabbitMQApplicationTests {

    @Autowired
    private AmqpTemplate amqpTemplate;

    @Test
    public void testSimpleQueue() {
        String message = "Hello RabbitMQ !";
        amqpTemplate.convertAndSend("test_simple_queue", message);
        System.out.println("[x] send " + message + " ok");
    }

}

執行test:

[x] send Hello RabbitMQ ! ok
[x] rev : Hello RabbitMQ !

三、工作佇列

image

However, “Fair dispatch” is the default configuration for spring-amqp

公平分發模式在Spring-amqp中是預設的,這種情況也是日常工作中使用最為正常的,輪詢模式用的較少,區別在於prefetch預設是1,如果設定為0就是輪詢模式。

3.1 公平分發模式

RabbitMQConfiguration.java 宣告佇列

package cn.saytime.config;

import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class RabbitMQConfiguration {

    private static final String QUEUE_WORK_NAME = "test_work_queue";

    @Bean
    public Queue workQueue(){
        return new Queue(QUEUE_WORK_NAME, false, false, false, null);
    }

}

消費者1

package cn.saytime.listener.workfair;

import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

/**
 * 消費者
 */
@RabbitListener(queues = "test_workfair_queue")
@Component
public class WorkRecv {

    @RabbitHandler
    public void process(String message){
        System.out.println("[1] rev : " + message);
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

消費者2

package cn.saytime.listener.workfair;

import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

/**
 * 消費者
 */
@RabbitListener(queues = "test_workfair_queue")
@Component
public class WorkRecv2 {

    @RabbitHandler
    public void process(String message){
        System.out.println("[2] rev : " + message);
        try {
            Thread.sleep(2000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

測試生產者

package cn.saytime;

import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;

@RunWith(SpringRunner.class)
@SpringBootTest
public class RabbitMQApplicationTests {

    @Autowired
    private AmqpTemplate amqpTemplate;

    @Test
    public void testWorkFairQueue(){
        for (int i = 0; i < 20; i++) {
            String message = "Hello RabbitMQ " + i;
            // 傳送訊息
            amqpTemplate.convertAndSend("test_workfair_queue", message);
            System.out.println(" [x] Sent '" + message + "'");
            try {
                Thread.sleep(i*100);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

}

執行test:

 [x] Sent 'Hello RabbitMQ 0'
 [x] Sent 'Hello RabbitMQ 1'
[2] rev : Hello RabbitMQ 1
[1] rev : Hello RabbitMQ 0
 [x] Sent 'Hello RabbitMQ 2'
 [x] Sent 'Hello RabbitMQ 3'
 [x] Sent 'Hello RabbitMQ 4'
 [x] Sent 'Hello RabbitMQ 5'
[1] rev : Hello RabbitMQ 2
 [x] Sent 'Hello RabbitMQ 6'
[2] rev : Hello RabbitMQ 3
[1] rev : Hello RabbitMQ 4
 [x] Sent 'Hello RabbitMQ 7'
 [x] Sent 'Hello RabbitMQ 8'
[1] rev : Hello RabbitMQ 5
 [x] Sent 'Hello RabbitMQ 9'
[2] rev : Hello RabbitMQ 6
[1] rev : Hello RabbitMQ 7
 [x] Sent 'Hello RabbitMQ 10'
[1] rev : Hello RabbitMQ 8
 [x] Sent 'Hello RabbitMQ 11'
[2] rev : Hello RabbitMQ 9
[1] rev : Hello RabbitMQ 10
 [x] Sent 'Hello RabbitMQ 12'
[1] rev : Hello RabbitMQ 11
 [x] Sent 'Hello RabbitMQ 13'
[2] rev : Hello RabbitMQ 12
[1] rev : Hello RabbitMQ 13
 [x] Sent 'Hello RabbitMQ 14'
[1] rev : Hello RabbitMQ 14
 [x] Sent 'Hello RabbitMQ 15'
[1] rev : Hello RabbitMQ 15
 [x] Sent 'Hello RabbitMQ 16'
[2] rev : Hello RabbitMQ 16
 [x] Sent 'Hello RabbitMQ 17'
[1] rev : Hello RabbitMQ 17
 [x] Sent 'Hello RabbitMQ 18'
[2] rev : Hello RabbitMQ 18
 [x] Sent 'Hello RabbitMQ 19'
[1] rev : Hello RabbitMQ 19

公平分發模式測試正常。

3.2 輪詢分發模式

修改application.yml

spring:
  rabbitmq:
    host: 192.168.239.128
    port: 5672
#    username: test
#    password: 123456
#    virtual-host: /vhost_test
#    publisher-confirms: true
     listener:
      simple:
        prefetch: 0 
 [x] Sent 'Hello RabbitMQ 0'
 [x] Sent 'Hello RabbitMQ 1'
[1] rev : Hello RabbitMQ 0
[2] rev : Hello RabbitMQ 1
 [x] Sent 'Hello RabbitMQ 2'
 [x] Sent 'Hello RabbitMQ 3'
 [x] Sent 'Hello RabbitMQ 4'
 [x] Sent 'Hello RabbitMQ 5'
[1] rev : Hello RabbitMQ 2
 [x] Sent 'Hello RabbitMQ 6'
[2] rev : Hello RabbitMQ 3
[1] rev : Hello RabbitMQ 4
 [x] Sent 'Hello RabbitMQ 7'
 [x] Sent 'Hello RabbitMQ 8'
[1] rev : Hello RabbitMQ 5
 [x] Sent 'Hello RabbitMQ 9'
[2] rev : Hello RabbitMQ 6
[1] rev : Hello RabbitMQ 7
 [x] Sent 'Hello RabbitMQ 10'
[1] rev : Hello RabbitMQ 8
 [x] Sent 'Hello RabbitMQ 11'
[2] rev : Hello RabbitMQ 9
[1] rev : Hello RabbitMQ 10
 [x] Sent 'Hello RabbitMQ 12'
[1] rev : Hello RabbitMQ 11
 [x] Sent 'Hello RabbitMQ 13'
[2] rev : Hello RabbitMQ 12
[1] rev : Hello RabbitMQ 13
 [x] Sent 'Hello RabbitMQ 14'
[1] rev : Hello RabbitMQ 14
 [x] Sent 'Hello RabbitMQ 15'
[1] rev : Hello RabbitMQ 15
 [x] Sent 'Hello RabbitMQ 16'
[2] rev : Hello RabbitMQ 16
 [x] Sent 'Hello RabbitMQ 17'
[1] rev : Hello RabbitMQ 17
 [x] Sent 'Hello RabbitMQ 18'
[2] rev : Hello RabbitMQ 18
 [x] Sent 'Hello RabbitMQ 19'
[1] rev : Hello RabbitMQ 19

可以看到消費者1消費偶數,消費者2消費奇數,表示輪詢分發正常。

四、訂閱模式

image

定義交換機、佇列、以及佇列與交換機的繫結關係。

package cn.saytime.config;

import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class RabbitMQConfiguration {

    private static final String EXCHANGE_FANNOUT_NAME = "test_exchange_fanout";
    private static final String QUEUE_PS_SMS_NAME = "test_queue_fanout_sms";
    private static final String QUEUE_PS_EMAIL_NAME = "test_queue_fanout_email";

    @Bean("fanoutExchange")
    public FanoutExchange fanoutExchange(){
        return new FanoutExchange(EXCHANGE_FANNOUT_NAME);
    }
    @Bean
    public Queue fanoutSmsQueue(){
        return new Queue(QUEUE_PS_SMS_NAME, false, false, false, null);
    }
    @Bean
    public Queue fanoutEmailQueue(){
        return new Queue(QUEUE_PS_EMAIL_NAME, false, false, false, null);
    }
    @Bean
    public Binding smsQueueExchangeBinding(FanoutExchange fanoutExchange, Queue fanoutSmsQueue){
        return BindingBuilder.bind(fanoutSmsQueue).to(fanoutExchange);
    }
    @Bean
    public Binding emailQueueExchangeBinding(FanoutExchange fanoutExchange, Queue fanoutEmailQueue){
        return BindingBuilder.bind(fanoutEmailQueue).to(fanoutExchange);
    }

}

消費者1 sms

package cn.saytime.listener.ps;

import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

/**
 * sms消費者
 */
@RabbitListener(queues = "test_queue_fanout_sms")
@Component
public class SmsRecv {

    @RabbitHandler
    public void process(String message){
        System.out.println("[sms] rev : " + message);
    }

}

消費者2

package cn.saytime.listener.ps;

import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

/**
 * email消費者
 */
@RabbitListener(queues = "test_queue_fanout_email")
@Component
public class EmailRecv {

    @RabbitHandler
    public void process(String message){
        System.out.println("[email] rev : " + message);
    }

}

測試訊息:

package cn.saytime;

import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;

@RunWith(SpringRunner.class)
@SpringBootTest
public class RabbitMQApplicationTests {

    @Autowired
    private AmqpTemplate amqpTemplate;

    @Test
    public void testFanoutQueue() {
        String message = "Hello, fanout message ";
        // 傳送訊息
        amqpTemplate.convertAndSend("test_exchange_fanout", "", message);
        System.out.println(" [x] Sent '" + message + "'");
    }

}

執行test結果:

 [x] Sent 'Hello, fanout message '
[sms] rev : Hello, fanout message 
[email] rev : Hello, fanout message 

五、路由模式

跟訂閱模式一樣,只不過Configuration裡面配置的是DirectExchange,並設定路由鍵。

繫結關係:

BindingBuilder.bind(queue).to(exchange).with("info");

傳送訊息:

amqpTemplate.convertSendAndReceive(exchange, routingkey, message);

六、主題模式

跟路由模式一樣,只不過路由鍵可以模糊匹配,配置的是TopicExchange.