1. 程式人生 > >Netty構建分散式訊息佇列實現原理淺析(十六)

Netty構建分散式訊息佇列實現原理淺析(十六)

 在本人的上一篇部落格文章:Netty構建分散式訊息佇列(AvatarMQ)設計指南之架構篇 中,重點向大家介紹了AvatarMQ主要構成模組以及目前存在的優缺點。最後以一個生產者、消費者傳遞訊息的例子,具體演示了AvatarMQ所具備的基本訊息路由功能。而本文的寫作目的,是想從開發、設計的角度,簡單的對如何使用Netty,構建分散式訊息佇列背後的技術細節、原理,進行一下簡單的分析和說明。

  首先,在一個企業級的架構應用中,究竟何時需引入訊息佇列呢?本人認為,最經常的情況,無非這幾種:做業務解耦、事件訊息廣播、訊息流控處理。其中,對於業務解耦是作為訊息佇列,要解決的一個首要問題。所謂業務解耦,就是說在一個業務流程處理上,只關注具體的流程,盡到通知的責任即可,不必等待訊息處理的結果。

  總得來看,企業級系統模組通訊的方式通常情況下,無非兩種。

  同步方式:REST、RPC方式實現;非同步方式:訊息中介軟體(訊息佇列)方式實現。

  同步方式的優點:可以基於http協議之上,無需中介軟體代理,系統架構相對而言比較簡單。缺點是:客戶端和服務端緊密耦合,並且要實時線上通訊,否則會導致訊息傳送失敗。

  非同步方式的優點:客戶端和服務端互相解耦,雙方可以不產生依賴。缺點是:由於引入了訊息中介軟體,在程式設計的時候會增加難度係數。此外,訊息中介軟體的可靠性、容錯性、健壯性往往成為這類架構的決定性因素。

  舉一個本人工作中的例子向大家說明一下:移動業務中的產品訂購中心,每當一個使用者通過某些渠道(營業廳、自助終端等等)開通、訂購了某個套餐之後,如果這些套餐涉及第三方平臺派單的話,產品訂購中心會向第三方平臺發起訂購請求操作。試想一下,如果遇到高峰受理時間段,由於業務受理量的激增,導致一些外圍系統的響應速度降低(比如業務閘道器響應速度不及時、網路延時等等原因),終端使用者開通一個套餐花在主流程的時間會延長很多,這個會造成極不好的使用者體驗,最終可能導致受理失敗。在上述的場景裡面,我們就可以很好的引入一個訊息佇列進行業務的解耦,具體來說,產品訂購中心只要“通知”第三方平臺,我們的套餐開通成功了,並不一定非要同步阻塞地等待其真正的開通處理完成。正因為如此,訊息佇列逐漸成為當下系統模組通訊的主要方式手段。

  當今在Java的訊息佇列通訊領域,有很多主流的訊息中介軟體,比如RabbitMQ、ActiveMQ、以及炙手可熱Kafka。其中ActiveMQ是基於JMS的標準之上開發定製的一套訊息佇列系統,效能穩定,訪問介面也非常友好,但是這類的訊息佇列在訪問吞吐量上有所折扣;另外一個方面,比如Kafka這樣,以高效吞吐量著稱的訊息佇列系統,但是在穩定性和可靠性上,能力似乎還不夠,因此更多的是用在服務日誌傳輸、短訊息推送等等對於可靠性不高的業務場景之中。總結起來,不管是ActiveMQ還是Kafka,其框架的背後涉及到很多非同步網路通訊、多執行緒、高併發處理方面的專業技術知識。但本文的重點,也不在於介紹這些訊息中介軟體背後的技術細節,而是想重點闡述一下,如何透過上述訊息佇列的基本原理,在必要的時候,開發定製一套符合自身業務要求的訊息佇列系統時,能夠獲得更加全面的視角去設計、考量這些問題。

  因此本人用心開發實現了一個,基於Netty的訊息佇列系統:AvatarMQ。當然,在設計、實現AvatarMQ的時候,我會適當參考這些成熟訊息中介軟體中用到的很多重要的思想理念。

  當各位從github上面下載到AvatarMQ的原始碼的時候,可以發現,其中的包結構如下所示:

     

  現在對每個包的主要功能進行一下簡要說明(下面省略字首com.newlandframework.avatarmq)。

  broker:訊息中介軟體的伺服器模組,主要負責訊息的路由、負載均衡,對於生產者、消費者進行訊息的應答回覆處理(ACK),AvatarMQ中的中心節點,是連線生產者、消費者的橋樑紐帶。

  consumer:訊息中介軟體中的消費者模組,負責接收生產者過來的訊息,在設計的時候,會對消費者進行一個叢集化管理,同一個叢集標識的消費者,會構成一個大的消費者叢集,作為一個整體,接收生產者投遞過來的訊息。此外,還提供消費者接收訊息相關的API給客戶端進行呼叫。

  producer:訊息中介軟體中的生產者模組,負責生產特定主題(Topic)的訊息,傳遞給對此主題感興趣的消費者,同時提供生產者生產訊息的API介面,給客戶端使用。

  core:AvatarMQ中訊息處理的核心模組,負責訊息的記憶體儲存、應答控制、對訊息進行多執行緒任務分派處理。

  model:主要定義了AvatarMQ中的資料模型物件,比如MessageType訊息型別、MessageSource訊息源頭等等模型物件的定義。

  msg:主要定義了具體的訊息型別對應的結構模型,比如消費者訂閱訊息SubscribeMessage、消費者取消訂閱訊息UnSubscribeMessage,訊息伺服器應答給生產者的應答訊息ProducerAckMessage、訊息伺服器應答給消費者的應答訊息ConsumerAckMessage。

  netty:主要封裝了Netty網路通訊相關的核心模組程式碼,比如訂閱訊息事件的路由分派策略、訊息的編碼、解碼器等等。

  serialize:利用Kryo這個優秀高效的物件序列化、反序列框架對訊息物件進行序列化網路傳輸。

  spring:Spring的容器管理類,負責把AvatarMQ中的訊息伺服器模組:Broker,進行容器化管理。這個包裡面的AvatarMQServerStartup是整個AvatarMQ訊息伺服器的啟動入口。

  test:這個就不用多說了,就是針對AvatarMQ進行訊息路由傳遞的測試demo。

  AvatarMQ執行原理示意圖:

  首先是訊息生產者客戶端(AvatarMQ Producer)傳送帶有主題的訊息給訊息轉發伺服器(AvatarMQ Broker),訊息轉發伺服器確認收到生產者的訊息,傳送ACK應答給生產者,然後把訊息繼續投遞給消費者(AvatarMQ Consumer)。同時broker伺服器接收來自消費者的訂閱、取消訂閱訊息,併發送ACK應該給對應的消費者,整個訊息系統就是這樣周而復始的工作。

  現在再來看一下,AvatarMQ中的核心模組的組成,如下圖所示:

  

  Producer Manage:訊息的生產者,其主要程式碼在(com.newlandframework.avatarmq.producer)包之下,其主要程式碼模組關鍵部分簡要說明如下:

複製程式碼
package com.newlandframework.avatarmq.producer;

import com.newlandframework.avatarmq.core.AvatarMQAction;
import com.newlandframework.avatarmq.model.MessageSource;
import com.newlandframework.avatarmq.model.MessageType;
import com.newlandframework.avatarmq.model.RequestMessage;
import com.newlandframework.avatarmq.model.ResponseMessage;
import com.newlandframework.avatarmq.msg.Message;
import com.newlandframework.avatarmq.msg.ProducerAckMessage;
import com.newlandframework.avatarmq.netty.MessageProcessor;
import java.util.concurrent.atomic.AtomicLong;

/**
 * @filename:AvatarMQProducer.java
 * @description:AvatarMQProducer功能模組
 * @author tangjie<https://github.com/tang-jie>
 * @blog http://www.cnblogs.com/jietang/
 * @since 2016-8-11
 */
public class AvatarMQProducer extends MessageProcessor implements AvatarMQAction {

    private boolean brokerConnect = false;
    private boolean running = false;
    private String brokerServerAddress;
    private String topic;
    private String defaultClusterId = "AvatarMQProducerClusters";
    private String clusterId = "";
    private AtomicLong msgId = new AtomicLong(0L);
    
    //連線訊息轉發伺服器broker的ip地址,以及生產出來訊息附帶的主題資訊
    public AvatarMQProducer(String brokerServerAddress, String topic) {
        super(brokerServerAddress);
        this.brokerServerAddress = brokerServerAddress;
        this.topic = topic;
    }
    
    //沒有連線上訊息轉發伺服器broker就傳送的話,直接應答失敗
    private ProducerAckMessage checkMode() {
        if (!brokerConnect) {
            ProducerAckMessage ack = new ProducerAckMessage();
            ack.setStatus(ProducerAckMessage.FAIL);
            return ack;
        }

        return null;
    }
    
    //啟動訊息生產者
    public void start() {
        super.getMessageConnectFactory().connect();
        brokerConnect = true;
        running = true;
    }
    
    //連線訊息轉發伺服器broker,設定生產者訊息處理鉤子,用於處理broker過來的訊息應答
    public void init() {
        ProducerHookMessageEvent hook = new ProducerHookMessageEvent();
        hook.setBrokerConnect(brokerConnect);
        hook.setRunning(running);
        super.getMessageConnectFactory().setMessageHandle(new MessageProducerHandler(this, hook));
    }
    
    //投遞訊息API
    public ProducerAckMessage delivery(Message message) {
        if (!running || !brokerConnect) {
            return checkMode();
        }

        message.setTopic(topic);
        message.setTimeStamp(System.currentTimeMillis());

        RequestMessage request = new RequestMessage();
        request.setMsgId(String.valueOf(msgId.incrementAndGet()));
        request.setMsgParams(message);
        request.setMsgType(MessageType.AvatarMQMessage);
        request.setMsgSource(MessageSource.AvatarMQProducer);
        message.setMsgId(request.getMsgId());

        ResponseMessage response = (ResponseMessage) sendAsynMessage(request);
        if (response == null) {
            ProducerAckMessage ack = new ProducerAckMessage();
            ack.setStatus(ProducerAckMessage.FAIL);
            return ack;
        }

        ProducerAckMessage result = (ProducerAckMessage) response.getMsgParams();
        return result;
    }
    
    //關閉訊息生產者
    public void shutdown() {
        if (running) {
            running = false;
            super.getMessageConnectFactory().close();
            super.closeMessageConnectFactory();
        }
    }

    public String getTopic() {
        return topic;
    }

    public void setTopic(String topic) {
        this.topic = topic;
    }

    public String getClusterId() {
        return clusterId;
    }

    public void setClusterId(String clusterId) {
        this.clusterId = clusterId;
    }
}
複製程式碼

  Consumer Clusters Manage / Message Routing:訊息的消費者叢集管理以及訊息路由模組,其主要模組在包(com.newlandframework.avatarmq.consumer)之中。其中訊息消費者物件,對應的核心程式碼主要功能描述如下:

複製程式碼
package com.newlandframework.avatarmq.consumer;

import com.google.common.base.Joiner;
import com.newlandframework.avatarmq.core.AvatarMQAction;
import com.newlandframework.avatarmq.core.MessageIdGenerator;
import com.newlandframework.avatarmq.core.MessageSystemConfig;
import com.newlandframework.avatarmq.model.MessageType;
import com.newlandframework.avatarmq.model.RequestMessage;
import com.newlandframework.avatarmq.msg.SubscribeMessage;
import com.newlandframework.avatarmq.msg.UnSubscribeMessage;
import com.newlandframework.avatarmq.netty.MessageProcessor;

/**
 * @filename:AvatarMQConsumer.java
 * @description:AvatarMQConsumer功能模組
 * @author tangjie<https://github.com/tang-jie>
 * @blog http://www.cnblogs.com/jietang/
 * @since 2016-8-11
 */
public class AvatarMQConsumer extends MessageProcessor implements AvatarMQAction {

    private ProducerMessageHook hook;
    private String brokerServerAddress;
    private String topic;
    private boolean subscribeMessage = false;
    private boolean running = false;
    private String defaultClusterId = "AvatarMQConsumerClusters";
    private String clusterId = "";
    private String consumerId = "";
    
    //連線的訊息伺服器broker的ip地址以及關注的生產過來的訊息鉤子
    public AvatarMQConsumer(String brokerServerAddress, String topic, ProducerMessageHook hook) {
        super(brokerServerAddress);
        this.hook = hook;
        this.brokerServerAddress = brokerServerAddress;
        this.topic = topic;
    }
    
    //向訊息伺服器broker傳送取消訂閱訊息
    private void unRegister() {
        RequestMessage request = new RequestMessage();
        request.setMsgType(MessageType.AvatarMQUnsubscribe);
        request.setMsgId(new MessageIdGenerator().generate());
        request.setMsgParams(new UnSubscribeMessage(consumerId));
        sendSyncMessage(request);
        super.getMessageConnectFactory().close();
        super.closeMessageConnectFactory();
        running = false;
    }
    
    //向訊息伺服器broker傳送訂閱訊息
    private void register() {
        RequestMessage request = new RequestMessage();
        request.setMsgType(MessageType.AvatarMQSubscribe);
        request.setMsgId(new MessageIdGenerator().generate());

        SubscribeMessage subscript = new SubscribeMessage();
        subscript.setClusterId((clusterId.equals("") ? defaultClusterId : clusterId));
        subscript.setTopic(topic);
        subscript.setConsumerId(consumerId);

        request.setMsgParams(subscript);

        sendAsynMessage(request);
    }
    
    public void init() {
        super.getMessageConnectFactory().setMessageHandle(new MessageConsumerHandler(this, new ConsumerHookMessageEvent(hook)));
        Joiner joiner = Joiner.on(MessageSystemConfig.MessageDelimiter).skipNulls();
        consumerId = joiner.join((clusterId.equals("") ? defaultClusterId : clusterId), topic, new MessageIdGenerator().generate());
    }
    
    //連線訊息伺服器broker
    public void start() {
        if (isSubscribeMessage()) {
            super.getMessageConnectFactory().connect();
            register();
            running = true;
        }
    }

    public void receiveMode() {
        setSubscribeMessage(true);
    }

    public void shutdown() {
        if (running) {
            unRegister();
        }
    }

    public String getBrokerServerAddress() {
        return brokerServerAddress;
    }

    public void setBrokerServerAddress(String brokerServerAddress) {
        this.brokerServerAddress = brokerServerAddress;
    }

    public String getTopic() {
        return topic;
    }

    public void setTopic(String topic) {
        this.topic = topic;
    }

    public boolean isSubscribeMessage() {
        return subscribeMessage;
    }

    public void setSubscribeMessage(boolean subscribeMessage) {
        this.subscribeMessage = subscribeMessage;
    }

    public String getDefaultClusterId() {
        return defaultClusterId;
    }

    public void setDefaultClusterId(String defaultClusterId) {
        this.defaultClusterId = defaultClusterId;
    }

    public String getClusterId() {
        return clusterId;
    }

    public void setClusterId(String clusterId) {
        this.clusterId = clusterId;
    }
}
複製程式碼

  訊息的叢集管理模組,主要程式碼是ConsumerContext.java、ConsumerClusters.java。先簡單說一下消費者叢集模組ConsumerClusters,主要負責定義消費者叢集的行為,以及負責訊息的路由。主要的功能描述如下所示:

複製程式碼
package com.newlandframework.avatarmq.consumer;

import com.newlandframework.avatarmq.model.RemoteChannelData;
import com.newlandframework.avatarmq.model.SubscriptionData;
import com.newlandframework.avatarmq.netty.NettyUtil;
import io.netty.channel.Channel;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.collections.MapUtils;
import org.apache.commons.collections.Predicate;

/**
 * @filename:ConsumerClusters.java
 * @description:ConsumerClusters功能模組
 * @author tangjie<https://github.com/tang-jie>
 * @blog http://www.cnblogs.com/jietang/
 * @since 2016-8-11
 */
public class ConsumerClusters {
    
    //輪詢排程(Round-Robin Scheduling)位置標記
    private int next = 0;
    private final String clustersId;
    private final ConcurrentHashMap<String/*生產者訊息的主題*/, SubscriptionData/*訊息對應的topic資訊資料結構*/> subMap
            = new ConcurrentHashMap<String, SubscriptionData>();

    private final ConcurrentHashMap<String/*消費者標識編碼*/, RemoteChannelData/*對應的消費者的netty網路通訊管道資訊*/> channelMap
            = new ConcurrentHashMap<String, RemoteChannelData>();

    private final List<RemoteChannelData> channelList = Collections.synchronizedList(new ArrayList<RemoteChannelData>());

    public ConsumerClusters(String clustersId) {
        this.clustersId = clustersId;
    }

    public String getClustersId() {
        return clustersId;
    }

    public ConcurrentHashMap<String, SubscriptionData> getSubMap() {
        return subMap;
    }

    public ConcurrentHashMap<String, RemoteChannelData> getChannelMap() {
        return channelMap;
    }
    
    //新增一個消費者到消費者叢集
    public void attachRemoteChannelData(String clientId, RemoteChannelData channelinfo) {
        if (findRemoteChannelData(channelinfo.getClientId()) == null) {
            channelMap.put(clientId, channelinfo);
            subMap.put(channelinfo.getSubcript().getTopic(), channelinfo.getSubcript());
            channelList.add(channelinfo);
        } else {
            System.out.println("consumer clusters exists! it's clientId:" + clientId);
        }
    }
    
    //從消費者叢集中刪除一個消費者
    public void detachRemoteChannelData(String clientId) {
        channelMap.remove(clientId);

        Predicate predicate = 
            
           

相關推薦

Netty構建分散式訊息佇列實現原理淺析

 在本人的上一篇部落格文章:Netty構建分散式訊息佇列(AvatarMQ)設計指南之架構篇 中,重點向大家介紹了AvatarMQ主要構成模組以及目前存在的優缺點。最後以一個生產者、消費者傳遞訊息的例子,具體演示了AvatarMQ所具備的基本訊息路由功能。而本文的寫作

osgEarth的Rex引擎原理分析請求合併佇列_mergeQueue

目標:(十四)中的33 請求合併佇列_mergeQueue是在幀迴圈的更新遍歷時構建的。這個是有分頁資料庫DatabasePager的更新遍歷實現的,而不是依靠場景樹節點的更新遍歷。 osgEarthDrivers/engine_rex/Loader.cpp bool PagerLoa

高校新聞管理與釋出系統的設計與實現——論文隨筆

一、基本資訊 標題:高校新聞管理與釋出系統的設計與實現 時間:2016-04 出版源:湖北大學 領域分類:系統架構和設計 二、研究背景 問題定義:由於資訊獲取渠道較之以往發生了太大的改變,無論是學校還是學生對新聞資訊的獲取和傳播速度已大大區別於前,新聞資訊的影響力也大大提高 相關工作:設計一個新聞資訊管

學生工作管理系統的設計與實現--文獻隨筆

一、基本資訊 標題:學生工作管理系統的設計與實現 時間:2017 出版源:天津大學 關鍵詞:學生工作; 高校管理; 資訊系統; 系統實現; 二、研究背景 問題定義:隨著我國社會的發展,高校也越來越多,每個高校的學生管理水平也不同,但隨著時間的推移,很多高校的原有學生資訊管理也出現了較為明顯的問題,一是學

分散式訊息佇列kafka原理簡介

kafka原理簡介 Kafka是由LinkedIn開發的一個分散式的訊息系統,使用Scala編寫,它以可水平擴充套件和高吞吐率而被廣泛使用。目前越來越多的開源分散式處理系統如Cloudera、Apache Storm、Spark都支援與Kafka整合

Java併發:執行緒池實現原理 Java併發:阻塞佇列BlockingQueue Java併發:阻塞佇列BlockingQueue Java併發程式設計:執行緒池的使用

一、總覽 執行緒池類ThreadPoolExecutor的相關類需要先了解:  (圖片來自:https://javadoop.com/post/java-thread-pool#%E6%80%BB%E8%A7%88) Executor:位於最頂層,只有一個 execute(Runnab

H5下拉重新整理和上拉載入實現原理淺析 轉載

前言 在移動端H5網頁中,下拉重新整理和上拉載入更多資料的互動方式出現頻率很高,開源社群也有很多類似的解決方案,如iscroll,pulltorefresh.js庫等。下面是對這兩種常見互動基本實現原理的闡述。 實現原理 下拉重新整理 實現下拉重新整理主要分為三步

分散式訊息通訊Kafka原理分析

本章重點: 1.訊息的儲存原理 2.Partition的副本機制原理 3.副本資料同步原理 訊息的檔案儲存機制 通過如

訊息中介軟體--RabbitMQ學習---高階特性之死信佇列

死信佇列:DLX,Dead- Letter- Exchange 利用DLX,當訊息在一個佇列中變成死信( dead message)之後它能被重新 publish到另一個 Exchange,這個 Exchange就是DLX 死信佇列訊息變成死信有一下幾種情況

入門RabbitMQ訊息佇列結合SSH框架配置篇

使用RabbitMQ訊息佇列,因為訊息佇列的非同步思想,解耦,以及允許短暫的不一致性,就像我現在把東西放在桌子上,你可以去拿,別人也可以去拿,而我不用等人拿完我便放東西上去,這樣就保證了我(生產者)和接收者沒有什麼聯絡,而且接受者可以隨時去拿。我們要使用RabbitMQ,安裝

通過兩個佇列實現一個棧C語言

stackBy2Queue.h檔案 #pragma once #define max_size 1000 typedef char DataType; typedef struct Queue { DataType data[max_size

兩個佇列實現一個棧C語言

本題的思路是先建立queue1和queue2,入棧時直接向queue1裡入佇列,出棧時需要先從queue1中出佇列的同時把數依次進入queue2,直到queue1中到最後一個數為止,然後將queue1中的數出佇列,再將queue2中的數倒回queue1,這樣就實

柯南君:看大資料時代下的IT架構8訊息佇列之RabbitMQ--案例topic起航

public class ReceiveLogsTopic { private static final String EXCHANGE_NAME = "topic_logs"; public static void main(String[] argv)

訊息佇列-ActiveMQ的使用Windows系統

   2.下載完成後,解壓到本地目錄(我放在了D盤),我的電腦是Windows系統64位的,所以進入 D:\apache-activemq-5.15.2-bin\apache-activemq-5.15.2\bin\win64 的資料夾,找到activemq.bat,如

訊息佇列和事件迴圈Event Loop

  產生原因 為什麼會有訊息佇列和事件迴圈呢?首先最關鍵的一點在於JS是個單執行緒,並且主執行緒非常繁忙,既要處理 DOM,又要計算樣式,還要處理佈局,同時還需要處理 JavaScript 任務以及各種輸入事件。要讓這麼多不同型別的任務在主執行緒中有條不紊地執行,這就需要一個系統來統籌排程這些任

《Linux內核設計與實現》讀書筆記- 頁高速緩存和頁回寫

第一次 源碼 進行 lose 減少 文件緩存 掩碼 recycle 創建 主要內容: 緩存簡介 頁高速緩存 頁回寫 1. 緩存簡介 在編程中,緩存是很常見也很有效的一種提高程序性能的機制。 linux內核也不例外,為了提高I/O性能,也引入了緩存機

Android項目實戰:QQ空間實現—— 展示說說中的評論內容並有相應點擊事件

con toast short demo append 集合 obj parent 自帶 原文:Android項目實戰(十六):QQ空間實現(一)—— 展示說說中的評論內容並有相應點擊事件大家都玩QQ空間客戶端,對於每一個說說,我們都可以評論,那麽,對於某一條評論:

用Vue來實現音樂播放器:滾動列表的實現

com 作用 efault nor 大小 -s stylus BE ack 滾動列表是一個基礎組件 他是基於scroll組件實現的 在base文件夾下面創建一個list-view文件夾 裏面有list-view.vue組件 <template>

Spring原始碼解析——AOP原理——獲取攔截器鏈——MethodInterceptor

   *     3)、目標方法執行    ;  *         容器中儲存了元件的代理物件(cglib增強後的物件),這個物件裡面儲存了詳細

redis原始碼分析與思考——集合型別的命令實現(t_set.c)

    集合型別是用來儲存多個字串的,與列表型別不一樣,集合中不允許有重複的元素,也不能以索引的方式來通過下標獲取值,集合中的元素還是無序的。在普通的集合上增刪查改外,集合型別還實現了多個集合的取交集、並集、差集,集合的命令如下表所示: 集合命