RocketMQ4.2.0-引數配置解析-Producer配置-compressMsgBodyOverHowmuch
阿新 • • 發佈:2019-01-25
引數解析系列不會寫太多,看到哪兒寫到哪兒
根據RocketMQ3.2.4的開發手冊定義,compressMsgBodyOverHowmuch 是指 “訊息 Body 超過多大開始壓縮(Consumer
收到訊息會自動解壓縮),單位位元組”,預設是1024*4即4096
此引數在 FiltersrvConfig.java和DefaultMQProducer.java 中均有定義
前者是DefaultRequestProcessor.java呼叫 FiltersrvController時使用,後者是 DefaultMQProducerImpl.java使用。
DefaultMQProducerImpl.java
//org.apache.rocketmq.filtersrv.processor.DefaultRequestProcessor.java private ByteBuffer messageToByteBuffer(final MessageExt msg) throws IOException { int sysFlag = MessageSysFlag.clearCompressedFlag(msg.getSysFlag()); if (msg.getBody() != null) { if (msg.getBody().length >= this.filtersrvController.getFiltersrvConfig().getCompressMsgBodyOverHowmuch()) { byte[] data = UtilAll.compress(msg.getBody(), this.filtersrvController.getFiltersrvConfig().getZipCompressLevel()); if (data != null) { msg.setBody(data); sysFlag |= MessageSysFlag.COMPRESSED_FLAG; } } } //此方法還有程式碼未寫完 略。。。 }
DefaultMQProducerImpl.java的呼叫
//org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl.java private boolean tryToCompressMessage(final Message msg) { if (msg instanceof MessageBatch) { //batch dose not support compressing right now return false; } byte[] body = msg.getBody(); if (body != null) { if (body.length >= this.defaultMQProducer.getCompressMsgBodyOverHowmuch()) { try { byte[] data = UtilAll.compress(body, zipCompressLevel); if (data != null) { msg.setBody(data); return true; } } catch (IOException e) { log.error("tryToCompressMessage exception", e); log.warn(msg.toString()); } } } return false; }
由上可知 完成壓縮訊息的是
UtilAll.compress(final byte[] src, final int level)
方法再看其定義
//org.apache.rocketmq.common.UtilAll.java
public static byte[] compress(final byte[] src, final int level) throws IOException {
byte[] result = src;
ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream(src.length);
java.util.zip.Deflater defeater = new java.util.zip.Deflater(level);
DeflaterOutputStream deflaterOutputStream = new DeflaterOutputStream(byteArrayOutputStream, defeater);
try {
deflaterOutputStream.write(src);
deflaterOutputStream.finish();
deflaterOutputStream.close();
result = byteArrayOutputStream.toByteArray();
} catch (IOException e) {
defeater.end();
throw e;
} finally {
try {
byteArrayOutputStream.close();
} catch (IOException ignored) {
}
defeater.end();
}
return result;
}
此處預設level=5(在FiltersrvConfig.java中有定義
綜上所述,當配置引數compressMsgBodyOverHowmuch後,RocketMQ按照規則判定訊息body大小時,使用 jdk自帶的java.util.zip.Deflater 對body進行壓縮處理。