spring+ActiveMQ+JMS+執行緒池實現簡單的分散式,多執行緒,多工的非同步任務處理系統
前言:隨著系統的業務功能不斷增強,傳統的單機、單任務,單執行緒的執行模式已經逐漸的被淘汰,取而代之的是分散式,多工,多執行緒,當然,現在開源的這方面的框架也非常的多,大概的思想也都類似,下面就結合我這一年多的工作心得,分享一個簡單易實現的分散式,多工,多執行緒的非同步任務處理系統的基本實現。
1.系統部署圖
該系統主要由3部分構成,任務生產者叢集,訊息中介軟體叢集,任務消費者叢集,下面來分別說下這3部分的作用:
任務生產者叢集:顧名思義,主要用來產生訊息任務,並將這些訊息任務傳送到訊息中介軟體叢集中,任務生產者叢集可能使用的是不同的開發語言,開發框架以及不同的開發平臺。
訊息中介軟體叢集:訊息中介軟體叢集主要的作用是使生產者和消費者解耦,遮蔽各個異構系統之間的區別,以及保證訊息的傳送,超時重試和訊息任務的負載均衡。並確保同一訊息只被一個消費者消費。為了實現系統的高可用性,此處使用了叢集模式,實現master-slave
任務消費者叢集:這個是我們的業務核心,通過消費訊息中介軟體傳送過來的訊息,從而實現我們的業務功能需求。為了保證任務被及時的處理,我們會用到spring的執行緒池,來實現任務的非同步排程。
2.系統設計
(1)為了保證系統的高可用性,訊息中介軟體叢集我們採用的是主備結構,配置檔案如下:
為了減少訊息中介軟體由於頻繁連線導致的效能消耗,會使用連線池,配置檔案如下:<bean id="targetConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory"> <property name="brokerURL" value="failover:(tcp://localhost:61616,tcp://localhost:61617)" /> </bean>
(2),為了及時的監聽訊息,我們使用到了JMS中的MessageListener,當然,為了有更好的擴充套件性和靈活性,我們可以使用SessionAwareMessageListener以及MessageListenerAdapter來實現訊息驅動POJO,示例程式碼如下:<!-- 通過往PooledConnectionFactory注入一個ActiveMQConnectionFactory可以用來將Connection,Session和MessageProducer池化 這樣可以大大減少我們的資源消耗, --> <bean id="pooledConnectionFactory" class="org.apache.activemq.pool.PooledConnectionFactory"> <property name="connectionFactory" ref="targetConnectionFactory" /> <property name="maxConnections" value="20" /> </bean>
public class ConsumerSessionAwareMessageListener implements
SessionAwareMessageListener<TextMessage> {
private Destination destination;
@Override
public void onMessage(TextMessage message, Session session)
throws JMSException {
try {
String receiveMessage = ((TextMessage) message).getText();
// 建立訊息生產者,用來發送回復訊息到回覆佇列裡面
MessageProducer producer = session.createProducer(destination);
producer.send(session.createTextMessage("消費者回復訊息!"));
System.out.println("消費者收到的訊息為:"+receiveMessage);
} catch (JMSException e) {
e.printStackTrace();
}
}
}
(3)非同步任務排程
對於接收到的訊息會採用非同步的任務排程結合線程池來處理,示例程式碼如下:
@SuppressWarnings({"unchecked", "rawtypes"})
public class ConsumerReceive implements MessageListener {
private CustomerServiceStrategyI strategy;
@Override
@Async("mqExecutor")// 非同步的任務處理
public void onMessage(Message message) {
System.out.println("當前處理任務的執行緒為:" + Thread.currentThread().getName());
if (message instanceof TextMessage) {
strategy = new TextMessageStrategy();
strategy.doService(message);
message.acknowledge();// 客戶端訊息確認機制
}
}
}
非同步任務執行緒池的配置如下:<task:annotation-driven/>
<task:executor id="mqExecutor" pool-size="5-10" queue-capacity="20000" keep-alive="2000" rejection-policy="CALLER_RUNS"/>
注意:@Async("mqExecutor")這個註解表示該方法會通過非同步的方式來執行,會直接跳過主程式
(4)訊息確認機制
為了保證訊息或者是請求被至少處理一次,可以引入訊息的確認機制,JMS總共為我們提供了3種確認機制,分別如下:
Auto_acknowledge:JMS客戶端會自動向伺服器傳送確認訊息,如果伺服器沒有接收到這個確認訊息,就會認為該訊息未被傳送,並可能會試圖重新發送。
Client_acknowledge:Auto_acknowledge模式中,確認總是隱式的在onMessage處理器返回之後發生,而Client_acknowledge則是由客戶端控制何時傳送確認,這樣的話,可以保證接收訊息的客戶端能夠實現對“保證訊息傳送”更細粒度的控制。當然,這種方式需要客戶端來顯示的傳送,例如呼叫message.acknowledge();方法
Dups_OK_acknowledge:如果在會話上指定這種模式的話,JMS提供者可以將一條訊息向統一目的地傳送兩次以上,這與前面兩種模式的“一次且僅僅一次”的語義就不同了,用於可以接收重複訊息的程式。
(5)訊息策略
由於訊息生產者可能生產的訊息各不一樣,例如TextMessage、MapMessage等,可以根據不同的訊息使用不同的策略,示例程式碼如下:
策略介面:為了更好的相容性,此處使用了泛型
package com.chhliu.myself.activemq.start.async;
public interface CustomerServiceStrategyI<P, V> {
P doService(V message);
}
具體的策略類:
package com.chhliu.myself.activemq.start.async;
import javax.jms.JMSException;
import javax.jms.TextMessage;
public class TextMessageStrategy implements CustomerServiceStrategyI<User, TextMessage> {
@Override
public User doService(TextMessage message) {
try {
String receiveMessage = message.getText();
System.out.println("消費者收到的訊息為:"+receiveMessage);
return null;
} catch (JMSException e) {
}
return null;
}
}
(6)負載均衡
由於訊息消費者是以叢集模式在執行,那麼具體到每一條訊息,該有哪臺機器來消費了,這個就涉及到訊息的負載均衡,在系統中,可以利用JMS提供者從訊息源頭上來實現,具體的負載均衡演算法會因JMS提供商的不同而不同,但大概主流的幾種演算法如下:雜湊雜湊演算法,輪詢排程演算法,first-available均衡演算法,使用的時候,需要查閱JMS提供商的文件來確定。
通過上面的這幾步,就基本上實現了一個簡單的分散式,多工,多執行緒的非同步任務處理系統,整體執行結果如下:
================生產者建立了一條訊息==============
================生產者建立了一條訊息==============
當前處理任務的執行緒為:mqExecutor-2
消費者收到的訊息為:hello acticeMQ:my name is chhliu!fcb25e99-1181-48bb-963f-8d20a98829ab
當前處理任務的執行緒為:mqExecutor-1
消費者收到的訊息為:hello acticeMQ:my name is chhliu!12578be1-56a4-4b41-a8d4-112031617525
================生產者建立了一條訊息==============
================生產者建立了一條訊息==============
當前處理任務的執行緒為:mqExecutor-5
消費者收到的訊息為:hello acticeMQ:my name is chhliu!24d8f8c2-b172-4f7e-81e8-d87b73b7825b
當前處理任務的執行緒為:mqExecutor-3
消費者收到的訊息為:hello acticeMQ:my name is chhliu!00dc0934-c521-42da-83fb-9f541f667ec9
其實上面的這個簡單的系統,也可以當成簡單的RPC來使用,如果要實現更細粒度的RPC的話,可以引進Netty等來實現,當然這些都是後話了。