1. 程式人生 > >MetaQ原始碼閱讀及與Spring結合使用

MetaQ原始碼閱讀及與Spring結合使用

MetaQ (全稱 Metamorphosis )是一個高效能、高可用、可擴充套件的分散式訊息中介軟體 ,MetaQ 具有訊息儲存順序寫、吞吐量大和支援本地和XA 事務等特性,適用於大吞吐量、順序訊息、廣播和日誌資料傳輸等場景, MetaQ在阿里巴巴各個子公司被廣泛應用,每天轉發 250 億 + 條訊息。主要應用於非同步解耦, Mysql 資料複製,收集日誌等場景 。。
我是做移動網際網路廣告系統的,在工作中有很多場景使用到了MetaQ,例如:廣告的儲存、效果資料的上報,多機房扣費等都需要依賴MetaQ,由於公司已經使用MeatQ作為訊息中介軟體的時間很久了,已經有了模板,所以很多的時候就是直接拿來使用,對裡面為什麼做這樣那樣的封裝沒有去深入的瞭解,剛好這段時間有空就去看了看原始碼,給自己總結沉澱一下,做到不僅知道怎麼用,還要知道為什麼這樣做。
一、生產者
傳送訊息是由生產者MessageProduce觸發,MessageProduce從MessageSessionFactory中創建出來具體實現如下:

 MetaClientConfig metaClientConfig = new MetaClientConfig();
 ZKConfig zkConfig = new ZkUtils.ZKConfig()
 zkConfig.zkConnect = "127.0.0.1:2181";
 metaClientConfig.setZkConfig(zkConfig)
 MessageSessionFactory sessionFactory = new MetaMessageSessionFactory(metaClientConfig );
 // create producer,強烈建議使用單例
MessageProducer producer = sessionFactory.createProducer(); // publish topic final String topic = "meta-test"; producer.publish(topic); SendResult sendResult = producer.sendMessage(new Message(topic, "xxxx".getBytes()));

我們可以看出MessageProduce 是通過工廠建立的 ,MetaMessageSessionFactory需要一個引數就MetaClientConfig這個類,MetaClientConfig裡面是什麼了?MetaClientConfig中有個工具類ZkUtils,通過名字就知道是和zk互動的類,
和zk互動我們知道建立一個客戶端需要幾個引數:
1. zkConnect (zk的ip地址)
2. zkSessionTimeoutMs(zk的會話超時時間)
3. zkConnectionTimeoutMs(zk的連線超時時間)
4. zkSyncTimeMs(zk心跳時間)

我們知道Sping IOC容器就是用來建立發現維護類與類之間的關係的,MetaQ團隊當然也想到了這個,那他是這麼實現的呢?
在com.taobao.metamorphosis.client.extension.spring 中有如下幾個類:
1. AbstractMetaqMessageSessionFactory
2. DefaultMessageListener
3. JavaSerializationMessageBodyConverter
4. MessageBodyConverter
5. MessageBuilder
6. MessageListenerContainer
7. MetaqMessage
8. MetaqMessageSessionFactoryBean
9. MetaqTemplate
10. MetaqTopic
11. XAMetaqMessageSessionFactoryBean

先來看看MetaqTemplate這個類,這個類提供傳送訊息的方法,

    public SendResult send(MessageBuilder builder) throws InterruptedException {
        Message msg = builder.build(this.messageBodyConverter);
        final String topic = msg.getTopic();
        MessageProducer producer = this.getOrCreateProducer(topic);
        try {
            return producer.sendMessage(msg);
        }
        catch (MetaClientException e) {
            return new SendResult(false, null, -1, ExceptionUtils.getFullStackTrace(e));
        }
    }

我們發現使用send方法的時候還要MessageBodyConverter的類,這個類是用來做什麼的呢?:

    /**
     * Convert a message object to byte array.
     * 
     * @param body
     * @return
     * @throws MetaClientException
     */
    public byte[] toByteArray(T body) throws MetaClientException;

    /**
     * Convert a byte array to message object.
     * 
     * @param bs
     * @return
     * @throws MetaClientException
     */
    public T fromByteArray(byte[] bs) throws MetaClientException;

可以看到這裡定義了兩個方法 用來把訊息轉換為二進位制,以及從二進位制中恢復訊息,我們知道資料在網路上傳輸都是二進位制的方式進行傳輸的,這個介面很方便我們做擴充套件,靈活的實現自己的轉換規則,比如採用其他序列化協議,如protobufs,hessian等等,當然如果你不想實現自己的訊息轉換類,這裡提供了一個實現類:JavaSerializationMessageBodyConverter

public class JavaSerializationMessageBodyConverter implements MessageBodyConverter<Serializable> {
    JavaSerializer serializer = new JavaSerializer();
    JavaDeserializer deserializer = new JavaDeserializer();


    @Override
    public byte[] toByteArray(Serializable body) throws MetaClientException {
        try {
            return this.serializer.encodeObject(body);
        }
        catch (IOException e) {
            throw new MetaClientException(e);

        }
    }


    @Override
    public Serializable fromByteArray(byte[] bs) throws MetaClientException {
        try {
            return (Serializable) this.deserializer.decodeObject(bs);
        }
        catch (IOException e) {
            throw new MetaClientException(e);

        }
    }

}

JavaSerializationMessageBodyConverter 實現了MessageBodyConverter ,對訊息體進行序列化和反序列化。
send方法中還呼叫了getOrCreateProducer我們來看看這個方法:

 public MessageProducer getOrCreateProducer(final String topic) {
        if (!this.shareProducer) {
            FutureTask<MessageProducer> task = this.producers.get(topic);
            if (task == null) {
                task = new FutureTask<MessageProducer>(new Callable<MessageProducer>() {

                    @Override
                    public MessageProducer call() throws Exception {
                        MessageProducer producer = MetaqTemplate.this.messageSessionFactory.createProducer();
                        producer.publish(topic);
                        if (!StringUtils.isBlank(MetaqTemplate.this.defaultTopic)) {
                            producer.setDefaultTopic(MetaqTemplate.this.defaultTopic);
                        }
                        return producer;
                    }

                });
                FutureTask<MessageProducer> oldTask = this.producers.putIfAbsent(topic, task);
                if (oldTask != null) {
                    task = oldTask;
                }
                else {
                    task.run();
                }
            }

            try {
                MessageProducer producer = task.get();
                return producer;
            }
            catch (ExecutionException e) {
                throw ThreadUtils.launderThrowable(e.getCause());
            }
            catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        }
        else {
            if (this.sharedProducer == null) {
                synchronized (this) {
                    if (this.sharedProducer == null) {
                        this.sharedProducer = this.messageSessionFactory.createProducer();
                        if (!StringUtils.isBlank(this.defaultTopic)) {
                            this.sharedProducer.setDefaultTopic(this.defaultTopic);
                        }
                    }
                }
            }
            this.sharedProducer.publish(topic);
            return this.sharedProducer;
        }
        throw new IllegalStateException("Could not create producer for topic '" + topic + "'");
    }

看到熟悉的 messageSessionFactory ,建立生產者的時候就需要這個工廠類來建立,我們在回過頭來看看MetaqTemplate這個類:

    private MessageSessionFactory messageSessionFactory;
    private String defaultTopic;
    private MessageBodyConverter<?> messageBodyConverter;
    private boolean shareProducer = false;
    private volatile MessageProducer sharedProducer;

有好幾個屬性,我們只要傳入一個MessageSessionFactory 及MessageBodyConverter物件即可:
到此我們就可以建立MetaqTemplate這個類了:

先來建立MessageSessionFactory ,這裡使用MetaqMessageSessionFactoryBean這個實現類:

<!--  message session factory -->  
    <bean id="sessionFactory" class="com.taobao.metamorphosis.client.extension.spring.MetaqMessageSessionFactoryBean">  
        <property name="zkConnect" value="127.0.0.1:2181"/>  
        <property name="zkSessionTimeoutMs" value="30000"/>  
        <property name="zkConnectionTimeoutMs" value="30000"/>  
        <property name="zkSyncTimeMs" value="5000"/>  
</bean>  

這樣我們就建立了一個工廠類了,然後我們需要建立一個訊息轉換類,這裡使用預設實現類:
JavaSerializationMessageBodyConverter

<!--  message body converter using java serialization. -->  
   <bean id="messageBodyConverter"    
  class="com.taobao.metamorphosis.client.extension.spring.JavaSerializationMessageBodyConverter"/>  

我們需要建立MetaqTemplate元素都準備好了,可以建立MetaqTemplate類了:

<!--  template to send messages. -->  
   <bean id ="metaqTemplate" class="com.taobao.metamorphosis.client.extension.spring.MetaqTemplate">      
       <property name="messageSessionFactory" ref="sessionFactory"/>  
       <property name="messageBodyConverter" ref="messageBodyConverter"/>  
   </bean>  

可以傳送訊息了:

final String topic = "date";  
final SendResult sendResult =  
template.send(MessageBuilder.withTopic(topic).withBody(new Date()); 

二、消費者
看完生產者我在來看看消費者,接受訊息是由消費者MessageConsume觸發,MessageConsume從MessageSessionFactory中創建出來具體實現如下:

   MetaClientConfig metaClientConfig = new MetaClientConfig();
   ZKConfig zkConfig = new ZkUtils.ZKConfig()
   zkConfig.zkConnect = "127.0.0.1:2181";
   metaClientConfig.setZkConfig(zkConfig)
   MessageSessionFactory sessionFactory = new MetaMessageSessionFactory(metaClientConfig );
        // subscribed topic
        final String topic = "meta-test";
        // consumer group
        final String group = "meta-example";
        // create consumer,強烈建議使用單例
        MessageConsumer consumer = sessionFactory.createConsumer(new ConsumerConfig(group));
        // subscribe topic
        consumer.subscribe(topic, 1024 * 1024, new MessageListener() {

            public void recieveMessages(Message message) {
                System.out.println("Receive message " + new String(message.getData()));
            }


            public Executor getExecutor() {
                // Thread pool to process messages,maybe null.
                return null;
            }
        });
        // complete subscribe
        consumer.completeSubscribe();

    }

消費者也是通過MetaMessageSessionFactory 去建立的,然後呼叫subscribe 實現訊息的訂閱接受及處理,我們來看看這個類MessageListenerContainer:

@Override
    public void afterPropertiesSet() throws Exception {
        log.info("Start to initialize message listener container.");
        if (this.subscribers != null) {
            Set<MessageConsumer> consumers = new HashSet<MessageConsumer>();
            for (Map.Entry<MetaqTopic, ? extends DefaultMessageListener<?>> entry : this.subscribers.entrySet()) {
                final MetaqTopic topic = entry.getKey();
                final DefaultMessageListener<?> listener = entry.getValue();
                if (topic == null) {
                    throw new IllegalArgumentException("Topic is null");
                }
                if (StringUtils.isBlank(topic.getTopic())) {
                    throw new IllegalArgumentException("Blank topic");
                }
                MessageConsumer consumer = this.getMessageConsumer(topic);
                if (consumer == null) {
                    throw new IllegalStateException("Get or create consumer failed");
                }
                log.info("Subscribe topic=" + topic.getTopic() + " with group=" + topic.getGroup());
                if (listener.getMessageBodyConverter() == null) {
                    listener.setMessageBodyConverter(this.messageBodyConverter);
                }
                consumer.subscribe(topic.getTopic(), topic.getMaxBufferSize(), listener);
                consumers.add(consumer);
            }
            for (MessageConsumer consumer : consumers) {
                consumer.completeSubscribe();
            }
        }
        log.info("Initialize message listener container successfully.");
    }

可以看到這個類在初始完成後會建立一個消費者,然後呼叫消費者的subscribe方法訂閱和處理訊息,建立這個類需要下面這個幾個類:MetaqTopic 、DefaultMessageListener或者其子類,下面我來分別看看這個兩個類:

MetaqTopic 主要有如下幾個屬性:

    private ConsumerConfig consumerConfig = new ConsumerConfig();
    private String topic;
    private int maxBufferSize = 1024 * 1024;

我們知道建立消費者的時候需要指定topic及每次消費的大小,MetaqTopic 這個就是用來指定這些屬性值的

再來看看 DefaultMessageListener:

    @Override
    public void recieveMessages(Message message) throws InterruptedException {
        if (this.messageBodyConverter != null) {
            try {
                T body = (T) this.messageBodyConverter.fromByteArray(message.getData());
                this.onReceiveMessages(new MetaqMessage<T>(message, body));
            }
            catch (Exception e) {
                log.error("Convert message body from byte array failed,msg id is " + message.getId() + " and topic is "
                        + message.getTopic(), e);
                message.setRollbackOnly();
            }
        }
        else {
            this.onReceiveMessages(new MetaqMessage<T>(message, null));
        }
    }

    public abstract void onReceiveMessages(MetaqMessage<T> msg);

這個類實現了recieveMessages處理訊息的方法,在方法中我們呼叫了MessageBodyConverter 這個類轉換訊息,然後呼叫了onReceiveMessages 這個方法,需要我們自己來實現真正的訊息處理,也就是我們需要實現DefaultMessageListener這個類中onReceiveMessages 方法來處理訊息就可以了。

這裡簡單進行一個實現:

import com.taobao.metamorphosis.client.extension.spring.DefaultMessageListener;  
import com.taobao.metamorphosis.client.extension.spring.MetaqMessage;  
import java.util.Date;  


/** 
 * Process date messages listener. 
 *  
 * @author dennis 
 *  
 */  
public class DateMessageListener extends DefaultMessageListener<Date> {  

    @Override  
    public void onReceiveMessages(MetaqMessage<Date> msg) {  
        Date date = msg.getBody();  
        System.out.println("receive date message:" + date);  
    }  

}  

這樣我們所需的要素就都有了,現在看看怎麼用spring來配置:

<!--  topics to be subscribed. -->  
    <bean id = "dateTopic" class="com.taobao.metamorphosis.client.extension.spring.MetaqTopic">  
        <!-- consumer group -->  
        <property name="group" value="testGroup"/>  
        <!--  topic -->  
        <property name="topic" value="date"/>  
        <!--  max buffer size to fetch messages -->  
        <property name="maxBufferSize" value="16384"/>  
    </bean>  
 <!--  message listener -->  
    <bean id= "messageListener" class="com.taobao.metamorphosis.example.spring.DateMessageListener">  
        <!--  threads to process these messages. -->  
        <property name="processThreads" value="10"/>  
    </bean>  
<!--  listener container to subscribe topics -->  
  <bean id ="listenerContainer" class="com.taobao.metamorphosis.client.extension.spring.MessageListenerContainer">   
       <property name="messageSessionFactory" ref="sessionFactory"/>  
       <property name="messageBodyConverter" ref="messageBodyConverter"/>  
       <property name="subscribers">  
           <map>  
               <entry key-ref="dateTopic" value-ref="messageListener"/>  
           </map>  
       </property>  
  </bean>  

只要配置好這些後就可以通過我們實現的監聽器DateMessageListener 自動處理訊息了。

寫到這差不多就整理完了,程式碼比較多,只找了幾個關鍵的地方進行分析,著重點落在了這麼結合Spring使用。由於能力有限制,寫到不到位的地方多多見諒。。

我是一隻小蝸牛,雖然速度慢,但我一直在努力向前爬。。

相關推薦

MetaQ原始碼閱讀Spring結合使用

MetaQ (全稱 Metamorphosis )是一個高效能、高可用、可擴充套件的分散式訊息中介軟體 ,MetaQ 具有訊息儲存順序寫、吞吐量大和支援本地和XA 事務等特性,適用於大吞吐量、順序訊息、廣播和日誌資料傳輸等場景, MetaQ在阿里巴巴各個子公司

Spring原始碼閱讀——BeanFactoryPostProcessorBeanPostProcessor

摘要 Spring IoC容器允許BeanFactoryPostProcessor在容器例項化任何bean之前讀取bean的定義(配置元資料),並可以修改它。 BeanFactoryPostProcessor: BeanFactory的後置處理器(處理的物件是BeanFact

[轉]結合原始碼淺析Struts2Spring整合的原理

文章的結構如下: 一、回顧Struts2與Spring整合的配置方法 二、(重點)對關鍵配置的分析 -------------------------------------------------------- 一、回顧Struts2與Spring整合的配置

spring原始碼閱讀--下載編譯

這裡簡單介紹如何下載編譯spring原始碼專案,並且匯入到idea或者eclipse中。 首先是下載spring的原始碼壓縮包,可取去github上面下載,地址:https://github.com/spring-projects/spring-framew

MogileFSspring結合

art pro exists ren tput col 常用api ava 從服務器 一、通過Maven添加MogileFS的Java客戶端驅動包 <dependency>   <groupId>fm.last</groupId>   

OpenCV學習筆記(31)KAZE 演算法原理原始碼分析(五)KAZE的原始碼優化SIFT的比較

  KAZE系列筆記: 1.  OpenCV學習筆記(27)KAZE 演算法原理與原始碼分析(一)非線性擴散濾波 2.  OpenCV學習筆記(28)KAZE 演算法原理與原始碼分析(二)非線性尺度空間構建 3.  Op

Kafka簡單入門Spring結合實踐

Kafka簡單入門與Spring結合實踐 一、【安裝部署kafka伺服器環境(centos7.0)】:  1.【注意】新版的kafka已經內建了一個zookeeper環境 2.【安裝與執行】: 可以在kafka官網 http://kafka.ap

quartz詳解5-quartzspring結合

1. spring中quartz依賴 2. quartz配置作業的兩種方式及配置檔案 慕課網_《Java定時任務排程工具詳解之Quartz篇》學習總結 1. spring中quartz依賴

Kafka的安裝Spring Boot的整合

安裝JDK 下載jdk-8u202-ea-bin-b03-linux-x64-07_nov_2018.tar.gz 解壓 配置 $ vi /etc/profile,在最後加入下面兩行   export JAVA_HOME=/usr/local/bigdata/jdk

Kafka的安裝Spring Boot的集成

gin 消費者 ole 輸入 beginning bin tis sed 解壓 安裝JDK 下載jdk-8u202-ea-bin-b03-linux-x64-07_nov_2018.tar.gz 解壓 配置 $ vi /etc/profile,在最後加入下面兩行   

【Mongodb】morphiaspring結合使用詳解

Morphia簡介   隨著網際網路web2.0網站的興起,傳統的關係資料庫在應付web2.0網站,特別是超大規模和高併發的SNS型別的web2.0純動態網站已經顯得力不從心,暴露了很多難以克服的問題,而非關係型的資料庫則由於其本身的特點得到了非常迅速的發展。

MybtisSpring結合使用Log4J列印日誌/Sql到控制檯

首先說明一下,專案採用SpringMVC+Mybatis的架構,日誌工具還是最常用的log4j2,整合框架之後發現無法列印SQL語句,只有報錯時才會列印。 然後開始看Mybatis的官方文件,關於日誌這一塊是怎麼處理的,下面是官方文件關於日誌的說明: Logging

hiberntespring結合時,配置使hibernate的session的生命週期延遲,以及配置httpsession的時間

  hibernate的session <filter> <filter-name>OpenSessionInViewFilter</filter-name> <filter-class> org.

JSON資料處理echarts結合使用時所需要的問題

data部分傳入的資料要求是陣列 如果是json資料 需要做出解析後變成資料才可以使用 1. var str = “[‘研發部’,’研發部’,’研發部’,’研發部’]”; var strDat

QT原始碼閱讀——QTHTML互動程式設計

QT中通過QWebKit元件處理HTML,其中最重要的就是QWebView了~~~ 通過例子formextractor我們可以窺之一二: /*******************************************************************

原始碼閱讀SourceInsight靜態反彙編IDA Pro

1.原始碼閱讀SI     1.1呼叫關係圖(正向和方向關係調用)                  Sourceinsight可以方便的檢視函式呼叫關係,點選圖示   開啟如下:                  點選函式的末尾可以展開下一級呼叫關係,如上圖滑鼠彈出”加

使用JDBCTemplate實現Spring結合,方法公用 ——Spring配置(applicationContext.xml)

<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema

spring結合的策略模式

策略模式是23種設計模式之一,客戶端通過制定多個演算法並且封裝,使得不同場景可以使用不同的策略演算法。使得程式降低了耦合提高程式碼的複用性。接下來通過一個簡單的例項來說明在實戰中如何使用(即使是業務邏輯也是可以用設計模式的)。 例子很簡單,就是通過同一個儲存的

Redis的安裝過程SSM結合使用

Redis安裝過程 下載安裝包,解壓,Mac系統將資料夾放在/usr/local資料夾下 開啟終端,先安裝伺服器端server 終端$ cd /usr/local/redis/ 去檔案下進入redis.config檔案 找到requirepass刪除

Redis搭建哨兵模式並且spring結合

1.啟動一個主redis ./redis-server ../redis.conf配置redis.conf設定守護執行緒為開啟 2.啟動一個從redis ./redis-server ../redis.conf./redis-cli -h 127.0.0.1 -p 638