1. 程式人生 > >springboot2.x簡單詳細教程--訊息佇列介紹及整合ActiveMQ (第十三章)

springboot2.x簡單詳細教程--訊息佇列介紹及整合ActiveMQ (第十三章)

一、JMS介紹和使用場景及基礎程式設計模型


    簡介:講解什麼是小寫佇列,JMS的基礎知識和使用場景
    1、什麼是JMS: Java訊息服務(Java Message Service),Java平臺中關於面向訊息中介軟體的介面

    2、JMS是一種與廠商無關的 API,用來訪問訊息收發系統訊息,它類似於JDBC(Java Database Connectivity)。這裡,JDBC 是可以用來訪問許多不同關係資料庫的 API

    3、使用場景:

        1)跨平臺 

        2)多語言 

        3)多專案

        4)解耦

        5)分散式事務(業務涉及多個表比如:轉賬

        6)流量控制(訪問量)

        7)最終一致性
        8)RPC呼叫
            上下游對接,資料來源變動->通知下屬

4.案例分析

1)網購支付流程:不用訊息中介軟體服務(使用使用者流量少的情況

2)當用戶非常多的時候顯然我們的支付閘道器容易崩潰,同時支付閘道器之後的系統等待時間可能過長,所以需要用到訊息服務

分析:支付閘道器將資訊放到訊息服務之後
不需要等待積分或是訂單系統的執行提高了系統的併發性和承受的流量(積分訂單系統訂閱訊息

5、概念    
        JMS提供者:Apache ActiveMQ、RabbitMQ、Kafka、Notify、MetaQ、RocketMQ
        JMS生產者(Message Producer)
        JMS消費者(Message Consumer)
        JMS訊息
        JMS佇列
        JMS主題

        JMS訊息通常有兩種型別:點對點(Point-to-Point)、釋出/訂閱(Publish/Subscribe)

3)訊息點對點型別(單個系統

4)釋出/訂閱(Publish/Subscribe)

微信支付就是這種模式


   6、程式設計模型
        MQ中需要用的一些類
        ConnectionFactory :連線工廠,JMS 用它建立連線
        Connection :JMS 客戶端到JMS Provider 的連線
        Session: 一個傳送或接收訊息的執行緒
        Destination :訊息的目的地;訊息傳送給誰.
        MessageConsumer / MessageProducer: 訊息接收者,消費者

 

二、ActiveMQ5.x訊息佇列基礎介紹和安裝
    


    簡介:介紹ActiveMQ5.x訊息佇列基礎特性和本地快速安裝
        特點:
            1)支援來自Java,C,C ++,C#,Ruby,Perl,Python,PHP的各種跨語言客戶端和協議
            2)支援許多高階功能,如訊息組,虛擬目標,萬用字元和複合目標
            3) 完全支援JMS 1.1和J2EE 1.4,支援瞬態,持久,事務和XA訊息
            4) Spring支援,ActiveMQ可以輕鬆嵌入到Spring應用程式中,並使用Spring的XML配置機制進行配置
            5) 支援在流行的J2EE伺服器(如TomEE,Geronimo,JBoss,GlassFish和WebLogic)中進行測試
            6) 使用JDBC和高效能日誌支援非常快速的持久化(重要訊息放到mysql或是日誌作為備份
            ...

        1、下載地址:http://activemq.apache.org/activemq-5153-release.html


        2、快速開始:http://activemq.apache.org/getting-started.html
        3、如果我們是32位的機器,就雙擊win32目錄下的activemq.bat,如果是64位機器,則雙擊win64目錄下的activemq.bat
        4、bin目錄裡面啟動 選擇對應的系統版本和位數,activeMQ start 啟動


        5、啟動後訪問路徑http:// 127.0.0.1:8161/

          127.0.0.1是activeMq安裝的伺服器,預設埠號8161

  

 

        6、使用者名稱和密碼預設都是admin驗證這是訊息服務的保護措施

登入後的頁面:ActiveMQ控制檯

1)Home | Queues | Topics | Subscribers | Connections | Network | Scheduled | Send
Support

主頁| 佇列| 主題| 訂閱者| 連線| 網路| 預定| 傳送
支援

2)Queues建立

    面板說明:    
        Name:佇列名稱。
        Number Of Pending Messages:等待消費的訊息個數。
        Number Of Consumers:當前連線的消費者數目
        Messages Enqueued:進入佇列的訊息總個數,包括出佇列的和待消費的,這個數量只增不減。
        Messages Dequeued:已經消費的訊息數量。

3)向訊息佇列傳送訊息

結果


        7、官方案例集合
            https://github.com/spring-projects/spring-boot/tree/master/spring-boot-samples
    


三、SpringBoot2.x整合ActiveMQ實戰之點對點訊息(p2p)
   訊息 消費者和訊息生產者可以在一個工程也可以在不同的工程


    簡介:SpringBoot2.x整合ActiveMQ實戰之點對點訊息

    1、官網地址:https://docs.spring.io/spring-boot/docs/2.1.0.BUILD-SNAPSHOT/reference/htmlsingle/#boot-features-activemq

    2、加入依賴
        <!-- 整合訊息佇列ActiveMQ -->
     

        <dependency>  
            <groupId>org.springframework.boot</groupId>  
            <artifactId>spring-boot-starter-activemq</artifactId>  
        </dependency>  
        
        <!-- 如果配置執行緒池則加入 -->
        <dependency>  
            <groupId>org.apache.activemq</groupId>  
            <artifactId>activemq-pool</artifactId>  
        </dependency>

     3、application.properties配置檔案配置
     

#整合jms測試,安裝在別的機器,防火牆和埠號記得開放
spring.activemq.broker-url=tcp://127.0.0.1:61616
#叢集配置,61616:開啟對外連線的預設埠號
#spring.activemq.broker-url=failover:(tcp://localhost:61616,tcp://localhost:61617)

spring.activemq.user=admin
spring.activemq.password=admin
#下列配置要增加依賴:訊息佇列連線池 max-connections,50,100,500根據需要寫
spring.activemq.pool.enabled=true
spring.activemq.pool.max-connections=100

      4、springboot啟動類 @EnableJms,開啟支援jms

1)service介面

package com.itcast.demo.service;

import javax.jms.Destination;

/**
 * 
* <p>Title: ProducerService</p>  
* <p>Description: </p>  
* @author shenlan  
* @date 2018年11月4日
 */

public interface ProducerService {
    /**
     * 指定訊息佇列還有訊息
     * @param destination
     * @param message
     */
	public void sendMessage(Destination destination,final String message );
	/**
	 * 
	 * 使用預設訊息佇列傳送訊息
	 * @param message
	 */
	public void sendMessage(final String message );
}

    2)實現

package com.itcast.demo.service.impl;

import javax.jms.Destination;
import javax.jms.Queue;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jms.core.JmsMessagingTemplate;
import org.springframework.stereotype.Service;

import com.itcast.demo.service.ProducerService;


@Service
public class ProducerServiceImpl implements ProducerService{
	 //用來發送訊息到broker的物件
	@Autowired
   private JmsMessagingTemplate jmsTemplate ;
	
	@Autowired
	private Queue queue;//程式入口有了
	//傳送訊息,destination是傳送到的佇列,message是待發送的訊息
	@Override
	public void sendMessage(Destination destination, String message) {
		jmsTemplate.convertAndSend(destination,message);
		
	}

	//傳送訊息,destination是傳送到的佇列,message是待發送的訊息
	@Override
	public void sendMessage(String message) {
		jmsTemplate.convertAndSend(this.queue,message);
	}

}

3)controller

package com.itcast.demo.controller;

import javax.jms.Destination;


import org.apache.activemq.command.ActiveMQQueue;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

import com.itcast.demo.domain.JsonData;
import com.itcast.demo.service.ProducerService;

/**
 * 功能描述:模擬微信支付回撥
 *
 *
 *@作者 llj  小D
 */
@RestController
@RequestMapping("/api/v1")
public class OrderController {
	
	@Autowired
	private ProducerService producerService;
	/**
	 * 功能描述:微信支付回撥介面
	 * @param msg 支付資訊
	 * @return
	 */
	@GetMapping("order")
	public Object order(String msg){//實際開發msg是html需要document解析
		
		Destination destination = new ActiveMQQueue("order.queue");
		
		producerService.sendMessage(destination, msg);
	
       return JsonData.buildSuccess();
	}
	
	
	
	@GetMapping("common")
	public Object common(String msg){
		producerService.sendMessage(msg);	
       return JsonData.buildSuccess();
	}
	
	
	
	
}

 4)控制檯建立佇列

5)啟動工程訪問

6)重新整理

7)再發送

訊息變成兩條

     5、消費者:實時監聽對應的佇列()
          @JmsListener(destination = "order.queue")
     1)     建立一個jms包下面建立OrderConsumer 類

package com.itcast.demo.jms;

import org.springframework.jms.annotation.JmsListener;
import org.springframework.stereotype.Component;
@Component  //spring可以掃描
public class OrderConsumer {
    /**
     * 
     * @JmsListener監聽order.queue佇列有沒有訊息進來
     * @param text
     */
	@JmsListener(destination="order.queue")
	public void receiveQueue(String text){
		System.out.println("OrderConsumer收到的報文為:"+text);
	}
}

2)啟動工程之前寫的報文都收到了

3)重新整理控制檯

4)再新增訊息

6.測一下預設的佇列comment.queue

1)和OrderConsumer 類一樣

package com.itcast.demo.jms;

import org.springframework.jms.annotation.JmsListener;
import org.springframework.stereotype.Component;

@Component
public class CommonConsumer {
    
	@JmsListener(destination="comment.queue")
	public void receiveQueue(String text){
		System.out.println("CommonConsumer收到的報文為:"+text);
	}
	
}

2)

3)

 

四、SpringBoot整合ActiveMQ實戰之釋出訂閱模式(pub/sub)


    簡介:SpringBoot整合ActiveMQ實戰之釋出訂閱模式(pub/sub),及同時支援點對點和釋出訂閱模型

        1、需要加入配置檔案,支援釋出訂閱模型,預設只支援點對點
            #default point to point
            spring.jms.pub-sub-domain=true

    注意點:
        
        1、預設消費者並不會消費訂閱釋出型別的訊息,這是由於springboot預設採用的是p2p模式進行訊息的監聽
           1) 修改配置:spring.jms.pub-sub-domain=true
         2)在程式入口處加入topic物件,好處是啟動程式new 一次可以多次使用

package com.itcast.demo;

import javax.jms.Queue;
import javax.jms.Topic;
import javax.servlet.MultipartConfigElement;

import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.ActiveMQTopic;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.web.servlet.MultipartConfigFactory;
import org.springframework.context.annotation.Bean;
import org.springframework.jms.annotation.EnableJms;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.scheduling.annotation.EnableScheduling;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

@SpringBootApplication
@EnableJms
public class Application {
	//交個spring進行管理,方便後續進行注入
    @Bean
	public Queue queue(){
		return new ActiveMQQueue("comment.queue");
	}
   //主題物件交給spring管理
    @Bean
   	public Topic topic(){
   		return new ActiveMQTopic("video.topic");
   	}
	public static void main(String[] args) {
		SpringApplication.run(Application.class, args);
	}
	
}

    3)ProducerService介面新增如下程式碼

    /**
	 * 
	 * 訊息釋出者
	 * @param message
	 */
	public void publish(String msg) ;

  4)實現類ProducerServiceImpl

    //=======釋出訂閱相關程式碼=========
	
	@Autowired
	private Topic topic;
	
	 @Override
	public void publish(String msg) {
		this.jmsTemplate.convertAndSend(this.topic, msg);
		
	}

5)controller

    @GetMapping("topic")
	public Object topic(String msg){
		producerService.publish(msg);
       return JsonData.buildSuccess();
	}
	

6)啟動程式,訪問

7)

8)TopicSub  :模擬一個訂閱者

package com.itcast.demo.jms;

import org.springframework.jms.annotation.JmsListener;
import org.springframework.stereotype.Component;
@Component
public class OrderConsumer {
    /**
     * 
     * @JmsListener監聽order.queue佇列有沒有訊息進來
     * @param text
     */
	@JmsListener(destination="order.queue")
	public void receiveQueue(String text){
		System.out.println("OrderConsumer收到的報文為:"+text);
	}
}

9)啟動訪問



控制檯

10)模擬三個訂閱者

package com.itcast.demo.jms;

import org.springframework.jms.annotation.JmsListener;
import org.springframework.stereotype.Component;

@Component
public class TopicSub {
    //模擬出三個訂閱者
	
	@JmsListener(destination="video.topic")
	public void receive1(String text){
		System.out.println("video.topic 消費者:receive1="+text);
	}
	
	
	@JmsListener(destination="video.topic")
	public void receive2(String text){
		System.out.println("video.topic 消費者:receive2="+text);
	}
	@JmsListener(destination="video.topic")
	public void receive3(String text){
		System.out.println("video topic 消費者:receive3="+text);
	}
	
	
}

 11)訪問

12)此時存在一個問題:點對點模式沒有生效

我們測點對點時,並沒有消費者

 

13)@JmsListener如果不指定獨立的containerFactory的話是只能消費queue訊息
       也就是隻支援一種模式

 

            //需要給topic定義獨立的JmsListenerContainer
            

          a)  在配置檔案裡面,註釋掉 #spring.jms.pub-sub-domain=true

         b)在程式入口加入

@Bean
    public JmsListenerContainerFactory<?> jmsListenerContainerTopic(ConnectionFactory   activeMQConnectionFactory) {
        DefaultJmsListenerContainerFactory bean = new DefaultJmsListenerContainerFactory();
        bean.setPubSubDomain(true);//支援訂閱模式
        bean.setConnectionFactory(activeMQConnectionFactory);
        return bean;
    }

c)   修改訂閱者container:containerFactory="jmsListenerContainerTopic"

package com.itcast.demo.jms;

import org.springframework.jms.annotation.JmsListener;
import org.springframework.stereotype.Component;

@Component
public class TopicSub {
    //模擬出三個訂閱者
	
	@JmsListener(destination="video.topic",containerFactory="jmsListenerContainerTopic")
	public void receive1(String text){
		System.out.println("video.topic 消費者:receive1="+text);
	}
	
	
	@JmsListener(destination="video.topic",containerFactory="jmsListenerContainerTopic")
	public void receive2(String text){
		System.out.println("video.topic 消費者:receive2="+text);
	}
	@JmsListener(destination="video.topic",containerFactory="jmsListenerContainerTopic")
	public void receive3(String text){
		System.out.println("video topic 消費者:receive3="+text);
	}
	
	
}

14)order點對點模式,

15)釋出訂閱模式