1. 程式人生 > >ActiveMQ -- 中介軟體之運用(一)

ActiveMQ -- 中介軟體之運用(一)

                                           ActiveMQ -- 中介軟體

                                                            

一、ActiveMQ講解

1.1、概念

概念:官方解釋:Active是屬於apache下的產品,是目前最流行、能力強勁的訊息匯流排,完全是遵循JMS規範。

1.2、在現實中應用

activeMQ是分散式中注重要的中介軟體之一:主要解決應用解耦、非同步訊息、流量削鋒、訊息通訊場景。

例如:唯品會上我們下訂單時候,總“提示使用者:您的訂單正在處理”,其實這裡是將使用者下的訂單新增到訊息佇列中介軟體中,等待消費者去消費(庫存),消費者(庫存)通過推拉的形式的獲取訊息佇列中的人物。

1.3、分析使用場景(借鑑原作者案例)

這一章節我就借鑑一片部落格圖片(未找到原作者),舉的案例非常容易理解,我在其基礎上進行補充。

1.3.1、非同步資訊處理

非同步:顧名思義就是一個呼叫無需等待結果返回就讓其繼續操作,

場景說明:使用者註冊後,需要發註冊郵件和註冊簡訊。傳統的做法有兩種1.序列的方式;2.並行方式。

(1)序列方式:將註冊資訊寫入資料庫成功後,傳送註冊郵件,再發送註冊簡訊。以上三個任務全部完成後,返回給客戶端。

(2)並行方式:將註冊資訊寫入資料庫成功後,傳送註冊郵件的同時,傳送註冊簡訊。以上三個任務完成後,返回給客戶端。與序列的差別是,並行的方式可以提高處理的時間。

假設三個業務節點每個使用50毫秒鐘,不考慮網路等其他開銷,則序列方式的時間是150毫秒,並行的時間可能是100毫秒。

因為CPU在單位時間內處理的請求數是一定的,假設CPU1秒內吞吐量是100次。

則序列方式1秒內CPU可處理的請求量是7次(1000/150)。並行方式處理的請求量是10次(1000/100)。

小結:如以上案例描述,傳統的方式系統的效能(併發量,吞吐量,響應時間)會有瓶頸。如何解決這個問題呢?

引入訊息佇列,將不是必須的業務邏輯,非同步處理。改造後的架構如下:

按照以上約定,使用者的響應時間相當於是註冊資訊寫入資料庫的時間,也就是50毫秒。

註冊郵件,傳送簡訊寫入訊息佇列後,直接返回,因此寫入訊息佇列的速度很快,基本可以忽略,

因此使用者的響應時間可能是50毫秒。所以基於此架構改變後,系統的吞吐量提高到每秒20 QPS。比序列提高了3倍,比並行提高了兩倍。


1.3.2、非同步資訊處理

場景說明:使用者下單後,訂單系統需要通知庫存系統。傳統的做法是,訂單系統呼叫庫存系統的介面。如下圖:

傳統模式的缺點:

1)  假如庫存系統無法訪問,則訂單減庫存將失敗,從而導致訂單失敗;

2)  訂單系統與庫存系統耦合;

如何解決以上問題呢?引入應用訊息佇列後的方案,如下圖:

1:訂單系統:使用者下單後,訂單系統完成持久化處理,將訊息寫入訊息佇列,返回使用者訂單下單成功,請等待物流配送。
2:庫存系統:訂閱下單的訊息,採用拉/推的方式,獲取下單資訊,庫存系統根據下單資訊,進行庫存操作。
3:假如:在下單時庫存系統不能正常使用。也不影響正常下單,
因為下單後,訂單系統寫入訊息佇列就不再關心其他的後續操作了。實現訂單系統與庫存系統的應用解耦。
 

1.3.3、流量削鋒

流量削鋒也是訊息佇列中的常用場景,一般在秒殺或團搶活動中使用廣泛。

應用場景:秒殺活動,一般會因為流量過大,導致流量暴增,應用容易掛掉。為解決這個問題,一般需要在應用前端加入訊息佇列。

  1. 可以控制活動的人數.
  2. 可以緩解短時間內高流量壓垮應用;

  1. 使用者的請求,伺服器接收後,首先寫入訊息佇列。假如訊息佇列長度超過最大數量,則直接拋棄使用者請求或跳轉到錯誤頁面;
  2. 秒殺業務根據訊息佇列中的請求資訊,再做後續處理。

1.3.3、訊息通訊

訊息通訊是指,訊息佇列一般都內建了高效的通訊機制,因此也可以用在純的訊息通訊。比如實現點對點訊息佇列,或者聊天室等。

點對點通訊:

客戶端A和客戶端B使用同一佇列,進行訊息通訊。

聊天室通訊:

客戶端A,客戶端B,客戶端N訂閱同一主題,進行訊息釋出和接收。實現類似聊天室效果。

以上實際是訊息佇列的兩種訊息模式,點對點或釋出訂閱模式。

1.4、activemq有自己的特點和優勢:

(1)activemq可以很好的執行在任何JVM上,而不只是整合到JBoss的應用伺服器中;

(2)activemq支援大量的跨語言客戶端;

(3)activemq支援許多不同的協議,如Ajax,REST,Stomp,OpenWire,XMPP

(4)activemq支援許多高階功能,例如MessageGroups,ExclusiveConsumer,CompositeDestinations

(5)AdvisoryMessage

(6)activemq支援可靠連線並且具有可配置的自動重連線

(7)activemq對Spring有很好的支援

(8)activemq支援跨網路的分散式目的地

(9)activemq是速度非常快;一般要比jbossmq快10倍
 

二、ActiveMQ安裝

2.1、下載ActiveMQ安裝包

作為一個具有獨立思考的程式老農,下載對應的版本都是從官網下載正宗的版本,官網路徑Active官網《下載對應的版本》

進到首頁如下:

點選進去之後,可以選擇下載最新的版本,如果想下載歷史版本

 

點選下載歷史版本如下:

下載版本時候需要注意,下載對應的windown或者linux,根據自己的需要進行下載

2.2、ActiveMQ啟動

為了簡單且快速瞭解ActiveMQ,我直接下載window版本來進行演示,對應的Linux安裝自己網上查詢安裝教程,其實很簡單

我是下載時候安裝如下:

啟動步驟:bin-->win64(對應自己的系統)-->activemq.bat(啟動)

啟動成功如下圖:

當啟動顯示"access to all MBeans is allowed"則表示ActiveMQ已經啟動成功;接下來就可以直接訪問了;

成功頁面如下:

2.3、首次登入遇到問題

       首次登入的使用者可能會有疑問,說埠號和登入密碼是什麼?這裡我就來幫大家解答這問題,作為一個web開發者,相信大家都使用多web容器tomcat,tomcat在con.xml中都有配置預設的埠號,那當然ActiveMQ那必須有,因為ActiveMQ和tomcat的父母都是apache,不扯哪些沒用的了,直接進入主題:

如果你是直接下載好的ActiveMQ:

預設埠號是:8161

登入賬號和密碼是:賬號admins ;密碼:admin

下面我們來看下ActiveMQ的配置檔案就明白了,其實ActiveMQ中介軟體裡面整合了簡潔版的jetty容器,路徑:conf -- >jetty.xml

登入密碼檢視或修改路徑:conf-->groups.properties

到此為此ActiveMQ則安裝完了,寫的不好地方望網友多多指點,相互學習。

三、MQ程式測試

3.1、程式碼結構

這裡編寫簡單的測試demo進行測試,需要的jar路徑如下:

3.2、程式碼分析

3.2.1、生產者

package com.debug.test;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnectionFactory;

/**
 * 生產者
 * @author debug
 * @Date 2018-1-18
 */
public class Sender {

	public static void sender() throws JMSException {
		// 1、建立ConnecttionFactory工廠連線物件,需要填入使用者名稱、密碼、以及連線地址,均使用預設就行
		ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(ActiveMQConnectionFactory.DEFAULT_PASSWORD,
				ActiveMQConnectionFactory.DEFAULT_PASSWORD, "tcp://localhost:61616");
		//2、通過工廠連線物件進行建立連線connection,並且開啟連線(預設時關閉的)
		Connection connection = connectionFactory.createConnection();
		connection.start();
		
		//3、通過連線建立session會話(上下文環境物件),用於接受資訊、引數配置:1為是否啟用事務、引數配置2為簽收模式、一般我們設定為自動簽收。
		Session session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE);
		Destination destination = session.createQueue("queue01");
		
		//4、通過session獲物件,指的時一個客戶端用來指定生產消費目標合消費資訊來源的物件,
		
		//5、我們需要通過sesion物件建立訊息的發動合接受物件(生產者合消費者)MessageProducer/MessageConsumer
		MessageProducer messageProducer = session.createProducer(destination);
		
		//6、我們可以使用messageProducer的setDeiveryMode方法為其設定持久化性合非持久化(DeiveryMode)
		messageProducer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
		
		//7、最後傳送資訊(使用JMS規範)
		for (int i = 0; i <5; i++) {
			TextMessage textMessage = session.createTextMessage("我是debug");
			messageProducer.send(textMessage);
			System.out.println("生產者正在生產......."+ i);
		}
		System.out.println("生產者生產訊息完成.......");
		
		//緊密判斷
		if(connection != null){
			connection.close();
		}
		
		
	}

	public static void main(String[] args) throws JMSException {
		sender();
	}

}

執行之後:

生產完成之後,我們來看下ActiveMQ控制後臺:

3.2.2、消費者

package com.debug.test;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnectionFactory;

/**
 * 消費者
 * @author debug
 * @Date 2018-1-18
 */
public class Receiver {
	public static void sender() throws JMSException {
		// 1、建立ConnecttionFactory工廠連線物件,需要填入使用者名稱、密碼、以及連線地址,均使用預設就行
		ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(ActiveMQConnectionFactory.DEFAULT_PASSWORD,
				ActiveMQConnectionFactory.DEFAULT_PASSWORD, "tcp://localhost:61616");
		//2、通過工廠連線物件進行建立連線connection,並且開啟連線(預設時關閉的)
		Connection connection = connectionFactory.createConnection();
		connection.start();
		
		//3、通過連線建立session會話(上下文環境物件),用於接受資訊、引數配置:1為是否啟用事務、引數配置2為簽收模式、一般我們設定為自動簽收。
		Session session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE);
		Destination destination = session.createQueue("queue01");
		
		//4、通過session獲物件,指的時一個客戶端用來指定生產消費目標合消費資訊來源的物件,
		
		//5、我們需要通過sesion物件建立訊息的發動合接受物件(生產者合消費者)MessageProducer/MessageConsumer
		MessageConsumer messageConsumer = session.createConsumer(destination);
		
		while(true){
			TextMessage msg = (TextMessage) messageConsumer.receive();
			if(msg == null){
				break;
			}
			System.out.println("消費者接受到的內容......." + msg.getText());
		}
		
		//緊密判斷
		if(connection != null){
			connection.close();
		}
		
		
	}

	public static void main(String[] args) throws JMSException {
		sender();
	}


}

執行之後:

此時ActiveMQ後臺觀察下

好了;目前來說demo測試已經完成了,後續有需要補充的地兒再接著完善......