1. 程式人生 > >RocketMQ(八):訊息傳送

RocketMQ(八):訊息傳送

匠心零度 轉載請註明原創出處,謝謝!

RocketMQ網路部署圖

  • NameServer:在系統中是做命名服務,更新和發現 broker服務。
  • Broker-Master:broker 訊息主機伺服器。
  • Broker-Slave: broker 訊息從機伺服器。
  • Producer: 訊息生產者。
  • Consumer: 訊息消費者。

說明: rocketmq系列都將會以rocketmq-4.1.0-incubating進行介紹。

在閱讀原始碼時做了一定的註釋,公眾號【匠心零度】回覆:rocketmq,可獲得基於rocketmq4.1.0加詳細中文程式碼註釋 。歡迎大家 star、fork !

廝大說過訊息中介軟體的本質訊息中介軟體大道至簡:一發一存一消費 ,今天主要來討論下,就是RocketMQ網路部署圖中用顏色標記的部分。

往期rocketmq系列文章

訊息傳送概述

上面的圖大概就是producer傳送message到broker的核心邏輯了。

問題思考:

把broker相關資訊快取到客戶端減少了與namesrv的互動,但是也降低了broker變化的實時性了,如何忽然有一臺broker不可用了會怎麼樣呢?(後續看看rocketmq的處理),為什麼producer傳送會那麼快呢?本質是由於netty的writeAndFlush?producer如何做到非同步傳送?同步傳送?oneway傳送的呢?如果傳送失敗會怎麼處理呢?

訊息傳送一般流程分析

由於傳送還涉及到定時傳送,順序傳送,批量傳送等情況,本篇考慮到篇幅問題就是一般的傳送邏輯講解,後面繼續分享其他情況。

如何在本地除錯之前文章也分享過了,在此就不提了,傳送的邏輯相對於儲存以及消費來說是最簡單的(直接根據一條線不斷的跟下去基本就差不多了),而儲存最複雜,其次消費(這些過程可能一條線不好找,後續分享)。

同步傳送寫法

producer.start

    /**
     * Start this producer instance.
     * </p>
     *
     * <strong>
     * Much internal initializing procedures are carried out to make this instance prepared, thus, it's a must to invoke
     * this method before sending or querying messages.
     * </strong>
     * </p>
     *
     * @throws
MQClientException if there is any unexpected error. */
@Override public void start() throws MQClientException { this.defaultMQProducerImpl.start(); }

主要做了下列事情(核心事情):

  • 一些配置檢查。
  • 構建與namesrv通訊的netty客戶端。
  • 預設每30s與namesrv交換獲取broker相關資訊。
  • 預設每30s去掉失效的broker資訊以及傳送心跳到所有broker上面。

構建Message物件

producer是以Message物件進行傳送的,看看Message構造:

    public Message() {
    }

    public Message(String topic, byte[] body) {
        this(topic, "", "", 0, body, true);
    }

    public Message(String topic, String tags, String keys, int flag, byte[] body, boolean waitStoreMsgOK) {
        this.topic = topic;
        this.flag = flag;
        this.body = body;

        if (tags != null && tags.length() > 0)
            this.setTags(tags);

        if (keys != null && keys.length() > 0)
            this.setKeys(keys);

        this.setWaitStoreMsgOK(waitStoreMsgOK);
    }

    public Message(String topic, String tags, byte[] body) {
        this(topic, tags, "", 0, body, true);
    }

    public Message(String topic, String tags, String keys, byte[] body) {
        this(topic, tags, keys, 0, body, true);
    }

備註: 主要就是topic、tags、以及body真實內容等。

send傳送

SendResult sendResult = producer.send(msg);

進行傳送處理。下面我們重點看看send如何處理。

傳送send核心分析

傳送的幾種方式:同步 非同步 oneway(應該選擇哪種,需要自己根據情況進行判斷)

以同步傳送為例子,預設超時時間為3s,

SendResult sendResult = producer.send(msg);

這個就是傳送的觸發方法,我們一直跟進去就行了,第一初步感受:通過跟蹤進去第一感覺就是涉及到了JUC相關使用,大量運用享元模式(本質一個map進行快取)以及netty使用。

核心邏輯:

程式碼就不大量複製了,需要的github裡面獲取基於rocketmq4.1.0加詳細中文程式碼註釋 。歡迎大家 star、fork !

  • 判斷服務是否可用? 不可用直接結束流程。

  • 訊息的驗證:

  • 獲取topic路由資訊

    快取中有就獲取,沒有就namesrv互動一次(也可能2次)由於topic資訊在broker服務端不一定存在,如果不存在就用預設的(TBW102)。

封裝請求頭資訊:

// Namesrv 根據Topic獲取Broker Name、佇列數(包含讀佇列與寫佇列)
 public static final int GET_ROUTEINTO_BY_TOPIC = 105;

namesrv服務端接受到這個請求的處理情況。

最後得到的路由資訊類似下面的:

  • 傳送模式是sync 會有3次其他1次

    //傳送模式是sync 會有3次其他1次
    int timesTotal = communicationMode == CommunicationMode.SYNC ? 1 + this.defaultMQProducer.getRetryTimesWhenSendFailed() : 1;
  • 選擇一個queue

    如何選擇傳送那個broker的那個queueid上面?(客戶端自己負載),由於broker相關資訊快取在客戶端裡面,問題來了(由於30s會同步一次資訊,那麼在30s之內broker出現問題會怎麼樣呢? )rocketmq是這樣處理的:sendLatencyFaultEnable開關是否開啟

    1.開啟–> 有多長時間內不可用情況

    2.不開啟(預設)–>直接隨機一個(如果帶了lastBrokerName不為空 儘量換不是這個broker的,如果都沒有又是隨機一個)

  • 呼叫sendKernelImpl傳送訊息 傳送訊息核心

    根據broker的name獲取到ip地址,如果通道沒有建立並且儲存。

    設定設定UNIQ_id,裡面保護客戶端ip地址資訊。

    傳送的時候 會有鉤子函式提供執行(禁止訊息鉤子 ,傳送訊息鉤子(executeSendMessageHookBefore、executeSendMessageHookAfter)。

    構建SendMessageRequestHeader,包括生成訊息時間戳,所以各各機器時間最好一致,(這樣後期也可以查下broker接受訊息花了多少時間)。

  • 根據傳送訊息模式,選擇傳送方式

    下面這次主要看同步傳送情況。

    如果1情況執行nettywriteAndFlush傳送成功者跳出來,到達3情況進行等等最多等待3s。這裡什麼時候喚醒呢? 其實是在broker情況響應客戶端的時候進行喚醒的:

    備註: 這裡使用CountDownLatch非同步轉同步的。

    如果是2情況表示傳送失敗,直接喚醒3情況不進行阻塞了(最後拋異常表示傳送失敗)

  • 更新broker可用時間

  • retryAnotherBrokerWhenNotStoreOK情況判斷

    如果設定為retryAnotherBrokerWhenNotStoreOK為true之後,在傳送失敗的時候,會選擇換一個broker。

  • 如下異常continue,進行傳送訊息重試

客戶端傳送流程大概到這裡就分析完成了。

如果讀完覺得有收穫的話,歡迎點贊、關注、加公眾號【匠心零度】,查閱更多精彩歷史!!!

加入知識星球,一起探討!

相關推薦

RocketMQ()訊息傳送

匠心零度 轉載請註明原創出處,謝謝! RocketMQ網路部署圖 NameServer:在系統中是做命名服務,更新和發現 broker服務。 Broker-Master:broker 訊息主機伺服器。 Broker-Slave: brok

iOS進階—Runtime原始碼解析訊息傳送

GitHub參考 PS:參考GitHub分享的objc-runtime-master程式碼 1、OC中的方法呼叫,實際上objc_msgSend函式呼叫 2、objc_msgSend的執行過程大致可以分為三個部分: 訊息傳送 動態方法解析 訊息轉發 Ru

RocketMQ()高效能探祕之執行緒池

  上一篇文章講了如何設計和實現高併發高效能的應用,從根本上說明了一些道理。且以rocketmq的mappedFile的實現作為一個突破點,講解了rocketmq是如何具體實現高效能的。從中我們也知道,mappedFile只是其利用的作業系統的一個特性小點。   今天,我們就來說說,rockmq實現高效能的第

簡單的spring boot + flowable(activiti) + rocketmq 流程包括申請、稽核、定期提醒、rocketmq訊息傳送

一、配置 1、application.yml配置 server: port: 18090 spring: application: name: service-flowable datasource: driver-class

史上最簡單的SpringCloud教程 | 第 訊息匯流排(Spring Cloud Bus)

最新Finchley版本請訪問: https://www.fangzhipeng.com/springcloud/2018/08/30/sc-f8-bus/ 或者 http://blog.csdn.net/forezp/article/details/81041062

RocketMQ原理學習---Producer訊息傳送

        上一篇部落格RocketMQ原理學習-- Name Server中我們介紹了Name Server提供的相關功能,這篇部落格我們來介紹一下生產者訊息傳送相關的內容。 訊息傳送示例: public class Producer {

Kafka、RabbitMQ、RocketMQ訊息中介軟體的對比 —— 訊息傳送效能和區別

分散式系統中,我們廣泛運用訊息中介軟體進行系統間的資料交換,便於非同步解耦。現在開源的訊息中介軟體有很多,前段時間我們自家的產品 RocketMQ (MetaQ的核心) 也順利開源,得到大家的關注。 那麼,訊息中介軟體效能究竟哪家強? 帶著這個疑問,我們中介軟體測

RocketMQ訊息傳送方式

同步傳送 簡單來說,同步傳送就是指 producer 傳送訊息後,會在接收到 broker 響應後才繼續發下一條訊息的通訊方式。 由於這種同步傳送的方式確保了訊息的可靠性,同時也能及時得到訊息傳送的結果,故而適合一些傳送比較重要的訊息場景,比如說重要的通知郵件、營銷簡訊等等。在實

Kafka、RabbitMQ、RocketMQ訊息中介軟體的對比 —— 訊息傳送效能(轉自阿里中介軟體)

引言分散式系統中,我們廣泛運用訊息中介軟體進行系統間的資料交換,便於非同步解耦。現在開源的訊息中介軟體有很多,前段時間我們自家的產品 RocketMQ (MetaQ的核心) 也順利開源,得到大家的關注。那麼,訊息中介軟體效能究竟哪家強?帶著這個疑問,我們中介軟體測試組對常見的三類訊息產品(Kafka、Rabb

Rocketmq 4.3.2訊息傳送邏輯--------sendDefaultImpl方法研究

訊息傳送邏輯 makeSureStateOK():判斷服務(serviceState)是否可用,不可用就直接退出 checkMessage: 判斷訊息是否符合要求:是否為空,topic(還要判斷topic是否符合命名規則),body是否為空,訊息長度是否為0或者大於預設訊息長度。 be

RocketMQ原理學習---生產者事物訊息傳送

        上一篇部落格《RocketMQ原理學習---生產者普通訊息傳送》我們已經對生產者傳送普通訊息有了簡單的瞭解,這篇部落格我們來學習一下RocketMQ在傳送事物訊息時做了什麼處理操作。 一、生產者傳送訊息    

RocketMQ訊息傳送之pull和push

RocketMQ學習(五)——RocketMQ訊息傳送之pull和push import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.client.producer.Defa

Scala簡單使用Actor的訊息傳送與接收求和

從Scala的諸多介紹當中,就看到了不少特別指出Scala中的Actor能夠實現並行程式設計的強大功能,它是基於事件模型的併發機制。或者說,Scala是運用訊息(message)的傳送、接收來實現多執行緒的。使用Scala能夠更容易地實現多執行緒應用的開發。  說到並行與訊息傳送、接收,我記起了上學期“平行

Kafka、RabbitMQ、RocketMQ 訊息中介軟體的對比 | 訊息傳送效能篇

摘要: 訊息中介軟體效能究竟哪家強? 帶著這個疑問,我們訊息佇列測試小組對常見的三類訊息產品(Kafka、RabbitMQ、RocketMQ)做了效能比較。 阿里雲訊息佇列測試小組 出品 分散式系統中,我們廣泛運用訊息中介軟體進行系統間的資料交換,便於非同步解耦。現在

RabbitMQ學習筆記RabbitMQ的訊息確認

來源: https://blog.csdn.net/chenxyt/article/details/79259838 一、概述     前文說到RabbitMQ的交換機、佇列、訊息的持久化並不能100%的保證訊息不會丟失。首先從生產者端,持久化的訊息在

RocketMQ原始碼解析-Producer訊息傳送

首先以預設的非同步訊息傳送模式作為例子。DefaultMQProducer中的send()方法會直接呼叫DefaultMQProducerImpl的send()方法,在DefaultMQProducerImpl會直接呼叫sendDefaultImpl()方法。 public

RabbitMQ學習筆記二rabbitmq傳送接收訊息Helloworld(Java版)

一 引入rabbitmq java client 前面我們已經在本地(windows下)安裝配置好了RabbitMQ server。現在我們引入rabbitmq Java client。 在eclipse中建立一個maven專案,在pom.xml檔案中加

從零開始微信機器人(一)wxpy簡介(登入、訊息傳送、註冊回覆)

在過去的幾個月中,由於在新生群中回答問題費時費力,同時又有許多重複而又有固定答案的回答,我受到一些知乎文章的啟發,維護了一個基於itchat的群聊機器人。從剛開始接入圖靈機器人時只會尬聊的機器人,之後又加入了api.ai的按照訊息內容自動回覆,而後再加入了回覆表情功能

rocketmq問題彙總-如何將特定訊息傳送至特定queue,消費者從特定queue消費

業務描述 由於業務需要這樣一種場景,將訊息按照id(業務id)尾號傳送到對應的queue中,並啟動10個消費者(單jvm,10個消費者組),從對應的queue中叢集消費,如下圖1所示(假設有兩

RocketMQ使用心得1-訊息傳送

去年末接觸RocketMQ,主要因為官方標註訊息必達,專案為支付相關,通過Restful api接收訂單存庫+推送MQ,再由消費端支付。後期發現即使訊息必達,但訊息不一定100%傳送成功,那麼如何判斷訊息傳送成功又成為了一個坑。 訊息傳送結果SendStatus分4種SEN