springboot2.x簡單詳細教程--訊息佇列介紹及整合ActiveMQ (第十三章)
阿新 • • 發佈:2018-11-24
一、JMS介紹和使用場景及基礎程式設計模型
簡介:講解什麼是小寫佇列,JMS的基礎知識和使用場景
1、什麼是JMS: Java訊息服務(Java Message Service),Java平臺中關於面向訊息中介軟體的介面2、JMS是一種與廠商無關的 API,用來訪問訊息收發系統訊息,它類似於JDBC(Java Database Connectivity)。這裡,JDBC 是可以用來訪問許多不同關係資料庫的 API
3、使用場景:
1)跨平臺
2)多語言
3)多專案
4)解耦
5)分散式事務(業務涉及多個表比如:轉賬)
6)流量控制(訪問量)
7)最終一致性
8)RPC呼叫
上下游對接,資料來源變動->通知下屬
4.案例分析
1)網購支付流程:不用訊息中介軟體服務(使用使用者流量少的情況)
2)當用戶非常多的時候顯然我們的支付閘道器容易崩潰,同時支付閘道器之後的系統等待時間可能過長,所以需要用到訊息服務
分析:支付閘道器將資訊放到訊息服務之後 ,不需要等待積分或是訂單系統的執行,提高了系統的併發性和承受的流量(積分訂單系統訂閱訊息)
5、概念
JMS提供者:Apache ActiveMQ、RabbitMQ、Kafka、Notify、MetaQ、RocketMQ
JMS生產者(Message Producer)
JMS消費者(Message Consumer)
JMS訊息
JMS佇列
JMS主題
JMS訊息通常有兩種型別:點對點(Point-to-Point)、釋出/訂閱(Publish/Subscribe)
3)訊息點對點型別(單個系統)
4)釋出/訂閱(Publish/Subscribe)
微信支付就是這種模式
6、程式設計模型
MQ中需要用的一些類
ConnectionFactory :連線工廠,JMS 用它建立連線
Connection :JMS 客戶端到JMS Provider 的連線
Session: 一個傳送或接收訊息的執行緒
Destination :訊息的目的地;訊息傳送給誰.
MessageConsumer / MessageProducer: 訊息接收者,消費者
二、ActiveMQ5.x訊息佇列基礎介紹和安裝
簡介:介紹ActiveMQ5.x訊息佇列基礎特性和本地快速安裝
特點:
1)支援來自Java,C,C ++,C#,Ruby,Perl,Python,PHP的各種跨語言客戶端和協議
2)支援許多高階功能,如訊息組,虛擬目標,萬用字元和複合目標
3) 完全支援JMS 1.1和J2EE 1.4,支援瞬態,持久,事務和XA訊息
4) Spring支援,ActiveMQ可以輕鬆嵌入到Spring應用程式中,並使用Spring的XML配置機制進行配置
5) 支援在流行的J2EE伺服器(如TomEE,Geronimo,JBoss,GlassFish和WebLogic)中進行測試
6) 使用JDBC和高效能日誌支援非常快速的持久化(重要訊息放到mysql或是日誌作為備份)
...
1、下載地址:http://activemq.apache.org/activemq-5153-release.html
2、快速開始:http://activemq.apache.org/getting-started.html
3、如果我們是32位的機器,就雙擊win32目錄下的activemq.bat,如果是64位機器,則雙擊win64目錄下的activemq.bat
4、bin目錄裡面啟動 選擇對應的系統版本和位數,activeMQ start 啟動
5、啟動後訪問路徑http:// 127.0.0.1:8161/
127.0.0.1是activeMq安裝的伺服器,預設埠號8161
6、使用者名稱和密碼預設都是admin驗證這是訊息服務的保護措施
登入後的頁面:ActiveMQ控制檯
1)Home | Queues | Topics | Subscribers | Connections | Network | Scheduled | Send
Support
主頁| 佇列| 主題| 訂閱者| 連線| 網路| 預定| 傳送
支援
2)Queues建立
面板說明:
Name:佇列名稱。
Number Of Pending Messages:等待消費的訊息個數。
Number Of Consumers:當前連線的消費者數目
Messages Enqueued:進入佇列的訊息總個數,包括出佇列的和待消費的,這個數量只增不減。
Messages Dequeued:已經消費的訊息數量。
3)向訊息佇列傳送訊息
結果
7、官方案例集合
https://github.com/spring-projects/spring-boot/tree/master/spring-boot-samples
三、SpringBoot2.x整合ActiveMQ實戰之點對點訊息(p2p)
訊息 消費者和訊息生產者可以在一個工程也可以在不同的工程
簡介:SpringBoot2.x整合ActiveMQ實戰之點對點訊息
1、官網地址:https://docs.spring.io/spring-boot/docs/2.1.0.BUILD-SNAPSHOT/reference/htmlsingle/#boot-features-activemq
2、加入依賴
<!-- 整合訊息佇列ActiveMQ -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-activemq</artifactId>
</dependency>
<!-- 如果配置執行緒池則加入 -->
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-pool</artifactId>
</dependency>
3、application.properties配置檔案配置
#整合jms測試,安裝在別的機器,防火牆和埠號記得開放
spring.activemq.broker-url=tcp://127.0.0.1:61616
#叢集配置,61616:開啟對外連線的預設埠號
#spring.activemq.broker-url=failover:(tcp://localhost:61616,tcp://localhost:61617)
spring.activemq.user=admin
spring.activemq.password=admin
#下列配置要增加依賴:訊息佇列連線池 max-connections,50,100,500根據需要寫
spring.activemq.pool.enabled=true
spring.activemq.pool.max-connections=100
4、springboot啟動類 @EnableJms,開啟支援jms
1)service介面
package com.itcast.demo.service;
import javax.jms.Destination;
/**
*
* <p>Title: ProducerService</p>
* <p>Description: </p>
* @author shenlan
* @date 2018年11月4日
*/
public interface ProducerService {
/**
* 指定訊息佇列還有訊息
* @param destination
* @param message
*/
public void sendMessage(Destination destination,final String message );
/**
*
* 使用預設訊息佇列傳送訊息
* @param message
*/
public void sendMessage(final String message );
}
2)實現
package com.itcast.demo.service.impl;
import javax.jms.Destination;
import javax.jms.Queue;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jms.core.JmsMessagingTemplate;
import org.springframework.stereotype.Service;
import com.itcast.demo.service.ProducerService;
@Service
public class ProducerServiceImpl implements ProducerService{
//用來發送訊息到broker的物件
@Autowired
private JmsMessagingTemplate jmsTemplate ;
@Autowired
private Queue queue;//程式入口有了
//傳送訊息,destination是傳送到的佇列,message是待發送的訊息
@Override
public void sendMessage(Destination destination, String message) {
jmsTemplate.convertAndSend(destination,message);
}
//傳送訊息,destination是傳送到的佇列,message是待發送的訊息
@Override
public void sendMessage(String message) {
jmsTemplate.convertAndSend(this.queue,message);
}
}
3)controller
package com.itcast.demo.controller;
import javax.jms.Destination;
import org.apache.activemq.command.ActiveMQQueue;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import com.itcast.demo.domain.JsonData;
import com.itcast.demo.service.ProducerService;
/**
* 功能描述:模擬微信支付回撥
*
*
*@作者 llj 小D
*/
@RestController
@RequestMapping("/api/v1")
public class OrderController {
@Autowired
private ProducerService producerService;
/**
* 功能描述:微信支付回撥介面
* @param msg 支付資訊
* @return
*/
@GetMapping("order")
public Object order(String msg){//實際開發msg是html需要document解析
Destination destination = new ActiveMQQueue("order.queue");
producerService.sendMessage(destination, msg);
return JsonData.buildSuccess();
}
@GetMapping("common")
public Object common(String msg){
producerService.sendMessage(msg);
return JsonData.buildSuccess();
}
}
4)控制檯建立佇列
5)啟動工程訪問
6)重新整理
7)再發送
訊息變成兩條
5、消費者:實時監聽對應的佇列()
@JmsListener(destination = "order.queue")
1) 建立一個jms包下面建立OrderConsumer 類
package com.itcast.demo.jms;
import org.springframework.jms.annotation.JmsListener;
import org.springframework.stereotype.Component;
@Component //spring可以掃描
public class OrderConsumer {
/**
*
* @JmsListener監聽order.queue佇列有沒有訊息進來
* @param text
*/
@JmsListener(destination="order.queue")
public void receiveQueue(String text){
System.out.println("OrderConsumer收到的報文為:"+text);
}
}
2)啟動工程之前寫的報文都收到了
3)重新整理控制檯
4)再新增訊息
6.測一下預設的佇列comment.queue
1)和OrderConsumer 類一樣
package com.itcast.demo.jms;
import org.springframework.jms.annotation.JmsListener;
import org.springframework.stereotype.Component;
@Component
public class CommonConsumer {
@JmsListener(destination="comment.queue")
public void receiveQueue(String text){
System.out.println("CommonConsumer收到的報文為:"+text);
}
}
2)
3)
四、SpringBoot整合ActiveMQ實戰之釋出訂閱模式(pub/sub)
簡介:SpringBoot整合ActiveMQ實戰之釋出訂閱模式(pub/sub),及同時支援點對點和釋出訂閱模型
1、需要加入配置檔案,支援釋出訂閱模型,預設只支援點對點
#default point to point
spring.jms.pub-sub-domain=true
注意點:
1、預設消費者並不會消費訂閱釋出型別的訊息,這是由於springboot預設採用的是p2p模式進行訊息的監聽
1) 修改配置:spring.jms.pub-sub-domain=true
2)在程式入口處加入topic物件,好處是啟動程式new 一次可以多次使用
package com.itcast.demo;
import javax.jms.Queue;
import javax.jms.Topic;
import javax.servlet.MultipartConfigElement;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.ActiveMQTopic;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.web.servlet.MultipartConfigFactory;
import org.springframework.context.annotation.Bean;
import org.springframework.jms.annotation.EnableJms;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.scheduling.annotation.EnableScheduling;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
@SpringBootApplication
@EnableJms
public class Application {
//交個spring進行管理,方便後續進行注入
@Bean
public Queue queue(){
return new ActiveMQQueue("comment.queue");
}
//主題物件交給spring管理
@Bean
public Topic topic(){
return new ActiveMQTopic("video.topic");
}
public static void main(String[] args) {
SpringApplication.run(Application.class, args);
}
}
3)ProducerService介面新增如下程式碼
/**
*
* 訊息釋出者
* @param message
*/
public void publish(String msg) ;
4)實現類ProducerServiceImpl
//=======釋出訂閱相關程式碼=========
@Autowired
private Topic topic;
@Override
public void publish(String msg) {
this.jmsTemplate.convertAndSend(this.topic, msg);
}
5)controller
@GetMapping("topic")
public Object topic(String msg){
producerService.publish(msg);
return JsonData.buildSuccess();
}
6)啟動程式,訪問
7)
8)TopicSub :模擬一個訂閱者
package com.itcast.demo.jms;
import org.springframework.jms.annotation.JmsListener;
import org.springframework.stereotype.Component;
@Component
public class OrderConsumer {
/**
*
* @JmsListener監聽order.queue佇列有沒有訊息進來
* @param text
*/
@JmsListener(destination="order.queue")
public void receiveQueue(String text){
System.out.println("OrderConsumer收到的報文為:"+text);
}
}
9)啟動訪問
控制檯
10)模擬三個訂閱者
package com.itcast.demo.jms;
import org.springframework.jms.annotation.JmsListener;
import org.springframework.stereotype.Component;
@Component
public class TopicSub {
//模擬出三個訂閱者
@JmsListener(destination="video.topic")
public void receive1(String text){
System.out.println("video.topic 消費者:receive1="+text);
}
@JmsListener(destination="video.topic")
public void receive2(String text){
System.out.println("video.topic 消費者:receive2="+text);
}
@JmsListener(destination="video.topic")
public void receive3(String text){
System.out.println("video topic 消費者:receive3="+text);
}
}
11)訪問
12)此時存在一個問題:點對點模式沒有生效
我們測點對點時,並沒有消費者
13)@JmsListener如果不指定獨立的containerFactory的話是只能消費queue訊息
也就是隻支援一種模式
//需要給topic定義獨立的JmsListenerContainer
a) 在配置檔案裡面,註釋掉 #spring.jms.pub-sub-domain=true
b)在程式入口加入
@Bean
public JmsListenerContainerFactory<?> jmsListenerContainerTopic(ConnectionFactory activeMQConnectionFactory) {
DefaultJmsListenerContainerFactory bean = new DefaultJmsListenerContainerFactory();
bean.setPubSubDomain(true);//支援訂閱模式
bean.setConnectionFactory(activeMQConnectionFactory);
return bean;
}
c) 修改訂閱者container:containerFactory="jmsListenerContainerTopic"
package com.itcast.demo.jms;
import org.springframework.jms.annotation.JmsListener;
import org.springframework.stereotype.Component;
@Component
public class TopicSub {
//模擬出三個訂閱者
@JmsListener(destination="video.topic",containerFactory="jmsListenerContainerTopic")
public void receive1(String text){
System.out.println("video.topic 消費者:receive1="+text);
}
@JmsListener(destination="video.topic",containerFactory="jmsListenerContainerTopic")
public void receive2(String text){
System.out.println("video.topic 消費者:receive2="+text);
}
@JmsListener(destination="video.topic",containerFactory="jmsListenerContainerTopic")
public void receive3(String text){
System.out.println("video topic 消費者:receive3="+text);
}
}
14)order點對點模式,
15)釋出訂閱模式