1. 程式人生 > >Redis延時佇列方案

Redis延時佇列方案

總體方案

建立一個定時任務,每一次執行完後間隔一定時間就會掃描快取,快取中一旦添加了任務,就會被掃描到,然後傳送到訊息佇列,監聽器一旦監聽到訊息就會進行處理,如果處理失敗,則再次生成任務(次數加1,時間戳會根據規則增加),到達規定次數後則不在執行

具體細節

首先建立一個執行完後間隔’${webhook.fixedDelay}’後執行下一次,從快取中取出資料,一旦取到資料就傳送到訊息佇列中,並且刪除掉快取裡面的資料

@Scheduled(fixedDelayString = "${webhook.fixedDelay}")
    public void process() {
        // ZRANGEBYSCORE 取score小於等於當前時間的資料
Date date = new Date(); Set<String> list = stringRedisTemplate.opsForZSet().rangeByScore(key, 0, date.getTime()); if (list == null || list.size() == 0) { return; } try { // 傳送佇列 for (String json : list) { amqpTemplate.
convertAndSend(routingKey, json); } // ZREMRANGEBYSCORE() 刪除score小於等於當前時間的資料 stringRedisTemplate.opsForZSet().removeRangeByScore(key, 0, date.getTime()); } catch (Exception e) { // TODO: handle exception logger.error("佇列傳送失敗", e); } }

監聽器一旦監聽到訊息就會進行處理,如果處理失敗,則再次生成任務(次數加1,時間戳會根據規則增加),到達規定次數後則不在執行

@RabbitListener(queues = "${webhook.queue}")
    // 引數中使用@Header獲取mesage
    public void helloReply(String json) {
        logger.debug("task:{}", json);
        Task task = gson.fromJson(json, Task.class);
        try {
            handler.handler(task.getData());
        } catch (Exception e) {
            if (webhookUtil.getInterval().size() > task.getTimes()) {
                Task next = new Task();
                next.setTimes(task.getTimes() + 1);
                next.setTimestamp(task.getTimestamp() + webhookUtil.getInterval().get(task.getTimes())*1000);
                next.setData(task.getData());
                webhookUtil.addNext(next);
                if (logger.isDebugEnabled()) {
                    logger.debug("處理失敗,新增到下次執行:" + gson.toJson(task), e);
                }
            } else {
                logger.error("處理{}次失敗,停止處理", webhookUtil.getInterval().size(), e);
            }
        }

    }

測試

向快取中新增一條資料,定時任務會到時間自動檢測到這條資料

public boolean add(String data) {
        Task task = new Task();
        task.setData("data");
        task.setTimes(0);
        task.setTimestamp(new Date().getTime() + getInterval().get(task.getTimes())*1000);
        try {
            stringRedisTemplate.opsForZSet().add(key, gson.toJson(task), task.getTimestamp());
            return true;
        } catch (Exception e) {
            logger.error("redis新增失敗", e);
            return false;
        }
    }

配置檔案

spring-mq.xml

    <!-- 連線服務配置  -->  
    <rabbit:connection-factory id="connectionFactory"
        addresses="${rabbitmq.addresses}" username="${rabbitmq.username}"
        password="${rabbitmq.password}" channel-cache-size="${rabbitmq.channel.cache.size}" />

    <rabbit:admin connection-factory="connectionFactory" />

    <!-- 將queue和routingKey進行繫結 --><!-- queue 佇列宣告 --> 
    <rabbit:queue name="${webhook.queue}" />

    <!-- exchange queue binging key 繫結 -->
    <!-- direct方式:根據routingKey將訊息傳送到所有繫結的queue中 -->
    <rabbit:direct-exchange name="${rabbitmq.direct.exchange}">
        <rabbit:bindings>
            <rabbit:binding queue="${webhook.queue}" key="${webhook.routing.key}" />
        </rabbit:bindings>
    </rabbit:direct-exchange>

    <!-- 消費者配置開始 -->
    <rabbit:annotation-driven />

    <bean id="rabbitListenerContainerFactory"
        class="org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory">
        <property name="messageConverter" ref="messageConverter" />
        <property name="connectionFactory" ref="connectionFactory" />
        <property name="concurrentConsumers" value="3" />
        <property name="maxConcurrentConsumers" value="10" />
    </bean>
    <!-- 消費者配置結束 -->

    <!-- 將生產者生產的資料轉換為json存入訊息佇列 -->
    <bean id="messageConverter" class="org.springframework.amqp.support.converter.Jackson2JsonMessageConverter" />

    <!-- spring template宣告 -->  
    <rabbit:template id="amqpTemplate" message-converter="messageConverter"
        connection-factory="connectionFactory" reply-timeout="2000" retry-template="retryTemplate" 
        exchange="${rabbitmq.direct.exchange}"  />

    <!--傳送失敗後重新發送的模板  -->
    <bean id="retryTemplate" class="org.springframework.retry.support.RetryTemplate">
        <property name="backOffPolicy">
            <bean class="org.springframework.retry.backoff.ExponentialBackOffPolicy">
                <property name="initialInterval" value="500" />
                <property name="multiplier" value="10.0" />
                <property name="maxInterval" value="10000" />
            </bean>
        </property>
    </bean>

webhook.properties

#資料存入快取中的名字
webhook.key=eisp_webhook_key
#訊息佇列的key值
webhook.routing.key=eisp_webhook_routing_key
#訊息佇列的佇列名
webhook.queue=webhook_queue
#下次請求的時間間隔(單位:秒)
webhook.interval=5, 20,30
#定時器每次執行完後的間隔時間(單位:毫秒)
webhook.fixedDelay=1000

其中’webhook.interval=5, 20,30’是以陣列的方式注入

@Value("#{'${webhook.interval}'.split(',')}")
    private List<Integer> interval;