1. 程式人生 > >SpringBoot整合RabbitMQ解耦合二

SpringBoot整合RabbitMQ解耦合二

如何讓工具類更加通用,與業務模組解耦合是架構師和產品研發人員一直研究的課題,不管MVC分層設計,SOA面向服務設計,還是為解決高併發的讀寫分類,前後臺分類,叢集計算都解決問題的統籌方法。如下實現Spring+RabbitMQ整合工具,使用Spring開發專案更加容易整合RabbitMQ,無論是傳送RPC同步呼叫還是非同步呼叫。

在這裡插入圖片描述

使用步驟:

一、準備Maven POM配置

在這裡插入圖片描述

二、準備SpringBoot Application.properties配置檔案

在這裡插入圖片描述

三、匯入介面IMsgProcess

package com.test.util;

public interface IMsgProcess {
	public Boolean process(Object obj);
}

四、匯入訊息監聽器

package com.test.util;

import java.io.ByteArrayInputStream;
import java.io.ObjectInputStream;

import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

/**
 * 接收RabbitMQ非同步訊息的監聽器
 * 當監聽的佇列上有訊息,獲取訊息並儲存到資料庫,需要注意Service介面
 * @author java
 *
 */
@Component
public class RabbitmqListener {
	@Autowired
	private IMsgProcess serv;
	@Autowired
	private AmqpTemplate template;

    @RabbitListener(queues="test_rpc")
    public void receive(Message msg)
    {
        try
        {
            byte[] data = msg.getBody();
            Object obj = toObject(data);
			boolean rtn = serv.process(obj);
			//判斷是否是同步訊息
			if(msg.getMessageProperties() != null)
			{
				String replyTo = msg.getMessageProperties().getReplyTo();
				if(replyTo != null && !"".equals(replyTo))
				{
					String corrId = msg.getMessageProperties().getCorrelationIdString();
					MessageProperties mprop = new MessageProperties();
					mprop.setCorrelationIdString(corrId);
					mprop.setReplyTo(replyTo);
					System.out.println("訊息處理結果="+rtn+",回覆佇列replyTo="+replyTo+",關聯ID corrId="+corrId);
					byte[] body = (rtn+"").getBytes();
					Message replyMsg = new Message(body,mprop);
					template.send("", replyTo, replyMsg);
					System.out.println("接收到同步訊息="+new String(data));
				}
				else
				{
					System.out.println("接收到非同步訊息="+new String(data));
				}
			}
			else
			{
				System.out.println("接收到非同步訊息="+new String(data));
			}
        }
        catch(Exception e)
        {
            e.printStackTrace();
        }
    }

    public Object toObject(byte[] data)
    {
        try
        {
            ByteArrayInputStream bais = new ByteArrayInputStream(data);
            ObjectInputStream ois = new ObjectInputStream(bais);
            Object obj = ois.readObject();
            bais.close();
            return obj;
        }
        catch(Exception e)
        {
            e.printStackTrace();
        }
        return null;
    }
}

五、匯入訊息傳送工具類

可以傳送非同步訊息,也可以傳送同步訊息

package com.test.util;

import java.io.ByteArrayOutputStream;
import java.io.ObjectOutputStream;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;

import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;

@Component
public class SendMsg {
	@Value("${spring.rabbitmq.host}")
	private String host = "localhost";
	@Value("${spring.rabbitmq.port}")
	private String port = "5672";
	@Value("${spring.rabbitmq.username}")
	private String userName = "admin2";
	@Value("${spring.rabbitmq.password}")
	private String pwd = "admin2";
	
	public byte[] toBytes(Object obj)
	{
		try
		{
			ByteArrayOutputStream baos = new ByteArrayOutputStream();
			ObjectOutputStream oos = new ObjectOutputStream(baos);
			oos.writeObject(obj);
			byte[] data = baos.toByteArray();
			baos.close();
			return data;
			
		}
		catch(Exception e)
		{
			e.printStackTrace();
		}
		return null;
	}
	
	public void sendObject(Object obj,Integer waitSecond)
	{
		try
		{
			System.out.println("host="+host+",port="+port);
			//1.creeate ConnectionFactory
			ConnectionFactory cf = new ConnectionFactory();
			cf.setHost(host);
			cf.setPort(Integer.parseInt(port));
			cf.setUsername(userName);
			cf.setPassword(pwd);
			//2.create Conection
			Connection con = cf.newConnection();
			//3.create Channel
			Channel channel = con.createChannel();
			//4.create exchage
			String exgName = "test_rpc_exg";
			channel.exchangeDeclare(exgName, "direct");
			String queueName = "test_rpc";
			String routeKey = "java.io.File";
			boolean durable = true;
			boolean exclusive = false;
			boolean autoDelete = false;
			channel.queueDeclare(queueName, durable, exclusive, autoDelete, null);
			//5.bind exchange and queue
			channel.queueBind(queueName,exgName,routeKey);
			channel.basicPublish(exgName, routeKey, null,toBytes(obj));
			
			channel.close();
			con.close();
			
			if(waitSecond == null)
				waitSecond = 1;
			Thread.sleep(waitSecond*1000);
			
		}
		catch(Exception e)
		{
			e.printStackTrace();
		}
	}
	
	/**
	 * 傳送同步訊息
	 * @param obj
	 * @return
	 */
	public boolean sendSyncObject(Object obj) {
		try
		{
			System.out.println("host="+host+",port="+port);
			//1.creeate ConnectionFactory
			ConnectionFactory cf = new ConnectionFactory();
			cf.setHost(host);
			cf.setPort(Integer.parseInt(port));
			cf.setUsername(userName);
			cf.setPassword(pwd);
			//2.create Conection
			Connection con = cf.newConnection();
			//3.create Channel
			Channel channel = con.createChannel();
			//4.create exchage
			String exgName = "test_rpc_exg";
			channel.exchangeDeclare(exgName, "direct");
			String queueName = "test_rpc";
			String routeKey = "java.io.File";
			boolean durable = true;
			boolean exclusive = false;
			boolean autoDelete = false;
			channel.queueDeclare(queueName, durable, exclusive, autoDelete, null);
			String replyTo = channel.queueDeclare().getQueue();
			final String corrId = java.util.UUID.randomUUID().toString();	
			//5.bind exchange and queue
			channel.queueBind(queueName,exgName,routeKey);
			//6.send msg
			AMQP.BasicProperties prop = new AMQP.BasicProperties.Builder()
					.correlationId(corrId).replyTo(replyTo).build();
			
			channel.basicPublish(exgName, routeKey, prop,toBytes(obj));
			final BlockingQueue<byte[]> response = new ArrayBlockingQueue<byte[]>(1);
			Consumer csum = new DefaultConsumer(channel){
				@Override
				public void handleDelivery(java.lang.String consumerTag, 
						Envelope envelope, AMQP.BasicProperties properties, 
						byte[] body)
				{
					String corrId2 = properties.getCorrelationId();
					//if(corrId.equals(corrId2))
					{
						response.offer(body);
					}
					
				}
			};
			channel.basicConsume(replyTo,true,csum);
			//從JVM阻塞佇列中獲取回覆訊息,如果沒收到當前執行緒阻塞
			byte[] result = response.take();
			String str = new String(result);		
			System.out.println("result="+str);
			//7.close Connection
			channel.close();
			con.close();
			if("true".equals(str))
				return true;
			else
				return false;
			
		}
		catch(Exception e)
		{
			e.printStackTrace();
		}
		return false;
	}

}

六、實現訊息處理類

必須實現IMsgProcess介面
在這裡插入圖片描述

七、傳送訊息

在這裡插入圖片描述

程式碼下載
https://pan.baidu.com/s/18obEXV-uZs9d9lpgpXuhzQ