1. 程式人生 > >ActiveMQ 訊息中介軟體

ActiveMQ 訊息中介軟體

什麼是ActiveMQ

ActiveMQ是Apache出品,能力強勁的開源訊息匯流排。ActiveMQ 是一個完全支援JMS1.1和J2EE 1.4規範的 JMS Provider實現,儘管JMS規範出臺已經是很久的事情了,但是JMS在當今的J2EE應用中間仍然扮演著特殊的地位。

JMS介紹

JMS的全稱是Java Message Service,即Java訊息服務。用於在兩個應用程式之間,或分散式系統中傳送訊息,進行非同步通訊。

它主要用於在生產者和消費者之間進行訊息傳遞,生產者負責產生訊息,而消費者負責接收訊息。把它應用到實際的業務需求中的話我們可以在特定的時候利用生產者生成一訊息,並進行傳送,對應的消費者在接收到對應的訊息後去完成對應的業務邏輯。

對於訊息的傳遞有兩種型別:

一種是點對點的,即一個生產者和一個消費者一一對應;

另一種是釋出/訂閱模式,即一個生產者產生訊息並進行傳送後,可以由多個消費者進行接收。

JMS定義了五種不同的訊息正文格式,以及呼叫的訊息型別,允許你傳送並接收以一些不同形式的資料,提供現有訊息格式的一些級別的相容性。

· StreamMessage -- Java原始值的資料流

· MapMessage--一套名稱-值對

· TextMessage--一個字串物件

· ObjectMessage--一個序列化的 Java物件

· BytesMessage--一個位元組的資料流

JMS應用程式介面

ConnectionFactory 介面(連線工廠)

使用者用來建立到JMS提供者的連線的被管物件。JMS客戶通過可移植的介面訪問連線,這樣當下層的實現改變時,程式碼不需要進行修改。管理員在JNDI名字空間中配置連線工廠,這樣,JMS客戶才能夠查詢到它們。根據訊息型別的不同,使用者將使用佇列連線工廠,或者主題連線工廠。

Connection 介面(連線)

連線代表了應用程式和訊息伺服器之間的通訊鏈路。在獲得了連線工廠後,就可以建立一個與JMS提供者的連線。根據不同的連線型別,連線允許使用者建立會話,以傳送和接收佇列和主題到目標。

Destination 介面(目標)

目標是一個包裝了訊息目標識別符號的被管物件,訊息目標是指訊息釋出和接收的地點,或者是佇列,或者是主題。JMS管理員建立這些物件,然後使用者通過JNDI發現它們。和連線工廠一樣,管理員可以建立兩種型別的目標,點對點模型的佇列,以及釋出者/訂閱者模型的主題。

MessageConsumer 介面(訊息消費者)

由會話建立的物件,用於接收發送到目標的訊息。消費者可以同步地(阻塞模式),或非同步(非阻塞)接收佇列和主題型別的訊息。

MessageProducer 介面(訊息生產者)

由會話建立的物件,用於傳送訊息到目標。使用者可以建立某個目標的傳送者,也可以建立一個通用的傳送者,在傳送訊息時指定目標。

Message 介面(訊息)

是在消費者和生產者之間傳送的物件,也就是說從一個應用程式創送到另一個應用程式。一個訊息有三個主要部分:

訊息頭(必須):包含用於識別和為訊息尋找路由的操作設定。

一組訊息屬性(可選):包含額外的屬性,支援其他提供者和使用者的相容。可以建立定製的欄位和過濾器(訊息選擇器)。

一個訊息體(可選):允許使用者建立五種型別的訊息(文字訊息,對映訊息,位元組訊息,流訊息和物件訊息)。

訊息介面非常靈活,並提供了許多方式來定製訊息的內容。

Session 介面(會話)

表示一個單執行緒的上下文,用於傳送和接收訊息。由於會話是單執行緒的,所以訊息是連續的,就是說訊息是按照發送的順序一個一個接收的。會話的好處是它支援事務。如果使用者選擇了事務支援,會話上下文將儲存一組訊息,直到事務被提交才傳送這些訊息。在提交事務之前,使用者可以使用回滾操作取消這些訊息。一個會話允許使用者建立訊息生產者來發送訊息,建立訊息消費者來接收訊息。

ActiveMQ 安裝

http://activemq.apache.org/download.html

ActiveMQ 依賴JDK版本

MQ版本號    Build-Jdk    依賴JDK

apache-activemq-5.0.0    1.5.0_12    1.5+

...

apache-activemq-5.4.0    1.5.0_19    1.5+

apache-activemq-5.5.0    1.6.0_23    1.6+

...

apache-activemq-5.9.0    1.6.0_51    1.6+

apache-activemq-5.10.0    1.7.0_12-ea    1.7+

...

apache-activemq-5.14.0    1.7.0_80    1.7+

apache-activemq-5.15.0    1.8.0_112    1.8+

下載解壓,啟動activemq.bat

http://127.0.0.1:8161

ActiveMQ 預設使用者名稱和密碼

使用者名稱:admin

密碼:admin

可以在/conf/users.properties中尋找。

預設登入地址:http://localhost:8161/admin/

Producer

第一步:建立ConnectionFactory物件,需要指定服務端ip及埠號。

第二步:使用ConnectionFactory物件建立一個Connection物件。

第三步:開啟連線,呼叫Connection物件的start方法。

第四步:使用Connection物件建立一個Session物件。

第五步:使用Session物件建立一個Destination物件(topic、queue),此處建立一個Queue物件。

第六步:使用Session物件建立一個Producer物件。

第七步:建立一個Message物件,建立一個TextMessage物件。

第八步:使用Producer物件傳送訊息。

第九步:關閉資源。

注:8161是後臺管理系統,61616是給java用的tcp埠

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;


/**
 * 
 第一步:建立ConnectionFactory物件,需要指定服務端ip及埠號。

第二步:使用ConnectionFactory物件建立一個Connection物件。

第三步:開啟連線,呼叫Connection物件的start方法。

第四步:使用Connection物件建立一個Session物件。

第五步:使用Session物件建立一個Destination物件(topic、queue),此處建立一個Queue物件。

第六步:使用Session物件建立一個Producer物件。

第七步:建立一個Message物件,建立一個TextMessage物件。

第八步:使用Producer物件傳送訊息。

第九步:關閉資源。


 *
 */

public class ActiveMQProduder {

	public static void main(String[] args) {
		//第一步:建立ConnectionFactory物件,需要指定服務端ip及埠號。
        ConnectionFactory connectionFactory=new ActiveMQConnectionFactory(ActiveMQConnection.DEFAULT_USER, ActiveMQConnection.DEFAULT_PASSWORD, "tcp://localhost:61616"); 
		
        try {
        	//第二步:使用ConnectionFactory物件建立一個Connection物件。
			Connection connection = connectionFactory.createConnection();
			
			//第三步:開啟連線,呼叫Connection物件的start方法。
			connection.start();
			
			//第四步:使用Connection物件建立一個Session物件。
			//第一個引數:是否開啟事務。true:開啟事務,第二個引數忽略。
			//第二個引數:當第一個引數為false時,才有意義。訊息的應答模式。1、自動應答2、手動應答。一般是自動應答。
			Session session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE);
			
			// 第五步:使用Session物件建立一個Destination物件(topic、queue),此處建立一個Queue物件。
			//引數:佇列的名稱。
			// Destination :訊息的目的地訊息傳送給誰.  
	        Destination destination = session.createQueue("test-queue");
	        //Destination 是Queue父類
	        //Queue queue = session.createQueue("test-queue");
	        
			// 第六步:使用Session物件建立一個Producer物件。
			MessageProducer producer = session.createProducer(destination);
			//建立方式等價的
			//session.createProducer(queue);
			// 第七步:建立一個Message物件,建立一個TextMessage物件。
			/*TextMessage message = new ActiveMQTextMessage();
			message.setText("hello activeMq,this is my first test.");*/
			TextMessage textMessage = session.createTextMessage("hello activeMq,this is my first test.");
			
			// 第八步:使用Producer物件傳送訊息。
			producer.send(textMessage);
			System.out.println("producer 建立的訊息:"+textMessage.getText());
			// 第九步:關閉資源。
			producer.close();
			session.close();
			connection.close();
		} catch (JMSException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		}
	}

}







下面看下生產一條訊息控制檯的變化

Queues

Name ↑ Number Of Pending Messages Number Of Consumers Messages Enqueued Messages Dequeued Views Operations
test-queue 1 0 1 0 Browse Active Consumers  

Number Of Pending Messages 生產了沒有消費的message 為1

Message Enqueued 進入訊息佇列的message 為1

Consumer

消費者有兩種消費方法:

1、同步消費。通過呼叫消費者的receive方法從目的地中顯式提取訊息。receive方法可以一直阻塞到訊息到達。

2、非同步消費。客戶可以為消費者註冊一個訊息監聽器,以定義在訊息到達時所採取的動作。
實現MessageListener介面,在MessageListener()方法中實現訊息的處理邏輯。

import java.io.IOException;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;


/**
 * 
 * 消費者有兩種消費方法::

1、同步消費。通過呼叫消費者的receive方法從目的地中顯式提取訊息。receive方法可以一直阻塞到訊息到達。

2、非同步消費。客戶可以為消費者註冊一個訊息監聽器,以定義在訊息到達時所採取的動作。
實現MessageListener介面,在MessageListener()方法中實現訊息的處理邏輯。

 *
 */
public class ActiveMQConsumer{

	public static void main(String[] args) {
		//第一步:建立ConnectionFactory物件,需要指定服務端ip及埠號。
        ConnectionFactory connectionFactory=new ActiveMQConnectionFactory(ActiveMQConnection.DEFAULT_USER, ActiveMQConnection.DEFAULT_PASSWORD, "tcp://localhost:61616"); 
		
        try {
        	//第二步:使用ConnectionFactory物件建立一個Connection物件。
			Connection connection = connectionFactory.createConnection();
			
			//第三步:開啟連線,呼叫Connection物件的start方法。
			connection.start();
			
			//第四步:使用Connection物件建立一個Session物件。
			//第一個引數:是否開啟事務。true:開啟事務,第二個引數忽略。
			//第二個引數:當第一個引數為false時,才有意義。訊息的應答模式。1、自動應答2、手動應答。一般是自動應答。
			Session session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE);
			
			// 第五步:使用Session物件建立一個Destination物件(topic、queue),此處建立一個Queue物件。
			//引數:佇列的名稱。
			// Destination :訊息的目的地訊息傳送給誰.  
	        Destination destination = session.createQueue("test-queue");
	        //第五步:建立一個消費者
	        MessageConsumer consumer = session.createConsumer(destination);
	        /**
	        while(true) {
	        	//第六步:同步接收訊息    設定接收訊息的時間100s
	        	Message message = consumer.receive(100000);
	        	if(message != null){
	        		System.out.println("consumer接收的訊息:"+message);
	        	}else{
	        		break;
	        	}
	        }*/
	        //第七步: 設定訊息監聽,非同步接收訊息
	        consumer.setMessageListener(new MessageListener() {
				
				@Override
				public void onMessage(Message message) {
					TextMessage textMessage=(TextMessage) message;
					try {
						System.out.println("非同步接收consumer訊息:"+textMessage.getText());
					} catch (JMSException e) {
						// TODO Auto-generated catch block
						e.printStackTrace();
					}
				}
			});
	        
	        //方便看到訊息
			System.in.read();
			
	        // 第八步:關閉資源。
	        consumer.close();
 			session.close();
 			connection.close();
        }catch (JMSException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		}catch(IOException e1){
			e1.printStackTrace();
		}
	}
	
	
}



Queues

Name ↑ Number Of Pending Messages Number Of Consumers Messages Enqueued Messages Dequeued Views Operations
test-queue 0 1 1 1 Browse Active Consumers  

消費一條訊息的變化

Number Of Pending Messages 生產了沒有消費的message 為0

Number Of Pending Messages 消費的訊息的message 為1

Message Enqueued 進入訊息佇列的message 為1

Message Dequeued 進入訊息佇列的message 為1

 

**Topic **

 

佇列(Queue)和主題(Topic)是JMS支援的兩種訊息傳遞模型:


        1、點對點(point-to-point,簡稱PTP)Queue訊息傳遞模型:
        通過該訊息傳遞模型,一個應用程式(即訊息生產者)可以向另外一個應用程式(即訊息消費者)傳送訊息。在此傳遞模型中,訊息目的地型別是佇列(即Destination介面實現類例項由Session介面實現類例項通過呼叫其createQueue方法並傳入佇列名稱而建立)。訊息首先被傳送至訊息伺服器端特定的佇列中,然後從此對列中將訊息傳送至對此佇列進行監聽的某個消費者。同一個佇列可以關聯多個訊息生產者和訊息消費者,但一條訊息僅能傳遞給一個訊息消費者。如果多個訊息消費者正在監聽佇列上的訊息,,JMS訊息伺服器將根據“先來者優先”的原則確定由哪個訊息消費者接收下一條訊息。如果沒有訊息消費者在監聽佇列,訊息將保留在佇列中,直至訊息消費者連線到佇列為止。這種訊息傳遞模型是傳統意義上的懶模型或輪詢模型。在此模型中,訊息不是自動推動給訊息消費者的,而是要由訊息消費者從佇列中請求獲得。 


        2、釋出/訂閱(publish/subscribe,簡稱pub/sub)Topic訊息傳遞模型:

通過該訊息傳遞模型,應用程式能夠將一條訊息傳送給多個訊息消費者。在此傳送模型中,訊息目的地型別是主題(即Destination介面實現類例項由Session介面實現類例項通過呼叫其createTopic方法並傳入主題名稱而建立)。訊息首先由訊息生產者釋出至訊息伺服器中特定的主題中,然後由訊息伺服器將訊息傳送至所有已訂閱此主題的消費者。主題目標也支援長期訂閱。長期訂閱表示消費者已註冊了主題目標,但在訊息到達目標時該消費者可以處於非活動狀態。當消費者再次處於活動狀態時,將會接收該訊息。如果消費者均沒有註冊某個主題目標,該主題只保留註冊了長期訂閱的非活動消費者的訊息。與PTP訊息傳遞模型不同,pub/sub訊息傳遞模型允許多個主題訂閱者接收同一條訊息。JMS一直保留訊息,直至所有主題訂閱者都接收到訊息為止。pub/sub訊息傳遞模型基本上是一個推模型。在該模型中,訊息會自動廣播,訊息消費者無須通過主動請求或輪詢主題的方法來獲得新的訊息。

 具體區別對比如下:

 

型別

Topic

Queue

概要

Publish Subscribe messaging 釋出訂閱訊息

Point-to-Point 點對點

有無狀態

topic資料預設不落地,是無狀態的。

Queue資料預設會在mq伺服器上以檔案形式儲存,比如Active MQ一般儲存在$AMQ_HOME\data\kr-store\data下面。也可以配置成DB儲存。

完整性保障

並不保證publisher釋出的每條資料,Subscriber都能接受到。

Queue保證每條資料都能被receiver接收。

訊息是否會丟失

一般來說publisher釋出訊息到某一個topic時,只有正在監聽該topic地址的sub能夠接收到訊息;如果沒有sub在監聽,該topic就丟失了。

Sender傳送訊息到目標Queue,receiver可以非同步接收這個Queue上的訊息。Queue上的訊息如果暫時沒有receiver來取,也不會丟失。

訊息釋出接收策略

一對多的訊息釋出接收策略,監聽同一個topic地址的多個sub都能收到publisher傳送的訊息。Sub接收完通知mq伺服器

一對一的訊息釋出接收策略,一個sender傳送的訊息,只能有一個receiver接收。receiver接收完後,通知mq伺服器已接收,mq伺服器對queue裡的訊息採取刪除或其他操作。

       

** TopicConsumer**

import java.io.IOException;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.Topic;

import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;


public class TopicConsumer {
	public static void main(String[] args) {
		//第一步:建立ConnectionFactory物件,需要指定服務端ip及埠號。
        ConnectionFactory connectionFactory=new ActiveMQConnectionFactory(ActiveMQConnection.DEFAULT_USER, ActiveMQConnection.DEFAULT_PASSWORD, "tcp://localhost:61616"); 
		
        try {
        	//第二步:使用ConnectionFactory物件建立一個Connection物件。
			Connection connection = connectionFactory.createConnection();
			
			//第三步:開啟連線,呼叫Connection物件的start方法。
			connection.start();
			
			//第四步:使用Connection物件建立一個Session物件。
			//第一個引數:是否開啟事務。true:開啟事務,第二個引數忽略。
			//第二個引數:當第一個引數為false時,才有意義。訊息的應答模式。1、自動應答2、手動應答。一般是自動應答。
			Session session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE);
			
			// 第五步:使用Session物件建立一個Destination物件(topic、queue),此處建立一個Queue物件。
			//引數:話題的名稱。
			Topic topic = session.createTopic("test-topic");
	        //第五步:建立一個消費者
	        MessageConsumer consumer = session.createConsumer(topic);
	        System.out.println("consumer訊息者3");
	        //第六步: 設定訊息監聽,非同步接收訊息
	        consumer.setMessageListener(new MessageListener() {
				
				@Override
				public void onMessage(Message message) {
					TextMessage textMessage=(TextMessage) message;
					try {
						System.out.println("非同步接收consumer訊息:"+textMessage.getText());
					} catch (JMSException e) {
						// TODO Auto-generated catch block
						e.printStackTrace();
					}
				}
			});
	        
	        //方便看到訊息
			System.in.read();
			
	        // 第七步:關閉資源。
	        consumer.close();
 			session.close();
 			connection.close();
        }catch (JMSException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		}catch(IOException e1){
			e1.printStackTrace();
		}
	}
}



Topics

Name ↑ Number Of Consumers Messages Enqueued Messages Dequeued Operations
test-topic 3 0 0 Send To Active Subscribers

 

TopicProduder

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.Topic;

import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;


public class TopicProduder {

	public static void main(String[] args) {
		//第一步:建立ConnectionFactory物件,需要指定服務端ip及埠號。
        ConnectionFactory connectionFactory=new ActiveMQConnectionFactory(ActiveMQConnection.DEFAULT_USER, ActiveMQConnection.DEFAULT_PASSWORD, "tcp://localhost:61616"); 
		
        try {
        	//第二步:使用ConnectionFactory物件建立一個Connection物件。
			Connection connection = connectionFactory.createConnection();
			
			//第三步:開啟連線,呼叫Connection物件的start方法。
			connection.start();
			
			//第四步:使用Connection物件建立一個Session物件。
			//第一個引數:是否開啟事務。true:開啟事務,第二個引數忽略。
			//第二個引數:當第一個引數為false時,才有意義。訊息的應答模式。1、自動應答2、手動應答。一般是自動應答。
			Session session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE);
			
			// 第五步:使用Session物件建立一個Destination物件(topic、queue),此處建立一個Queue物件。
			//引數:話題的名稱。
			Topic topic = session.createTopic("test-topic");
	        
			// 第六步:使用Session物件建立一個Producer物件。
			MessageProducer producer = session.createProducer(topic);
			
			// 第七步:建立一個Message物件,建立一個TextMessage物件。
			/*TextMessage message = new ActiveMQTextMessage();
			message.setText("hello activeMq,this is my first test.");*/
			TextMessage textMessage = session.createTextMessage("hello activeMq,this is my topic test.");
			
			// 第八步:使用Producer物件傳送訊息。
			producer.send(textMessage);
			System.out.println("producer 建立的訊息:"+textMessage.getText());
			// 第九步:關閉資源。
			producer.close();
			session.close();
			connection.close();
		} catch (JMSException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		}
	}
}



注:由於釋出訂閱關係,訊息不會落地,要先啟動多個消費者監聽訂閱的訊息,生產者(訊息釋出者)傳送訊息後監聽訂閱訊息的消費者都會接到訊息

看下控制檯的情況

Topics

Name ↑ Number Of Consumers Messages Enqueued Messages Dequeued Operations
test-topic 3 1 3 Send To Active Subscribers

Active Non-Durable Topic Subscribers

Destination Selector Pending Queue Size Dispatched Queue Size Dispatched Counter Enqueue Counter Dequeue Counter Operations      
test-topic   0 0 1 1 1        
test-topic   0 0 1 1 1        
test-topic   0 0 1 1 1        

 這樣的特性就可以應用分散式中應用解構,非同步訊息佇列,流量削鋒比較重要的中介軟體

特別介紹幾種代表的應用場景

應用解耦

場景說明:使用者下單後,訂單系統需要通知庫存系統。傳統的做法是,訂單系統呼叫庫存系統的介面

假如庫存系統無法訪問,則訂單減庫存將失敗,從而導致訂單失敗

 訂單系統與庫存系統耦合

  • 訂單系統:使用者下單後,訂單系統完成持久化處理,將訊息寫入訊息佇列,返回使用者訂單下單成功。
  • 庫存系統:訂閱下單的訊息,採用拉/推的方式,獲取下單資訊,庫存系統根據下單資訊,進行庫存操作。
  • 假如:在下單時庫存系統不能正常使用。也不影響正常下單,因為下單後,訂單系統寫入訊息佇列就不再關心其他的後續操作了。實現訂單系統與庫存系統的應用解耦。

 

流量削鋒

流量削鋒也是訊息佇列中的常用場景,一般在秒殺或團搶活動中使用廣泛。

應用場景:秒殺活動,一般會因為流量過大,導致流量暴增,應用掛掉。為解決這個問題,一般需要在應用前端加入訊息佇列。

  1. 可以控制活動的人數;
  2. 可以緩解短時間內高流量壓垮應用;
  3. 使用者的請求,伺服器接收後,首先寫入訊息佇列。假如訊息佇列長度超過最大數量,則直接拋棄使用者請求或跳轉到錯誤頁面;
  4. 秒殺業務根據訊息佇列中的請求資訊,再做後續處理

 

再來簡單看看Linux上部署ActiveMQ

安裝activemq

1、gz檔案拷貝到/usr/local/src目錄

2、解壓啟動

tar -zxvf apache-activemq-5.14.0-bin.tar.gz

cd apache-activemq-5.14.0

cd bin

./activemq start

netstat -anp|grep 61616

3.開通防火牆8161(web管理頁面埠)、61616(activemq服務監控埠) 兩個埠

/sbin/iptables -I INPUT -p tcp --dport 8161 -j ACCEPT&&/etc/init.d/iptables save&&service iptables restart&&/etc/init.d/iptables status

/sbin/iptables -I INPUT -p tcp --dport 61616 -j ACCEPT&&/etc/init.d/iptables save&&service iptables restart&&/etc/init.d/iptables status

 

常用命令

  • activemq-admin stop
  • activemq-admin list
  • activemq-admin query
  • activemq-admin bstat
  • activemq-admin browse

activemq官方命令列手冊:http://activemq.apache.org/activemq-command-line-tools-reference.html