Redis延時佇列方案
阿新 • • 發佈:2018-12-30
總體方案
建立一個定時任務,每一次執行完後間隔一定時間就會掃描快取,快取中一旦添加了任務,就會被掃描到,然後傳送到訊息佇列,監聽器一旦監聽到訊息就會進行處理,如果處理失敗,則再次生成任務(次數加1,時間戳會根據規則增加),到達規定次數後則不在執行
具體細節
首先建立一個執行完後間隔’${webhook.fixedDelay}’後執行下一次,從快取中取出資料,一旦取到資料就傳送到訊息佇列中,並且刪除掉快取裡面的資料
@Scheduled(fixedDelayString = "${webhook.fixedDelay}")
public void process() {
// ZRANGEBYSCORE 取score小於等於當前時間的資料
Date date = new Date();
Set<String> list = stringRedisTemplate.opsForZSet().rangeByScore(key, 0, date.getTime());
if (list == null || list.size() == 0) {
return;
}
try {
// 傳送佇列
for (String json : list) {
amqpTemplate. convertAndSend(routingKey, json);
}
// ZREMRANGEBYSCORE() 刪除score小於等於當前時間的資料
stringRedisTemplate.opsForZSet().removeRangeByScore(key, 0, date.getTime());
} catch (Exception e) {
// TODO: handle exception
logger.error("佇列傳送失敗", e);
}
}
監聽器一旦監聽到訊息就會進行處理,如果處理失敗,則再次生成任務(次數加1,時間戳會根據規則增加),到達規定次數後則不在執行
@RabbitListener(queues = "${webhook.queue}")
// 引數中使用@Header獲取mesage
public void helloReply(String json) {
logger.debug("task:{}", json);
Task task = gson.fromJson(json, Task.class);
try {
handler.handler(task.getData());
} catch (Exception e) {
if (webhookUtil.getInterval().size() > task.getTimes()) {
Task next = new Task();
next.setTimes(task.getTimes() + 1);
next.setTimestamp(task.getTimestamp() + webhookUtil.getInterval().get(task.getTimes())*1000);
next.setData(task.getData());
webhookUtil.addNext(next);
if (logger.isDebugEnabled()) {
logger.debug("處理失敗,新增到下次執行:" + gson.toJson(task), e);
}
} else {
logger.error("處理{}次失敗,停止處理", webhookUtil.getInterval().size(), e);
}
}
}
測試
向快取中新增一條資料,定時任務會到時間自動檢測到這條資料
public boolean add(String data) {
Task task = new Task();
task.setData("data");
task.setTimes(0);
task.setTimestamp(new Date().getTime() + getInterval().get(task.getTimes())*1000);
try {
stringRedisTemplate.opsForZSet().add(key, gson.toJson(task), task.getTimestamp());
return true;
} catch (Exception e) {
logger.error("redis新增失敗", e);
return false;
}
}
配置檔案
spring-mq.xml
<!-- 連線服務配置 -->
<rabbit:connection-factory id="connectionFactory"
addresses="${rabbitmq.addresses}" username="${rabbitmq.username}"
password="${rabbitmq.password}" channel-cache-size="${rabbitmq.channel.cache.size}" />
<rabbit:admin connection-factory="connectionFactory" />
<!-- 將queue和routingKey進行繫結 --><!-- queue 佇列宣告 -->
<rabbit:queue name="${webhook.queue}" />
<!-- exchange queue binging key 繫結 -->
<!-- direct方式:根據routingKey將訊息傳送到所有繫結的queue中 -->
<rabbit:direct-exchange name="${rabbitmq.direct.exchange}">
<rabbit:bindings>
<rabbit:binding queue="${webhook.queue}" key="${webhook.routing.key}" />
</rabbit:bindings>
</rabbit:direct-exchange>
<!-- 消費者配置開始 -->
<rabbit:annotation-driven />
<bean id="rabbitListenerContainerFactory"
class="org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory">
<property name="messageConverter" ref="messageConverter" />
<property name="connectionFactory" ref="connectionFactory" />
<property name="concurrentConsumers" value="3" />
<property name="maxConcurrentConsumers" value="10" />
</bean>
<!-- 消費者配置結束 -->
<!-- 將生產者生產的資料轉換為json存入訊息佇列 -->
<bean id="messageConverter" class="org.springframework.amqp.support.converter.Jackson2JsonMessageConverter" />
<!-- spring template宣告 -->
<rabbit:template id="amqpTemplate" message-converter="messageConverter"
connection-factory="connectionFactory" reply-timeout="2000" retry-template="retryTemplate"
exchange="${rabbitmq.direct.exchange}" />
<!--傳送失敗後重新發送的模板 -->
<bean id="retryTemplate" class="org.springframework.retry.support.RetryTemplate">
<property name="backOffPolicy">
<bean class="org.springframework.retry.backoff.ExponentialBackOffPolicy">
<property name="initialInterval" value="500" />
<property name="multiplier" value="10.0" />
<property name="maxInterval" value="10000" />
</bean>
</property>
</bean>
webhook.properties
#資料存入快取中的名字
webhook.key=eisp_webhook_key
#訊息佇列的key值
webhook.routing.key=eisp_webhook_routing_key
#訊息佇列的佇列名
webhook.queue=webhook_queue
#下次請求的時間間隔(單位:秒)
webhook.interval=5, 20,30
#定時器每次執行完後的間隔時間(單位:毫秒)
webhook.fixedDelay=1000
其中’webhook.interval=5, 20,30’是以陣列的方式注入
@Value("#{'${webhook.interval}'.split(',')}")
private List<Integer> interval;