1. 程式人生 > >訊息中介軟體系列六,rabbit與spring整合實戰

訊息中介軟體系列六,rabbit與spring整合實戰

本專案是rabbit和spring整合的實戰學習專案,模擬電商下單和庫存管理的過程,看過前面幾篇部落格的同學,相信這篇部落格對你不會再難了。一些和本章學習不太相關的內容不會做過多說明,需要的朋友可以下載原始碼自己檢視執行:rabbit與spring整合實戰原始碼

生產者訂單系統

一、pom檔案引入相關包

rabbit和spring整合以下兩個包是必須的

    <dependency>
      <groupId>com.rabbitmq</groupId>
      <artifactId>amqp-client</artifactId>
      <version>5.0.0</version>
    </dependency>
    <dependency>
      <groupId>org.springframework.amqp</groupId>
      <artifactId>spring-rabbit</artifactId>
      <version>2.0.0.RELEASE</version>
    </dependency>

其他包根據專案需要引進;引入的其他包有興趣檢視的可以下載原始碼檢視。

二、配置檔案

web.xml和spring-mvc.xml不是部落格重點,不再貼出,有興趣的下載原始碼檢視;下面說明applicationContext.xml的內容:

1、配置檔案中增加名稱空間:

1、連線工廠配置

	<!-- rabbitMQ配置 -->
	<bean id="rabbitConnectionFactory"
		  class="org.springframework.amqp.rabbit.connection.CachingConnectionFactory">
		<constructor-arg value="127.0.0.1"/>
		<property name="username" value="guest"/>
		<property name="password" value="guest"/>
		<property name="channelCacheSize" value="8"/>
		<property name="port" value="5672"></property>
		<!-- 釋出確認必須配置在CachingConnectionFactory上 -->
		<property name="publisherConfirms" value="true"/>
	</bean>

2、<rabbit:admin>

配置<rabbit:admin>之後才能根據配置檔案去生產佇列交換器等資訊。

<rabbit:admin connection-factory="rabbitConnectionFactory"/>

4、宣告佇列

durable:是否持久化

<rabbit:queue name="depot_queue" durable="true"/>

5、宣告交換器

name:交換器名稱,durable:是否持久化

	<rabbit:direct-exchange name="depot-amount-exchange"
          xmlns="http://www.springframework.org/schema/rabbit" durable="true">
		<rabbit:bindings>
			<rabbit:binding queue="depot_queue" key="amount.depot" ></rabbit:binding>
		</rabbit:bindings>
	</rabbit:direct-exchange>

6、佇列和交換器進行繫結

queue:佇列名稱,key:繫結的路由鍵,需要在交換器中繫結。

		<rabbit:bindings>
			<rabbit:binding queue="depot_queue" key="amount.depot" ></rabbit:binding>
		</rabbit:bindings>

7、生產者端要宣告RabbitmqTemplate

	<!-- 建立rabbitTemplate 訊息模板類 -->
	<bean id="rabbitTemplate" class="org.springframework.amqp.rabbit.core.RabbitTemplate">
		<constructor-arg ref="rabbitConnectionFactory"></constructor-arg>
		<!--訊息確認回撥 -->
		<property name="confirmCallback" ref="confirmCallback"/>
		<property name="returnCallback" ref="sendReturnCallback"/>
	</bean>

完整的applicationContext.xml檔案如下:

<?xml version="1.0" encoding="UTF-8"?>
<!-- 查詢最新的schemaLocation 訪問 http://www.springframework.org/schema/ -->
<beans xmlns="http://www.springframework.org/schema/beans"
	   xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
	   xmlns:mvc="http://www.springframework.org/schema/mvc"
	   xmlns:tx="http://www.springframework.org/schema/tx"
	   xmlns:jee="http://www.springframework.org/schema/jee"
	   xmlns:p="http://www.springframework.org/schema/p"
	   xmlns:aop="http://www.springframework.org/schema/aop"
	   xmlns:context="http://www.springframework.org/schema/context"
	   xmlns:task="http://www.springframework.org/schema/task"
	   xmlns:rabbit="http://www.springframework.org/schema/rabbit"
	   xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.0.xsd
    http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-4.0.xsd
    http://www.springframework.org/schema/jee http://www.springframework.org/schema/jee/spring-jee-4.0.xsd
    http://www.springframework.org/schema/mvc http://www.springframework.org/schema/mvc/spring-mvc-4.0.xsd
    http://www.springframework.org/schema/tx  http://www.springframework.org/schema/tx/spring-tx-4.0.xsd
    http://www.springframework.org/schema/aop  http://www.springframework.org/schema/aop/spring-aop-4.0.xsd
    http://www.springframework.org/schema/task http://www.springframework.org/schema/task/spring-task-4.0.xsd
    http://www.springframework.org/schema/rabbit http://www.springframework.org/schema/rabbit/spring-rabbit-2.0.xsd">

     <!-- 配置掃描路徑 -->
     <context:component-scan base-package="com.dongnaoedu">
     	<context:exclude-filter type="annotation" expression="org.springframework.stereotype.Controller"/>
     </context:component-scan>

	<!-- rabbitMQ配置 -->
	<bean id="rabbitConnectionFactory"
		  class="org.springframework.amqp.rabbit.connection.CachingConnectionFactory">
		<constructor-arg value="127.0.0.1"/>
		<property name="username" value="guest"/>
		<property name="password" value="guest"/>
		<property name="channelCacheSize" value="8"/>
		<property name="port" value="5672"></property>
		<!-- 釋出確認必須配置在CachingConnectionFactory上 -->
		<property name="publisherConfirms" value="true"/>
	</bean>
	<rabbit:admin connection-factory="rabbitConnectionFactory"/>

	<rabbit:queue name="depot_queue" durable="true"/>

	<rabbit:direct-exchange name="depot-amount-exchange"
          xmlns="http://www.springframework.org/schema/rabbit" durable="true">
		<rabbit:bindings>
			<rabbit:binding queue="depot_queue" key="amount.depot" ></rabbit:binding>
		</rabbit:bindings>
	</rabbit:direct-exchange>

	<!-- 建立rabbitTemplate 訊息模板類 -->
	<bean id="rabbitTemplate" class="org.springframework.amqp.rabbit.core.RabbitTemplate">
		<constructor-arg ref="rabbitConnectionFactory"></constructor-arg>
		<!--訊息確認回撥 -->
		<property name="confirmCallback" ref="confirmCallback"/>
	</bean>
</beans>

三、其他程式碼

1、controller

package com.dongnaoedu.controller;

import com.dongnaoedu.service.ProcessOrder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.ResponseBody;

@Controller
public class OrderController {

    private Logger logger = LoggerFactory.getLogger(OrderController.class);
    private static final String SUCCESS = "suc";
    private static final String FAILUER = "failure";

    @Autowired
    private ProcessOrder processOrder;

    @RequestMapping("/order")
    public String userReg(){
        return "index";
    }

    @RequestMapping("/confirmOrder")
    @ResponseBody
    public String confirmOrder(@RequestParam("goodsId")String goodsId,
                           @RequestParam("amount")int amount){
        try {
            processOrder.processOrder(goodsId,amount);
            return SUCCESS;
        } catch (Exception e) {
            logger.error("訂單確認異常!",e);
            return FAILUER;
        }
    }
}

2、ProcessOrder

package com.dongnaoedu.service;

import com.dongnaoedu.rpc.DepotService;
import com.dongnaoedu.rpc.RpcProxy;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.stereotype.Service;
import sun.security.x509.IPAddressName;

import java.net.InetSocketAddress;

@Service
public class ProcessOrder {
    private Logger logger = LoggerFactory.getLogger(ProcessOrder.class);

    @Autowired
    @Qualifier("mq")
    private IProDepot proDepot;

    public void processOrder(String goodsId,int amount){
        try {
            Thread.sleep(80);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        logger.info("--------------------["+goodsId+"]訂單入庫完成,準備變動庫存!");
        proDepot.processDepot(goodsId,amount);

    }
}

3、MqMode

生產者可通過rabbitTemplate呼叫send方法傳送訊息,引數分別為exchange交換器,routingKey路由鍵,Message物件。

package com.dongnaoedu.service;

import com.dongnaoedu.vo.GoodTransferVo;
import com.google.gson.Gson;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageDeliveryMode;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.stereotype.Service;

@Service
@Qualifier("mq")
public class MqMode  implements IProDepot {

    private final static String DEPOT_RK = "amount.depot";
    private final static String DEPOT_EXCHANGE = "depot-amount-exchange";

    @Autowired
    RabbitTemplate rabbitTemplate;

    private static Gson gson = new Gson();

    public void processDepot(String goodsId, int amount) {
        GoodTransferVo goodTransferVo = new GoodTransferVo();
        goodTransferVo.setGoodsId(goodsId);
        goodTransferVo.setChangeAmount(amount);
        goodTransferVo.setInOrOut(false);
        String goods = gson.toJson(goodTransferVo);
        MessageProperties messageProperties = new MessageProperties();
        messageProperties.setDeliveryMode(MessageDeliveryMode.PERSISTENT);//設定訊息屬性以便進行訊息持久化,投遞模式設定為2,
        rabbitTemplate.send(DEPOT_EXCHANGE, DEPOT_RK,new Message(goods.getBytes(), messageProperties));
    }
}

4、ConfirmCallback

訊息的確認回撥,必須實現RabbitTemplate.ConfirmCallback介面

package com.dongnaoedu.service.callback;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.support.CorrelationData;
import org.springframework.stereotype.Service;

@Service
public class ConfirmCallback implements RabbitTemplate.ConfirmCallback {
    private Logger logger = LoggerFactory.getLogger(ConfirmCallback.class);

    public void confirm(CorrelationData correlationData, boolean ack, String cause) {
        if (ack) {
            logger.info("訊息確認傳送給mq成功");
        } else {
            //處理失敗的訊息
            logger.info("訊息傳送給mq失敗,考慮重發:"+cause);
        }
    }
}

消費者庫存系統

一、配置檔案

其他配置檔案可下載原始碼檢視。 applicationContext.xml

<?xml version="1.0" encoding="UTF-8"?>
<!-- 查詢最新的schemaLocation 訪問 http://www.springframework.org/schema/ -->
<beans xmlns="http://www.springframework.org/schema/beans"
	   xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
	   xmlns:mvc="http://www.springframework.org/schema/mvc"
	   xmlns:tx="http://www.springframework.org/schema/tx"
	   xmlns:jee="http://www.springframework.org/schema/jee"
	   xmlns:p="http://www.springframework.org/schema/p"
	   xmlns:aop="http://www.springframework.org/schema/aop"
	   xmlns:context="http://www.springframework.org/schema/context"
	   xmlns:task="http://www.springframework.org/schema/task"
	   xmlns:rabbit="http://www.springframework.org/schema/rabbit"
	   xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.0.xsd
    http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-4.0.xsd
    http://www.springframework.org/schema/jee http://www.springframework.org/schema/jee/spring-jee-4.0.xsd
    http://www.springframework.org/schema/mvc http://www.springframework.org/schema/mvc/spring-mvc-4.0.xsd
    http://www.springframework.org/schema/tx  http://www.springframework.org/schema/tx/spring-tx-4.0.xsd
    http://www.springframework.org/schema/aop  http://www.springframework.org/schema/aop/spring-aop-4.0.xsd
    http://www.springframework.org/schema/task http://www.springframework.org/schema/task/spring-task-4.0.xsd
    http://www.springframework.org/schema/rabbit http://www.springframework.org/schema/rabbit/spring-rabbit-2.0.xsd">

     <!-- 配置掃描路徑 -->
     <context:component-scan base-package="com.dongnaoedu">
     	<context:exclude-filter type="annotation" expression="org.springframework.stereotype.Controller"/>
     </context:component-scan>

	<!-- rabbitMQ配置 -->
	<bean id="rabbitConnectionFactory"
		  class="org.springframework.amqp.rabbit.connection.CachingConnectionFactory">
		<constructor-arg value="127.0.0.1"/>
		<property name="username" value="guest"/>
		<property name="password" value="guest"/>
		<property name="channelCacheSize" value="8"/>
		<property name="port" value="5672"></property>
	</bean>
	<rabbit:admin connection-factory="rabbitConnectionFactory"/>

    <rabbit:queue name="depot_queue" durable="true"/>

	<rabbit:direct-exchange name="depot-amount-exchange"
          xmlns="http://www.springframework.org/schema/rabbit" durable="true">
        <rabbit:bindings>
            <rabbit:binding queue="depot_queue" key="amount.depot" ></rabbit:binding>
        </rabbit:bindings>
	</rabbit:direct-exchange>

	<!-- 對訊息要手動確認 -->
    <rabbit:listener-container connection-factory="rabbitConnectionFactory"
							   acknowledge="manual">
        <rabbit:listener queues="depot_queue" ref="processDepot"
                         method="onMessage" />
    </rabbit:listener-container>
</beans>  

二、其他原始碼

1、ProcessDepot

package com.dongnaoedu.mq;

import com.dongnaoedu.service.DepotManager;
import com.dongnaoedu.vo.GoodTransferVo;
import com.google.gson.Gson;
import com.rabbitmq.client.Channel;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.core.ChannelAwareMessageListener;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

import java.io.IOException;

@Service
public class ProcessDepot  implements ChannelAwareMessageListener {

    private static Logger logger = LoggerFactory.getLogger(ProcessDepot.class);

    @Autowired
    private DepotManager depotManager;

    private static Gson gson = new Gson();

    @Override
    public void onMessage(Message message, Channel channel) throws Exception {
        try {
            String msg = new String(message.getBody());
            logger.info(">>>>>>>>>>>>>>接收到訊息:"+msg);
            GoodTransferVo goodTransferVo = gson.fromJson(msg,GoodTransferVo.class);
            try {
                depotManager.operDepot(goodTransferVo);
                channel.basicAck(message.getMessageProperties().getDeliveryTag(),
                        false);
                logger.info(">>>>>>>>>>>>>>庫存處理完成,應答Mq服務");
            } catch (Exception e) {
                logger.error(e.getMessage());
                channel.basicNack(message.getMessageProperties().getDeliveryTag(),false,true);//deliveryTag投遞的標記符,multiple是否進行批量回復,requeue是否重新入隊分發訊息
                logger.info(">>>>>>>>>>>>>>庫存處理失敗,拒絕訊息,要求Mq重新派發");
                throw e;
            }
        } catch (Exception e) {
            logger.error(e.getMessage());
        }
    }
}

2、DepotManager

package com.dongnaoedu.service;

import com.dongnaoedu.vo.GoodTransferVo;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
@Service
public class DepotManager {

    @Autowired
    private Depot depot;

    public void operDepot(GoodTransferVo goodTransferVo){
        if(goodTransferVo.isInOrOut()){
            depot.inDepot(goodTransferVo.getGoodsId(),goodTransferVo.getChangeAmount());
        }else{
            depot.outDepot(goodTransferVo.getGoodsId(),goodTransferVo.getChangeAmount());
        }
    }

}

3、Depot

package com.dongnaoedu.service;

import com.dongnaoedu.rpc.DepotService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Service;
import javax.annotation.PostConstruct;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.BiFunction;

@Service
public class Depot {

    private static Logger logger = LoggerFactory.getLogger(Depot.class);

    private ConcurrentHashMap<String,Integer> goodsData =
            new ConcurrentHashMap<String, Integer>();

    @PostConstruct
    public void initDepot(){
        goodsData.put("001",1000);
        goodsData.put("002",500);
        goodsData.put("003",600);
        goodsData.put("004",700);
    }


    /*使用jdk1.8以下的用這個方法
    public synchronized void inDepot(String goodsId,int addAmout){
        int amount = goodsData.get(goodsId)+addAmout;
        goodsData.put(goodsId,amount);
    }
    */
    //增加庫存
    public void inDepot(String goodsId,int addAmout){
        logger.info("+++++++++++++++++增加商品:"+goodsId+"庫存,數量為:"+addAmout);
        int newValue = goodsData.compute(goodsId, new BiFunction<String, Integer, Integer>() {
            public Integer apply(String s, Integer integer) {
                return integer == null ? addAmout : integer + addAmout;
            }
        });
        logger.info("+++++++++++++++++商品:"+goodsId+"庫存,數量變為:"+newValue);
    }

     /*使用jdk1.8以下的用這個方法
    public synchronized void outDepot(String goodsId,int reduceAmout){
        int amount = goodsData.get(goodsId)-reduceAmout;
        goodsData.put(goodsId,amount);
    }
    */
    //減少庫存
    public void outDepot(String goodsId,int reduceAmout){
        logger.info("-------------------減少商品:"+goodsId+"庫存,數量為:"+reduceAmout);
        int newValue = goodsData.compute(goodsId, new BiFunction<String, Integer, Integer>() {
            public Integer apply(String s, Integer integer) {
                return integer == null ? 0 : integer - reduceAmout;
            }
        });
        logger.info("-------------------商品:"+goodsId+"庫存,數量變為:"+newValue);
    }

    public int getGoodsAmount(String goodsId){
        return goodsData.get(goodsId);
    }
}

補充:原始碼的rpc呼叫部分補在這裡解釋,有興趣的朋友可以自行研究。原始碼連結:rabbit與spring整合實戰原始碼