ActiveMQ·基礎篇(一)
JMS 叫做 Java 訊息服務(Java Message Service),是 Java 平臺上有關面向 MOM 的技術規範,旨在通過提供標準的產生、傳送、接收和處理訊息的 API 簡化企業應用的開發,類似於 JDBC 和關係型資料庫通訊方式的抽象。
ActiveMQ 是一個 MOM,具體來說是一個實現了 JMS 規範的系統間遠端通訊的訊息代理。
MOM 就是面向訊息中介軟體(Message-oriented middleware),是用於以分散式應用或系統中的非同步、鬆耦合、可靠、可擴充套件和安全通訊的一類軟體。MOM 的總體思想是它作為訊息傳送器和訊息接收器之間的訊息中介,這種中介提供了一個全新水平的鬆耦合。
即相當於人與人通訊之間的郵局,人們信件通訊的時候不需要以兩人直接進行通訊 ,而是通過一個第三方中介寄存信件,一個取,一個存。
以上圖片顯示的是整個JMS的元件
- Provider:純 Java 語言編寫的 JMS 介面實現(比如 ActiveMQ 就是)
- Domains:訊息傳遞方式,包括點對點(P2P)、釋出/訂閱(Pub/Sub)兩種
- Connection factory:客戶端使用連線工廠來建立與 JMS provider 的連線
- Destination:訊息被定址、傳送以及接收的物件
ActiveMq的兩種主要通訊方式
1.P2P(點對點的方式)
點對點:訊息域使用 queue 作為 Destination,訊息可以被同步或非同步的傳送和接收,每個訊息只會給一個 Consumer 傳送一次。
Consumer 可以使用 MessageConsumer.receive() 同步地接收訊息,也可以通過註冊一個 MessageListener 實現非同步接收。多個 Consumer 可以註冊到同一個 queue 上,但一個訊息只能被一個 Consumer 所接收,然後由該 Consumer 來確認訊息。並且在這種情況下,Provider 對所有註冊的 Consumer 以輪詢的方式傳送訊息。
以下即為點對點的方式流程圖
1.sub/pub(釋出/訂閱,Publish/Subscribe)
訊息域使用 topic 作為 Destination,釋出者向 topic 傳送訊息,訂閱者註冊接收來自 topic 的訊息。傳送到 topic 的任何訊息都將自動傳遞給所有訂閱者。接收方式(同步和非同步)與 P2P 域相同。除非顯式指定,否則 topic 不會為訂閱者保留訊息。當然,這可以通過持久化(Durable)訂閱來實現訊息的儲存。這種情況下,當訂閱者與 Provider 斷開時,Provider 會為它儲存訊息。當持久化訂閱者重新連線時,將會受到所有的斷連期間未消費的訊息。如圖,每個釋出者釋出的資訊,每個訂閱者都會收到,而不是和點對點一樣一條資訊只能由一個接收者接受。
ActiveMq建立應用程式的流程
- 獲取連線工廠
- 使用連線工廠建立連線
- 啟動連線
- 從連線建立會話
- 獲取 Destination
- 建立 Producer,或
- 建立 Producer
- 建立 message
- 建立 Consumer,或傳送或接收message傳送或接收 message
- 建立 Consumer
- 註冊訊息監聽器(可選)
- 傳送或接收 message
- 關閉資源(connection, session, producer, consumer 等
以下是一個簡單的activeMq傳送P2P的簡單流程樣式。
import org.apache.activemq.ActiveMQConnectionFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import javax.jms.*;
/**
* @author pontus
* @date create in 11:25 2018/11/2
* @description
*/
@RestController
@Configuration
public class JmsTemplateConfiguration {
@Value("${spring.activemq.user}")
private String usrName;
@Value("${spring.activemq.password}")
private String password;
@Value("${spring.activemq.broker-url}")
private String brokerUrl;
ConnectionFactory connectionFactory;
Connection connection;
Session session;
Destination destination;
MessageProducer producer;
MessageConsumer consumer;
Message message;
boolean useTransaction = false;
@RequestMapping("ds")
public void ss(){
try {
//使用ActiveMQ時:
connectionFactory = new ActiveMQConnectionFactory(usrName, password, brokerUrl);
connection = connectionFactory.createConnection();
connection.start();
session = connection.createSession(useTransaction, Session.AUTO_ACKNOWLEDGE);
destination = session.createQueue("TEST.QUEUE");
//生產者傳送訊息
producer = session.createProducer(destination);
message = session.createTextMessage("this is a test");
producer.send(message);
//消費者同步接收
consumer = session.createConsumer(destination);
message = (TextMessage) consumer.receive(1000);
System.out.println("Received message: " + message);
//消費者非同步接收
consumer.setMessageListener(new MessageListener() {
@Override
public void onMessage(Message message) {
if (message != null) {
System.out.println(message);
}
}
});
} catch (Exception e) {
e.printStackTrace();
} finally {
try {
producer.close();
session.close();
connection.close();
} catch (JMSException e) {
e.printStackTrace();
}
}
}
}