Apache ActiveMQ實戰(1)-基本安裝配置與消息類型

分類:IT技術 時間:2016-10-09

ActiveMQ簡介




ActiveMQ是一種開源的,實現了JMS1.1規範的,面向消息(MOM)的中間件,為應用程序提供高效的、可擴展的、穩定的和安全的企業級消息通信。ActiveMQ使用Apache提供的授權,任何人都可以對其實現代碼進行修改。


ActiveMQ的設計目標是提供標準的,面向消息的,能夠跨越多語言和多系統的應用集成消息通信中間件。ActiveMQ實現了JMS標準並提供了很多附加的特性。這些附加的特性包括,JMX管理(Java Management Extensions,即java管理擴展),主從管理(master/salve,這是集群模式的一種,主要體現在可靠性方面,當主中介(代理)出現故障,那麽從代理會替代主代理的位置,不至於使消息系統癱瘓)、消息組通信(同一組的消息,僅會提交給一個客戶進行處理)、有序消息管理(確保消息能夠按照發送的次序被接受者接收)。


ActiveMQ 支持JMS規範,ActiveMQ完全實現了JMS1.1規範。


JMS規範提供了同步消息和異步消息投遞方式、有且僅有一次投遞語義(指消息的接收者對一條消息必須接收到一次,並且僅有一次)、訂閱消息持久接收等。如果僅使用JMS規範,表明無論您使用的是哪家廠商的消息代理,都不會影響到您的程序。


ActiveMQ整體架構


ActiveMQ主要涉及到5個方面:


  • 傳輸協議
消息之間的傳遞,無疑需要協議進行溝通,啟動一個ActiveMQ打開了一個監聽端口, ActiveMQ提供了廣泛的連接模式,其中主要包括SSL、STOMP、XMPP;ActiveMQ默認的使用 的協議是openWire,端口號:61616;

  • 消息域
ActiveMQ主要包含Point-to-Point (點對點),Publish/Subscribe Model (發布/訂閱者),其中在 Publich/Subscribe 模式下又有Nondurable subscription和       durable subscription (持久 化訂閱)2種消息處理方式

  • 消息存儲
在消息傳遞過程中,部分重要的消息可能需要存儲到數據庫或文件系統中,當中介崩潰時,信息不 回丟失

  • Cluster  (集群)
最常見到 集群方式包括Network of brokers和Master Slave;

  • Monitor (監控)
ActiveMQ一般由jmx來進行監控;




ActiveMQ的安裝配置


  1. 通過http://activemq.apache.org/download.html 下載:apache-activemq-5.13.3-bin.tar.gz 
  2. 把下載的該文件通過tar –zxvf apache-activemq-5.13.3-bin.tar.gz解壓在當前目錄
  3. 通過修改$ACTIVEMQ_HOME/conf/activemq.xml文件可以修改其配置
一般修改的其實也只有以下幾個段:、


 <destinationPolicy>
            <policyMap>
              <policyEntries>

我們在此段増加配置如下:


<destinationPolicy>
            <policyMap>
              <policyEntries>
                <policyEntry topic=">" >
                <pendingmessageLimitStrategy>
                    <constantPendingMessageLimitStrategy limit="50000"/>
                  </pendingMessageLimitStrategy>
               </policyEntry>
               <policyEntry queue=">" producerFlowControl="false" optimizedDispatch="true" memoryLimit=“2mb">
               </policyEntry>
              </policyEntries>
            </policyMap>
</destinationPolicy>

此處,我們使用的是”>”通配符,上述配置為每個隊列、每個Topic配置了一個最大2mb的隊列,並且使用了”optimizedDispatch=true”這個策略,該策略會啟用優化了的消息分發器,直接減少消息來回時的上下文以加快消息分發速度。


找到下面這一段


 <persistenceAdapter>
            <kahaDB directory="${activemq.data}/kahadb"/>
 </persistenceAdapter>

為確保擴展配置既可以處理大量連接也可以處理海量消息隊列,我們可以使用JDBC或更新更快的KahaDB消息存儲。默認情況下ActiveMQ使用KahaDB消息存儲。


ActiveMQ支持三種持久化策略:AMQ、KAHADB、JDBC


  • AMQ 它是一種文件存儲形式,具有寫入快、容易恢復的特點,采用這種方式持久化消息會被存儲在一個個文件中,每個文件默認大小為32MB,如果一條消息超過32MB,那麽這個值就需要設大。當一個文件中所有的消息被“消費”掉了,那麽這文件會被置成“刪除”標誌,並且在下一個清除開始時被刪除掉。


  • KAHADB,相比AMQ來説,KAHADB速度沒有AMQ快,可是KAHADB具有極強的垂直和橫向擴展能力,恢復時間比AMQ還要短,因此從5.4版後ActiveMQ默認使用KAHADB作為其持久化存儲。而且在作MQ的集群時使用KAHADB可以做到Cluster+Master Slave的這樣的完美高可用集群方案。


  • JDBC,即ActiveMQ默認可以支持把數據持久化到DB中,如:mysql、Oracle等。

找到下面這一段

<systemUsage>
            <systemUsage>
                <memoryUsage>
                    <memoryUsage percentOfJvmHeap="90" />
                </memoryUsage>
                <storeUsage>
                    <storeUsage limit="100 gb"/>
                </storeUsage>
                <tempUsage>
                    <tempUsage limit="50 gb"/>
                </tempUsage>
            </systemUsage>
</systemUsage>

此處為ActiveMQ的內存配置,從5.10版後ActiveMQ在<memoryUsage>中引入了一個percentOfJvmHeap的配置,該百分比為:


$ACTIVEMQ_HOME/bin/env中配置的JVM堆大小的百分比,如$ACTIVEMQ_HOME/bin/env 中:

# Set jvm memory configuration (minimal/maximum amount of memory)
ACTIVEMQ_OPTS_MEMORY="-Xms2048M -Xmx2048M"

那麽此處的percentOfJvmHeap=90即表示:MQ消息隊列一共會用到2048M*0.9的內存。

全部配完後我們可以通過以下命令啟動ActiveMQ

$cd $ACTIVEMQ_HOME
$ ./activemq console  

這種方式為前臺啟動activemq,用於開發模式便於調試時的信息輸出。


你也可以使用:

以後臺進程的方式啟動activemq。


啟動後在瀏覽器內輸入http://192.168.0.101:8161/admin/ 輸入管理員帳號(默認為admin/admin)即可登錄activemq的console界面



  • 啟動後的ActiveMQ的數據位於:$ACTIVEMQ_HOME/data/目錄內


  • 啟動後的ActiveMQ運行日誌位於:$ACTIVEMQ_HOME/data/目錄內的activemq.log文件


  • 如果需要改ActiveMQ的日誌配置可以通過修改$ACTIVEMQ_HOME/conf/log4j.properties



ActiveMQ與Spring集成



在Spring中建立一個activemq.xml文件,使其內容如下:

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
	xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:amq="http://activemq.apache.org/schema/core"
	xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core-5.8.0.xsd">
	<!-- 真正可以產生Connection的ConnectionFactory,由對應的 JMS服務廠商提供 -->
	<bean id="targetConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">

		<property name="brokerURL" value=http://blog.csdn.net/lifetragedy/article/details/"tcp://192.168.0.101:61616" />
		
		
		
	
	
	
		
		
	



其中:


<property name="alwaysSessionAsync" value=http://blog.csdn.net/lifetragedy/article/details/“true" />


對於一個connection如果只有一個session,該值有效,否則該值無效,默認這個參數的值為true。


<property name="useAsyncSend" value=http://blog.csdn.net/lifetragedy/article/details/"true" />


將該值開啟官方說法是可以取得更高的發送速度(5倍)。


<bean id="destination" class="org.apache.activemq.command.ActiveMQQueue">
<!-- 設置消息隊列的名字 -->
<constructor-arg value=http://blog.csdn.net/lifetragedy/article/details/"ymk.queue?consumer.prefetchSize=100" />
</bean>


在此我們申明了一個隊列,並用它用於後面的實驗代碼。


consumer.prefetchSize則代表我們在此使用“消費者”預分配協議,在消費者內在足夠時可以使這個值更大以獲得更好的吞吐性能。


工程中的pom.xml文件主要內容如下:

。。。。。。
<properties>
	<activemq_version>5.13.3</activemq_version>
</properties>
。。。。。。
<dependency>
	<groupId>org.apache.activemq</groupId>
	<artifactId>activemq-all</artifactId>
	<version>${activemq_version}</version>
</dependency>

<dependency>
	<groupId>org.apache.activemq</groupId>
	<artifactId>activemq-pool</artifactId>
	<version>${activemq_version}</version>
</dependency>


ActiveMQ與Spring集成-發送端代碼


package webpoc;

public class AMQSender {
	public static void sendWithAuto(ApplicationContext context) {
		ActiveMQConnectionFactory factory = null;
		Connection conn = null;
		Destination destination = null;
		Session session = null;
		MessageProducer producer = null;
		try {
			destination = (Destination) context.getBean("destination");
			factory = (ActiveMQConnectionFactory) context.getBean("targetConnectionFactory");
			conn = factory.createConnection();
			session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
			producer = session.createProducer(destination);
			Message message = session.createTextMessage("...Hello JMS!");
			producer.send(message);
		} catch (Exception e) {
			e.printStackTrace();
		} finally {
			try {
				producer.close();
				producer = null;
			} catch (Exception e) {
			}
			try {
				session.close();
				session = null;
			}
			} catch (Exception e) {
			}
			try {
				conn.stop();
			} catch (Exception e) {
			}
			try {
				conn.close();
			} catch (Exception e) {
			}
		}
	}
	public static void main(String[] args) {
		final ApplicationContext context = new ClassPathXmlApplicationContext("classpath:/spring/activemq.xml");
		sendWithAuto(context);
	}

}


ActiveMQ與Spring集成-接收端代碼


package webpoc;

public class TranQConsumer extends Thread implements MessageListener {

	private Connection conn = null;
	private Destination destination = null;
	private Session session = null;
	public void run() {
		receive();
	}
	public void receive() {
		ConnectionFactory factory = null;
		Connection conn = null;
		try {
			final ApplicationContext context = new ClassPathXmlApplicationContext("classpath:/spring/activemq.xml");
			factory = (ActiveMQConnectionFactory) context.getBean("targetConnectionFactory");
			conn = factory.createConnection();
			conn.start();
			session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
			destination = (Destination) context.getBean("destination");
			MessageConsumer consumer = session.createConsumer(destination);
			consumer.setMessageListener(this);
		} catch (Exception e) {
			e.printStackTrace();
		}
	}
	public void onMessage(Message message) {

		try {
			TextMessage tm = (TextMessage) message;
			System.out.println("TranQConsumer receive message: " + tm.getText());
		} catch (Exception e) {
			e.printStackTrace();
		}

	}
	public static void main(String[] args) {
		TranQConsumer tranConsumer = new TranQConsumer();
		tranConsumer.start();
	}
}


ActiveMQ與Spring集成-示例講解


上述例子非常的簡單。


它其實是啟動了一個Message Listener用來監聽ymk.queue中的消息,如果有消息到達,接收端代碼就會把消息“消費”掉。


而發送端代碼也很簡單,它每次向ymk.queue隊列發送一個文本消息。


這邊所謂的MQ消費大家可以這樣理解:


用戶sender向MQ的KAHADB中插入一條數據。


用戶receiver把這條數據select後,再delete,這個select一下後再delete就是一個“消費”動作。


簡單消息與事務型消息


我們可以註意到上述的例子中我們的代碼中有這樣的一段: 

session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);

它代表的是我們的MQ消費端消費模式為“自動”,即一旦消費端從MQ中取到一條消息,這條消息會自動從隊列中刪除。


ActiveMQ是一個分布式消息隊列,它自然支持“事務”型消息,我們可以舉一個例子


系統A和系統B是有一個事務的系統間“服務集成”,我們可以把它想成如下場景:


系統A先會do sth…然後發送消息給系統B,系統B拿到消息後do sth,如果在其中任意一個環節發生了Exception,那麽代表系統A與系統B之間的消息調用這一過程為“失敗”。


失敗要重發,重發的話那原來那條消息必須還能重新拿得到。


此時我們就需要使用事務性的消息了。而事務性的消息是在:


生產端和消費端在創建session時,需要:


session = conn.createSession(true, Session.AUTO_ACKNOWLEDGE);


下面來看一個實際例子。


事務型消息發送端(生產端)


此處其它代碼與普通式消息發送代碼相似,只在以下幾處有不同,首先在取得session時會聲明事務開啟“true”。

session = conn.createSession(true, Session.AUTO_ACKNOWLEDGE);

然後在發送時會有一個動作:

producer.send(message);
System.out.println("send......" + Thread.currentThread().getId());
session.commit();

相應的在catch(Exception)時需要

catch (Exception e) {
						e.printStackTrace();
	try {
		session.rollback();
	} catch (Exception ex) {
	}
} 


事務型消息接收端(消費端)


在我們的接收端的createSession時也需要把它設為“事務開啟”,此時請註意,生產和消費是在一個事務邊界中的。

session = conn.createSession(true, Session.AUTO_ACKNOWLEDGE);

然後在接收時會有一個動作:

try {
	TextMessage tm = (TextMessage) message;
	System.out.println("TranQConsumer receive message: " + tm.getText());
	session.commit();

} catch (Exception e) {
	e.printStackTrace();
	try {
		session.rollback();
	} catch (Exception ex) {
	}
}

註意:


  1. 如果在消費端的onMessage中沒有session.commit(),那麽這條消息可以正常被接收,但不會被消費,換句話説客戶端只要不commit這條消息,這條消息可以被客戶端無限消費下去,直到commit(從MQ所persistent的DB中被刪除)。
  2. 如果在消費斷遇到任何Exception時session.rollback()了,ActiveMQ會按照默認策略每隔1s會重發一次,重發6次如果還是失敗,則進入ActiveMQ的ActiveMQ.DLQ隊列,重發策略這個值可以設(稍後會給出)。
  3. 如果在生產端的try{}塊裏發生錯誤,導致回滾(沒有commit),會怎麽樣?消費隊列永遠拿不到這條被rollback的消息,因為這條數據還沒被插入KAHADB中呢。
  4. 再如果,消費端拿到了消息不commit也不rollback呢?那消費端重啟後會再次拿到這條消息(因為始終取where status=‘未消費’取不到的原因,對吧?)

事務型消息的重發機制

<bean id="targetConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">

	<property name="brokerURL" value=http://blog.csdn.net/lifetragedy/article/details/"tcp://192.168.0.101:61616" />
	
	
	
	



	
	





以上例子申明了對於destination這個隊列的重發機制為間隔100毫秒重發一次。


事務型消息的演示








點對點,應答式消息


所謂點對點應答式消息和事務無關,它主要實現的是如:


生產端:我發給你一個消息了,在你收到並處理後請回復!因為我要根據你的回復內容再做處理


消費端:我收到你的消息了,我處理完了請查收我給你的回復


生產端:收到你的消息,88




點對點,應答式消息核心代碼-配置部分


<!-- 發送消息的目的地(一個隊列) -->
<bean id="destination" class="org.apache.activemq.command.ActiveMQQueue">
<!-- 設置消息隊列的名字 -->
<constructor-arg value=http://blog.csdn.net/lifetragedy/article/details/"ymk.queue?consumer.prefetchSize=100" />









其實也沒啥花頭,就是多了一個隊列(不要打我)


。。。。。。


關鍵在於代碼,代碼,不要只重視表面嗎。。。要看內含的LA。。。


這兩個隊列其實:


一個Request
一個應答(也可以使用temp隊列來做應答隊列)


點對點,應答式消息核心代碼-設計部分


我們設立兩個程序:


  • 發送端(生產端)內含一個MessageListener,用來收消費端的返回消息
  • 服務端(消費端)內含一個MessageListener,用來收生產端發過來的消息然後再異步返回


而溝通生產端和消費端的這根“消息鏈”是兩個東西:


  • JMSCorrelationID
  • JMSReplyTo


JMSCorrelationID:
它就是一個隨機不可重復的數字,以String型傳入API,也可以是GUID,它主要是被用來標示MQ 中每一條不同的消息用的一個唯一ID


JMSReplyTo
它就是一個生產端用來接收消費端返回消息的地址


點對點,應答式消息核心代碼-生產端部分代碼


String correlationId = RandomStringUtils.randomNumeric(5);

consumer = session.createConsumer(replyDest);
message.setJMSReplyTo(replyDest);
message.setJMSCorrelationID(correlationId);
consumer.setMessageListener(this);


  • RandomStringUtils
import org.apache.commons.lang.RandomStringUtils;


  • replyDest
replyDest = (Destination) context.getBean("replyDestination");


來看位於客戶端(生產端)的messageListener吧

public void onMessage(Message message) {
	TextMessage tm = (TextMessage) message;
	try {
		System.out.println("Client接收Server端消息:" + tm.getText());
	} catch (Exception e) {
		e.printStackTrace();
	}
}


其余部分代碼(沒啥花頭,就是sender裏帶了一個messageListener):

producer.send(message);


點對點,應答式消息核心代碼-生產端所有代碼


package webpoc.mq.dual;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.commons.lang.RandomStringUtils;
import org.springframework.context.ApplicationContext;
import org.springframework.context.support.ClassPathXmlApplicationContext;


public class Client implements MessageListener {


	public void onMessage(Message message) {
		TextMessage tm = (TextMessage) message;
		try {
			System.out.println("Client接收Server端消息:" + tm.getText());
		} catch (Exception e) {
			e.printStackTrace();
		}

	}
public void start(ApplicationContext context) {
		ConnectionFactory factory = null;
		Connection conn = null;
		Destination destination = null;
		Destination replyDest = null;
		Session session = null;
		MessageProducer producer = null;
		MessageConsumer consumer = null;
		try {
			destination = (Destination) context.getBean("destination");
			replyDest = (Destination) context.getBean("replyDestination");
			factory = (ActiveMQConnectionFactory) context.getBean("connectionFactory");
			conn = factory.createConnection();
			conn.start();
			session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
			producer = session.createProducer(destination);
			TextMessage message = session.createTextMessage("...Hello JMS!");
			String correlationId = RandomStringUtils.randomNumeric(5);

			consumer = session.createConsumer(replyDest);
			message.setJMSReplyTo(replyDest);
			message.setJMSCorrelationID(correlationId);

			consumer.setMessageListener(this);

		} catch (Exception e) {
			String errorMessage = "JMSException while queueing HTTP JMS Message";
			e.printStackTrace();
		}
}
	public void send(ApplicationContext context) {
		ConnectionFactory factory = null;
		Connection conn = null;
		Destination destination = null;
		Destination replyDest = null;
		Session session = null;
		MessageProducer producer = null;
		try {
			destination = (Destination) context.getBean("destination");
			replyDest = (Destination) context.getBean("replyDestination");
			factory = (ActiveMQConnectionFactory) context.getBean("connectionFactory");
			conn = factory.createConnection();
			conn.start();
			session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
			producer = session.createProducer(destination);
			TextMessage message = session.createTextMessage("...Hello JMS!");
			String correlationId = RandomStringUtils.randomNumeric(5);

			message.setJMSReplyTo(replyDest);
			message.setJMSCorrelationID(correlationId);
			producer.send(message);
			System.out.println("send 1 message");
		} catch (Exception e) {
			String errorMessage = "JMSException while queueing HTTP JMS Message";
			e.printStackTrace();
		}
	}

	public static void main(String[] args) {
		final ApplicationContext context = new ClassPathXmlApplicationContext("classpath:/spring/activemq_dual.xml");
		//sendWithAuto(context);
		Client c = new Client();
		c.start(context);
		c.send(context);

}


點對點,應答式消息核心代碼-消費端部分代碼


public void onMessage(Message message) {
		System.out.println("on message");
		try {
			TextMessage response = this.session.createTextMessage();
			if (message instanceof TextMessage) {
				TextMessage txtMsg = (TextMessage) message;
				String messageText = txtMsg.getText();
				response.setText("服務器收到消息:" + messageText);
				System.out.println(response.getText());
			}
			response.setJMSCorrelationID(message.getJMSCorrelationID());
			producer.send(message.getJMSReplyTo(), response);

		} catch (Exception e) {
			e.printStackTrace();
		}

	}

}

  1. 此處的send()方法內有兩個參數,註意其用法
  2. 然後為這個消費端也加一個messageListener如:

session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
producer = session.createProducer(replyDest);
consumer = session.createConsumer(destination);
consumer.setMessageListener(this);


點對點,應答式消息核心代碼-全部代碼


package webpoc.mq.dual;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.commons.lang.RandomStringUtils;
import org.springframework.context.ApplicationContext;
import org.springframework.context.support.ClassPathXmlApplicationContext;

public class Server implements MessageListener {
	private ConnectionFactory factory = null;
	private Connection conn = null;
	private Destination destination = null;
	Destination replyDest = null;
	private Session session = null;
	private MessageProducer producer = null;
	private MessageConsumer consumer = null;

	@Override
	public void onMessage(Message message) {
		System.out.println("on message");
		try {
			// 若有消息傳送到服務時,先創建一個文本消息  
			TextMessage response = this.session.createTextMessage();
			// 若從客戶端傳送到服務端的消息為文本消息  
			if (message instanceof TextMessage) {
				// 先將傳送到服務端的消息轉化為文本消息  
				TextMessage txtMsg = (TextMessage) message;
				// 取得文本消息的內容  
				String messageText = txtMsg.getText();
				// 將客戶端傳送過來的文本消息進行處理後,設置到回應消息裏面  
				response.setText("服務器收到消息:" + messageText);
				System.out.println(response.getText());
			}
			// 設置回應消息的關聯ID,關聯ID來自於客戶端傳送過來的關聯ID  
			response.setJMSCorrelationID(message.getJMSCorrelationID());
			System.out.println("replyto===" + message.getJMSReplyTo());
			// 生產者發送回應消息,目的由客戶端的JMSReplyTo定義,內容即剛剛定義的回應消息  
			producer.send(message.getJMSReplyTo(), response);

		} catch (Exception e) {
			e.printStackTrace();
		}

	}


public void receive(ApplicationContext context) {

		try {
			destination = (Destination) context.getBean("destination");
			replyDest = (Destination) context.getBean("replyDestination");
			factory = (ActiveMQConnectionFactory) context.getBean("connectionFactory");
			conn = factory.createConnection();
			conn.start();
			session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
			producer = session.createProducer(replyDest);
			consumer = session.createConsumer(destination);
			consumer.setMessageListener(this);
		} catch (Exception e) {
			String errorMessage = "JMSException while queueing HTTP JMS Message";
			e.printStackTrace();
		}
	}

	public static void main(String[] args) {
		final ApplicationContext context = new ClassPathXmlApplicationContext("classpath:/spring/activemq_dual.xml");
		Server s = new Server();
		s.receive(context);

	}

}


點對點,應答式消息核心代碼-演示







Tags: 應用程序 master 點對點 可靠性 中間件

文章來源:


ads
ads

相關文章
ads

相關文章

ad