簡單記錄一下springboot+rabbitmq整合
1.Windows下安裝RabbitMQ需要以下幾個步驟
(1):下載erlang,原因在於RabbitMQ服務端程式碼是使用併發式語言erlang編寫的,下載地址:http://www.erlang.org/downloads,雙擊.exe檔案進行安裝就好一路下一步下一步這裡就不截圖了,然後安裝完成之後建立一個名為ERLANG_HOME的環境變數,其值指向erlang的安裝目錄,同時將%ERLANG_HOME%\bin加入到Path中,最後開啟命令列,輸入erl,如果出現erlang的版本資訊就表示erlang語言環境安裝成功
(2):下載RabbitMQ,下載地址:http://www.rabbitmq.com/,同樣雙擊.exe進行安裝就好(這裡需要注意一點,預設的安裝目錄是C:/Program Files/....,這個目錄中是存在空格符的,我們需要改變安裝目錄,RabbitMQ安裝目錄中是不允許有空格的);
(3):安裝RabbitMQ-Plugins,這個相當於是一個管理介面,方便我們在瀏覽器介面檢視RabbitMQ各個訊息佇列以及exchange的工作情況,安裝方法是:開啟命令列cd進入rabbitmq的sbin目錄(我的目錄是:E:\software\rabbitmq\rabbitmq_server-3.6.5\sbin),輸入:rabbitmq-plugins enable rabbitmq_management命令,稍等會會發現出現plugins安裝成功的提示,預設是安裝6個外掛,如果你在安裝外掛的過程中出現了下面的錯誤:
然後繼續雙擊 rabbitmq-server.bat 啟動
然後訪問管理介面如下: 我這裡省略掉了登陸 輸入guest guest預設賬號登陸
繼續省略N多行rabbitmq介紹. 如果不懂可百度 檢視簡介
新建兩個springboot 專案 一個訊息生產 一個訊息消費
訊息生成 A
spring boot 引入相關依賴
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency>
訊息生產者
application.properties新增一下配置 這裡需要注意的是 rabbitmq 預設埠5672 ,15672是訪問管理介面的埠
還有使用者名稱密碼 需要去管理介面新建 使用者不然會提示無許可權呼叫,guest 預設只能訪問管理介面
spring.rabbitmq.host=192.168.1.107
spring.rabbitmq.port=5672
spring.rabbitmq.username=test
spring.rabbitmq.password=test123
spring.rabbitmq.publisher-confirms=true
spring.rabbitmq.publisher-returns=true
spring.rabbitmq.template.mandatory=true
spring boot配置類,作用為指定佇列,交換器型別及繫結操作
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class RabbitConfig {
//宣告佇列
@Bean
public Queue queue1() {
return new Queue("hello.queue1", true); // true表示持久化該佇列
}
@Bean
public Queue queue2() {
return new Queue("hello.queue2", true);
}
//宣告互動器
@Bean
TopicExchange topicExchange() {
return new TopicExchange("topicExchange");
}
//繫結
@Bean
public Binding binding1() {
return BindingBuilder.bind(queue1()).to(topicExchange()).with("key.1");
}
@Bean
public Binding binding2() {
return BindingBuilder.bind(queue2()).to(topicExchange()).with("key.#");
}
}
共聲明瞭2個佇列,分別是hello.queue1,hello.queue2,交換器型別為TopicExchange,並與hello.queue1,hello.queue2佇列分別繫結。
生產者類
import java.util.UUID;
import javax.annotation.PostConstruct;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.core.RabbitTemplate.ReturnCallback;
import org.springframework.amqp.rabbit.support.CorrelationData;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
@Component
public class Sender implements RabbitTemplate.ConfirmCallback, ReturnCallback {
@Autowired
private RabbitTemplate rabbitTemplate;
@PostConstruct
public void init() {
rabbitTemplate.setConfirmCallback(this);
rabbitTemplate.setReturnCallback(this);
}
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
if (ack) {
System.out.println("訊息傳送成功:" + correlationData);
} else {
System.out.println("訊息傳送失敗:" + cause);
}
}
@Override
public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
System.out.println(message.getMessageProperties().getCorrelationIdString() + " 傳送失敗");
}
//傳送訊息,不需要實現任何介面,供外部呼叫。
public void send(String msg){
CorrelationData correlationId = new CorrelationData(UUID.randomUUID().toString());
System.out.println("開始傳送訊息 : " + msg.toLowerCase());
String response = rabbitTemplate.convertSendAndReceive("topicExchange", "key.1", msg, correlationId).toString();
System.out.println("結束髮送訊息 : " + msg.toLowerCase());
System.out.println("消費者響應 : " + response + " 訊息處理完成");
}
}
要點:
1.注入RabbitTemplate
2.實現RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnCallback介面(非必須)。
ConfirmCallback介面用於實現訊息傳送到RabbitMQ交換器後接收ack回撥。ReturnCallback介面用於實現訊息傳送到RabbitMQ交換器,但無相應佇列與交換器繫結時的回撥。
3.實現訊息傳送方法。呼叫rabbitTemplate相應的方法即可。rabbitTemplate常用傳送方法有
rabbitTemplate.send(message); //發訊息,引數型別為org.springframework.amqp.core.Message
rabbitTemplate.convertAndSend(object); //轉換併發送訊息。 將引數物件轉換為org.springframework.amqp.core.Message後傳送
rabbitTemplate.convertSendAndReceive(message) //轉換併發送訊息,且等待訊息者返回響應訊息。
針對業務場景選擇合適的訊息傳送方式即可。
然後是訊息消費 B
application.properties新增一下配置
spring.rabbitmq.host=192.168.1.107
spring.rabbitmq.port=5672
spring.rabbitmq.username=test
spring.rabbitmq.password=test123
spring.rabbitmq.listener.concurrency=2 //最小訊息監聽執行緒數
spring.rabbitmq.listener.max-concurrency=2 //最大訊息監聽執行緒數
消費者類
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component
public class Receiver {
@RabbitListener(queues = "hello.queue1")
public String processMessage1(String msg) {
System.out.println(Thread.currentThread().getName() + " 接收到來自hello.queue1佇列的訊息:" + msg);
return msg.toUpperCase();
}
@RabbitListener(queues = "hello.queue2")
public void processMessage2(String msg) {
System.out.println(Thread.currentThread().getName() + " 接收到來自hello.queue2佇列的訊息:" + msg);
}
}
由於定義了2個佇列,所以分別定義不同的監聽器監聽不同的佇列。由於最小訊息監聽執行緒數和最大訊息監聽執行緒數都是2,所以每個監聽器各有2個執行緒實現監聽功能。
要點:
1.監聽器引數型別與訊息實際型別匹配。在生產者中傳送的訊息實際型別是String,所以這裡監聽器引數型別也是String。
2.如果監聽器需要有響應返回給生產者,直接在監聽方法中return即可。
執行測試
import java.util.Date;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
import com.sam.demo.rabbitmq.Application;
import com.sam.demo.rabbitmq.sender.Sender;
@RunWith(value=SpringJUnit4ClassRunner.class)
@SpringBootTest(classes = Application.class)
public class RabbitTests {
@Autowired
private Sender sender;
@Test
public void sendTest() throws Exception {
while(true){
String msg = new Date().toString();
sender.send(msg);
Thread.sleep(1000);
}
}
}
輸出:
開始傳送訊息 : wed mar 29 23:20:52 cst 2017
SimpleAsyncTaskExecutor-1 接收到來自hello.queue2佇列的訊息:Wed Mar 29 23:20:52 CST 2017
SimpleAsyncTaskExecutor-2 接收到來自hello.queue1佇列的訊息:Wed Mar 29 23:20:52 CST 2017
結束髮送訊息 : wed mar 29 23:20:52 cst 2017
消費者響應 : WED MAR 29 23:20:52 CST 2017 訊息處理完成
------------------------------------------------
訊息傳送成功:CorrelationData [id=340d14e6-cfcc-4653-9f95-29b37d50f886]
開始傳送訊息 : wed mar 29 23:20:53 cst 2017
SimpleAsyncTaskExecutor-1 接收到來自hello.queue1佇列的訊息:Wed Mar 29 23:20:53 CST 2017
SimpleAsyncTaskExecutor-2 接收到來自hello.queue2佇列的訊息:Wed Mar 29 23:20:53 CST 2017
結束髮送訊息 : wed mar 29 23:20:53 cst 2017
消費者響應 : WED MAR 29 23:20:53 CST 2017 訊息處理完成
------------------------------------------------
訊息傳送成功:CorrelationData [id=e4e01f89-d0d4-405e-80f0-85bb20238f34]
開始傳送訊息 : wed mar 29 23:20:54 cst 2017
SimpleAsyncTaskExecutor-2 接收到來自hello.queue1佇列的訊息:Wed Mar 29 23:20:54 CST 2017
SimpleAsyncTaskExecutor-1 接收到來自hello.queue2佇列的訊息:Wed Mar 29 23:20:54 CST 2017
結束髮送訊息 : wed mar 29 23:20:54 cst 2017
消費者響應 : WED MAR 29 23:20:54 CST 2017 訊息處理完成
------------------------------------------------
如果需要使用的其他的交換器型別,spring中都已提供實現,所有的交換器均實現org.springframework.amqp.core.AbstractExchange介面。
常用交換器型別如下:
Direct(DirectExchange):direct 型別的行為是"先匹配, 再投送". 即在繫結時設定一個 routing_key, 訊息的routing_key完全匹配時, 才會被交換器投送到繫結的佇列中去。
Topic(TopicExchange):按規則轉發訊息(最靈活)。
Headers(HeadersExchange):設定header attribute引數型別的交換機。
Fanout(FanoutExchange):轉發訊息到所有繫結佇列。