1. 程式人生 > >SpringBoot(四)springboot整合ActiveMQ

SpringBoot(四)springboot整合ActiveMQ

       訊息佇列中介軟體是分散式系統中重要的元件,主要解決應用耦合,非同步訊息,流量削鋒等問題。實現高效能,高可用,可伸縮和最終一致性架構。是大型分散式系統不可缺少的中介軟體。訊息形式支援點對點和訂閱-釋出。

ActiveMQ是什麼

  1. ActiveMQ是訊息佇列技術,為解決高併發問題而生
  2. ActiveMQ生產者消費者模型(生產者和消費者可以跨平臺、跨系統)
  3. ActiveMQ支援如下兩種訊息傳輸方式
  • 點對點模式,生產者生產了一個訊息,只能由一個消費者進行消費
  • 釋出/訂閱模式,生產者生產了一個訊息,可以由多個消費者進行消費

SpringBoot整合ActiveMQ

      1. ActiveMQ下載啟動

          http://activemq.apache.org/download-archives.html ,本文用的是windows版的5.15.3版本,下載下來是壓縮包,自行解壓一個到目錄下,CMD進入到解壓目錄下的bin目錄下,執行 activemq.bat start 啟動。                                                                               如果能成功訪問http://localhost:8161/admin(使用者名稱和密碼預設為admin),則啟動成功。

      2.  建立兩個個springboot專案,分別作為訊息提供者(provider)和消費者(consumer),新增依賴

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-activemq</artifactId>
</dependency>
<!--訊息佇列連線池-->
<dependency>
    <groupId>org.apache.activemq</groupId>
    <artifactId>activemq-pool</artifactId>
    <version>5.15.0</version>
</dependency>

      3.  在兩個專案中的application.properires配置訊息佇列,並在啟動類新增@EnableJms開啟訊息佇列。

 application.properties

# failover:(tcp://localhost:61616,tcp://localhost:61617)
# tcp://localhost:61616
spring.activemq.broker-url=tcp://localhost:61616
#true 表示使用內建的MQ,false則連線伺服器
spring.activemq.in-memory=false
#true表示使用連線池;false時,每傳送一條資料建立一個連線
spring.activemq.pool.enabled=true
#連線池最大連線數
spring.activemq.pool.max-connections=10
#空閒的連線過期時間,預設為30秒
spring.activemq.pool.idle-timeout=30000
#強制的連線過期時間,與idleTimeout的區別在於:idleTimeout是在連線空閒一段時間失效,而expiryTimeout不管當前連線的情況,只要達到指定時間就失效。預設為0,never
spring.activemq.pool.expiry-timeout=0

  啟動類(provider),consumer同樣

@SpringBootApplication
@EnableJms //啟動訊息佇列
public class ProviderApplication {
	public static void main(String[] args) {
		SpringApplication.run(ProviderApplication.class, args);
	}
}

      4.  prover專案結構圖

    BeanConfig定義訊息佇列

import javax.jms.Queue;
import org.apache.activemq.command.ActiveMQQueue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/*
 * @author uv
 * @date 2018/9/15 14:21
 */
@Configuration
public class BeanConfig {

    //定義存放訊息的佇列
    @Bean
    public Queue queue() {
        return new ActiveMQQueue("ActiveMQQueue");
    }
}

     ProviderController

import javax.jms.Queue;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jms.core.JmsMessagingTemplate;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

/*
 * @author uv
 * @date 2018/9/15 14:54
 *
 */
@RestController
public class ProviderController {

    //注入存放訊息的佇列,用於下列方法一
    @Autowired
    private Queue queue;

    //注入springboot封裝的工具類
    @Autowired
    private JmsMessagingTemplate jmsMessagingTemplate;

    @RequestMapping("send")
    public void send(String name) {
        //方法一:新增訊息到訊息佇列
        jmsMessagingTemplate.convertAndSend(queue, name);
        //方法二:這種方式不需要手動建立queue,系統會自行建立名為test的佇列
        //jmsMessagingTemplate.convertAndSend("test", name);
    }
}

      5.  啟動provider,向訊息佇列新增資料,本次新增5條資料

  • Number Of Pending Messages:訊息佇列中待處理的訊息
  • Number Of Consumers:消費者的數量
  • Messages Enqueued:累計進入過訊息佇列的總量
  • Messages Dequeued:累計消費過的訊息總量

      6. consumer專案結構圖

application.properties 和 ConsumerApplication 同 provider類似,如下為不同的ConsumerService:

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jms.annotation.JmsListener;
import org.springframework.jms.core.JmsMessagingTemplate;
import org.springframework.messaging.handler.annotation.SendTo;
import org.springframework.stereotype.Component;

/*
 * @author uv
 * @date 2018/9/15 18:36
 *
 */
@Component
public class ConsumerService {

    @Autowired
    private JmsMessagingTemplate jmsMessagingTemplate;

    // 使用JmsListener配置消費者監聽的佇列,其中name是接收到的訊息
    @JmsListener(destination = "ActiveMQQueue")
    // SendTo 會將此方法返回的資料, 寫入到 OutQueue 中去.
    @SendTo("SQueue")
    public String handleMessage(String name) {
        System.out.println("成功接受Name" + name);
        return "成功接受Name" + name;
    }
}

      7. 啟動consumer,控制檯輸出如下

      訊息接收成功,檢視 http://localhost:8161/admin/queues.jsp ,如下圖所示,訊息佇列中不再有未處理的訊息,由於consumer的啟動,消費者的數量為1,Messages Dequeued(累計消費過的訊息總量)的數值也變成了5;另外消費者接收到5條訊息處理後,返回到OutQueue 5條訊息,下圖可以看出來。

      8. ActiveMQ的持久化