1. 程式人生 > >淘淘商城24_ActiveMq訊息佇列02_activeMq與spring的整合

淘淘商城24_ActiveMq訊息佇列02_activeMq與spring的整合

一、新增依賴

<!-- activeMQ -->
		<dependency>
			<groupId>org.apache.activemq</groupId>
			<artifactId>activemq-all</artifactId>
		</dependency>

二、配置檔案

applicationContext-activeMq.xml

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
	xmlns:context="http://www.springframework.org/schema/context" xmlns:p="http://www.springframework.org/schema/p"
	xmlns:aop="http://www.springframework.org/schema/aop" xmlns:tx="http://www.springframework.org/schema/tx"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
	xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.2.xsd
	http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-4.2.xsd
	http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop-4.2.xsd 
	http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx-4.2.xsd
	http://www.springframework.org/schema/util http://www.springframework.org/schema/util/spring-util-4.2.xsd">


	<!-- 真正可以產生Connection的ConnectionFactory,由對應的 JMS服務廠商提供 -->
	<bean id="targetConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
		<property name="brokerURL" value="tcp://192.168.1.104:61616" />
	</bean>
	<!-- Spring用於管理真正的ConnectionFactory的ConnectionFactory -->
	<bean id="connectionFactory"
		class="org.springframework.jms.connection.SingleConnectionFactory">
		<!-- 目標ConnectionFactory對應真實的可以產生JMS Connection的ConnectionFactory -->
		<property name="targetConnectionFactory" ref="targetConnectionFactory" />
	</bean>
	
	<!-- 配置生產者 -->
	<!-- Spring提供的JMS工具類,它可以進行訊息傳送、接收等 -->
	<bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
		<!-- 這個connectionFactory對應的是我們定義的Spring提供的那個ConnectionFactory物件 -->
		<property name="connectionFactory" ref="connectionFactory" />
	</bean>
	
	<!--這個是佇列目的地,點對點的 -->
	<bean id="queueDestination" class="org.apache.activemq.command.ActiveMQQueue">
		<constructor-arg>
            <!-- 傳送訊息的名稱 -->
			<value>spring-queue</value>
		</constructor-arg>
	</bean>
	
	<!--這個是主題目的地,一對多的 -->
	<bean id="topicDestination" class="org.apache.activemq.command.ActiveMQTopic">
		<constructor-arg value="topic" />
	</bean>
	
	<!-- 接收訊息 -->
	<!-- 配置監聽器 -->
	<bean id="myMessageListener" class="com.taotao.listener.MyMessageListener" />
	<!-- 訊息監聽容器 -->
	<bean class="org.springframework.jms.listener.DefaultMessageListenerContainer">
		<property name="connectionFactory" ref="connectionFactory" />
		<property name="destination" ref="queueDestination" />
		<property name="messageListener" ref="myMessageListener" />
	</bean>
	
</beans>

三、程式碼測試

1. 傳送訊息

第一步:初始化一個spring容器

第二步:從容器中獲得JMSTemplate物件。

第三步:從容器中獲得一個Destination物件

第四步:使用JMSTemplate物件傳送訊息,需要知道Destination

@Test
	public void testQueueProducer() throws Exception {
		// 第一步:初始化一個spring容器
		ApplicationContext applicationContext = new ClassPathXmlApplicationContext("classpath:spring/applicationContext-activeMq.xml");
		// 第二步:從容器中獲得JMSTemplate物件。
		JmsTemplate jmsTemplate = applicationContext.getBean(JmsTemplate.class);
		// 第三步:從容器中獲得一個Destination物件
		Queue queue = (Queue) applicationContext.getBean("queueDestination");
		// 第四步:使用JMSTemplate物件傳送訊息,需要知道Destination
		jmsTemplate.send(queue, new MessageCreator() {
			@Override
			public Message createMessage(Session session) throws JMSException {
				TextMessage textMessage = session.createTextMessage("spring activemq testsss");
				return textMessage;
			}
		});
	}

2. 接收訊息

第一步:把Activemq相關的jar包新增到工程中

第二步:建立一個MessageListener的實現類。

MessageListener.java

public class MyMessageListener implements MessageListener {
	@Override
	public void onMessage(Message message) {
		try {
			TextMessage textMessage = (TextMessage) message;
			//獲取訊息內容
			String text = textMessage.getText();
			System.out.println(text);
		} catch (JMSException e) {
			e.printStackTrace();
		}
	}
}

 

@Test
	public void testQueueConsumer() throws Exception {
		//初始化spring容器
		ApplicationContext applicationContext = new ClassPathXmlApplicationContext("classpath:spring/applicationContext-activeMq.xml");
		//等待
		System.in.read();
	}

3、測試

先執行consumer程式碼,執行起來以後,再執行”testQueueProducers”這個程式,就可以看到執行結果了

四、在專案中的應用-----資料庫與索引庫的同步

1. 思想:

taotao-manager工程中新增 一個商品,然後會將這個商品的itemId傳送給activeMq,Mq接收到這個itemId,然後將itemId再次傳送給taotao-search工程,

2. 生產者taotao-manager-service ,依賴和配置檔案本文開始已新增

商品新增功能

/**
	 * 新增商品資訊
	 */
	@Override
	public TaotaoResult saveItem(TbItem tbItem, String desc,String paramData) {
		//資料補全
		tbItem.setCreated(new Date());
		tbItem.setUpdated(new Date());
		tbItem.setStatus((byte)1);//'商品狀態,1-正常,2-下架,3-刪除
		final long itemId = IDUtils.genItemId();//生成隨機id
		tbItem.setId(itemId);
		itemMapper.saveItem(tbItem);
		
		TaotaoResult result = saveItemDesc(itemId, desc);
		if (result.getStatus()!=200) {
			try {
				throw new Exception();
			} catch (Exception e) {
				// TODO Auto-generated catch block
				e.printStackTrace();
			}
		}
		
		TaotaoResult  itemResult = saveItemParamItem(itemId, paramData);
		if (itemResult.getStatus()!=200) {
			try {
				throw new Exception();
			} catch (Exception e) {
				// TODO Auto-generated catch block
				e.printStackTrace();
			}
		}
		
		//向activeMq傳送訊息,將商品的id傳送給activeMq
		jmsTemplate.send(destination, new MessageCreator() {
			@Override
			public Message createMessage(Session session) throws JMSException {
				TextMessage textMessage = session.createTextMessage(itemId+"");//傳送的訊息
				return textMessage;
			}
		});
		
		return TaotaoResult.ok();
	}

3. 消費者taotao-search

3.1. 配置檔案 applicationContext-activeMq.xml(taotao-search-service)

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
	xmlns:context="http://www.springframework.org/schema/context" xmlns:p="http://www.springframework.org/schema/p"
	xmlns:aop="http://www.springframework.org/schema/aop" xmlns:tx="http://www.springframework.org/schema/tx"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
	xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.2.xsd
	http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-4.2.xsd
	http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop-4.2.xsd 
	http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx-4.2.xsd
	http://www.springframework.org/schema/util http://www.springframework.org/schema/util/spring-util-4.2.xsd">


   <!-- 真正可以產生Connection的ConnectionFactory,由對應的 JMS服務廠商提供 -->
	<bean id="targetConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
		<property name="brokerURL" value="tcp://192.168.1.101:61616" />
	</bean>
	
	<!-- Spring用於管理真正的ConnectionFactory的ConnectionFactory -->
	<bean id="connectionFactory"
		class="org.springframework.jms.connection.SingleConnectionFactory">
		<!-- 目標ConnectionFactory對應真實的可以產生JMS Connection的ConnectionFactory -->
		<property name="targetConnectionFactory" ref="targetConnectionFactory" />
	</bean>
	
	<!--這個是佇列目的地,點對點的 -->
	<bean id="queueDestination" class="org.apache.activemq.command.ActiveMQQueue">
		<constructor-arg>
			<value>spring-queue</value>
		</constructor-arg>
	</bean>
	
	<!--這個是主題目的地,一對多的 -->
	<bean id="topicDestination" class="org.apache.activemq.command.ActiveMQTopic">
		<constructor-arg value="item-change-topic" />
	</bean>
	
	<!-- 配置監聽器 -->
	<bean id="myMessageListener" class="com.taotao.search.listener.MyMessageListener" />
	<!-- 訊息監聽容器 -->
	<bean class="org.springframework.jms.listener.DefaultMessageListenerContainer">
		<property name="connectionFactory" ref="connectionFactory" />
		<property name="destination" ref="topicDestination" />
		<property name="messageListener" ref="myMessageListener" />
	</bean>

</beans>

3.2. pom新增依賴taotao-search-service

<!-- activeMQ -->
            <dependency>
                <groupId>org.apache.activemq</groupId>
                <artifactId>activemq-all</artifactId>
            </dependency>

3.3. Mapper層

3.4 SearchItemMapper.xml

3.5 Service層SearchItemService.java

3.6 SearchItemServiceImpl.java

/**
	 * 新增商品到索引庫
	 */
	@Override
	public TaotaoResult addDocument(long itemId) {
		try {
			//1.查詢出taotao-manager新增的商品
			SearchItem searchItem = searchItemMapper.getSearchItemById(itemId);
			//2.建立一個SolrInputDocument物件
			SolrInputDocument document = new SolrInputDocument();
			//3.將查詢到的資料新增到索引庫
			//根據索引庫域的定義,將欄位寫入到索引庫
			document.setField("id", searchItem.getId());
			document.setField("item_title", searchItem.getTitle());
			document.setField("item_category_name", searchItem.getCatName());
			document.setField("item_image", searchItem.getImage());
			document.setField("item_desc", searchItem.getItem_desc());
			document.setField("item_price", searchItem.getPrice());
			document.setField("item_sell_point", searchItem.getSell_point());
			
			solrServer.add(document);
			solrServer.commit();
		} catch (Exception e) {
			e.printStackTrace();
		} 
		return TaotaoResult.ok();
	}

3.7 配置監聽 MyMessageListener.java

建立一個監聽類

package com.taotao.search.listener;

import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.TextMessage;

import org.springframework.beans.factory.annotation.Autowired;

import com.taotao.search.service.SearchItemService;

/**
 * 配置一個監聽類
 * @author fengjinzhu
 *
 */
public class MyMessageListener implements MessageListener {

	@Autowired
	private SearchItemService searchItemService;
	
	@Override
	public void onMessage(Message message) {
		try {
			TextMessage textMessage = (TextMessage) message;
			//獲取訊息內容,生產者傳送的訊息內容
			String itemId = textMessage.getText();
			//等待一秒鐘
			Thread.sleep(1000);
			searchItemService.addDocument(Long.valueOf(itemId));
		} catch (Exception e) {
			e.printStackTrace();
		}
	}

	

}