1. 程式人生 > >訊息中介軟體activeMQ(3)

訊息中介軟體activeMQ(3)

訊息總是從生產者傳送到中介軟體再有中介軟體傳送給消費者。

傳送端特性分析:

producer預設是非同步傳送訊息。在沒有開啟事務的情況下,producer傳送持久化訊息是同步的,呼叫send會阻塞直到broker把訊息儲存到磁碟並返回確認。

訊息設定為持久:

MessageProducer producer = session.createProducer(destination);
producer.setDeliveryMode(DeliveryMode.PERSISTENT);

訊息設定為非持久:

MessageProducer producer = session.createProducer(destination);
producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);

官方提示非同步傳送有三種配置方式:

Configuring Async Send using a Connection URI
You can use the Connection Configuration URI to configure async sends as follows

cf = new ActiveMQConnectionFactory("tcp://locahost:61616?jms.useAsyncSend=true");
Configuring Async Send at the ConnectionFactory Level
You can enable this feature on the ActiveMQConnectionFactory object using the property.

((ActiveMQConnectionFactory)connectionFactory).setUseAsyncSend(true);
Configuring Async Send at the Connection Level
Configuring the dispatchAsync setting at this level overrides the settings at the connection factory level.

You can enable this feature on the ActiveMQConnection object using the property.

((ActiveMQConnection)connection).setUseAsyncSend(true);

非同步傳送丟失訊息的場景是:生產者設定UseAsyncSend=true,使用producer.send(msg)持續傳送訊息。由於訊息不阻塞,生產者會認為所有send的訊息均被成功傳送至MQ。如果服務端突然宕機,此時生產者端記憶體中尚未被髮送至MQ的訊息都會丟失。

alwaysSyncSend/useAsyncSend

    用來設定producer(或者session)傳送訊息的方式:同步/非同步。

    alwaysSyncSend表示訊息將總會被同步傳送(request<->response),即訊息通過底層transport連結傳送之後,將會阻塞直到收到broker端的ProducerAck或者sentTimeout。同步傳送,是一種擔保能力強但傳送效率較低的方式,如果訊息傳送失敗,client端可以立即獲得異常。如果alwaysSyncSend值為true,則producer將使用同步傳送所有訊息(同時忽略useAsyncSend選項

)

    useAsyncSend表示訊息可以被非同步傳送(oneway),非同步傳送並不是訊息在client快取起來,而是訊息也將立即被transport傳送給broker,只是不會阻塞等待broker的ACK。訊息傳送給broker之後,producer.send方法將會立即返回。非同步傳送,通常配合broker端“Flow Control”手段使用,它可以最大化produer端的傳送效率。我們通常在訊息量比較密集的情況下使用非同步傳送,它可以很大的提升Producer效能;不過這也帶來了額外的問題,就是需要消耗較多的Client端記憶體,同時也會導致broker端效能消耗增加;此外它不能有效的確保訊息的傳送成功。在useAsyncSend的情況下,客戶端需要容忍訊息丟失的可能。

在不考慮事務的情況下:

producer傳送持久化訊息是同步傳送,傳送是阻塞的,直到收到確認。同步傳送肯定是有流量控制的。

producer預設是非同步傳送,非同步傳送不會等待broker的確認, 所以就需要考慮流量控制了:

ActiveMQConnectionFactory.setProducerWindowSize(int producerWindowSize)

ProducerWindowSize的含義:producer每傳送一個訊息,統計一下發送的位元組數,當位元組數達到ProducerWindowSize值時,需要等待broker的確認,才能繼續傳送。

windowSize(非同步傳送)

    視窗尺寸,用來約束在非同步傳送時producer端允許積壓的(尚未ACK)的訊息的尺寸,且只對非同步傳送有意義。每次傳送訊息之後,都將會導致memoryUsage尺寸增加(+message.size),當broker返回producerAck時,memoryUsage尺寸減少(producerAck.size,此size表示先前傳送訊息的大小)。可以通過如下2種方式設定:

  • 在brokerUrl中設定: "tcp://localhost:61616?jms.producerWindowSize=1048576",這種設定將會對所有的producer生效。

  • 在destinationUri中設定: "test-queue?producer.windowSize=1048576",此引數只會對使用此Destination例項的producer失效,將會覆蓋brokerUrl中的producerWindowSize值。

     此值越大,意味著消耗Client端的記憶體就越大。

    至此我們已經清楚,任何傳送到broker端的訊息,broker總會在儲存成功後回傳ProducerAck資訊,並且在ACK中包含訊息的size。當producer傳送訊息時,會首先檢測memoryUsage中是否有足夠的空間(根據message.size判斷),如果空間足夠,則訊息正常傳送;否則將會阻塞,直到收到producerACK且memoryUsage空間釋放足夠多。(注意,對於持久化訊息,只要broker儲存訊息成功即立即傳送ProcuerAck)

sendTimeout

    訊息傳送最大阻塞(等到ACK)時間。這個引數和syncSend方式配合使用,在同步傳送方式中,send方法將會阻塞直到broker反饋ACK,不過這種阻塞的時長無法預期,我們需要使用sendTimeout來設定最大的阻塞時間,如果阻塞超時,將會丟擲RequestTimedOutIOException。此值預設為0,表示永不超時。

  • 在brokerUrl中設定

//在brokerUrl中指定,單位:毫秒
jms.sendTimeout=30000
  • connection.setSendTimeout(long) 

    此外需要注意,如果指定了sendTimeout值 >0,無論何時,所有的訊息都將同步傳送。(忽略alwaysSyncSend、useAsyncSend、訊息持久化方式等)。如果sendTimeout <0,則效果和=0一致。

    這是一個非常重要而且有效的調優引數之一;它在producer端訊息傳送比較密集時,或者producer傳送速率與consumer消費速度失衡嚴重時,對維護broker例項效能穩定具有重要意義。比如,當producer短時間內大批量傳送訊息,而consumer消費速度相對較低時,會對broker端的儲存造成巨大沖擊,在極端情況下,可能導致OOM或者broker宕機。所以,我們通常開啟此引數,且指定相對合理的值。

    僅僅在producer指定windowSize,可以在Client端避免這些問題,不過這只是一個方面;但是從全域性來看,還不夠全面,我們還需要在broker端使用“Flow Control”策略來避免上述情況的發生。不過對於"持久化"型別的訊息,當broker儲存成功之後,總是立即傳送ack,所以我們不需要擔心會帶來潛在的危險。因為“windowSize”、“Flow Control”通常是針對“非持久化”訊息。

    “Flow Control”通常稱為“流量控制”,它的核心理念就是“按需交付”。如果每個Producer指定的windowSize為64M,但是我們有10個這樣的producer,那麼意味著broker端在最壞的情況下,需要承載640M資料,不過很不幸的是broker上可供cache的空間只有512M,那麼此後broker可能因為記憶體不足而強制掛起Producer的物理連線(connection),從而導致在connection上無法繼續傳送任何訊息。這會引入另外一個問題,當connection上還有當前通道(Queue或者Topic)的consumer時,會帶來死鎖;這導致consumer的ACK無法傳送-->進而導致訊息繼續消費,以致於形成“訊息無法消費”“也無法傳送”的死鎖情況。所以通常情況下,我們不能讓Consumer和Producer共享一個物理Connection。

  “Flow Control”所能做的就是:當broker記憶體不足時,讓producer暫停一下,直到訊息消費之後騰出足夠的空間。可以讓訊息密集傳送時,broker能夠協調Producer傳送的“頻率”,同時確保自己的效能處於可控範圍

參考文獻:

相關推薦

訊息中介軟體activeMQ3

訊息總是從生產者傳送到中介軟體再有中介軟體傳送給消費者。 傳送端特性分析: producer預設是非同步傳送訊息。在沒有開啟事務的情況下,producer傳送持久化訊息是同步的,呼叫send會阻塞直到broker把訊息儲存到磁碟並返回確認。 訊息設定為持久: Mess

訊息中介軟體activeMQ1

開篇只是提出了訊息中間的定義,點對點、訂閱兩種模式下的訊息傳輸特點和AactiveMQ中訊息傳遞和接收的流程。 基本概念: MOM 就是面向訊息中介軟體(Message-oriented middleware),是用於以分散式應用或系統中的非同步、鬆耦合、可靠、可擴充套件和安全通訊的一類軟體

訊息中介軟體ActiveMQHelloWorld入門例項

1、JMS訊息傳送模式 在點對點或佇列模型: 一個生產者向一個特定的佇列釋出訊息,一個消費者從該佇列中讀取訊息。這裡,生產者知道消費者的佇列,並直接將訊息傳送到消費者的佇列。這種模式被概括為:只有一個消費者將獲得訊息。生產者不需要在接收者消費該訊息期間處於執行狀態,接收

訊息中介軟體activeMQ6

訊息總是從生產者傳送到中介軟體再有中介軟體傳送給消費者。 訊息從broker傳送到消費者之後,為了使訊息能夠被正確的消費,引入了ACK機制來進行訊息的確認。 概念一: optimizeACK     "可優化的ACK",這是ActiveMQ對於consumer在訊息

ActiveMQ訊息中介軟體學習

同步通訊:客戶端向伺服器端發出請求,並一直等待伺服器端的響應。直到獲取到伺服器端返回的響應資訊,客戶端才能繼續執行。 MQ全稱為Message Queue, 訊息佇列(MQ)是一種應用程式對應用程式的通訊方法。應用程式通過讀寫出入佇列的訊息(針對應用程式的資料)來通訊,而

訊息中介軟體MQ模式、應用場景、常用協議

1.訊息中介軟體模式分類 1.1 點對點 1.2釋出/訂閱 2訊息中介軟體應用場景 2.1非同步通訊 有些業務不想也不需要立即處理訊息。訊息佇列提供了非同步處理機制,允許使用者把一個訊息放入佇列,但並不立即處理它。想向佇列中放入多少訊息就放多少,然後在需要的

訊息中介軟體MQJMS常識及簡單案例

1JMS概念 JMS即Java訊息服務(Java Message Service)應用程式介面,是一個Java平臺中關於面向訊息中介軟體(MOM)的API,用於在兩個應用程式之間,或分散式系統中傳送訊息,進行非同步通訊。Java訊息服務是一個與具體平臺無關的AP

訊息中介軟體MQ概念及常識

1.概念和用途:Message Queue顧名思義就是訊息佇列。打個比方去快餐店點餐,每個人點餐可能只要10s,但如果三個人同時向服務員點餐,服務員就可能會亂了,三個顧客還可能會吵起來,這件事就沒法30s內解決,那麼很簡單,排隊點餐就好辦了。所以MQ最核心的功能

訊息中介軟體Rabbitmq-使用詳解

https://blog.csdn.net/Dante_003/article/details/79377908Rabbitmq 是基於amqp(高階訊息佇列協議)實現的佇列技術,在他之上可以完成多種型別的訊息轉發模型。 下面列舉一些常用的訊息轉發場景,在rabbitmq中是

訊息中介軟體——RocketMQ 環境搭建完整版

每章一點正能量:自我控制是最強者的本能。——蕭伯納 前言 最近在學習訊息中介軟體——RocketMQ,打算把這個學習過程記錄下來。此章主要介紹環境搭建。此次主要是單機搭建(條件有限),包括在Windows、Linux環境下的搭建,以及console監控平臺搭建,最後加一demo驗證一下。 環境準備

訊息中介軟體——RabbitMQWindows/Linux環境搭建完整版

前言 最近在學習訊息中介軟體——RabbitMQ,打算把這個學習過程記錄下來。此章主要介紹環境搭建。此次主要是單機搭建(條件有限),包括在Windows、Linux環境下的搭建,以及RabbitMQ的監控平臺搭建。 環境準備 在搭建RabbitMQ之前,請先確保如下環境已經搭建完畢 Java環境(我

訊息中介軟體——RabbitMQ各大主流訊息中介軟體綜合對比介紹!

前言 訊息佇列已經逐漸成為企業IT系統內部通訊的核心手段。它具有低耦合、可靠投遞、廣播、流量控制、最終一致性等一系列功能,成為非同步RPC的主要手段之一。當今市面上有很多主流的訊息中介軟體,如老牌的ActiveMQ、RabbitMQ,炙手可熱的Kafka,阿里巴巴自主開發RocketMQ等。今天主要來

訊息中介軟體——RabbitMQ理解RabbitMQ核心概念和AMQP協議!

前言 本章學習,我們可以瞭解到以下知識點: 網際網路大廠為什麼選擇RabbitMQ? RabbiMQ的高效能之道是如何做到的? 什麼是AMQP高階協議? AMQP核心概念是什麼? RabbitMQ整體架構模型是什麼樣子的? RabbitMQ訊息是如何流轉的? 1. 初識RabbitMQ Rabbi

訊息中介軟體——RabbitMQ命令列與管控臺的基本操作!

前言 在前面的文章中我們介紹過RabbitMQ的搭建:RabbitMQ的安裝過以及各大主流訊息中介軟體的對比:,本章就主要來介紹下我們之前安裝的管控臺是如何使用以及如何通過命令列進行操作。 1. 命令列操作 1.1 基礎服務的命令操作 rabbitmqctl stop_app:關閉應用 rabbitm

訊息中介軟體——RabbitMQ快速入門生產者與消費者,SpringBoot整合RabbitMQ!

前言 本章我們來一次快速入門RabbitMQ——生產者與消費者。需要構建一個生產端與消費端的模型。什麼意思呢?我們的生產者傳送一條訊息,投遞到RabbitMQ叢集也就是Broker。 我們的消費端進行監聽RabbitMQ,當發現佇列中有訊息後,就進行消費。 1. 環境準備 本次整合主要採用Spring

訊息中介軟體——RabbitMQ理解Exchange交換機核心概念!

前言 來了解RabbitMQ一個重要的概念:Exchange交換機 1. Exchange概念 Exchange:接收訊息,並根據路由鍵轉發訊息所繫結的佇列。 藍色框:客戶端傳送訊息至交換機,通過路由鍵路由至指定的佇列。 黃色框:交換機和佇列通過路由鍵有一個繫結的關係。 綠色框:消費端通過監聽

訊息中介軟體——RabbitMQ高階特性全在這裡!

前言 前面我們介紹了RabbitMQ的安裝、各大訊息中介軟體的對比、AMQP核心概念、管控臺的使用、快速入門RabbitMQ。本章將介紹RabbitMQ的高階特性。分兩篇(上/下)進行介紹。 訊息如何保障100%的投遞成功? 冪等性概念詳解 在海量訂單產生的業務高峰期,如何避免訊息的重複消費的問題?

訊息中介軟體——RabbitMQ高階特性全在這裡!

前言 上一篇訊息中介軟體——RabbitMQ(七)高階特性全在這裡!(上)中我們介紹了訊息如何保障100%的投遞成功?,冪等性概念詳解,在海量訂單產生的業務高峰期,如何避免訊息的重複消費的問題?,Confirm確認訊息、Return返回訊息。這篇我們來介紹下下面內容。 自定義消費者 訊息的限流(防止

訊息中介軟體kafka0.9以及0.10版本學習及實踐

目錄 一、介紹 二、特點 三、何時需要訊息佇列 四、元件 五、訊息傳送的流程 六、原理 七、實踐 八、版本比較

循序漸進ActiveMQ3----MessageConsumer的訊息選擇器及mysql訊息持久化

MessageConsumer的訊息選擇器MessageConsumer是一個由Session建立的物件,用來從Destination接收訊息。看一下Session建立MessageConsumer的構造方法有哪些: public MessageConsumer create