1. 程式人生 > >RocketMQ原理學習--死信訊息實現原理

RocketMQ原理學習--死信訊息實現原理

        上一篇部落格《RocketMQ原理學習--失敗訊息實現原理》中我們瞭解到RocketMQ對於失敗訊息的處理原理,當訊息一直失敗的情況下RocketMQ是如何處理的,這篇部落格我們通過分析原始碼簡單瞭解一下。

        RocketMQ對於失敗次數超過16次的訊息設定為死信訊息,訊息最終被放到DLQ死信佇列中,需要人工進行干預處理。處理程式碼還是在SendMessageProcessor的consumerSendMsgBack方法中,簡單來說就是判斷重試次數超過16或者延時級別小於0,則將訊息設定為新的死信topic為:%DLQ%+consumerGroup

private RemotingCommand consumerSendMsgBack(final ChannelHandlerContext ctx, final RemotingCommand request)
        throws RemotingCommandException {
        final RemotingCommand response = RemotingCommand.createResponseCommand(null);
        final ConsumerSendMsgBackRequestHeader requestHeader =
            (ConsumerSendMsgBackRequestHeader)request.decodeCommandCustomHeader(ConsumerSendMsgBackRequestHeader.class);

		//省略部分程式碼

		//最大重試次數為16
        if (msgExt.getReconsumeTimes() >= maxReconsumeTimes
            || delayLevel < 0) {
			//死信佇列 %DLQ%+consumerGroup
            newTopic = MixAll.getDLQTopic(requestHeader.getGroup());
            queueIdInt = Math.abs(this.random.nextInt() % 99999999) % DLQ_NUMS_PER_GROUP;

            topicConfig = this.brokerController.getTopicConfigManager().createTopicInSendMessageBackMethod(newTopic,
                DLQ_NUMS_PER_GROUP,
                PermName.PERM_WRITE, 0
            );
            if (null == topicConfig) {
                response.setCode(ResponseCode.SYSTEM_ERROR);
                response.setRemark("topic[" + newTopic + "] not exist");
                return response;
            }
        } else {
            if (0 == delayLevel) {
                delayLevel = 3 + msgExt.getReconsumeTimes();
            }

            msgExt.setDelayTimeLevel(delayLevel);
        }

        MessageExtBrokerInner msgInner = new MessageExtBrokerInner();
        msgInner.setTopic(newTopic);
        msgInner.setBody(msgExt.getBody());
        msgInner.setFlag(msgExt.getFlag());
        MessageAccessor.setProperties(msgInner, msgExt.getProperties());
        msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgExt.getProperties()));
        msgInner.setTagsCode(MessageExtBrokerInner.tagsString2tagsCode(null, msgExt.getTags()));

        msgInner.setQueueId(queueIdInt);
        msgInner.setSysFlag(msgExt.getSysFlag());
        msgInner.setBornTimestamp(msgExt.getBornTimestamp());
        msgInner.setBornHost(msgExt.getBornHost());
        msgInner.setStoreHost(this.getStoreHost());
        msgInner.setReconsumeTimes(msgExt.getReconsumeTimes() + 1);

        String originMsgId = MessageAccessor.getOriginMessageId(msgExt);
        MessageAccessor.setOriginMessageId(msgInner, UtilAll.isBlank(originMsgId) ? msgExt.getMsgId() : originMsgId);

		//訊息被持久化到死信佇列中
        PutMessageResult putMessageResult = this.brokerController.getMessageStore().putMessage(msgInner);
        //省略部分程式碼
        return response;
    }

總結:死信訊息需要人為進行處理干預,可以通過RocketMQ控制檯等重新發送