Springboot + rabbitMQ實現延遲佇列(消費者)
阿新 • • 發佈:2019-01-03
由於太長了,所以分成兩篇寫,接上一篇講解了訊息的定義和傳送,這裡繼續講解消費者
由於可能每條訊息所處理的邏輯可能不一樣,例如:常規訂單30分鐘不支付則取消訂單,團購訂單一天拼團不成功則取消等等,為了避免在消費者監聽類中使用大量if else,這裡使用策略模式來處理(由於spring的bean的初始化的時候建立,如果用Java常規的反射獲取類,則在具體策略類用注入別的bean的時候,會拿不到值,所以需要通過applicationContext獲取類)
1.訊息消費者類-MessageConsumer -- 使用註解的方式監聽,這裡包括消費確認
@Component @RabbitListener(queues = QueueContent.MESSAGE_QUEUE_NAME) public class MessageConsumer { static Logger logger = LoggerFactory.getLogger(MessageConsumer.class); @RabbitHandler public void handler(String msg,Channel channel, Message message) throws IOException { if (!StringUtils.isEmpty(msg)) { MessagePojo messagePojo = JSONUtil.toBean(msg,MessagePojo.class); Action action = Action.RETRY; try { //這裡使用策略模式和springboot的結合使用, Strategy s = (Strategy)SpringContextUtil.getBean(messagePojo.getClassName()); s.doJob(messagePojo.getParams()); channel.basicAck(message.getMessageProperties().getDeliveryTag(),false); logger.info("[MessageConsumer延時訊息消費時間]"+DateUtil.datetoString(new Date()) + JSON.toJSONString(messagePojo) + ",訊息ID:" + messagePojo.getMessageId()); action = Action.ACCEPT; } catch (Exception e) { logger.error("確認消費異常",e); //記錄下這條訊息 redisService.hmSet("failedMsg",messagePojo.getMessageId(),msg); action = Action.RETRY; }finally { // 通過finally塊來保證Ack/Nack會且只會執行一次 if (action == Action.ACCEPT) { channel.basicAck(message.getMessageProperties().getDeliveryTag(),false); } else if (action == Action.RETRY) { channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true); } else { channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false); } } } } }
public enum Action {
ACCEPT, // 處理成功
RETRY, // 可以重試的錯誤
REJECT, // 無需重試的錯誤
}
2.定義策略介面-Strategy
public interface Strategy {
public void doJob(Map<String, Object> params) throws Exception;
}
3.定義具體策略實現類---A B
@Component("A") public class A implements Strategy { /** * * @param params 介面所需引數 */ @Override public void doJob(Map<String, Object> params) { System.out.println("用A方法處理"); } } @Component("B") public class B implements Strategy { /** * * @param params 介面所需引數 */ @Override public void doJob(Map<String, Object> params) { System.out.println("用B方法處理"); } }
4.獲取上下文工具類,在spring容器中根據類名獲取具體類---SpringContextUtil
@Component public class SpringContextUtil implements ApplicationContextAware { // Spring應用上下文環境 private static ApplicationContext applicationContext; /** * 實現ApplicationContextAware介面的回撥方法。設定上下文環境 * * @param applicationContext */ public void setApplicationContext(ApplicationContext applicationContext) { SpringContextUtil.applicationContext = applicationContext; } /** * @return ApplicationContext */ public static ApplicationContext getApplicationContext() { return applicationContext; } /** * 獲取物件 * * @param name * @return Object * @throws BeansException */ public static Object getBean(String name) throws BeansException { return applicationContext.getBean(name); } }
5.寫一個controller測試
@RestController
public class MessageController {
@Autowired
private MessageProvider provider;
Logger logger = LoggerFactory.getLogger(MessageController.class);
@RequestMapping(value="/send_message",produces = "text/json;charset=UTF-8")
@ResponseBody
public String send_message(MessagePojo pojo){
try {
provider.sendMessage(pojo);
return JSON.toJSONString(pojo);
} catch (Exception e) {
e.printStackTrace();
return null;
}
}
}
測試結果:延遲消費和具體方法處理具體業務也實現了