SpringBoot(四)springboot整合ActiveMQ
訊息佇列中介軟體是分散式系統中重要的元件,主要解決應用耦合,非同步訊息,流量削鋒等問題。實現高效能,高可用,可伸縮和最終一致性架構。是大型分散式系統不可缺少的中介軟體。訊息形式支援點對點和訂閱-釋出。
ActiveMQ是什麼
- ActiveMQ是訊息佇列技術,為解決高併發問題而生
- ActiveMQ生產者消費者模型(生產者和消費者可以跨平臺、跨系統)
- ActiveMQ支援如下兩種訊息傳輸方式
- 點對點模式,生產者生產了一個訊息,只能由一個消費者進行消費
- 釋出/訂閱模式,生產者生產了一個訊息,可以由多個消費者進行消費
SpringBoot整合ActiveMQ
1. ActiveMQ下載啟動
http://activemq.apache.org/download-archives.html ,本文用的是windows版的5.15.3版本,下載下來是壓縮包,自行解壓一個到目錄下,CMD進入到解壓目錄下的bin目錄下,執行 activemq.bat start 啟動。 如果能成功訪問http://localhost:8161/admin(使用者名稱和密碼預設為admin),則啟動成功。
2. 建立兩個個springboot專案,分別作為訊息提供者(provider)和消費者(consumer),新增依賴
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-activemq</artifactId> </dependency> <!--訊息佇列連線池--> <dependency> <groupId>org.apache.activemq</groupId> <artifactId>activemq-pool</artifactId> <version>5.15.0</version> </dependency>
3. 在兩個專案中的application.properires配置訊息佇列,並在啟動類新增@EnableJms開啟訊息佇列。
application.properties
# failover:(tcp://localhost:61616,tcp://localhost:61617)
# tcp://localhost:61616
spring.activemq.broker-url=tcp://localhost:61616
#true 表示使用內建的MQ,false則連線伺服器
spring.activemq.in-memory=false
#true表示使用連線池;false時,每傳送一條資料建立一個連線
spring.activemq.pool.enabled=true
#連線池最大連線數
spring.activemq.pool.max-connections=10
#空閒的連線過期時間,預設為30秒
spring.activemq.pool.idle-timeout=30000
#強制的連線過期時間,與idleTimeout的區別在於:idleTimeout是在連線空閒一段時間失效,而expiryTimeout不管當前連線的情況,只要達到指定時間就失效。預設為0,never
spring.activemq.pool.expiry-timeout=0
啟動類(provider),consumer同樣
@SpringBootApplication
@EnableJms //啟動訊息佇列
public class ProviderApplication {
public static void main(String[] args) {
SpringApplication.run(ProviderApplication.class, args);
}
}
4. prover專案結構圖
BeanConfig定義訊息佇列
import javax.jms.Queue;
import org.apache.activemq.command.ActiveMQQueue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/*
* @author uv
* @date 2018/9/15 14:21
*/
@Configuration
public class BeanConfig {
//定義存放訊息的佇列
@Bean
public Queue queue() {
return new ActiveMQQueue("ActiveMQQueue");
}
}
ProviderController
import javax.jms.Queue;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jms.core.JmsMessagingTemplate;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
/*
* @author uv
* @date 2018/9/15 14:54
*
*/
@RestController
public class ProviderController {
//注入存放訊息的佇列,用於下列方法一
@Autowired
private Queue queue;
//注入springboot封裝的工具類
@Autowired
private JmsMessagingTemplate jmsMessagingTemplate;
@RequestMapping("send")
public void send(String name) {
//方法一:新增訊息到訊息佇列
jmsMessagingTemplate.convertAndSend(queue, name);
//方法二:這種方式不需要手動建立queue,系統會自行建立名為test的佇列
//jmsMessagingTemplate.convertAndSend("test", name);
}
}
5. 啟動provider,向訊息佇列新增資料,本次新增5條資料
- Number Of Pending Messages:訊息佇列中待處理的訊息
- Number Of Consumers:消費者的數量
- Messages Enqueued:累計進入過訊息佇列的總量
- Messages Dequeued:累計消費過的訊息總量
6. consumer專案結構圖
application.properties 和 ConsumerApplication 同 provider類似,如下為不同的ConsumerService:
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jms.annotation.JmsListener;
import org.springframework.jms.core.JmsMessagingTemplate;
import org.springframework.messaging.handler.annotation.SendTo;
import org.springframework.stereotype.Component;
/*
* @author uv
* @date 2018/9/15 18:36
*
*/
@Component
public class ConsumerService {
@Autowired
private JmsMessagingTemplate jmsMessagingTemplate;
// 使用JmsListener配置消費者監聽的佇列,其中name是接收到的訊息
@JmsListener(destination = "ActiveMQQueue")
// SendTo 會將此方法返回的資料, 寫入到 OutQueue 中去.
@SendTo("SQueue")
public String handleMessage(String name) {
System.out.println("成功接受Name" + name);
return "成功接受Name" + name;
}
}
7. 啟動consumer,控制檯輸出如下
訊息接收成功,檢視 http://localhost:8161/admin/queues.jsp ,如下圖所示,訊息佇列中不再有未處理的訊息,由於consumer的啟動,消費者的數量為1,Messages Dequeued(累計消費過的訊息總量)的數值也變成了5;另外消費者接收到5條訊息處理後,返回到OutQueue 5條訊息,下圖可以看出來。
8. ActiveMQ的持久化