1. 程式人生 > >rabbitMQ實現可靠訊息投遞

rabbitMQ實現可靠訊息投遞

    RabbitMQ訊息的可靠性主要包括兩方面,一方面是通過實現消費的重試機制(通過@Retryable來實現重試,可以設定重試次數和重試頻率,但是要保證冪等性),另一方面就是實現訊息生產者的可靠投遞(注意消費單冪等),下面主要講下生產者實現的可靠訊息投遞。

    rabbitTemplate的傳送流程是這樣的:

    1 傳送資料並返回(不確認rabbitmq伺服器已成功接收)

    2 非同步的接收從rabbitmq返回的ack確認資訊

    3 收到ack後呼叫confirmCallback函式 注意:在confirmCallback中是沒有原message的,所以無法在這個函式中呼叫重發,confirmCallback只有一個通知的作用 在這種情況下,如果在2,3步中任何時候切斷連線,我們都無法確認資料是否真的已經成功傳送出去,從而造成資料丟失的問題。

    最完美的解決方案只有1種: 使用rabbitmq的事務機制。 但是在這種情況下,rabbitmq的效率極低,每秒鐘處理的message在幾百條左右。實在不可取。

    第二種解決方式,使用同步的傳送機制,也就是說,客戶端傳送資料,rabbitmq收到後返回ack,再收到ack後,send函式才返回。程式碼類似這樣:

建立channel
send message
wait for ack(or 超時)
close channel
返回成功or失敗

    同樣的,由於每次傳送message都要重新建立連線,效率很低。

    基於上面的分析,我們使用一種新的方式來做到資料的不丟失。

    在rabbitTemplate非同步確認的基礎上

    1 在redis中快取已傳送的message

    2 通過confirmCallback或者被確認的ack,將被確認的message從本地刪除

    3 定時掃描本地的message,如果大於一定時間未被確認,則重發

    當然了,這種解決方式也有一定的問題: 想象這種場景,rabbitmq接收到了訊息,在傳送ack確認時,網路斷了,造成客戶端沒有收到ack,重發訊息。(相比於丟失訊息,重發訊息要好解決的多,我們可以在consumer端做到冪等)。 自動重試的程式碼如下:

    

package cn.chinotan.service.reliabletransmission;

/**
 * @program: test
 * @description: rabbitMq常量
 * @author: xingcheng
 * @create: 2018-08-12 12:30
 **/
public class MyConstant {

    public static final String MY_EXCHANGE = "my_exchange";
    
    public static final String ERROR_EXCHANGE = "error_exchange";

    public static final String MY_QUEUE_THREE = "my_queue_three";

    public final static String KEY_PREFIX = "test:rabbitMq:";

    /**
     * consumer失敗後等待時間(mils)
     */
    public static final int ONE_MINUTE = 1 * 60 * 1000;

    /**
     * MQ訊息retry時間
     */
    public static final int RETRY_TIME_INTERVAL = ONE_MINUTE;

    /**
     * MQ訊息有效時間
     */
    public static final int VALID_TIME = ONE_MINUTE;

}
package cn.chinotan.service.reliabletransmission;

import java.io.Serializable;

/**
 * @program: test
 * @description: 包裝訊息
 * @author: xingcheng
 * @create: 2018-09-24 15:32
 **/
public class MessageWithTime implements Serializable {

    private String id;
    private long time;
    private String message;

    public String getId() {
        return id;
    }

    public void setId(String id) {
        this.id = id;
    }

    public long getTime() {
        return time;
    }

    public void setTime(long time) {
        this.time = time;
    }

    public String getMessage() {
        return message;
    }

    public void setMessage(String message) {
        this.message = message;
    }
}
package cn.chinotan.service.reliabletransmission;

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
 * rabbitMQ配置
 */
@Configuration
public class ReliableRabbitConfig {

    @Bean
    public DirectExchange myExchange() {
        return new DirectExchange(MyConstant.MY_EXCHANGE, true, false);
    }

    @Bean
    public Queue myQueueOne() {
        return new Queue(MyConstant.MY_QUEUE_THREE, true);
    }

    @Bean
    public Binding queueOneBinding() {
        return BindingBuilder.bind(myQueueOne()).to(myExchange()).withQueueName();
    }
}

package cn.chinotan.service.reliabletransmission;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.support.CorrelationData;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Service;

import java.util.Map;
import java.util.UUID;

/**
 * @program: test
 * @description: rabbitService
 * @author: xingcheng
 * @create: 2018-09-24 14:28
 **/
@Service
public class RabbitMQService {

    Logger logger = LoggerFactory.getLogger(RabbitMQService.class);

    @Autowired
    StringRedisTemplate redisTemplate;

    @Autowired
    private RabbitTemplate rabbitTemplate;

    public Boolean send(String exchange, String routingKey, Object message) {
        try {
            String key = StringUtils.join(MyConstant.KEY_PREFIX, UUID.randomUUID().toString().replace("-", "").toLowerCase());

            // 傳送前儲存訊息和時間和id到redis快取中
            MessageWithTime messageWithTime = new MessageWithTime();
            messageWithTime.setId(key);
            messageWithTime.setMessage(JSONObject.toJSONString(message));
            messageWithTime.setTime(System.currentTimeMillis());
            redisTemplate.opsForValue().set(key, JSONObject.toJSONString(messageWithTime));

            // 非同步回撥通知
            rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {
                if (ack) {
                    logger.info("message send success--id:[{}]", correlationData.getId());
                    // 傳送成功後,刪除redis快取
                    redisTemplate.delete(correlationData.getId());
                } else {
                    // 傳送失敗後列印日誌,進行重試
                    logger.error("message send fail--id:[{}]", correlationData.getId());
                }
            });

            CorrelationData correlationData = new CorrelationData(key);
            rabbitTemplate.convertAndSend(exchange, routingKey, message, correlationData);
        } catch (Exception e) {
            logger.error("傳送訊息異常{}", e);
            return false;
        }

        return true;
    }

    Boolean send(String exchange, String routingKey, MessageWithTime message) {
        try {
            // 非同步回撥通知
            rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {
                if (ack) {
                    logger.info("message send success--id:[{}]", correlationData.getId());
                    // 傳送成功後,刪除redis快取
                    redisTemplate.delete(correlationData.getId());
                } else {
                    // 傳送失敗後列印日誌,進行重試
                    logger.error("message send fail--id:[{}]", correlationData.getId());
                }
            });

            CorrelationData correlationData = new CorrelationData(message.getId());
            Map map = JSON.parseObject(message.getMessage(), Map.class);
            rabbitTemplate.convertAndSend(exchange, routingKey, map, correlationData);
        } catch (Exception e) {
            logger.error("傳送訊息異常{}", e);
            return false;
        }

        return true;
    }

}
package cn.chinotan.service.reliabletransmission;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

import java.util.Map;

/**
 * 生產者
 */
@Service
public class ReliableProducr {
    private static final Logger LOGGER = LoggerFactory.getLogger(ReliableProducr.class);

    @Autowired
    private RabbitMQService rabbitMQService;

    public Boolean send(Map msg) {
        return rabbitMQService.send(MyConstant.MY_EXCHANGE, MyConstant.MY_QUEUE_THREE, msg);
    }

    public Boolean send(MessageWithTime msg) {
        return rabbitMQService.send(MyConstant.MY_EXCHANGE, MyConstant.MY_QUEUE_THREE, msg);
    }
}

package cn.chinotan.service.reliabletransmission;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.web.context.WebApplicationContext;
import org.springframework.web.context.support.WebApplicationContextUtils;

import javax.servlet.ServletContextEvent;
import javax.servlet.ServletContextListener;
import javax.servlet.annotation.WebListener;

/**
 * @program: test
 * @description: 可靠投遞監聽器
 * @author: xingcheng
 * @create: 2018-09-24 16:05
 **/
@WebListener
public class ReliableTransContextListener implements ServletContextListener {

    Logger logger = LoggerFactory.getLogger(ReliableTransContextListener.class);

    private WebApplicationContext springContext;

    @Override
    public void contextInitialized(ServletContextEvent sce) {
        logger.info("ReliableTransContextListener init start...........");
        springContext = WebApplicationContextUtils.getWebApplicationContext(sce.getServletContext());
        if (springContext != null) {
            RetryCache retryCache = (RetryCache) springContext.getBean("retryCache");
            new Thread(() -> retryCache.startRetry()).start();
        }
    }

    @Override
    public void contextDestroyed(ServletContextEvent sce) {
    }
}
package cn.chinotan.service.reliabletransmission;

import com.alibaba.fastjson.JSON;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Component;

import java.util.List;
import java.util.Set;

/**
 * @program: test
 * @description: 快取重試
 * @author: xingcheng
 * @create: 2018-09-24 16:12
 **/
@Component("retryCache")
public class RetryCache {

    private boolean stop = false;

    Logger logger = LoggerFactory.getLogger(RetryCache.class);

    @Autowired
    private ReliableProducr producr;

    @Autowired
    private StringRedisTemplate redisTemplate;
    
    private final String STAR = "*";

    public void startRetry() {
        while (!stop) {
            try {
                Thread.sleep(MyConstant.RETRY_TIME_INTERVAL);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }

            long now = System.currentTimeMillis();

            Set<String> keys = redisTemplate.keys(StringUtils.join(MyConstant.KEY_PREFIX, STAR));
            if (keys != null && !keys.isEmpty()) {
                List<String> list = redisTemplate.opsForValue().multiGet(keys);
                list.forEach(value -> {
                    MessageWithTime messageWithTime = JSON.parseObject(value, MessageWithTime.class);

                    if (null != messageWithTime) {
                        if (messageWithTime.getTime() + 3 * MyConstant.VALID_TIME < now) {
                            logger.error("send message {} failed after 3 min ", messageWithTime);
                            redisTemplate.delete(messageWithTime.getId());
                        } else if (messageWithTime.getTime() + MyConstant.VALID_TIME < now) {
                            Boolean send = producr.send(messageWithTime);
                            logger.info("進行重新投遞訊息");
                            if (!send) {
                                logger.error("retry send message failed {}", messageWithTime);
                            }
                        }
                    }
                });
            }
        }
    }

}
package cn.chinotan.service.reliabletransmission;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.Map;

/**
 * queueThree消費者
 */
@Component
public class MyQueueThreeConsumer {
    private static final Logger LOGGER = LoggerFactory.getLogger(MyQueueThreeConsumer.class);

    /**
     * 消費者做好冪等
     *
     * @param content
     */
    @RabbitListener(queues = MyConstant.MY_QUEUE_THREE)
    @RabbitHandler
    public void process(Map content) {
        LOGGER.info("消費者,queueThree開始執行 {}", new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()));
        LOGGER.info("消費者,queueThree消費內容:[{}]", JSON.toJSONString(content));
    }
}
import cn.chinotan.service.reliabletransmission.MyConstant;
import cn.chinotan.service.reliabletransmission.RabbitMQService;
import cn.chinotan.service.reliabletransmission.ReliableProducr;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;

import java.util.HashMap;
import java.util.Map;

/**
 * @program: test
 * @description: 可靠投遞測試
 * @author: xingcheng
 * @create: 2018-09-24 15:57
 **/
@RunWith(SpringRunner.class)
@SpringBootTest(classes = MyApplication.class)
public class ReliableTransmissionTest {

    @Autowired
    private ReliableProducr producr;

    @Autowired
    private RabbitMQService rabbitMQService;

    /**
     * 正常情況測試
     * @throws Exception
     */
    @Test
    public void reliableTransmissionTest() throws Exception {
        Map<String, String> map = new HashMap<>();
        map.put("name", "xingheng");
        producr.send(map);
    }

    /**
     * 異常情況測試
     * @throws Exception
     */
    @Test
    public void reliableTransmissionFailTest() throws Exception {
        Map<String, String> map = new HashMap<>();
        map.put("name", "xingheng");
        rabbitMQService.send(MyConstant.ERROR_EXCHANGE, MyConstant.MY_QUEUE_THREE, map);
    }
}

注意事項:

1.配置中要開啟發布者確認,類似這樣:

spring:
  rabbitmq:
    publisher-confirms: true

2.如果要測試異常情況只需要將訊息傳送到一個不存在的交換機即可

3.注意消費端冪等

簡單測試結果:

在重試一次後,會將它傳送到正確的交換機,於是傳送成功