1. 程式人生 > >淘淘商城44-使用訊息佇列activeMQ更新solr索引庫--解決同步索引問題

淘淘商城44-使用訊息佇列activeMQ更新solr索引庫--解決同步索引問題

目錄

3.注意

1.為什麼要使用訊息對列acitveMQ

在我們的後臺,增加、修改、刪除商品時,這裡以修改為例。

修改商品時,修改的是資料庫中的資料。但是使用者在商品搜尋時,搜尋的是solr索引庫中的資料,所以使用者並不會搜到新修改商品的資訊。後臺雖然有一個匯入所有商品資訊到索引庫的,但是每次匯入的資訊過大,而我們只需要修改索引庫中的一個商品資訊,所以我們需要編寫一個,修改單個商品索引的功能。

我們有三種方案:

1.1方案1

在taotao-manager-service中,新增商品的業務邏輯中,新增一個同步索引庫的業務邏輯。

缺點:業務邏輯耦合度非常高,業務拆分不明確

1.2方案2

業務邏輯在taotao-search中實現,呼叫服務在taotao-manager實現。業務邏輯分開。

缺點:

服務之間的耦合度變高,啟動有先後順序。

隨著呼叫的服務會越來越多,服務之間的呼叫越來越複雜,難以管理。

1.3方案3

使用訊息佇列

實現效果:

後臺需要增刪改商品資訊很大的時候,只需要taotao-manager-service在資料庫修改商品資訊後,向MQ傳送一個被修改商品的id,然後直接返回提示,修改成功。然後在taotao-search-service寫一個監聽器,專門接收MQ傳送的訊息,修改索引庫即可。

存在的問題:

1、如果MQ掛了,所有相關的服務都掛了

2、MQ有效能的瓶頸,儘量減少訊息的內容的大小

2.使用訊息佇列activeMQ修改索引庫

需要在商品的新增/修改的時候,同步索引庫。將資料從資料庫中查詢出來匯入到索引庫更新。

訊息的傳送方為:所在工程taotao-manager-service

訊息的接收方為:所在工程taotao-search-service

兩個工程都需要依賴activmq:

<dependency>
	<groupId>org.apache.activemq</groupId>
	<artifactId>activemq-all</artifactId>
</dependency>

2.1producer

在taotao-manager-service工程中傳送訊息。

功能分析:

        當商品新增完成後傳送一個TextMessage,包含一個商品id即可。

        接收端接收到商品id通過資料庫查詢到商品的資訊(搜尋的結果商品的資訊)再同步索引庫。

2.1.1applicationContext-activemq.xml配置

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
	xmlns:context="http://www.springframework.org/schema/context" xmlns:p="http://www.springframework.org/schema/p"
	xmlns:aop="http://www.springframework.org/schema/aop" xmlns:tx="http://www.springframework.org/schema/tx"
	xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
	xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.2.xsd
	http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-4.2.xsd
	http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop-4.2.xsd http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx-4.2.xsd
	http://www.springframework.org/schema/util http://www.springframework.org/schema/util/spring-util-4.2.xsd">
	
	<bean id="targetConnection" class="org.apache.activemq.ActiveMQConnectionFactory">
		<property name="brokerURL" value="tcp://192.168.25.130:61616"></property>
	</bean>
	<!-- 通用的connectionfacotry 指定真正使用的連線工廠 -->
	<bean id="connectionFactory" class="org.springframework.jms.connection.SingleConnectionFactory">
		<property name="targetConnectionFactory" ref="targetConnection"></property>
	</bean>
	<!-- 接收和傳送訊息時使用的類 -->
	<bean class="org.springframework.jms.core.JmsTemplate">
		<property name="connectionFactory" ref="connectionFactory"></property>
	</bean>
	<bean id="topicDestination" class="org.apache.activemq.command.ActiveMQTopic">
		<constructor-arg name="name" value="item-change-topic"></constructor-arg>
	</bean> 
</beans>

2.1.2添加發送訊息邏輯

首先注入訊息模板物件與傳送訊息的目標

@Autowired
private JmsTemplate jmsTemplate;
@Resource(name="topicDestination")
private Destination topicDestination;

添加發送訊息的程式碼

// 傳送一個商品新增訊息
		jmsTemplate.send(topicDestination, new MessageCreator() {
			@Override
			public Message createMessage(Session session) throws JMSException {
				TextMessage textMessage = session.createTextMessage(itemId + "");
				return textMessage;
			}

		});

2.2consumer

在taotao-search-service中消費訊息。

需要加入activmq的依賴。

2.2.1功能分析

  1. 接收訊息。需要建立MessageListener介面的實現類。
  2. 取訊息,取商品id。
  3. 根據商品id查詢資料庫。
  4. 建立一SolrInputDocument物件。
  5. 使用SolrServer物件寫入索引庫。
  6. 返回成功,返回TaotaoResult。

2.2.2dao層

  1. 根據商品id查詢商品資訊。
  2. 建立一SolrInputDocument物件。
  3. 使用SolrServer物件寫入索引庫。
  4. 返回成功,返回TaotaoResult。

在taotao-search-service的com.taotao.search.mapper編寫mapper介面與mapper對映檔案,用於根據id查詢資料庫,返回searchItem

mapper介面

/**
	 * 根據id查詢SearchItem
	 * @param itemId
	 * @return
	 */
	public SearchItem getItemById(Long itemId);

mapper對映檔案

<select id="getItemById" parameterType="long" resultType="com.taotao.common.pojo.SearchItem">
		SELECT
			a.id,
			a.title,
			a.sell_point,
			a.price,
			a.image,
			b. NAME category_name,
			c.item_desc
		FROM
			tb_item a
		JOIN tb_item_cat b ON a.cid = b.id
		JOIN tb_item_desc c ON a.id = c.item_id
		WHERE a.status = 1
		  AND a.id=#{itemId}
	</select>

索引庫dao

用於更新索引庫

在taotao-search-service的com.taotao.search.dao的SearchDao中

/**
	 * 商品增改時,更新索引庫
	 * @param itemId
	 * @return
	 * @throws Exception
	 */
	public TaotaoResult updateSearchItemById(Long itemId) throws Exception {
		//修改、新增
		//1.呼叫mapper中的方法
		SearchItem searchItem = searchItemMapper.getItemById(itemId);
		//2.建立solrinputdocument
		SolrInputDocument document = new SolrInputDocument();
		//3.向文件中新增域
		document.addField("id", searchItem.getId());
		document.addField("item_title", searchItem.getTitle());
		document.addField("item_sell_point", searchItem.getSell_point());
		document.addField("item_price", searchItem.getPrice());
		document.addField("item_image", searchItem.getImage());
		document.addField("item_category_name", searchItem.getCategory_name());
		document.addField("item_desc", searchItem.getItem_desc());
		//4.新增文件到索引庫中
		solrServer.add(document);
		//5.提交
		solrServer.commit();
		return TaotaoResult.ok();
	}

2.2.3service層

引數:商品ID

業務邏輯:

  1. 呼叫searchDao的方法。
  2. 返回成功,返回TaotaoResult。

在taotao-search-interface的com.taotao.search.service包下SearchService

/**
	 * 根據id更新商品索引庫
	 * @param itemId
	 * @return
	 * @throws Exception
	 */
	TaotaoResult updateSearchItemById(Long itemId) throws Exception;

在taotao-search-service的com.taotao.search.service.impl包下SearchServiceImpl實現介面

/**
	 * 根據id更新商品索引庫
	 */
	@Override
	public TaotaoResult updateSearchItemById(Long itemId) throws Exception {
		return searchDao.updateSearchItemById(itemId);
	}
	

2.2.4listener

在taotao-search-service下建立com.taotao.search.listener包,在com.taotao.search.listener包下建立listener用於接收訊息佇列中的id訊息。

package com.taotao.search.listener;

import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.TextMessage;

import org.springframework.beans.factory.annotation.Autowired;

import com.taotao.search.service.SearchService;

/**
 * 接收訊息的監聽器
 * 
 * @author Administrator
 *
 */
public class ItemChangeMessageListener implements MessageListener {
	// 注入service 直接呼叫 方法更新即可
	@Autowired
	private SearchService searchService;

	@Override
	public void onMessage(Message message) {
		// 判斷訊息型別是否為textmessage
		if (message instanceof TextMessage) {
			// 如果是 獲取商品的id
			TextMessage message2 = (TextMessage) message;
			try {
				String id = message2.getText();
				// 通過商品的id查詢資料 需要開發mapper 通過id查詢商品(搜尋時)的資料
				// 更新索引庫
				Long itemId = Long.parseLong(id);
				searchService.updateSearchItemById(itemId);
			} catch (Exception e) {
				e.printStackTrace();
			}
		}
	}

}

2.2.5配置applicationContext-activemq.xml

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
	xmlns:context="http://www.springframework.org/schema/context" xmlns:p="http://www.springframework.org/schema/p"
	xmlns:aop="http://www.springframework.org/schema/aop" xmlns:tx="http://www.springframework.org/schema/tx"
	xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
	xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.2.xsd
	http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-4.2.xsd
	http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop-4.2.xsd http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx-4.2.xsd
	http://www.springframework.org/schema/util http://www.springframework.org/schema/util/spring-util-4.2.xsd">
	
	<bean id="targetConnection" class="org.apache.activemq.ActiveMQConnectionFactory">
		<property name="brokerURL" value="tcp://47.100.54.177:61616"></property>
	</bean>
	<!-- 通用的connectionfacotry 指定真正使用的連線工廠 -->
	<bean id="connectionFactory" class="org.springframework.jms.connection.SingleConnectionFactory">
		<property name="targetConnectionFactory" ref="targetConnection"></property>
	</bean>
	<bean id="topicDestination" class="org.apache.activemq.command.ActiveMQTopic">
		<constructor-arg name="name" value="item-change-topic"></constructor-arg>
	</bean>
	<!-- messagelistener的初始化 -->
	<bean id="itemChangeMessageListener" class="com.taotao.search.listener.ItemChangeMessageListener"></bean>
	<!-- 設定預設的監聽容器 -->
	<bean class="org.springframework.jms.listener.DefaultMessageListenerContainer">
		<property name="connectionFactory" ref="connectionFactory"></property>
		<property name="destination" ref="topicDestination"></property>
		<property name="messageListener" ref="itemChangeMessageListener"></property>
	</bean>
</beans>

3.注意

使用訊息佇列activeMQ更新solr索引庫,用於管理員更新商品時,使用者可以搜到最新的商品資訊,就分析和編寫完畢。

但是有一點遺憾的地方,這裡並不能用作商品的刪除、下架、上架

因為我當初在編寫這個三個功能的時候偷懶了,將三個功能放在一起,根據方法名判斷刪除、下架、上架,而且引數為List型別。

有一種可行的方法我將List<Long> ids,用逗號分割轉為String,最後加上delete、reshelf、instock這樣的標識

比如:"15214,41358,123231,delete"、"152745,86358,123231,reshelf",在searchDao做字串split,判斷最末尾是delete還是reshelf等,執行solrServer.delete()、solrServer.add()。但是感覺這樣太亂了,耦合死了,看著就難受。

這個坑一個以後解決吧!!!哎

相關推薦

商城44-使用訊息佇列activeMQ更新solr索引--解決同步索引問題

目錄 3.注意 1.為什麼要使用訊息對列acitveMQ 在我們的後臺,增加、修改、刪除商品時,這裡以修改為例。 修改商品時,修改的是資料庫中的資料。但是使用者在商品搜尋時,搜尋的是solr索引庫中的資料,所

商城24_ActiveMq訊息佇列02_activeMq與spring的整合

一、新增依賴 <!-- activeMQ --> <dependency> <groupId>org.apache.activemq</groupId> <artifactId>activemq-all</a

商城24_ActiveMq訊息佇列01

一、什麼是訊息佇列MQ? 舉個例子:們去銀行視窗辦理業務,經常會遇到有好多人都在辦業務,這個時候呢,就需要排隊,等待視窗喊號:                   001號顧客請到1號視窗辦理業務

java B2B2C Springcloud多租戶電子商城系統-訊息佇列ActiveMQ

什麼是ActiveMQ? ActiveMQ 是Apache出品,最流行的,能力強勁的開源訊息匯流排。ActiveMQ 是一個完全支援JMS1.1和J2EE 1.4規範的 JMS Provider實現,儘管JMS規範出臺已經是很久的事情了,但是JMS在當今的J2EE應用中間仍然扮演著特殊的

java面試—訊息佇列ActiveMQ

1.如何使用ActiveMq解決分散式事物     在應用中,都會有使用者註冊功能:收集使用者錄入資訊,儲存到資料庫—向用戶的手機或郵箱發驗證碼······     在傳統集中式架構,實現功能:開啟一個本地事物,往本地資料庫中插入一條使

Java訊息佇列--ActiveMq筆記

1、下載安裝ActiveMQ     ActiveMQ官網下載地址:http://activemq.apache.org/download.html   ActiveMQ 提供了Windows 和Linux、Unix 等幾個版本,樓主這裡選擇了Linux 版本下進行開發。

訊息佇列ActiveMQ初步

安裝ActiveMQ 官網地址:http://activemq.apache.org/ 解壓後基本目錄結構: bin存放的是指令碼檔案 conf存放的是基本配置檔案 data存放的是日誌檔案 docs存放的是說明文件 examples存放的是簡單的例項

Java訊息佇列總結只需一篇解決ActiveMQ、RabbitMQ、ZeroMQ、Kafka

一、訊息佇列概述 訊息佇列中介軟體是分散式系統中重要的元件,主要解決應用解耦,非同步訊息,流量削鋒等問題,實現高效能,高可用,可伸縮和最終一致性架構。目前使用較多的訊息佇列有ActiveMQ,RabbitMQ,ZeroMQ,Kafka,MetaMQ,RocketMQ 二、

java B2B2C Springboot電子商城系統-訊息佇列之 RabbitMQ

常見的訊息佇列 需要JAVA Spring Cloud大型企業分散式微服務雲構建的B2B2C電子商務平臺原始碼請加企鵝求求:二一四七七七五六三三 目前業界有四款常用的訊息佇列,它們分別是RabbitMQ、RocketMQ、ActiveMQ和Kafka。 RabbitMQ Rabbit

Java訊息佇列--ActiveMq 初體驗

1、下載安裝ActiveMQ   ActiveMQ 提供了Windows 和Linux、Unix 等幾個版本,樓主這裡選擇了Linux 版本下進行開發。   下載完安裝包,解壓之後的目錄:    從它的目錄來說,還是很簡單的:  bin

訊息佇列-ActiveMQ學習筆記(一)-JMS介紹與環境搭建

一、介紹JMS(來自於百度百科)        JMS即Java訊息服務(Java Message Service)應用程式介面,是一個Java平臺中關於面向訊息中介軟體(MOM)的API,用於在兩個

訊息佇列-ActiveMQ

1 業務需求描述 舉例描述: 再警情通報的業務時通過傳送訊息介面可以選擇 警情聯絡,和船情通報兩種訊息 傳送方式可分為 一對一發送:部門對部門、個人對個人 一對多傳送:部門對多部門、個人對多人 2 功能實現設計 基於上述需求描述,在訊息傳輸功能實現上選用activemq進行警情聯絡訊息傳輸功能

訊息佇列ActiveMQ的使用

-----------------ActiveMQ----------------- 一、ActiveMQ核心概念1、ActiveMQ是訊息佇列技術,為解決高併發問題而生!2、ActiveMQ生產者消費者模型(生產者和消費者可以跨平臺、跨系統)有中間平臺3、ActiveMQ

Java訊息佇列--ActiveMq 實戰

https://www.cnblogs.com/jaycekon/p/6225058.htmlActiveMQ 提供了Windows 和Linux、Unix 等幾個版本,樓主這裡選擇了Linux 版本下進行開發從它的目錄來說,還是很簡單的: bin存放的是指令碼檔案conf存

java訊息佇列ActiveMQ的簡單使用

activeMQ 是學習java訊息佇列的實現專案,使用jfinal + jfinal-ext + activeMQ + quartz快速構建。 1.訊息佇列 訊息佇列,其實是一種基於資料結構實現的服務。而java語言中的實現,有apache的activeMQ,比較主流。

訊息佇列ActiveMQ+Spring整合

訊息佇列MQ簡介 訊息佇列技術是分散式應用間交換資訊的一種技術。使用訊息佇列可以很好的將任務以非同步的方式進行處理或者進行資料傳送和儲存等。例如當你頻繁地向資料庫中插入資料、頻繁的向搜尋引擎

訊息佇列activeMQ的啟動和關閉(學習筆記之二)

1、activeMQ是一個使用java開發的訊息中介軟體2、在windows和linux解壓縮3、cd 到apache-activemq-5.11.1目錄下的bin目錄,執行activemq start和activemq stop即可4、預設埠為81615.客戶端表格的欄位含

訊息佇列-ActiveMQ的使用(Windows系統)

   2.下載完成後,解壓到本地目錄(我放在了D盤),我的電腦是Windows系統64位的,所以進入 D:\apache-activemq-5.15.2-bin\apache-activemq-5.15.2\bin\win64 的資料夾,找到activemq.bat,如

訊息佇列--ActiveMq(一) 下載安裝

windows: 在本地試一下,我的是jdk1.7 我下載了 apache-activemq-5.12.0 解壓目錄: bin 存放的是指令碼檔案 裡面分32 位 64 位,可以選擇啟動 conf 存放的是基本配置檔案(可以通過修改jetty

訊息佇列-ActiveMQ學習筆記(三)-釋出-訂閱訊息模式實現

釋出-訂閱訊息模式與點對點模式類似,只不過在session建立訊息佇列時,由session.createQuene()變為session.createTopic()。 訊息釋出者程式碼: package com.feiyang.activemq2; import java