rocketmq-producer原理解析
Producer隨機與一個NameServer建立長連線,從NameServer獲取topic的最新佇列情況。Producer會向提供topic服務的master建立長連線,且定時向master傳送心跳。
傳送訊息demo:
// 構造Producer
DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName");
// 初始化Producer,整個應用生命週期內,只需要初始化1次
producer.start();
// 構造Message
Message msg = new Message("TopicTest1",// topic
"TagA",// tag:給訊息打標籤,用於區分一類訊息,可為null
"OrderID188",// key:自定義Key,可以用於去重,可為null
("Hello MetaQ").getBytes());// body:訊息內容
// 傳送訊息並返回結果
SendResult sendResult = producer.send(msg);
// 清理資源,關閉網路連線,登出自己
producer.shutdown();
一:start()
在整個應用生命週期內,生產者只需要呼叫一次start方法
- 如果沒有指定namesrv地址,將會自動定址
- 啟動定時任務:從namsrv更新topic路由資訊、清理已經掛掉的broker、向所有線上的broker master傳送心跳…
- 啟動負載均衡的服務:producer根據roundbin方式輪詢topic下的所有佇列來實現傳送方的負載均衡。
二:DefaultMQProducer
send的內部實現原理虛擬碼如下:
private SendResult sendDefaultImpl(Message msg,......) {
// 檢查Producer的狀態是否是RUNNING
this.makeSureStateOK();
// 檢查msg是否合法:是否為null、topic,body是否為空、body是否超長
Validators.checkMessage(msg, this.defaultMQProducer);
// 獲取topic路由資訊
TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic());
// 從路由資訊中選擇一個訊息佇列
MessageQueue mq = topicPublishInfo.selectOneMessageQueue(lastBrokerName);
// 將訊息傳送到該佇列上去
sendResult = this.sendKernelImpl(msg, mq, communicationMode, sendCallback, timeout);
}
1:producer初始化的時候會從namesrv獲取topic的路由資訊更新到本地快取,所以tryToFindTopicPublishInfo會先從本地快取取,如果沒取到再從namesrv獲取最新的路由資訊。
private TopicPublishInfo tryToFindTopicPublishInfo(final String topic) {
TopicPublishInfo topicPublishInfo = this.topicPublishInfoTable.get(topic);
if (null == topicPublishInfo || !topicPublishInfo.ok()) {
this.topicPublishInfoTable.putIfAbsent(topic, new TopicPublishInfo());
this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic);
topicPublishInfo = this.topicPublishInfoTable.get(topic);
}
if (topicPublishInfo.isHaveTopicRouterInfo() || (topicPublishInfo != null && topicPublishInfo.ok())) {
return topicPublishInfo;
} else {
this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic, true, this.defaultMQProducer);
topicPublishInfo = this.topicPublishInfoTable.get(topic);
return topicPublishInfo;
}
}
2.傳送訊息時,當沒有指定佇列或佇列選擇器時,呼叫selectOneMessageQueue:使用輪詢方式,返回一個佇列。當指定佇列選擇器時,通過selector.select(topicPublishInfo.getMessageQueueList(), msg, arg);獲取佇列。
3.訊息傳送
CommunicationMode:
SYNC:同步(預設),
ASYNC:非同步(callback),
ONEWAY:單向,
private SendResult sendKernelImpl(final Message msg,
final MessageQueue mq,
final CommunicationMode communicationMode,
......){
String brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq.getBrokerName());
SendMessageRequestHeader requestHeader = new SendMessageRequestHeader();
switch (communicationMode) {
case ASYNC:
sendResult = this.mQClientFactory.getMQClientAPIImpl().sendMessage(
brokerAddr, // 1
mq.getBrokerName(), // 2
msg, // 3
requestHeader, // 4
timeout, // 5
communicationMode, // 6
sendCallback, // 7
topicPublishInfo, // 8
this.mQClientFactory, // 9
this.defaultMQProducer.getRetryTimesWhenSendAsyncFailed(), //10
context, //
this);
break;
case ONEWAY:
case SYNC:
....
break;
}
相關推薦
rocketmq-producer原理解析
Producer隨機與一個NameServer建立長連線,從NameServer獲取topic的最新佇列情況。Producer會向提供topic服務的master建立長連線,且定時向master傳送心跳。 傳送訊息demo: // 構造Producer De
RocketMQ原理解析-producer 2.如何傳送訊息
Producer輪詢某topic下的所有佇列的方式來實現傳送方的負載均衡 1) Topic下的所有佇列如何理解: 比如broker1, broker2, borker3三臺broker機器都配置了Topic_A Broker1 的佇列為queue0 , queue1
RocketMQ原理解析-producer 3.如何傳送順序訊息
Rocketmq能夠保證訊息嚴格順序,但是Rocketmq需要producer保證順序訊息按順序傳送到同一個queue中,比如購買流程(1)下單(2)支付(3)支付成功,這三個訊息需要根據特定規則將這個三個訊息按順序傳送到一個queue 如何實現把順序訊息傳送到同一個qu
RocketMQ 實戰與原理解析
內容簡介 本書由雲棲社群官方出品。 本書作者是阿里資深資料專家,對 RocketMQ 有深入的研究,並有大量的實踐經驗。在寫這本書之前,作者不僅系統、深入地閱讀了 RocketMQ 的原始碼,而且還向 RocketMQ 的官方開發團隊深入瞭解了它的諸多設計細節。作者結合自己多年使用 Rocket
RocketMQ原理解析-consumer 5.push消費-順序消費訊息
順序消費服務ConsumeMessageConcurrentlyService構建的時候 構建一個執行緒池來接收消費請求ConsumeRequest 構建一個單執行緒的本地執行緒,用來稍後定時重新消
RocketMQ原理解析-consumer 2.消費端負載均衡
消費端負載均衡 消費端會通過RebalanceService執行緒,10秒鐘做一次基於topic下的所有佇列負載 消費端遍歷自己的所有topic,依次調rebalanceByTopic 根據topic獲取此topic下的所有queue 選擇一臺broker獲取基
RocketMQ原理解析-consumer 3.長輪詢
Rocketmq的訊息是由consumer端主動到broker拉取的, consumer向broker傳送拉訊息請求, PullMessageService服務通過一個執行緒將阻塞佇列LinkedBlockingQueue<PullRequest>中的PullR
RocketMQ原理解析-Remoting1. 通訊層實現
Rocketmq的通訊層是基於通訊框架netty 4.0.21.Final之上做了簡單的協議封裝,是強依賴。 一: NettyRemotingAbstract Server與Client公用抽象類 ResponseFuture模式: invokeSyn
Spring Cloud Stream應用與自定義RocketMQ原理解析
概述Spring Cloud Stream 簡介Spring Cloud Stream 是一個用來為微服務應用構建訊息驅動能力的框架。它可以基於Spring Boot 來建立獨立的,可用於生產的Spring 應用程式。他通過使用Spring Integration來連線訊息代
RocketMQ原理解析-broker 4.HA & master slave
在broker啟動的時候BrokerController如果是slave,配置了master地址更新,沒有配置所有broker會想namesrv註冊,從namesrv獲取haServerAddr,然後更新到HAClient 當HAClient的MasterAddress不
[Architect] Abp 框架原理解析(5) UnitOfWork
框架 方法 src options nalu res actions cnblogs 一個數 本節目錄 介紹 分析Abp源碼 實現UOW 介紹 UOW(全稱UnitOfWork)是指工作單元. 在Abp中,工作單元對於倉儲和應用服務方法默認開啟。並在一次請求中,共享
angularjs工作原理解析
body oot 分隔 復制 抖動 修改 重新 接收 裏的 個人覺得,要很好的理解AngularJS的運行機制,才能盡可能避免掉到坑裏面去。在這篇文章中,我將根據網上的資料和自己的理解對AngularJS的在啟動後,每一步都做了些什麽,做一個比較清楚詳細的解析。 首
USB Type-C工作原理解析
說明 是否 forms dfp 其他 耗時 def 左右 del 自從蘋果發布了新MacBook,USB Type-C接口就成為了熱議對象。我來從硬件角度解析下這個USB Type-C,以便大家更好的了解USB Type-C的工作原理。特色尺寸小,支持正反插,速度快(10G
LocationManager(一)-定位方式原理解析
一段時間 接入點 work use npr roi 無線網 服務器 輔助 參考資源:android 4種定位原理及實現——1 android使用不同的方法為應用提供位置信息。 定位的方式有三種:GPS地位(A-GPSAssistedGPS:輔助全球衛星定位系統,或者是同步G
移動端使用rem同時適應安卓ios手機原理解析,移動端響應式開發
size screen bsp 應用 屏幕 來看 比例 忽略 基礎 rem單位大家可能已經很熟悉,rem是隨著html的字體大小來顯示代表寬度的方法,我們怎樣進行移動端響應式開發呢 瀏覽器默認的字體大小為16px 及1rem 等於 16px 如果我們想要使1rem等於 12
短信轟炸工具原理解析
圖形驗證碼 led 可能 https encoding 驗證碼生成 dex alt create 溫馨提示:本文文章緊作為學習探討,不能用於破壞攻擊用途,後果自負。文章後面有Demo源碼下載,使用C#語言開發。 相信不少人都莫名奇妙收過一些註冊驗證碼短信、登錄驗證碼短
【數據壓縮】JPEG標準與原理解析
round 高頻 切割 基於 大小 image 生成 p s pan 轉載請註明出處:http://blog.csdn.net/luoshixian099/article/details/50392230 CSDN-勿在浮沙築高臺 為了滿足不同應用的需求,J
數據庫水平切分(拆庫拆表)的實現原理解析(轉)
數字 一個數據庫 java ins 結果 都對 不同 com 嚴重 第1章 引言 隨著互聯網應用的廣泛普及,海量數據的存儲和訪問成為了系統設計的瓶頸問題。對於一個大型的互聯網應用,每天幾十億的PV無疑對數據庫造成了相當 高的負載。對於系統的穩定性和擴展性造成了極大的問題。
遊戲外掛原理解析與制作 - [內存數值修改類 篇一]
tle lin 篩選 分享 自己的 src 但我 以及 先來 本章旨在講解外掛實現原理,未深入涉及至代碼層面。希望能與對這方面感興趣的朋友多多交流,畢竟理論是死的,套路是固定的,只有破解經驗是花大量時間和心血積累的。 對於單機遊戲而言,遊戲中絕大部分的參數(比如血、藍
圍棋人機大戰中阿爾法狗原理解析,左右互搏,青出於藍而勝於藍?
重新 公園 mas 人機大戰 規律 .com boa beyond 暫時 —阿爾法狗原理解析 這些天都在沒日沒夜地關註一個話題,谷歌人工智能程序AlphaGo(國內網友親切地稱為“阿 爾法狗”)擊敗歐洲職業圍棋冠軍樊麾二段,在圍棋遊戲中達到了人類職業棋手的水平。什麽!!19