1. 程式人生 > >JMS訊息確認和事務

JMS訊息確認和事務


保證訊息傳送

保證訊息傳送有3個主要部分:訊息自主性,儲存並轉發以及底層訊息確認,下面具體看一下這些概念;

1.訊息自主性

訊息是自包含的自主性實體,在設計分散式訊息應用程式時,要將此作為頭條法則;當JMS客戶端傳送一條訊息時,它就完成了它的所有工作,一旦該資料被傳送出去,它就被認為是”安全的”,

而且不在受該客戶端的控制,類似JDBC客戶端和資料庫直接的約定;

2.儲存並轉發訊息傳送

可以將訊息標記為持久化的,這樣JMS伺服器負責儲存訊息,以確保在提供者發生故障或消費客戶端發生故障的情況下,訊息可以恢復正常;訊息可以集中儲存或本地儲存;

3.訊息確認

JMS規定了多種確認模式,這些確認是保證訊息傳送的關鍵部分;伺服器確認從JMS生產者接受訊息,而JMS消費者確認從伺服器接受訊息;確認協議允許JMS提供者監測一條訊息的整個過程,

以便了解是否成功的生產和消費了該訊息;

訊息確認

訊息確認協議是保證訊息傳送的關鍵所在,JMS主要定義了三種確認模式:auto_acknowledge,dups_ok_acknowledge以及client_acknowledge;

1.auto_acknowledge

auto_acknowledge是JMS提供的自動確認模式,下面分別從生產者和消費者角度來分析,以下使用ActiveMQ來作為訊息伺服器;

1.1生產者和訊息伺服器

生產者呼叫send()或者publish()方法傳送訊息,同時進行阻塞,直到從訊息伺服器接收到一個確認為止;底層確認對客戶端程式設計模型來說是不可見的,如果在操作期間發生故障,就會丟擲一個異常,同時認為該訊息沒有被傳送;訊息伺服器接收到訊息,如果是永續性訊息就會持久化到磁碟,如果是非永續性訊息就會存入記憶體,然後再通知生產者已經接收到訊息;

上圖中可能出現的異常

1.1.1.傳送訊息失敗

可能由於網路原因導致傳送訊息失敗,伺服器沒有感知,需要生產者做好異常檢測或者重發機制;

1.1.2.持久化失敗

生產者成功傳送訊息給伺服器,伺服器在持久化時失敗,伺服器會在通知的時候,把錯誤資訊返回給生產者,需要生產者做好異常檢測;

1.1.3.伺服器通知生產者失敗

成功接收訊息和持久化,在通知生產者時,出現網路異常導致失敗,伺服器會將此訊息刪除,生產者會從阻塞中返回並丟擲異常;

1.2訊息伺服器和消費者

消費者獲取到訊息之後,需要向伺服器傳送確認資訊,如果伺服器沒有接收到確認資訊,會認為該訊息未被傳送,會試圖重新傳送;如果接收到確認訊息,此訊息將會從持久化儲存器中刪除;

上圖中可能出現的異常

1.2.1.接收訊息失敗

對於Queue模型來說,是主動拉取訊息,在沒有成功拉取資料的情況下,伺服器自然不會刪除資料;對於Topic模型來說,訊息伺服器會推送給每個消費者一個訊息的副本,如果是持久訂閱者,一直到訊息伺服器接收到所有訊息預定接收者的確認時,才會認為完成傳送;如果是非持久訂閱,就不會關心某一個接收者是否接收到訊息;

1.2.2.消費者通知伺服器失敗

消費者成功接收到訊息,但是在處理完之後,通知伺服器失敗,導致伺服器沒有被刪除,訊息會被重發,消費者要做好冪等性處理;

1.2.3.刪除持久化失敗

消費者成功接收到訊息,伺服器成功接收通知資訊,在刪除持久化資料時失敗,導致資料沒有被刪除,訊息會再次被消費,消費者要做好冪等性處理;

1.3例項分析

1.3.1.準備ActiveMq作為伺服器

使用apache-activemq-5.15.4作為伺服器,使用mysql作為持久化儲存器,activemq.xml做如下配置:

1.3.2.準備訊息傳送器

使用如下例項做訊息傳送器,本例項使用Queue模型進行分析

1.3.3.準備訊息接收器

這裡使用的是訊息監聽器的方式,有訊息自動呼叫onMessage方法,當然也可以直接迴圈使用qReceiver.receive()方法;其實監聽器方式本質上也是有一個consumer thread去不停的讀取訊息,具體可以檢視類TcpTransport;

1.3.4.QSender測試分析

執行QSender傳送一條訊息,QSender阻塞等待伺服器返回通知資訊,接收到成功通知,Qsender停止阻塞,執行其他的邏輯,結果如下:

檢視mysql資料庫

在傳送器中send()方法會丟擲一個JMSException異常,此異常是伺服器返回異常的包裝類,可以檢視ActiveMQConnection部分原始碼:

在傳送訊息的時候,可以指定一個超時時間,在指定時間內沒有接收到伺服器的通知訊息,直接認為獲取通知資訊失敗,丟擲超時異常;正常情況下,生產者會接收到Response,此類中有方法isException()方法,判定是否有異常,如果有異常會將異常包裝成JMSException,拋給生產者;

1.3.5.QReceiverListener測試與分析

執行QReceiverListener,接收器會啟動一個consumer thread專門去讀取訊息,讀取到訊息之後經過一系列處理之後,會呼叫onMessage()方法,此方法中需要讀取訊息,並進行業務邏輯處理,處理完之後會自動給伺服器傳送確認訊息;確認訊息非常重要,用來決定伺服器是否會刪除訊息,不刪除的話,訊息會被重複消費,結果如下:

一次成功接收訊息,重發標識為false;

檢視mysql資料庫

具體可以看一下ActiveMQMessageConsumer中的部分程式碼:

執行之後日誌如下:

接收到如上6條訊息之後,不再重複,訊息被刪除;

2.dups_ok_acknowledge

指示JMS生產者可以將一條訊息向同一目的地傳送兩次以上;dups_ok_acknowledge模式基於以下假設:用於確保”一次而且僅僅一次”傳送而必須的處理,會在提供者級別上導致額外開銷,還會影響系統的效能和訊息吞吐量,允許接受重複訊息的應用程式,可以使用dups_ok_acknowledge模式來避免這種開銷;

在ActiveMQ中表示並不是沒接收一條訊息就確認,而是可以接收一個批次後才確認,具體可以檢視afterMessageIsConsumed()方法中的部分程式碼:

大致分成了三種確認方式:沒接受一條訊息確認一次,等接收一個批次再確認以及手動指定確認;

3.client_acknowledge

此模式可以控制何時傳送確認訊息,具體使用message.acknowledge()方法,當然只有在client_acknowledge模式下才有效,其他2個模式直接忽略;


簡單模擬一下,在接收到訊息之後直接確認,後續處理業務發生錯誤,這種情況下訊息不會被重發;

事務性訊息

一個事務性發送,其中一組訊息要麼能夠全部保證到達伺服器,要麼都不到達伺服器,生產者、消費者與訊息伺服器直接都支援事務性;

1.事務性發送

從生產者角度的來看,JMS提供者為這組訊息提供了快取記憶體,直到執行commit()命令,如果發生了故障或者執行rollback(),這些訊息會丟失;

2.事務性接收

從接收者的角度來看,這些訊息會盡快的傳送給接收者,但是他們一直由JMS提供者儲存,知道接收者在會話物件上執行commit()為止;如果發生故障或者執行rollback(),提供者會重新發送這些訊息,這些訊息會被標誌為重新傳送;

3.事務性發送和接收

如果事務性生產者和事務性消費者由同一會話建立,那麼他們就能夠組合在單個事務中;這樣一來,JMS客戶端就可以作為單獨的工作單元生產和消費訊息;

4.例項分析

QSender做如下改動:

指定QueueSession為事務性會話,傳送完之後執行commit(),失敗執行rollback();

QReceiver做如下改動:

在接收完end結束標誌之後,執行commit()方法,高速伺服器接收完成;當然這裡使用非事務性消費者也是可以接收訊息的,事務的範圍僅限於生產者或消費者與訊息伺服器的會話;可以發現JMS的事務和JDBC提供的事務很像,本質上提供的是本地事務;不過如果要跨越多個會話、佇列、主題和資料庫之間協調單個事務,那僅僅本地事務是不夠的,這時候需要分散式事務;

5.分散式事務

允許多個資源參與到一個事務中,這些資源可以是資料庫,JMS等等;JMS規範提供了下列JMS物件的XA版本:XAConnection、XAConnectionFactory、XAQueueConnection、XAQueueConnectionFactory、XAQueueSession、XASession、XATopicConnection、XATopicConnectionFactory、XATopicSession;具體的訊息伺服器去實現這些介面,讓JMS也可以參與到全域性事務中。

總結

本文介紹了一下JMS的訊息確認模式和本地事務,並以ActiveMQ作為伺服器來做測試和分析,大體上了解了JMS的確認機制;重點介紹了一下本地事務,至於分散式事務一筆帶過,其實在處理分散式事務的問題,MQ應用廣泛實現最終一致性,這個可以深入分析一下。