1. 程式人生 > >rocketmq--特性之延遲訊息

rocketmq--特性之延遲訊息

1、訊息延遲級別定義     原始碼 MessageStoreConfig.java 是定義如下:     private String messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h";

    可以在brocker配置     messageDelayLevel=1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h

    支援時間單位(下面有原始碼分析):     s 秒 m分鐘 h小時 d天

2、使用,傳送訊息時設定延遲級別即可。     Message message = new Message("TestTopic", ("Hello scheduled message " + i).getBytes());     // This message will be delivered to consumer 10 seconds later.     //設定延遲級別為3,就找配置的 messageDelayLevel 第三順位,就是延遲的時間。     message.setDelayTimeLevel(3);

原始碼分析:

1、預設配置 MessageStoreConfig.messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h";

2、延遲等級解析: ScheduleMessageService.parseDelayLevel 方法對延遲級別做解析

public boolean parseDelayLevel() {
	HashMap<String, Long> timeUnitTable = new HashMap<String, Long>();
	timeUnitTable.put("s", 1000L);
	timeUnitTable.put("m", 1000L * 60);
	timeUnitTable.put("h", 1000L * 60 * 60);
	timeUnitTable.put("d", 1000L * 60 * 60 * 24);

	String levelString = this.defaultMessageStore.getMessageStoreConfig().getMessageDelayLevel();
	try {
		String[] levelArray = levelString.split(" ");
		for (int i = 0; i < levelArray.length; i++) {
			String value = levelArray[i];
			String ch = value.substring(value.length() - 1);
			Long tu = timeUnitTable.get(ch);

			int level = i + 1;
			if (level > this.maxDelayLevel) {
				this.maxDelayLevel = level;
			}
			long num = Long.parseLong(value.substring(0, value.length() - 1));
			long delayTimeMillis = tu * num;
			//delayLevelTable這個變數是ConcurrentMap,存放的就是延遲級別對應的延遲毫秒數。
			this.delayLevelTable.put(level, delayTimeMillis);
		}
	} catch (Exception e) {
		log.error("parseDelayLevel exception", e);
		log.info("levelString String = {}", levelString);
		return false;
	}

	return true;
}

3、CommitLog.putMessage 方法進行儲存訊息到commitLog 中


public PutMessageResult putMessage(final MessageExtBrokerInner msg) {

	//省略程式碼....

	final int tranType = MessageSysFlag.getTransactionValue(msg.getSysFlag());
	if (tranType == MessageSysFlag.TRANSACTION_NOT_TYPE
		|| tranType == MessageSysFlag.TRANSACTION_COMMIT_TYPE) {
		// Delay Delivery,延遲訊息
		if (msg.getDelayTimeLevel() > 0) {
			if (msg.getDelayTimeLevel() > this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel()) {
				msg.setDelayTimeLevel(this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel());
			}
			
			//修改訊息的主題為 SCHEDULE_TOPIC_XXXX
			topic = ScheduleMessageService.SCHEDULE_TOPIC;
			//修改訊息的佇列id為  delayLevel - 1
			queueId = ScheduleMessageService.delayLevel2QueueId(msg.getDelayTimeLevel());

			// Backup real topic, queueId,儲存真實的訊息主題和佇列id,用來做重發
			MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_TOPIC, msg.getTopic());
			MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_QUEUE_ID, String.valueOf(msg.getQueueId()));
			msg.setPropertiesString(MessageDecoder.messageProperties2String(msg.getProperties()));

			//設定訊息的主題和佇列id
			msg.setTopic(topic);
			msg.setQueueId(queueId);
		}
	}
	//省略程式碼....
}

通過putMessage 延遲訊息就被放存放到了主題為 SCHEDULE_TOPIC_XXXX 的commitlog中,後面再通過定時的方式對這這些訊息進行重新發送。

4、重新發送: 生成consumerQueue 時,延遲訊息使用“訊息計劃消費時間”當作tagCode(正常的訊息使用tag 的hashCode 做為tagCode),ScheduleMessageService 在輪詢 ConsumeQueue 時,可以使用 tagsCode 進行過濾。

CommitLog.checkMessageAndReturnSize 原始碼:

public DispatchRequest checkMessageAndReturnSize(java.nio.ByteBuffer byteBuffer, final boolean checkCRC,
        final boolean readBody) {

//省略程式碼...


	if (propertiesLength > 0) {
		byteBuffer.get(bytesContent, 0, propertiesLength);
		String properties = new String(bytesContent, 0, propertiesLength, MessageDecoder.CHARSET_UTF8);
		propertiesMap = MessageDecoder.string2messageProperties(properties);

		keys = propertiesMap.get(MessageConst.PROPERTY_KEYS);

		uniqKey = propertiesMap.get(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX);

		String tags = propertiesMap.get(MessageConst.PROPERTY_TAGS);
		if (tags != null && tags.length() > 0) {
			//正常訊息tagsCode
			tagsCode = MessageExtBrokerInner.tagsString2tagsCode(MessageExt.parseTopicFilterType(sysFlag), tags);
		}

		// Timing message processing
		{
			String t = propertiesMap.get(MessageConst.PROPERTY_DELAY_TIME_LEVEL);
			if (ScheduleMessageService.SCHEDULE_TOPIC.equals(topic) && t != null) {
				int delayLevel = Integer.parseInt(t);

				if (delayLevel > this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel()) {
					delayLevel = this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel();
				}

				if (delayLevel > 0) {
					//延遲訊息tagsCode,就是投遞時間
					tagsCode = this.defaultMessageStore.getScheduleMessageService().computeDeliverTimestamp(delayLevel,
						storeTimestamp);
				}
			}
		}
	}


//省略程式碼....		
		
}

//計算投遞時間,如果設定的延遲級別找不到就預設延遲1s
public long computeDeliverTimestamp(final int delayLevel, final long storeTimestamp) {
	Long time = this.delayLevelTable.get(delayLevel);
	if (time != null) {
		return time + storeTimestamp;
	}

	return storeTimestamp + 1000;
}

5、延遲訊息投遞: ScheduleMessageService 內部類 DeliverDelayedMessageTimerTask,是一個定時任務,對主題 SCHEDULE_TOPIC_XXXX 每條消費佇列對應單獨一個定時任務進行輪詢,傳送 到達投遞時間“計劃消費時間”的訊息 

DeliverDelayedMessageTimerTask.executeOnTimeup  方法進行訊息投遞:


public void executeOnTimeup(){
	ConsumeQueue cq =
		ScheduleMessageService.this.defaultMessageStore.findConsumeQueue(SCHEDULE_TOPIC,
			delayLevel2QueueId(delayLevel));

	long failScheduleOffset = offset;

	if (cq != null) {
	
		//讀取指定偏移處的ConsumeQueue內容
		SelectMappedBufferResult bufferCQ = cq.getIndexBuffer(this.offset);
		if (bufferCQ != null) {
			try {
				long nextOffset = offset;
				int i = 0;
				ConsumeQueueExt.CqExtUnit cqExtUnit = new ConsumeQueueExt.CqExtUnit();
				for (; i < bufferCQ.getSize(); i += ConsumeQueue.CQ_STORE_UNIT_SIZE) {
					long offsetPy = bufferCQ.getByteBuffer().getLong();
					int sizePy = bufferCQ.getByteBuffer().getInt();
					long tagsCode = bufferCQ.getByteBuffer().getLong();

					if (cq.isExtAddr(tagsCode)) {
						if (cq.getExt(tagsCode, cqExtUnit)) {
							tagsCode = cqExtUnit.getTagsCode();
						} else {
							//can't find ext content.So re compute tags code.
							log.error("[BUG] can't find consume queue extend file content!addr={}, offsetPy={}, sizePy={}",
								tagsCode, offsetPy, sizePy);
							long msgStoreTime = defaultMessageStore.getCommitLog().pickupStoreTimestamp(offsetPy, sizePy);
							tagsCode = computeDeliverTimestamp(delayLevel, msgStoreTime);
						}
					}

					long now = System.currentTimeMillis();
					
					//投遞時間戳
					long deliverTimestamp = this.correctDeliverTimestamp(now, tagsCode);

					nextOffset = offset + (i / ConsumeQueue.CQ_STORE_UNIT_SIZE);

					long countdown = deliverTimestamp - now;

					if (countdown <= 0) {//tagscode 到達投遞時間
						MessageExt msgExt =
							ScheduleMessageService.this.defaultMessageStore.lookMessageByOffset(offsetPy, sizePy);

						if (msgExt != null) {
							try {
								//把延遲訊息組裝成原始的訊息
								MessageExtBrokerInner msgInner = this.messageTimeup(msgExt);
								
								//傳送訊息
								PutMessageResult putMessageResult =
									ScheduleMessageService.this.defaultMessageStore
										.putMessage(msgInner);
										
								if (putMessageResult != null
									&& putMessageResult.getPutMessageStatus() == PutMessageStatus.PUT_OK) {
									//傳送成功
									continue;
								} else {
									//傳送失敗,就直接跳過這個傳送失敗的訊息  訊息就丟失
									// XXX: warn and notify me
									log.error(
										"ScheduleMessageService, a message time up, but reput it failed, topic: {} msgId {}",
										msgExt.getTopic(), msgExt.getMsgId());
									
									//重新發啟一個投遞任務,位置為 nextOffset
									ScheduleMessageService.this.timer.schedule(
										new DeliverDelayedMessageTimerTask(this.delayLevel,
											nextOffset), DELAY_FOR_A_PERIOD);
											
									//更新位置
									ScheduleMessageService.this.updateOffset(this.delayLevel,
										nextOffset);
									return;
								}
							} catch (Exception e) {
								/*
								 * XXX: warn and notify me
								 */
								log.error(
									"ScheduleMessageService, messageTimeup execute error, drop it. msgExt="
										+ msgExt + ", nextOffset=" + nextOffset + ",offsetPy="
										+ offsetPy + ",sizePy=" + sizePy, e);
							}
						}
					} else {
						
						//如果訊息沒有到達投遞時間,就安排下一下任務
						ScheduleMessageService.this.timer.schedule(
							new DeliverDelayedMessageTimerTask(this.delayLevel, nextOffset),
							countdown);
						ScheduleMessageService.this.updateOffset(this.delayLevel, nextOffset);
						return;
					}
				} // end of for

				nextOffset = offset + (i / ConsumeQueue.CQ_STORE_UNIT_SIZE);
				ScheduleMessageService.this.timer.schedule(new DeliverDelayedMessageTimerTask(
					this.delayLevel, nextOffset), DELAY_FOR_A_WHILE);
				ScheduleMessageService.this.updateOffset(this.delayLevel, nextOffset);
				return;
			} finally {

				bufferCQ.release();
			}
		} // end of if (bufferCQ != null)
		else {
			//如果 讀取指定偏移處的ConsumeQueue內容 為null,那麼就從最小的index重新發送訊息
			long cqMinOffset = cq.getMinOffsetInQueue();
			if (offset < cqMinOffset) {
				failScheduleOffset = cqMinOffset;
				log.error("schedule CQ offset invalid. offset=" + offset + ", cqMinOffset="
					+ cqMinOffset + ", queueId=" + cq.getQueueId());
			}
		}
	} // end of if (cq != null)

	ScheduleMessageService.this.timer.schedule(new DeliverDelayedMessageTimerTask(this.delayLevel,
		failScheduleOffset), DELAY_FOR_A_WHILE);
}

總結: 1、傳送延遲訊息時,會把原始的訊息topic 修改為SCHEDULE_TOPIC_XXXX, tag為投遞的時間戳;儲存原始的時間訊息topic,tag,目標佇列 2、每一個延遲等級有一個佇列,把相同的延遲等級的訊息放到一個佇列中,如果都放到一個佇列那就不可以避免的要根據延遲等級排序,這樣就很影響效率 3、每一個SCHEDULE_TOPIC_XXXX 和佇列啟動一個DeliverDelayedMessageTimerTask 定時任務輪詢佇列裡面的訊息,tagcode就是投遞的時間戳,通過這個tagcode和當前時間對比如果小於當前時間戳就傳送訊息,如果大於說明時間還沒有到,就啟動下一次輪詢任務。 4、需要注意的是,如果傳送訊息失敗了這個訊息就丟失了,只是列印了一個error日誌