1. 程式人生 > >ActiveMQ學習總結(10)——ActiveMQ採用Spring註解方式傳送和監聽

ActiveMQ學習總結(10)——ActiveMQ採用Spring註解方式傳送和監聽

對於ActiveMQ訊息的傳送,原聲的api操作繁瑣,而且如果不進行二次封裝,開啟關閉會話以及各種建立操作也是夠夠的了。那麼,Spring提供了一個很方便的去收發訊息的框架,spring jms。整合Spring後,程式碼不僅變得非常優雅,而且易用性和擴充套件性更好。

1. maven依賴

<!-- activemq --><dependency><groupId>org.apache.xbean</groupId><artifactId>xbean-spring</artifactId><version>3.16
</version></dependency><dependency><groupId>org.springframework</groupId><artifactId>spring-jms</artifactId><version>${springframework.version}</version></dependency><dependency><groupId>org.apache.activemq</groupId><artifactId
>activemq-all</artifactId><version>${activemq.version}</version></dependency>

2.名稱空間引入

<?xml version="1.0" encoding="UTF-8"?>
<!-- 查詢最新的schemaLocation 訪問 http://www.springframework.org/schema/ -->
<beans xmlns="http://www.springframework.org/schema/beans" xmlns:context="http://www.springframework.org/schema/context"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:amq="http://activemq.apache.org/schema/core" xmlns:jms="http://www.springframework.org/schema/jms" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.2.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-3.2.xsd http://www.springframework.org/schema/jms http://www.springframework.org/schema/jms/spring-jms-4.1.xsd http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core-5.8.0.xsd">

3. Xml配置

	<amq:connectionFactory id="jmsConnectionFactory" brokerURL="tcp://${activemq.ip}:61616" userName="${activemq.username}" password="${activemq.password}"/><bean id="jmsConnectionFactoryExtend" class="org.springframework.jms.connection.CachingConnectionFactory"><constructor-arg ref="jmsConnectionFactory"/><property name="sessionCacheSize" value="100"/></bean><!-- 訊息處理器 --><bean id="jmsMessageConverter" class="org.springframework.jms.support.converter.SimpleMessageConverter"/><!-- ====Producer side start==== --><!-- 定義JmsTemplate的Queue型別 --><bean id="jmsQueueTemplate" class="org.springframework.jms.core.JmsTemplate"><constructor-arg ref="jmsConnectionFactoryExtend"/><!-- 非pub/sub模型(釋出/訂閱),即佇列模式 --><property name="pubSubDomain" value="false"/><property name="messageConverter" ref="jmsMessageConverter"></property></bean><!-- 定義JmsTemplate的Topic型別 --><bean id="jmsTopicTemplate" class="org.springframework.jms.core.JmsTemplate"><constructor-arg ref="jmsConnectionFactoryExtend"/><!-- pub/sub模型(釋出/訂閱) --><property name="pubSubDomain" value="true"/><property name="messageConverter" ref="jmsMessageConverter"></property></bean><jms:listener-container destination-type="queue" container-type="default" connection-factory="jmsConnectionFactoryExtend" acknowledge="auto" concurrency="5-10"><jms:listener destination="testqueue" ref="queueReciver"/></jms:listener-container>

第一個是配置我們的mq連線,ip+埠號,帳號密碼的資訊。

第二個是引入spring的mq連線池。可以配置快取的連線數。

第三個是訊息處理器,Spring預設提供了基於Jdk Serializable的訊息處理和MappingJackson2MessageConventer,其實這兩個挺常用,在Spring Redis中,在Spring MVC中,都有著這幾種conventer的身影。

下面是兩個傳送訊息的模版類,類似於之前講過的RedisTemplate。向其注入上面定義的訊息處理器,程式碼中我們會用到。(其實類中已經判斷如果不進行注入就設定一個預設的,但是自己注入的話,方便我們控制)

listener-container是Spring提供的一個監聽器容器,用於統一控制我們的監聽類來接收處理訊息。這裡面有一些配置,schema有說明。可以配置響應模式,消費者數量等。開啟多消費者,有助於加快佇列處理速度。

4.註解方式的實現

如果要用註解的方式,就不需要在xml中自己定義訊息監聽容器了。只需要加入以下的程式碼:

<bean id="jmsListenerContainerFactory" class="org.springframework.jms.config.DefaultJmsListenerContainerFactory">
        <property name="connectionFactory" ref="jmsConnectionFactoryExtend"/>
    </bean>
    
    <!-- 監聽註解支援 -->
    <jms:annotation-driven/>

這樣,配置我們消費處理類上的@listener註解,即可監聽對應的queue或者topic訊息。

5.生產者程式碼

佇列訊息:

@Resource
@Component("queueSender")
public class QueueSender {
	@Resource(name = "jmsQueueTemplate")
	private JmsTemplate jmsQueueTemplate;// 通過@Qualifier修飾符來注入對應的bean
	public void send(String destination, final Object message) {
		jmsQueueTemplate.send(destination, new MessageCreator() {
			@Override
			public Message createMessage(Session session) throws JMSException {
				return jmsQueueTemplate.getMessageConverter().toMessage(message, session);
			}
		});
	}
}

訂閱訊息:

@Component
public class TopicSender {
	@Resource(name="jmsTopicTemplate")
	private JmsTemplate jmsTemplate;
	/**
	 * 傳送一條訊息到指定的佇列(目標)
	 * @param queueName 佇列名稱
	 * @param message 訊息內容
	 */
	publicvoid publish(String destination,final Object message){
		jmsTemplate.send(destination, new MessageCreator() {
			@Override
			public Message createMessage(Session session) throws JMSException {
				return jmsTemplate.getMessageConverter().toMessage(message, session);
			}
		});
	}
}

6.消費者程式碼

package cn.test.activemq.consumer.queue;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.Session;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.jms.annotation.JmsListener;
import org.springframework.jms.listener.adapter.MessageListenerAdapter;
import org.springframework.jms.support.converter.MessageConversionException;
import org.springframework.stereotype.Component;
import cn.test.MqBean;
import cn.test.activemq.message.types.QueueDefination;
/**
 * @author Han
 */
@Component("spqueueconsumertest")
public class SpringQueueReciverTest extends MessageListenerAdapter{
	private static final Logger log = LoggerFactory.getLogger(SpringQueueReciverTest.class);
	@JmsListener(destination=QueueDefination.TEST_QUEUE,concurrency="5-10")
	public void onMessagehehe(Message message, Session session) throws JMSException {
		try {
			MqBean bean = (MqBean) getMessageConverter().fromMessage(message);
			System.out.println(bean.getName());
			System.out.println(session);
			message.acknowledge();
			message.acknowledge();
		} catch (MessageConversionException | JMSException e) {
			e.printStackTrace();
		}
	}
}

上面的@JmsListener(destination=QueueDefination.TEST_QUEUE,concurrency="5-10")是在用註解方式監聽的時候加入。如果用xml配置容易,可以忽略。

附上MqBean

public class MqBean implements Serializable{
	private Integer age;
	private String name;
	public Integer getAge() {
		return age;
	}
	publicvoid setAge(Integer age) {
		this.age = age;
	}
	public String getName() {
		return name;
	}
	publicvoid setName(String name) {
		this.name = name;
	}
}

執行效果截圖:

相關推薦

ActiveMQ學習總結10——ActiveMQ採用Spring註解方式傳送

對於ActiveMQ訊息的傳送,原聲的api操作繁瑣,而且如果不進行二次封裝,開啟關閉會話以及各種建立操作也是夠夠的了。那麼,Spring提供了一個很方便的去收發訊息的框架,spring jms。整

ActiveMQ學習筆記10----ActiveMQ容錯的連線

1. Failover Protocol   前面講述的都是Client配置連線到指定的broker上,但是,如果Broker的連線失敗怎麼辦呢?此時,Client有兩個選項:要麼立刻死掉,要麼連線到其他的Broker上。 2. Failover Protocol的配置方式   預設的情況下,這種協議用

ActiveMQ學習總結1——ActiveMQ快速入門

1.下載ActiveMQ 2.執行ActiveMQ 解壓縮apache-activemq-5.5.1-bin.zip,然後雙擊apache-activemq-5.5.1\bin\activemq.bat執行ActiveMQ程式。 3.建立Eclipse專案並執行

ActiveMQ學習總結8——訊息佇列設計精要

訊息佇列已經逐漸成為企業IT系統內部通訊的核心手段。它具有低耦合、可靠投遞、廣播、流量控制、最終一致性等一系列功能,成為非同步RPC的主要手段之一。 當今市面上有很多主流的訊息中介軟體,如老牌的ActiveMQ、RabbitMQ,炙手可熱的Kafka,阿里巴巴自主開發的No

ActiveMQ學習總結——Topic主題模式示例

和佇列模式相似,分別編寫生產者和訂閱者。 生產者: package com.jms.topic; import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.j

ActiveMQ學習總結——spring整合ActiveMQ

本文有兩篇參考文獻,因此有兩個例項,專案結構如下圖所示:3.例項1資訊傳送者:HelloSender.javapackage edu.sjtu.erplab.springactivemq;import javax.jms.JMSException;import javax.jms.Session;import

ActiveMQ學習筆記7----ActiveMQ支援的傳輸協議

1. 連線到ActiveMQ   Connector: Active提供的,用來實現連線通訊的功能,包括:client-to-broker,broker-to-broker.ActiveMQ允許客戶端使用多種協議來連線。   1.1 配置Transport Connecto   在conf/active

ActiveMQ學習筆記8----ActiveMQ的訊息儲存持久化

1. 概述   ActiveMQ不僅支援persistent和non-persistent兩種方式,還支援訊息的恢復(recovery)方式。 2. PTP   Queue的儲存是很簡單的,其實就是FIFO的Queue      2. PUB/SUB   對於持久化訂閱主題,每一個消費者都將獲得一

ActiveMQ學習筆記11----ActiveMQ的動態網路連線

1. 多播協議multicast   ActiveMQ使用Multicast協議將一個Service和其他的Broker是我Service裡連線起來。IP Multicast是一個被用於網路中傳輸資料到其他一組接收者的技術。 Ip muiticast傳統的概念稱為組地址,組地址是ip地址在ActiveMQ

ActiveMQ學習筆記12----ActiveMQ的叢集

1. Queue consumer cluster   ActiveMQ支援Consumer對訊息的高可靠性的負載均衡消費,如果一個Consumer死掉,該訊息會轉發到其他的Consumer消費的Queue。如果一個Consumer獲得訊息比其他Consumer快,那麼他將獲得更多的訊息。因此推薦Activ

Redis學習總結10——快取雪崩、快取穿透、快取併發、快取預熱、快取演算法的概念及解決思路總結

一、快取雪崩 概念: 可能是因為資料未載入到快取中,或者快取同一時間大面積的失效,從而導致所有請求都去查資料庫,導致資料庫CPU和記憶體負載過高,甚至宕機。 解決思路: 1.1、加鎖計數(即限制併發的數量,可以用semphore)或者起一定數量的佇列來避免快取失效時大

Servlet學習總結10監聽器

監聽器概念         Servlet監聽器是Servlet規範中定義的一種特殊類,用於監聽ServletContext、HttpSession和ServletRequest等域物件的建立與銷燬事件,以及監聽這些域物件中屬性發生修改的事件。 監聽物件: Servle

Spring Boot學習總結10——SpringBoot打包成Docker映象

<build><!--定義jar檔名,可以自定義--><finalName>${project.name}-${project.version}</finalN

敏捷開發系列學習總結10——到底什麼是敏捷開發?

1,提要 軟體開發是一個系統工程,包括最初的可行性分析、再到設計、開發、測試、維護等整個生命週期。在這個過程中某些階段的失誤或說是變化,都可能增加整個軟體專案的風險。 如何在保證效率的基礎上還能安計劃

Docker學習總結10——10分鐘玩轉Docker

1、前言 進入雲端計算的時代,各大雲提供商AWS,阿里雲紛紛推出針對Docker的服務,現在Docker是十分火爆,那麼Docker到底是什麼,讓我們來體驗一下。 2、Docker是什麼 Dock

Nginx學習總結10——Nginx前後端分離將多個請求轉發到多個Tomcat,負載均衡反向代理

一、談談“渲染” 相信好多人都挺聽過“渲染”這個詞,但不清楚它是什麼意思?前端開發以為這是後端的活兒,後端開發以為是前端的事兒,推著推著就不了了之。其實渲染很簡單,不說概念,直接舉例: 1、 後端渲染:以JSP為例,可以分成三步 a、編寫標籤或Java程式碼(可以稱之為模板

Windows學習總結10——Windows系統中常用的CMD命令詳解

1.ping命令 ping是電腦網路故障診斷中的常用的命令,它的作用是用來檢查網路是否通暢或者網路連線速度。我們來看一下PING命令的具體表述。 日常的診斷過程中我們最常用到的就是診斷連線是否通

App後臺開發運維和架構實踐學習總結10——基於Java-JWT前後端token認證實戰使用詳解

一、什麼是JWT?瞭解JWT,認知JWT首先jwt其實是三個英語單詞JSON Web Token的縮寫。通過全名你可能就有一個基本的認知了。token一般都是用來認證的,比如我們系統中常用的使用者登入token可以用來認證該使用者是否登入。jwt也是經常作為一種安全的token使用。JWT的定義:JWT是一種

Mysql學習總結10——MySql觸發器使用講解

  觸發器(TRIGGER)是由事件來觸發某個操作。這些事件包括INSERT語句、UPDATE語句和DELETE語句。當資料庫系統執行這些事件時,就會啟用觸發器執行相應的操作。MySQL從5.0.2版本開始支援觸發器。在本文中將講解的內容包括: 觸發器的含義和作用 如何建

Linux學習總結 網絡配置-NAT方式靜態IP配置篇

開機啟動 log scrip 分享 如何快速 報錯 網絡服務 重新 查找 一:DHCP 服務驗證 1: 之前我們在裏面已經裝好了centos,當時我們選擇的組網方式為NAT方式,那麽我們怎麽樣如何快速實現虛擬機系統訪問外網呢?能不能直接上網呢,來我們試一下ping baid