1. 程式人生 > >Spring ActiveMQ 整合(四): JMS 事務管理

Spring ActiveMQ 整合(四): JMS 事務管理

1.為什麼要用事務?

       訊息事務是在生產者producer到broker或broker到consumer過程中同一個session中發生的,保證幾條訊息在傳送過程中的原子性。        可以在connection的createSession方法中指定一個布林值開啟,如果訊息確認機制是事務確認,那麼在傳送message的過程中session就會開啟事務(實際上broker的),不用使用者顯示呼叫 beginTransaction,這時所有通過session傳送的訊息都被快取下來,使用者呼叫session.commit時會發送所有訊息,當傳送出現異常時使用者可以呼叫rollback進行回滾操作,只有在開啟事務狀態下有效。

為什麼commit之後,不會有持久的訊息重新傳送呢?

       原因在於commit操作會自動將為簽收確認的訊息進行簽收確認,如果是當前接收但未簽收確認的訊息,都會被確認處理。因而在commit之後不會有持久化的訊息出現。


2.activeMQ支援的事務:

ActiveMQ有支援兩種事務,

  • JMS transactions - the commit() / rollback() methods on a Session (which is like doing commit() / rollback() on a JDBC connection)
  • XA Transactions - where the 
    XASession
     acts as an XAResource by communicating with the Message Broker, rather like a JDBC Connection takes place in an XA transaction by communicating with the database.

在支援事務的session中,producer傳送message時在message中帶有transaction ID。broker收到message後判斷是否有transaction ID,如果有就把message儲存在transaction store中,等待commit或者rollback訊息。所以ActiveMq的事務是針對broker而不是producer的,不管session是否commit,broker都會收到message。

如果producer傳送模式選擇了persistent,那麼message過期後會進入死亡佇列。在message進入死亡佇列之前,ActiveMQ會刪除message中的transaction ID,這樣過期的message就不在事務中了,不會儲存在transaction store中,會直接進入死亡佇列。具體刪除transaction ID的地方是在:org.apache.activemq.util.BrokerSupport的doResend,將transaction ID儲存在了originalTransactionID中,刪除了transaction ID。

       在下面的介紹中我用的是JMS transactions.

JMS transactions事務的配置:

      ①建立JMS事務,並引入關聯連結事務。

         ②.設定一個jmsTamplat,並關聯監聽容器。

<!-- jms事務 -->
	<bean id="jmsTransactionManager"
		class="org.springframework.jms.connection.JmsTransactionManager">
		<property name="connectionFactory" ref="connectionFactory" />
	</bean>
	<tx:annotation-driven transaction-manager="jmsTransactionManager" />


<!-- 訊息監聽容器 訊息接收監聽器用於非同步接收訊息 -->
	<bean id="jmsContainerOne" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
		<property name="connectionFactory" ref="connectionFactory" />
		<property name="destination" ref="destinationOne" />
		<property name="messageListener" ref="consumerMessageListenerOfOne" />
		<!-- <property name="sessionTransacted" value="true"/> -->  <!-- 給listener新增事務,只負責接收訊息的回滾 (有了transactionManager就不用這個了,這個功能不全) 設定後好像並沒有起作用 不知道為啥 -->
		<!-- <property name="transactionManager" ref="jtaTransactionManager"/> --> <!-- 接收訊息和資料庫訪問處於同一事務中 jta -->
		<property name="transactionManager" ref="jmsTransactionManager" /> <!--jms事務 -->
		<property name="sessionAcknowledgeMode" value="4"></property>   <!-- 應答模式是 INDIVIDUAL_ACKNOWLEDGE http://blog.csdn.net/yueding_h/article/details/54944254 -->
		<!-- ActiveMQ:設定多個並行的消費者 -->
		<property name="concurrency" value="2-3" />
	</bean>

上面配置檔案配置完成後,在接收著那邊接受訊息失敗後,進行事務回滾。
session.rollback();
   具體實現:
 public void onMessage(Message message, Session session) {   
		  TextMessage textMsg = (TextMessage) message;
	    	try {
	    			System.out.println(1);
	    			String endStr = textMsg.getText();
		            Integer endInt = Integer.parseInt(endStr);
		            System.out.println("訊息:==="+endInt);
	    		    //只要被確認後   就會出隊,接受失敗沒有確認成功,會在原佇列裡面
	                    textMsg.acknowledge();

	        } catch (Exception e) {  
	        	try {
					session.rollback();
					System.out.println("測試回滾");
                                      e.printStackTrace();
	                             System.out.println("異常資訊是:===:" + e.getMessage());
	                  }
	    }

  這就介紹完了。   另外,activeMQ還有一種JtaTransactionManager 事務控制。   事務控制: (這裡讓接收訊息和資料庫訪問處於同一事務中)我們就可以配置一個外部的事務管理同時配置一個支援外部事務管理的訊息監聽容器(如DefaultMessageListenerContainer) 。要配置這樣一個參與分散式事務管理的訊息監聽容器,我們可以配置一個JtaTransactionManager,當然底層的JMS ConnectionFactory需要能夠支援分散式事務管理, 並正確地註冊我們的JtaTransactionManager。這樣訊息監聽器進行訊息接收和對應的資料庫訪問就會處於同一資料庫控制下,當訊息接收失敗或資料庫訪問失敗都會進行事務回滾操作。     這種事務控制,我在配置的時候,失敗了,所以選用了JMS事務控制。
   待我詳細瞭解JtaTransactionManager 後,再說吧。