1. 程式人生 > >RocketMQ4.2.0-引數配置解析-Producer配置-compressMsgBodyOverHowmuch

RocketMQ4.2.0-引數配置解析-Producer配置-compressMsgBodyOverHowmuch

引數解析系列不會寫太多,看到哪兒寫到哪兒

根據RocketMQ3.2.4的開發手冊定義,compressMsgBodyOverHowmuch 是指 “訊息 Body 超過多大開始壓縮(Consumer
收到訊息會自動解壓縮),單位位元組”,預設是1024*4即4096

此引數在 FiltersrvConfig.java和DefaultMQProducer.java 中均有定義

前者是DefaultRequestProcessor.java呼叫 FiltersrvController時使用,後者是 DefaultMQProducerImpl.java使用。

DefaultMQProducerImpl.java

//org.apache.rocketmq.filtersrv.processor.DefaultRequestProcessor.java
    private ByteBuffer messageToByteBuffer(final MessageExt msg) throws IOException {
        int sysFlag = MessageSysFlag.clearCompressedFlag(msg.getSysFlag());
        if (msg.getBody() != null) {
            if (msg.getBody().length >= this.filtersrvController.getFiltersrvConfig().getCompressMsgBodyOverHowmuch()) {
                byte[] data = UtilAll.compress(msg.getBody(), this.filtersrvController.getFiltersrvConfig().getZipCompressLevel());
                if (data != null) {
                    msg.setBody(data);
                    sysFlag |= MessageSysFlag.COMPRESSED_FLAG;
                }
            }
        }
//此方法還有程式碼未寫完 略。。。
}

DefaultMQProducerImpl.java的呼叫
//org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl.java
    private boolean tryToCompressMessage(final Message msg) {
        if (msg instanceof MessageBatch) {
            //batch dose not support compressing right now
            return false;
        }
        byte[] body = msg.getBody();
        if (body != null) {
            if (body.length >= this.defaultMQProducer.getCompressMsgBodyOverHowmuch()) {
                try {
                    byte[] data = UtilAll.compress(body, zipCompressLevel);
                    if (data != null) {
                        msg.setBody(data);
                        return true;
                    }
                } catch (IOException e) {
                    log.error("tryToCompressMessage exception", e);
                    log.warn(msg.toString());
                }
            }
        }

        return false;
    }

由上可知 完成壓縮訊息的是 
UtilAll.compress(final byte[] src, final int level) 
方法
再看其定義
//org.apache.rocketmq.common.UtilAll.java
 public static byte[] compress(final byte[] src, final int level) throws IOException {
        byte[] result = src;
        ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream(src.length);
        java.util.zip.Deflater defeater = new java.util.zip.Deflater(level);
        DeflaterOutputStream deflaterOutputStream = new DeflaterOutputStream(byteArrayOutputStream, defeater);
        try {
            deflaterOutputStream.write(src);
            deflaterOutputStream.finish();
            deflaterOutputStream.close();
            result = byteArrayOutputStream.toByteArray();
        } catch (IOException e) {
            defeater.end();
            throw e;
        } finally {
            try {
                byteArrayOutputStream.close();
            } catch (IOException ignored) {
            }

            defeater.end();
        }

        return result;
    }
此處預設level=5(在FiltersrvConfig.java中有定義

綜上所述,當配置引數compressMsgBodyOverHowmuch後,RocketMQ按照規則判定訊息body大小時,使用 jdk自帶的java.util.zip.Deflater 對body進行壓縮處理。