訊息中介軟體原理及JMS簡介(2)
2.3 訊息中介軟體的傳遞模式
訊息中介軟體一般有兩種傳遞模型:點對點模型(PTP)和釋出-訂閱模型(Pub/Sub)。
1. 點對點模型(PTP)
點對點模型用於訊息生產者和訊息消費者之間點到點的通訊。訊息生產者將訊息發動到由某個名字標識的特定消費者。這個名字實際上對應於訊息服務中的一個佇列(Queue),在訊息傳動給消費者之前它被儲存在這個佇列中。佇列可以是持久的,以保證在訊息服務出現故障時仍然能夠傳遞訊息。
2. 釋出-訂閱模型(Pub/Sub)
釋出-訂閱模型用稱為主題(topic)的內容分層結構代替了PTP模型中的惟一目的地,傳送應用程式釋出自己的訊息,指出訊息描述的是有關分層結構中的一個主題的資訊。希望接收這些訊息的應用程式訂閱了這個主題。訂閱包含子主題的分層結構中的主題的訂閱者可以接收該主題和其子主題發表的所有訊息。
多個應用程式可以就一個主題釋出和訂閱訊息,而應用程式對其他人仍然是匿名的。MOM 起著代理(broker)的作用,將一個主題已發表的訊息路由給該主題的所有訂閱者。
2.4 訊息中介軟體產品與JMS
從上個世紀90年代初,隨著不同廠商訊息中介軟體大量上市,訊息中介軟體技術得到了長足的發展。目前,IBM和BEA的中介軟體產品在銀行、證券、電信等高階行業,以及IT等行業中得到廣泛應用。IBM憑藉其在1999年推出的應用伺服器WebSphere,紮根金融、證券等行業,在超大型以及系統整合型應用方面優勢突出;BEA則是專門從事中介軟體開發的公司,它的應用伺服器WebLogic在美國市場佔有率超過60%,在國內電信及證券行業佔據主要地位;Sun、Oracle、Sybase和Borland等廠商也都有自己的應用伺服器;近年來,以金蝶、東方通等公司為代表的國產中介軟體產品也發展迅速。
由於沒有統一的規範和標準,基於訊息中介軟體的應用不可移植,不同的訊息中介軟體也不能互操作,這大大阻礙了訊息中介軟體的發展。Java Message Service(JMS, Java訊息服務)是SUN及其夥伴公司提出的旨在統一各種訊息中介軟體系統介面的規範。它定義了一套通用的介面和相關語義,提供了諸如持久、驗證和事務的訊息服務,它最主要的目的是允許Java應用程式訪問現有的訊息中介軟體。JMS規範沒有指定在訊息節點間所使用的通訊底層協議,來保證應用開發人員不用與其細節打交道,一個特定的JMS實現可能提供基於TCP/IP、HTTP、UDP或者其它的協議。
目前許多廠商採用並實現了JMS API,現在,JMS產品能夠為企業提供一套完整的訊息傳遞功能,下面是一些比較流行的JMS商業軟體和開源產品。
1.IBM MQSeries
IBM MQ系列產品提供的服務使得應用程式可以使用訊息佇列進行相互交流,通過一系列基於Java的API,提供了MQSeries在Java中應用開發的方法。它支援點到點和釋出/訂閱兩種訊息模式,在基本訊息服務的基礎上增加了結構化訊息類,通過工作單元提供資料整合等內容。
2.WebLogic
WebLogic是BEA公司實現的基於工業標準的J2EE應用伺服器,支援大多數企業級JavaAPI,它完全相容JMS規範,支援點到點和釋出/訂閱訊息模式,它具有以下一些特點:
1) 通過使用管理控制檯設定JMS配置資訊;
2) 支援訊息的多點廣播;
3) 支援持久訊息儲存的檔案和資料庫;
4) 支援XML訊息,動態建立持久佇列和主題。
3.SonicMQ
SonicMQ是Progress公司實現的JMS產品。除了提供基本的訊息驅動服務之外,SonicMQ也提供了很多額外的企業級應用開發工具包,它具有以下一些基本特徵:
1) 提供JMS規範的完全實現,支援點到點訊息模式和釋出/訂閱訊息模式;
2) 支援層次安全管理;
3) 確保訊息在Internet上的持久傳送;
4) 動態路由構架(DRA)使企業能夠通過單個訊息伺服器動態的交換訊息;
5) 支援訊息伺服器的叢集。
4.Active MQ
Active MQ是一個基於Apcache 2.0 licenced釋出,開放原始碼的JMS產品。其特點為:
1) 提供點到點訊息模式和釋出/訂閱訊息模式;
2) 支援JBoss、Geronimo等開源應用伺服器,支援Spring框架的訊息驅動;
3) 新增了一個P2P傳輸層,可以用於建立可靠的P2P JMS網路連線;
4) 擁有訊息持久化、事務、叢集支援等JMS基礎設施服務。
5.OpenJMS
OpenJMS是一個開源的JMS規範的實現,它包含以下幾個特徵:
1) 它支援點到點模型和釋出/訂閱模型;
2) 支援同步與非同步訊息傳送;
3) 視覺化管理介面,支援Applet;
4) 能夠與Jakarta Tomcat這樣的Servlet容器結合;
5) 支援RMI、TCP、HTTP與SSL協議。
三、訊息中介軟體應用之JMS
3.1 JMS簡介
Java Message Service 規範 1.1 聲稱:JMS 是一組介面和相關語義,它定義了 JMS 客戶如何訪問企業訊息產品的功能。
在 JMS 之前,每一家 MOM 廠商都用專有 API 為應用程式提供對其產品的訪問,通常可用於許多種語言,其中包括 Java 語言。JMS 通過 MOM 產品為 Java 程式提供了一個傳送和接收訊息的標準的、便利的方法。用 JMS 編寫的程式可以在任何實現 JMS 標準的 MOM 上執行。
JMS 可移植性的關鍵在於:JMS API 是由 Sun 作為一組介面而提供的。提供了 JMS 功能的產品是通過提供一個實現這些介面的提供者來做到這一點的。開發人員可以通過定義一組訊息和一組交換這些訊息的客戶機應用程式建立 JMS 應用程式。[4]
JMS1.0版本於1998年推出,最新的版本是2002釋出的JMS 1.1規範。JMS支援訊息中介軟體的兩種傳遞模式:點到點模式和釋出-訂閱模式。在JMS 1.1以前的版本中,每一種都有自己的特定於該模式的一組客戶機介面。JMS1.1版本提供了單一的一組介面,它允許客戶機可以在兩個模式中傳送和接收訊息。這些“模式無關的介面”保留了每一個模式的語義和行為,是實現 JMS 客戶機的最好選擇。
統一模式的好處是:
1) 使得用於客戶機的程式設計更簡單。
2) 佇列和主題的操作可以是同一事務的一部分。
3) 為JMS提供者提供了優化其實現的機會。
3.2 JMS體系結構
3.2.1 JMS介面描述
JMS 支援兩種訊息型別PTP 和Pub/Sub,分別稱作:PTP Domain 和Pub/Sub Domain,這兩種介面都繼承統一的JMS Parent 介面,JMS 主要介面如下所示:
JMS Parent
PTP Domain
Pub/Sub Domain
ConnectionFactory
QueueConnectionFactory
TopicConnectionFactory
Connection
QueueConnection
TopicConnection
Destination
Queue
Topic
Session
QueueSession
TopicSession
MessageProducer
QueueSender
TopicPublisher
MessageConsumer
QueueReceiver
TopicSubscriber
以下是對這些介面的簡單描述:
ConnectionFactory:連線工廠,JMS 用它建立連線
Connection:JMS 客戶端到JMS Provider 的連線
Destination:訊息的目的地
Session:一個傳送或接收訊息的執行緒
MessageProducer: 由Session 物件建立的用來發送訊息的物件
MessageConsumer: 由Session 物件建立的用來接收訊息的物件
3.2.2 JMS訊息模型
JMS 訊息由以下幾部分組成:訊息頭,屬性,訊息體。
l 訊息頭(header):JMS訊息頭包含了許多欄位,它們是訊息傳送後由JMS提供者或訊息傳送者產生,用來表示訊息、設定優先權和失效時間等等,並且為訊息確定路由。
l 屬性(property):由訊息傳送者產生,用來新增刪除訊息頭以外的附加資訊。
l 訊息體(body):由訊息傳送者產生,JMS中定義了5種訊息體:ByteMessage、MapMessage、ObjectMessage、StreamMessage和TextMessage。
3.3 JMS程式設計實踐
廣義上說,一個JMS應用是幾個JMS 客戶端交換訊息,開發JMS客戶端應用由以下幾步構成:
1) 用JNDI 得到ConnectionFactory物件;
2) 用JNDI 得到目標佇列或主題物件,即Destination物件;
3) 用ConnectionFactory建立Connection 物件;
4) 用Connection物件建立一個或多個JMS Session;
5) 用Session 和Destination 建立MessageProducer和MessageConsumer;
6) 通知Connection 開始傳遞訊息。
3.3.1 訊息生產者程式設計
訊息生產者程式如下:
package org.jms.test;
import java.io.*;
import javax.jms.*;
import javax.naming.*;
public class Sender {
public static void main(String[] args) {
new Sender().send();
}
public void send() {
BufferedReader reader = new BufferedReader(new InputStreamReader(System.in));
try {
//Prompt for JNDI names
System.out.println("Enter ConnectionFactory name:");
String factoryName = reader.readLine();
System.out.println("Enter Destination name:");
String destinationName = reader.readLine();
//Look up administered objects
InitialContext initContext = new InitialContext();
ConnectionFactory factory =
(ConnectionFactory) initContext.lookup(factoryName);
Destination destination = (Destination) initContext.lookup(destinationName);
initContext.close();
//Create JMS objects
Connection connection = factory.createConnection();
Session session =
connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageProducer sender = session.createProducer(queue);
//Send messages
String messageText = null;
while (true) {
System.out.println("Enter message to send or 'quit':");
messageText = reader.readLine();
if ("quit".equals(messageText))
break;
TextMessage message = session.createTextMessage(messageText);
sender.send(message);
}
//Exit
System.out.println("Exiting...");
reader.close();
connection.close();
System.out.println("Goodbye!");
} catch (Exception e) {
e.printStackTrace();
System.exit(1);
}
}
}
3.3.2 訊息消費者程式設計
訊息消費者程式如下:
package compute;
import java.io.*;
import javax.jms.*;
import javax.naming.*;
public class Receiver implements MessageListener {
private boolean stop = false;
public static void main(String[] args) {
new Receiver().receive();
}
public void receive() {
BufferedReader reader = new BufferedReader(new InputStreamReader(System.in));
try {
//Prompt for JNDI names
System.out.println("Enter ConnectionFactory name:");
String factoryName = reader.readLine();
System.out.println("Enter Destination name:");
String destinationName = reader.readLine();
reader.close();
//Look up administered objects
InitialContext initContext = new InitialContext();
ConnectionFactory factory =
(ConnectionFactory) initContext.lookup(factoryName);
Destination destination = (Destination) initContext.lookup(destinationName);
initContext.close();
//Create JMS objects
Connection connection = factory.createConnection();
Session session =
connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageConsumer receiver = session.createConsumer(queue);
receiver.setMessageListener(this);
connection.start();
//Wait for stop
while (!stop) {
Thread.sleep(1000);
}
//Exit
System.out.println("Exiting...");
connection.close();
System.out.println("Goodbye!");
} catch (Exception e) {
e.printStackTrace();
System.exit(1);
}
}
public void onMessage(Message message) {
try {
String msgText = ((TextMessage) message).getText();
System.out.println(msgText);
if ("stop".equals(msgText))
stop = true;
} catch (JMSException e) {
e.printStackTrace();
stop = true;
}
}
}
以上程式都較為簡單,基本上為自解釋的。
四、訊息中介軟體總結
訊息中介軟體自從產生以來發展迅速,在分散式聯機事務處理環境中,它擔當通訊資源管理器(CRM)的角色,為分散式應用提供實時、高效、可靠的、跨越不同作業系統、不同網路的訊息傳遞服務,同時訊息中介軟體減少了開發跨平臺應用程式的複雜性。在要求可靠傳輸的系統中可以利用訊息中介軟體作為一個通訊平臺,嚮應用提供可靠傳輸功能來傳遞訊息和檔案。