1. 程式人生 > >簡單記錄一下springboot+rabbitmq整合

簡單記錄一下springboot+rabbitmq整合

1.Windows下安裝RabbitMQ需要以下幾個步驟

   (1):下載erlang,原因在於RabbitMQ服務端程式碼是使用併發式語言erlang編寫的,下載地址:http://www.erlang.org/downloads,雙擊.exe檔案進行安裝就好一路下一步下一步這裡就不截圖了,然後安裝完成之後建立一個名為ERLANG_HOME的環境變數,其值指向erlang的安裝目錄,同時將%ERLANG_HOME%\bin加入到Path中,最後開啟命令列,輸入erl,如果出現erlang的版本資訊就表示erlang語言環境安裝成功

   (2):下載RabbitMQ,下載地址:http://www.rabbitmq.com/,同樣雙擊.exe進行安裝就好(這裡需要注意一點,預設的安裝目錄是C:/Program Files/....,這個目錄中是存在空格符的,我們需要改變安裝目錄,RabbitMQ安裝目錄中是不允許有空格的);

   (3):安裝RabbitMQ-Plugins,這個相當於是一個管理介面,方便我們在瀏覽器介面檢視RabbitMQ各個訊息佇列以及exchange的工作情況,安裝方法是:開啟命令列cd進入rabbitmq的sbin目錄(我的目錄是:E:\software\rabbitmq\rabbitmq_server-3.6.5\sbin),輸入:rabbitmq-plugins enable rabbitmq_management命令,稍等會會發現出現plugins安裝成功的提示,預設是安裝6個外掛,如果你在安裝外掛的過程中出現了下面的錯誤:        


然後繼續雙擊  rabbitmq-server.bat 啟動    

然後訪問管理介面如下: 我這裡省略掉了登陸 輸入guest  guest預設賬號登陸

繼續省略N多行rabbitmq介紹.  如果不懂可百度 檢視簡介  

新建兩個springboot 專案  一個訊息生產  一個訊息消費

訊息生成 A

spring boot 引入相關依賴

<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-amqp</artifactId>
		</dependency>

訊息生產者

application.properties新增一下配置  這裡需要注意的是 rabbitmq 預設埠5672   ,15672是訪問管理介面的埠

還有使用者名稱密碼 需要去管理介面新建 使用者不然會提示無許可權呼叫,guest 預設只能訪問管理介面

spring.rabbitmq.host=192.168.1.107
spring.rabbitmq.port=5672
spring.rabbitmq.username=test
spring.rabbitmq.password=test123
spring.rabbitmq.publisher-confirms=true
spring.rabbitmq.publisher-returns=true
spring.rabbitmq.template.mandatory=true

spring boot配置類,作用為指定佇列,交換器型別及繫結操作

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class RabbitConfig {
    
    //宣告佇列
    @Bean
    public Queue queue1() {
        return new Queue("hello.queue1", true); // true表示持久化該佇列
    }
    
    @Bean
    public Queue queue2() {
        return new Queue("hello.queue2", true);
    }
    
    //宣告互動器
    @Bean
    TopicExchange topicExchange() {
        return new TopicExchange("topicExchange");
    }

    //繫結
    @Bean
    public Binding binding1() {
        return BindingBuilder.bind(queue1()).to(topicExchange()).with("key.1");
    }
    
    @Bean
    public Binding binding2() {
        return BindingBuilder.bind(queue2()).to(topicExchange()).with("key.#");
    }
    
}

共聲明瞭2個佇列,分別是hello.queue1,hello.queue2,交換器型別為TopicExchange,並與hello.queue1,hello.queue2佇列分別繫結。

生產者類

import java.util.UUID;

import javax.annotation.PostConstruct;

import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.core.RabbitTemplate.ReturnCallback;
import org.springframework.amqp.rabbit.support.CorrelationData;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

@Component
public class Sender implements RabbitTemplate.ConfirmCallback, ReturnCallback {
    
    @Autowired
    private RabbitTemplate rabbitTemplate;
    
    @PostConstruct
    public void init() {
        rabbitTemplate.setConfirmCallback(this);
        rabbitTemplate.setReturnCallback(this);
    }

    @Override
    public void confirm(CorrelationData correlationData, boolean ack, String cause) {
        if (ack) {  
            System.out.println("訊息傳送成功:" + correlationData);  
        } else {  
            System.out.println("訊息傳送失敗:" + cause);  
        }  
        
    }

    @Override
    public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
        System.out.println(message.getMessageProperties().getCorrelationIdString() + " 傳送失敗");
        
    }

    //傳送訊息,不需要實現任何介面,供外部呼叫。
    public void send(String msg){
        
        CorrelationData correlationId = new CorrelationData(UUID.randomUUID().toString());
        
        System.out.println("開始傳送訊息 : " + msg.toLowerCase());
        String response = rabbitTemplate.convertSendAndReceive("topicExchange", "key.1", msg, correlationId).toString();
        System.out.println("結束髮送訊息 : " + msg.toLowerCase());
        System.out.println("消費者響應 : " + response + " 訊息處理完成");
    }
}

要點:

1.注入RabbitTemplate

2.實現RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnCallback介面(非必須)。
ConfirmCallback介面用於實現訊息傳送到RabbitMQ交換器後接收ack回撥。ReturnCallback介面用於實現訊息傳送到RabbitMQ交換器,但無相應佇列與交換器繫結時的回撥。

3.實現訊息傳送方法。呼叫rabbitTemplate相應的方法即可。rabbitTemplate常用傳送方法有

rabbitTemplate.send(message);   //發訊息,引數型別為org.springframework.amqp.core.Message
rabbitTemplate.convertAndSend(object); //轉換併發送訊息。 將引數物件轉換為org.springframework.amqp.core.Message後傳送
rabbitTemplate.convertSendAndReceive(message) //轉換併發送訊息,且等待訊息者返回響應訊息。   

針對業務場景選擇合適的訊息傳送方式即可。

然後是訊息消費 B

application.properties新增一下配置

spring.rabbitmq.host=192.168.1.107
spring.rabbitmq.port=5672
spring.rabbitmq.username=test
spring.rabbitmq.password=test123

spring.rabbitmq.listener.concurrency=2  //最小訊息監聽執行緒數
spring.rabbitmq.listener.max-concurrency=2 //最大訊息監聽執行緒數

消費者類

import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
public class Receiver {
    
    @RabbitListener(queues = "hello.queue1")
    public String processMessage1(String msg) {
        System.out.println(Thread.currentThread().getName() + " 接收到來自hello.queue1佇列的訊息:" + msg);
        return msg.toUpperCase();
    }
    
    @RabbitListener(queues = "hello.queue2")
    public void processMessage2(String msg) {
        System.out.println(Thread.currentThread().getName() + " 接收到來自hello.queue2佇列的訊息:" + msg);
    }
}

由於定義了2個佇列,所以分別定義不同的監聽器監聽不同的佇列。由於最小訊息監聽執行緒數和最大訊息監聽執行緒數都是2,所以每個監聽器各有2個執行緒實現監聽功能。

要點:

1.監聽器引數型別與訊息實際型別匹配。在生產者中傳送的訊息實際型別是String,所以這裡監聽器引數型別也是String。

2.如果監聽器需要有響應返回給生產者,直接在監聽方法中return即可。

執行測試

import java.util.Date;

import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;

import com.sam.demo.rabbitmq.Application;
import com.sam.demo.rabbitmq.sender.Sender;

@RunWith(value=SpringJUnit4ClassRunner.class)
@SpringBootTest(classes = Application.class)
public class RabbitTests {
    
    @Autowired
    private Sender sender;
    
    @Test
    public void sendTest() throws Exception {
        while(true){
            String msg = new Date().toString();
            sender.send(msg);
            Thread.sleep(1000);
        }
    }
}

輸出:

開始傳送訊息 : wed mar 29 23:20:52 cst 2017
SimpleAsyncTaskExecutor-1 接收到來自hello.queue2佇列的訊息:Wed Mar 29 23:20:52 CST 2017
SimpleAsyncTaskExecutor-2 接收到來自hello.queue1佇列的訊息:Wed Mar 29 23:20:52 CST 2017
結束髮送訊息 : wed mar 29 23:20:52 cst 2017
消費者響應 : WED MAR 29 23:20:52 CST 2017 訊息處理完成
------------------------------------------------
訊息傳送成功:CorrelationData [id=340d14e6-cfcc-4653-9f95-29b37d50f886]
開始傳送訊息 : wed mar 29 23:20:53 cst 2017
SimpleAsyncTaskExecutor-1 接收到來自hello.queue1佇列的訊息:Wed Mar 29 23:20:53 CST 2017
SimpleAsyncTaskExecutor-2 接收到來自hello.queue2佇列的訊息:Wed Mar 29 23:20:53 CST 2017
結束髮送訊息 : wed mar 29 23:20:53 cst 2017
消費者響應 : WED MAR 29 23:20:53 CST 2017 訊息處理完成
------------------------------------------------
訊息傳送成功:CorrelationData [id=e4e01f89-d0d4-405e-80f0-85bb20238f34]
開始傳送訊息 : wed mar 29 23:20:54 cst 2017
SimpleAsyncTaskExecutor-2 接收到來自hello.queue1佇列的訊息:Wed Mar 29 23:20:54 CST 2017
SimpleAsyncTaskExecutor-1 接收到來自hello.queue2佇列的訊息:Wed Mar 29 23:20:54 CST 2017
結束髮送訊息 : wed mar 29 23:20:54 cst 2017
消費者響應 : WED MAR 29 23:20:54 CST 2017 訊息處理完成
------------------------------------------------

如果需要使用的其他的交換器型別,spring中都已提供實現,所有的交換器均實現org.springframework.amqp.core.AbstractExchange介面。

常用交換器型別如下:

Direct(DirectExchange):direct 型別的行為是"先匹配, 再投送". 即在繫結時設定一個 routing_key, 訊息的routing_key完全匹配時, 才會被交換器投送到繫結的佇列中去。

Topic(TopicExchange):按規則轉發訊息(最靈活)。

Headers(HeadersExchange):設定header attribute引數型別的交換機。

Fanout(FanoutExchange):轉發訊息到所有繫結佇列。