1. 程式人生 > >spring+ActiveMQ+JMS+執行緒池實現簡單的分散式,多執行緒,多工的非同步任務處理系統

spring+ActiveMQ+JMS+執行緒池實現簡單的分散式,多執行緒,多工的非同步任務處理系統

前言:隨著系統的業務功能不斷增強,傳統的單機、單任務,單執行緒的執行模式已經逐漸的被淘汰,取而代之的是分散式,多工,多執行緒,當然,現在開源的這方面的框架也非常的多,大概的思想也都類似,下面就結合我這一年多的工作心得,分享一個簡單易實現的分散式,多工,多執行緒的非同步任務處理系統的基本實現。

1.系統部署圖

該系統主要由3部分構成,任務生產者叢集,訊息中介軟體叢集,任務消費者叢集,下面來分別說下這3部分的作用:

任務生產者叢集:顧名思義,主要用來產生訊息任務,並將這些訊息任務傳送到訊息中介軟體叢集中,任務生產者叢集可能使用的是不同的開發語言,開發框架以及不同的開發平臺。

訊息中介軟體叢集:訊息中介軟體叢集主要的作用是使生產者和消費者解耦,遮蔽各個異構系統之間的區別,以及保證訊息的傳送,超時重試和訊息任務的負載均衡。並確保同一訊息只被一個消費者消費。為了實現系統的高可用性,此處使用了叢集模式,實現master-slave

模式。

任務消費者叢集:這個是我們的業務核心,通過消費訊息中介軟體傳送過來的訊息,從而實現我們的業務功能需求。為了保證任務被及時的處理,我們會用到spring的執行緒池,來實現任務的非同步排程。

2.系統設計

(1)為了保證系統的高可用性,訊息中介軟體叢集我們採用的是主備結構,配置檔案如下:

<bean id="targetConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
		<property name="brokerURL" value="failover:(tcp://localhost:61616,tcp://localhost:61617)" />
	</bean>
為了減少訊息中介軟體由於頻繁連線導致的效能消耗,會使用連線池,配置檔案如下:
<!-- 通過往PooledConnectionFactory注入一個ActiveMQConnectionFactory可以用來將Connection,Session和MessageProducer池化 
		這樣可以大大減少我們的資源消耗, -->
	<bean id="pooledConnectionFactory" class="org.apache.activemq.pool.PooledConnectionFactory">
		<property name="connectionFactory" ref="targetConnectionFactory" />
		<property name="maxConnections" value="20" />
	</bean>
(2),為了及時的監聽訊息,我們使用到了JMS中的MessageListener,當然,為了有更好的擴充套件性和靈活性,我們可以使用SessionAwareMessageListener以及MessageListenerAdapter來實現訊息驅動POJO,示例程式碼如下:
public class ConsumerSessionAwareMessageListener implements
		SessionAwareMessageListener<TextMessage> {
	
	private Destination destination;

	@Override
	public void onMessage(TextMessage message, Session session)
			throws JMSException {
		try {
			String receiveMessage = ((TextMessage) message).getText();
           // 建立訊息生產者,用來發送回復訊息到回覆佇列裡面
			MessageProducer producer = session.createProducer(destination);
			producer.send(session.createTextMessage("消費者回復訊息!"));
			System.out.println("消費者收到的訊息為:"+receiveMessage);
		} catch (JMSException e) {
			e.printStackTrace();
		}
	}
}
(3)非同步任務排程

對於接收到的訊息會採用非同步的任務排程結合線程池來處理,示例程式碼如下:

@SuppressWarnings({"unchecked", "rawtypes"})
public class ConsumerReceive implements MessageListener {

	private CustomerServiceStrategyI strategy;

	@Override
	@Async("mqExecutor")// 非同步的任務處理
	public void onMessage(Message message) {
		System.out.println("當前處理任務的執行緒為:" + Thread.currentThread().getName());
		if (message instanceof TextMessage) {
			strategy = new TextMessageStrategy();
			strategy.doService(message);
           message.acknowledge();// 客戶端訊息確認機制
		}
	}
}
非同步任務執行緒池的配置如下:
<task:annotation-driven/>
	<task:executor id="mqExecutor" pool-size="5-10" queue-capacity="20000" keep-alive="2000" rejection-policy="CALLER_RUNS"/>
注意:@Async("mqExecutor")這個註解表示該方法會通過非同步的方式來執行,會直接跳過主程式

(4)訊息確認機制

為了保證訊息或者是請求被至少處理一次,可以引入訊息的確認機制,JMS總共為我們提供了3種確認機制,分別如下:

Auto_acknowledge:JMS客戶端會自動向伺服器傳送確認訊息,如果伺服器沒有接收到這個確認訊息,就會認為該訊息未被傳送,並可能會試圖重新發送。

Client_acknowledge:Auto_acknowledge模式中,確認總是隱式的在onMessage處理器返回之後發生,而Client_acknowledge則是由客戶端控制何時傳送確認,這樣的話,可以保證接收訊息的客戶端能夠實現對“保證訊息傳送”更細粒度的控制。當然,這種方式需要客戶端來顯示的傳送,例如呼叫message.acknowledge();方法

Dups_OK_acknowledge:如果在會話上指定這種模式的話,JMS提供者可以將一條訊息向統一目的地傳送兩次以上,這與前面兩種模式的“一次且僅僅一次”的語義就不同了,用於可以接收重複訊息的程式。

(5)訊息策略

由於訊息生產者可能生產的訊息各不一樣,例如TextMessageMapMessage等,可以根據不同的訊息使用不同的策略,示例程式碼如下:

策略介面:為了更好的相容性,此處使用了泛型

package com.chhliu.myself.activemq.start.async;

public interface CustomerServiceStrategyI<P, V> {
	P doService(V message);
}

具體的策略類:
package com.chhliu.myself.activemq.start.async;

import javax.jms.JMSException;
import javax.jms.TextMessage;

public class TextMessageStrategy implements CustomerServiceStrategyI<User, TextMessage> {

	@Override
	public User doService(TextMessage message) {
		try {
			String receiveMessage = message.getText();
			System.out.println("消費者收到的訊息為:"+receiveMessage);
			return null;
		} catch (JMSException e) {
		}
		return null;
	}
}
(6)負載均衡

由於訊息消費者是以叢集模式在執行,那麼具體到每一條訊息,該有哪臺機器來消費了,這個就涉及到訊息的負載均衡,在系統中,可以利用JMS提供者從訊息源頭上來實現,具體的負載均衡演算法會因JMS提供商的不同而不同,但大概主流的幾種演算法如下:雜湊雜湊演算法,輪詢排程演算法,first-available均衡演算法,使用的時候,需要查閱JMS提供商的文件來確定。

通過上面的這幾步,就基本上實現了一個簡單的分散式,多工,多執行緒的非同步任務處理系統,整體執行結果如下:

================生產者建立了一條訊息==============
================生產者建立了一條訊息==============
當前處理任務的執行緒為:mqExecutor-2
消費者收到的訊息為:hello acticeMQ:my name is chhliu!fcb25e99-1181-48bb-963f-8d20a98829ab
當前處理任務的執行緒為:mqExecutor-1
消費者收到的訊息為:hello acticeMQ:my name is chhliu!12578be1-56a4-4b41-a8d4-112031617525
================生產者建立了一條訊息==============
================生產者建立了一條訊息==============
當前處理任務的執行緒為:mqExecutor-5
消費者收到的訊息為:hello acticeMQ:my name is chhliu!24d8f8c2-b172-4f7e-81e8-d87b73b7825b
當前處理任務的執行緒為:mqExecutor-3
消費者收到的訊息為:hello acticeMQ:my name is chhliu!00dc0934-c521-42da-83fb-9f541f667ec9
其實上面的這個簡單的系統,也可以當成簡單的RPC來使用,如果要實現更細粒度的RPC的話,可以引進Netty等來實現,當然這些都是後話了。