1. 程式人生 > >rocketmq-producer原理解析

rocketmq-producer原理解析

Producer隨機與一個NameServer建立長連線,從NameServer獲取topic的最新佇列情況。Producer會向提供topic服務的master建立長連線,且定時向master傳送心跳。
傳送訊息demo:

// 構造Producer
DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName");
// 初始化Producer,整個應用生命週期內,只需要初始化1次
producer.start();
// 構造Message
Message msg = new Message("TopicTest1",// topic
"TagA",// tag:給訊息打標籤,用於區分一類訊息,可為null "OrderID188",// key:自定義Key,可以用於去重,可為null ("Hello MetaQ").getBytes());// body:訊息內容 // 傳送訊息並返回結果 SendResult sendResult = producer.send(msg); // 清理資源,關閉網路連線,登出自己 producer.shutdown();

一:start()
在整個應用生命週期內,生產者只需要呼叫一次start方法

來初始化以下工作:

  • 如果沒有指定namesrv地址,將會自動定址
  • 啟動定時任務:從namsrv更新topic路由資訊、清理已經掛掉的broker、向所有線上的broker master傳送心跳…
  • 啟動負載均衡的服務:producer根據roundbin方式輪詢topic下的所有佇列來實現傳送方的負載均衡。

二:DefaultMQProducer
send的內部實現原理虛擬碼如下:

private SendResult sendDefaultImpl(Message msg,......) {
    // 檢查Producer的狀態是否是RUNNING
    this.makeSureStateOK();
    // 檢查msg是否合法:是否為null、topic,body是否為空、body是否超長
Validators.checkMessage(msg, this.defaultMQProducer); // 獲取topic路由資訊 TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic()); // 從路由資訊中選擇一個訊息佇列 MessageQueue mq = topicPublishInfo.selectOneMessageQueue(lastBrokerName); // 將訊息傳送到該佇列上去 sendResult = this.sendKernelImpl(msg, mq, communicationMode, sendCallback, timeout); }

1:producer初始化的時候會從namesrv獲取topic的路由資訊更新到本地快取,所以tryToFindTopicPublishInfo會先從本地快取取,如果沒取到再從namesrv獲取最新的路由資訊。

 private TopicPublishInfo tryToFindTopicPublishInfo(final String topic) {
        TopicPublishInfo topicPublishInfo = this.topicPublishInfoTable.get(topic);
        if (null == topicPublishInfo || !topicPublishInfo.ok()) {
            this.topicPublishInfoTable.putIfAbsent(topic, new TopicPublishInfo());
            this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic);
            topicPublishInfo = this.topicPublishInfoTable.get(topic);
        }

        if (topicPublishInfo.isHaveTopicRouterInfo() || (topicPublishInfo != null &&    topicPublishInfo.ok())) {
            return topicPublishInfo;
        } else {
            this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic, true, this.defaultMQProducer);
            topicPublishInfo = this.topicPublishInfoTable.get(topic);
            return topicPublishInfo;
        }
    }

2.傳送訊息時,當沒有指定佇列或佇列選擇器時,呼叫selectOneMessageQueue:使用輪詢方式,返回一個佇列。當指定佇列選擇器時,通過selector.select(topicPublishInfo.getMessageQueueList(), msg, arg);獲取佇列。
3.訊息傳送
CommunicationMode:
SYNC:同步(預設),
ASYNC:非同步(callback),
ONEWAY:單向,

private SendResult sendKernelImpl(final Message msg, 
                                      final MessageQueue mq, 
                                      final CommunicationMode communicationMode,
                                      ......){
     String brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq.getBrokerName());
     SendMessageRequestHeader requestHeader = new SendMessageRequestHeader();
     switch (communicationMode) {
         case ASYNC:
                  sendResult = this.mQClientFactory.getMQClientAPIImpl().sendMessage(
                                brokerAddr, // 1
                                mq.getBrokerName(), // 2
                                msg, // 3
                                requestHeader, // 4
                                timeout, // 5
                                communicationMode, // 6
                                sendCallback, // 7
                                topicPublishInfo, // 8
                                this.mQClientFactory, // 9
                                this.defaultMQProducer.getRetryTimesWhenSendAsyncFailed(), //10
                                context, //
                                this);
                        break;
                    case ONEWAY:
                    case SYNC:
                       ....
                        break;

}

相關推薦

rocketmq-producer原理解析

Producer隨機與一個NameServer建立長連線,從NameServer獲取topic的最新佇列情況。Producer會向提供topic服務的master建立長連線,且定時向master傳送心跳。 傳送訊息demo: // 構造Producer De

RocketMQ原理解析-producer 2.如何傳送訊息

Producer輪詢某topic下的所有佇列的方式來實現傳送方的負載均衡 1)  Topic下的所有佇列如何理解: 比如broker1, broker2, borker3三臺broker機器都配置了Topic_A Broker1 的佇列為queue0 , queue1

RocketMQ原理解析-producer 3.如何傳送順序訊息

Rocketmq能夠保證訊息嚴格順序,但是Rocketmq需要producer保證順序訊息按順序傳送到同一個queue中,比如購買流程(1)下單(2)支付(3)支付成功,這三個訊息需要根據特定規則將這個三個訊息按順序傳送到一個queue 如何實現把順序訊息傳送到同一個qu

RocketMQ 實戰與原理解析

內容簡介 本書由雲棲社群官方出品。 本書作者是阿里資深資料專家,對 RocketMQ 有深入的研究,並有大量的實踐經驗。在寫這本書之前,作者不僅系統、深入地閱讀了 RocketMQ 的原始碼,而且還向 RocketMQ 的官方開發團隊深入瞭解了它的諸多設計細節。作者結合自己多年使用 Rocket

RocketMQ原理解析-consumer 5.push消費-順序消費訊息

順序消費服務ConsumeMessageConcurrentlyService構建的時候                    構建一個執行緒池來接收消費請求ConsumeRequest                    構建一個單執行緒的本地執行緒,用來稍後定時重新消

RocketMQ原理解析-consumer 2.消費端負載均衡

消費端負載均衡 消費端會通過RebalanceService執行緒,10秒鐘做一次基於topic下的所有佇列負載 消費端遍歷自己的所有topic,依次調rebalanceByTopic  根據topic獲取此topic下的所有queue  選擇一臺broker獲取基

RocketMQ原理解析-consumer 3.長輪詢

Rocketmq的訊息是由consumer端主動到broker拉取的, consumer向broker傳送拉訊息請求, PullMessageService服務通過一個執行緒將阻塞佇列LinkedBlockingQueue<PullRequest>中的PullR

RocketMQ原理解析-Remoting1. 通訊層實現

Rocketmq的通訊層是基於通訊框架netty 4.0.21.Final之上做了簡單的協議封裝,是強依賴。 一: NettyRemotingAbstract  Server與Client公用抽象類 ResponseFuture模式:         invokeSyn

Spring Cloud Stream應用與自定義RocketMQ原理解析

概述Spring Cloud Stream 簡介Spring Cloud Stream 是一個用來為微服務應用構建訊息驅動能力的框架。它可以基於Spring Boot 來建立獨立的,可用於生產的Spring 應用程式。他通過使用Spring Integration來連線訊息代

RocketMQ原理解析-broker 4.HA & master slave

在broker啟動的時候BrokerController如果是slave,配置了master地址更新,沒有配置所有broker會想namesrv註冊,從namesrv獲取haServerAddr,然後更新到HAClient 當HAClient的MasterAddress不

[Architect] Abp 框架原理解析(5) UnitOfWork

框架 方法 src options nalu res actions cnblogs 一個數 本節目錄 介紹 分析Abp源碼 實現UOW 介紹 UOW(全稱UnitOfWork)是指工作單元. 在Abp中,工作單元對於倉儲和應用服務方法默認開啟。並在一次請求中,共享

angularjs工作原理解析

body oot 分隔 復制 抖動 修改 重新 接收 裏的 個人覺得,要很好的理解AngularJS的運行機制,才能盡可能避免掉到坑裏面去。在這篇文章中,我將根據網上的資料和自己的理解對AngularJS的在啟動後,每一步都做了些什麽,做一個比較清楚詳細的解析。 首

USB Type-C工作原理解析

說明 是否 forms dfp 其他 耗時 def 左右 del 自從蘋果發布了新MacBook,USB Type-C接口就成為了熱議對象。我來從硬件角度解析下這個USB Type-C,以便大家更好的了解USB Type-C的工作原理。特色尺寸小,支持正反插,速度快(10G

LocationManager(一)-定位方式原理解析

一段時間 接入點 work use npr roi 無線網 服務器 輔助 參考資源:android 4種定位原理及實現——1 android使用不同的方法為應用提供位置信息。 定位的方式有三種:GPS地位(A-GPSAssistedGPS:輔助全球衛星定位系統,或者是同步G

移動端使用rem同時適應安卓ios手機原理解析,移動端響應式開發

size screen bsp 應用 屏幕 來看 比例 忽略 基礎 rem單位大家可能已經很熟悉,rem是隨著html的字體大小來顯示代表寬度的方法,我們怎樣進行移動端響應式開發呢 瀏覽器默認的字體大小為16px 及1rem 等於 16px 如果我們想要使1rem等於 12

短信轟炸工具原理解析

圖形驗證碼 led 可能 https encoding 驗證碼生成 dex alt create 溫馨提示:本文文章緊作為學習探討,不能用於破壞攻擊用途,後果自負。文章後面有Demo源碼下載,使用C#語言開發。   相信不少人都莫名奇妙收過一些註冊驗證碼短信、登錄驗證碼短

【數據壓縮】JPEG標準與原理解析

round 高頻 切割 基於 大小 image 生成 p s pan 轉載請註明出處:http://blog.csdn.net/luoshixian099/article/details/50392230 CSDN-勿在浮沙築高臺 為了滿足不同應用的需求,J

數據庫水平切分(拆庫拆表)的實現原理解析(轉)

數字 一個數據庫 java ins 結果 都對 不同 com 嚴重 第1章 引言 隨著互聯網應用的廣泛普及,海量數據的存儲和訪問成為了系統設計的瓶頸問題。對於一個大型的互聯網應用,每天幾十億的PV無疑對數據庫造成了相當 高的負載。對於系統的穩定性和擴展性造成了極大的問題。

遊戲外掛原理解析與制作 - [內存數值修改類 篇一]

tle lin 篩選 分享 自己的 src 但我 以及 先來   本章旨在講解外掛實現原理,未深入涉及至代碼層面。希望能與對這方面感興趣的朋友多多交流,畢竟理論是死的,套路是固定的,只有破解經驗是花大量時間和心血積累的。 對於單機遊戲而言,遊戲中絕大部分的參數(比如血、藍

圍棋人機大戰中阿爾法狗原理解析,左右互搏,青出於藍而勝於藍?

重新 公園 mas 人機大戰 規律 .com boa beyond 暫時 —阿爾法狗原理解析 這些天都在沒日沒夜地關註一個話題,谷歌人工智能程序AlphaGo(國內網友親切地稱為“阿 爾法狗”)擊敗歐洲職業圍棋冠軍樊麾二段,在圍棋遊戲中達到了人類職業棋手的水平。什麽!!19