1. 程式人生 > >(二)ActiveMQ 點對點系統模型

(二)ActiveMQ 點對點系統模型

        ActiveMQ 客戶端編寫支援Java, C, C++ 等多種語言,筆者使用Java 語言來實現。測試模型為:一個生產者生產訊息,兩個消費者消費訊息。

1. 引入jar 包

  引入activemq-all-5.13.1.jar, 解壓apache-activemq-5.13.1-bin.zip 壓縮包,裡面有。 如果用maven3, 那麼新增Dependency

<dependency>
  <groupId>org.apache.activemq</groupId>
  <artifactId>activemq-all</artifactId>
  <version>5.13.1</version>
</dependency>

2. 源程式

      【1. 生產者一】

package org.zgf.learn.learn.jms.activemq.p2p;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;

/**
 * JMS 生產者
 */
public class JMSProducer {
	
	//設定預設的使用者名稱、密碼、連線地址
	private static final String USE = ActiveMQConnection.DEFAULT_USER;
	private static final String PASSWORD= ActiveMQConnection.DEFAULT_PASSWORD;
	private static final String BOOKERURL = ActiveMQConnection.DEFAULT_BROKER_URL;
	
	public static void main(String[] args) throws Exception{
		//1. 建立JMS 連線工程
		ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(USE, PASSWORD,BOOKERURL);
		//2. 建立JMS 連線
		Connection connection = connectionFactory.createConnection();
		//3. 啟動JMS 連線
		connection.start();
		//4. 建立JMS 會話,需要開啟事務,提交方式為自動提交
		Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
		//5. 建立JMS 訊息佇列
		Destination destination = session.createQueue("p2pMsgQueue1");
		
		//6. 建立JMS 訊息生產者
		MessageProducer messageProducer = session.createProducer(destination);
		
		//7. 建立JMS 訊息(建立最簡單的訊息, 文字訊息)
		TextMessage textMessage =  session.createTextMessage("hello,world! [" + System.currentTimeMillis() + "]");
		
		//8. JMS 訊息生產者 傳送 JMS訊息
		messageProducer.send(textMessage);
		
		//9. 提交事務
		session.commit();
		
		System.out.println("【生成者】訊息傳送成功");
		closeConn(connection,session);
	}
	
	private static void closeConn(Connection connection, Session session){
		if(null != session){
			try {
				session.close();
			} catch (JMSException e) {
				e.printStackTrace();
			}finally {
				if(connection != null){
					try {
						connection.close();
					} catch (JMSException e) {
						e.printStackTrace();
					}
				}
			}
		}
	}
}


【2. 消費者一】

package org.zgf.learn.learn.jms.activemq.p2p;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.MessageConsumer;
import javax.jms.Session;

import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;

/**
 * JMS 消費者一
 */
public class JMSCustumer1 {
    
    //設定預設的使用者名稱、密碼、連線地址
    private static final String USE = ActiveMQConnection.DEFAULT_USER;
    private static final String PASSWORD= ActiveMQConnection.DEFAULT_PASSWORD;
    private static final String BOOKERURL = ActiveMQConnection.DEFAULT_BROKER_URL;
    
    public static void main(String[] args) throws Exception{
        //1. 建立JMS 連線工程
        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(USE, PASSWORD,BOOKERURL);
        //2. 建立JMS 連線
        Connection connection = connectionFactory.createConnection();
        //3. 啟動JMS 連線
        connection.start();
        //4. 建立JMS 會話,不需要開啟事務,提交方式為自動提交
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        //5. 建立JMS 訊息佇列
        Destination destination = session.createQueue("p2pMsgQueue1");
        
        //6. 建立JMS 訊息消費者
        MessageConsumer messageConsumer = session.createConsumer(destination);
        //7. 為消費者設定監聽器
        messageConsumer.setMessageListener(new JMSTextMsgListener("消費 者一"));
        
        System.out.println("消費者一開始監聽....");
        //不能關閉連線,關閉之後就不能接受到訊息了
    }
    
}




【3. 消費者二】

package org.zgf.learn.learn.jms.activemq.p2p;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.MessageConsumer;
import javax.jms.Session;

import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;

/**
 * JMS 消費者二
 */
public class JMSCustumer2 {
	
	//設定預設的使用者名稱、密碼、連線地址
	private static final String USE = ActiveMQConnection.DEFAULT_USER;
	private static final String PASSWORD= ActiveMQConnection.DEFAULT_PASSWORD;
	private static final String BOOKERURL = ActiveMQConnection.DEFAULT_BROKER_URL;
	
	public static void main(String[] args) throws Exception{
		//1. 建立JMS 連線工程
		ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(USE, PASSWORD,BOOKERURL);
		//2. 建立JMS 連線
		Connection connection = connectionFactory.createConnection();
		//3. 啟動JMS 連線
		connection.start();
		//4. 建立JMS 會話,不需要開啟事務,提交方式為自動提交
		Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
		//5. 建立JMS 訊息佇列
		Destination destination = session.createQueue("p2pMsgQueue1");
		
		//6. 建立JMS 訊息消費者
		MessageConsumer messageConsumer = session.createConsumer(destination);
		//7. 為消費者設定監聽器
		messageConsumer.setMessageListener(new JMSTextMsgListener("消費者二"));
		
		System.out.println("消費者二開始監聽....");
		//不能關閉連線,關閉之後就不能接受到訊息了
	}

}


【4. 訊息監聽器】

package org.zgf.learn.learn.jms.activemq.p2p;

import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.TextMessage;
/**
 * JMS 文字訊息監聽器
 *
 */
public class JMSTextMsgListener implements MessageListener{
	
	//消費者名稱
	private final String customerName ;
		
	public JMSTextMsgListener(String customerName) {
		this.customerName = customerName;
	}
	
	@Override
	public void onMessage(Message message) {
		//1. 強制轉換訊息
		TextMessage textMessage = (TextMessage) message;
		
		//2. 獲取接收到的訊息內容
		try {
			String msgContent = textMessage.getText();
			System.out.println("【" + this.customerName + "】接受到的訊息內容為:" + msgContent);
		} catch (JMSException e) {
			e.printStackTrace();
		}
	}

}


3. 測試用例

      0. 測試之前Activemq 中訊息佇列為空

     

      1. 生產者生產併發送兩條訊息, 此時在web瀏覽器中可以看到:

       未被消費的訊息:2 , 當前消費者個數:0, 訊息入隊個數:2, 訊息出隊:0

     

      2. 啟動消費者一, 由於目前訊息佇列中存在兩條未消費的訊息,所以消費者一會立即消費這兩條訊息。

       未被消費的訊息:0 , 當前消費者個數:1, 訊息入隊個數:2, 訊息出隊:2

      

       消費者一控制檯列印結果:

       

      3. 啟動消費者二 , 由於目前訊息佇列中沒有被訊息的訊息,所以消費者二不消費任何訊息, 此時有兩個消費者在監聽。

       未被消費的訊息:0 , 當前消費者個數:1, 訊息入隊個數:2, 訊息出隊:2

     

       消費者二控制檯列印結果:

     

      4. 生產者每次傳送一條訊息,會發現,消費者一和消費者二進行輪流消費訊息,消費順序按啟動監聽的順序進行。

4. 總結

      1. 在點對點模型中,未被消費的訊息會儲存在activeMQ 佇列中

      2. 當一個生產者擁有多個消費者時,多個消費者將按照監聽順序輪流消費訊息

      3. 切記一條訊息只能被一個消費者消費。