1. 程式人生 > >Java訊息服務JMS詳解

Java訊息服務JMS詳解

JMS:

     Java訊息服務(Java Message Service)

JMS是用於訪問企業訊息系統的開發商中立的API。企業訊息系統可以協助應用軟體通過網路進行訊息互動。

 

JMS的程式設計過程很簡單,概括為:應用程式A傳送一條訊息到訊息伺服器的某個目得地(Destination),然後訊息伺服器把訊息轉發給應用程式B。因為應用程式A和應用程式B沒有直接的程式碼關連,所以兩者實現瞭解偶。

 

 

訊息驅動bean(message-driven bean)

   它是專門用於非同步處理java訊息的元件.具有處理大量併發訊息的能力.

 

何時使用JMS:

在某些情況下,由於SessionBean方法的執行時間比較長,這就需要非同步地呼叫該方法,否則客戶端就需要等待比較長的時間。要實現非同步呼叫, 就需要使用訊息驅動Bean。

訊息驅動Bean的基本原理是客戶端向訊息伺服器傳送一條訊息後,訊息伺服器會將該訊息儲存在訊息佇列中。在這時消 息伺服器中的某個消費者(讀取並處理訊息的物件)會讀取該訊息,並進行處理。傳送訊息的客戶端被稱為訊息生產者。

 

JMS中的訊息

訊息傳遞系統的中心就是訊息。一條 Message 由三個部分組成: 

頭(header),屬性(property)和主體(body)。

 

訊息有下面幾種型別,他們都是派生自 Message 介面。

StreamMessage:一種主體中包含 Java 基元值流的訊息。其填充和讀取均按順序進行。

MapMessage:一種主體中包含一組名-值對的訊息。沒有定義條目順序。

TextMessage:一種主體中包含 Java 字串的訊息(例如,XML 訊息)。

ObjectMessage:一種主體中包含序列化 Java 物件的訊息。

BytesMessage:一種主體中包含連續位元組流的訊息

 JMS訊息詳解

 

訊息的傳遞模型

JMS 支援兩種訊息傳遞模型:點對點(point-to-point,簡稱 PTP)和釋出/訂閱(publish/subscribe,簡稱 pub/sub)。

 

這兩種訊息傳遞模型非常相似,但有以下區別:

PTP 訊息傳遞模型規定了一條訊息只能傳遞給一個接收方。 採用javax.jms.Queue 表示。

Pub/sub 訊息傳遞模型允許一條訊息傳遞給多個接收方。採用javax.jms.Topic表示

 

這兩種模型都通過擴充套件公用基類來實現。例如:javax.jms.Queue 和javax.jms.Topic 都擴充套件自javax.jms.Destination 類。

 

 

配置目標地址

開始JMS程式設計前,我們需要先配置訊息到達的目標地址(Destination),因為只有目標地址存在了,我們才能傳送訊息到這個地址。由於每個應用伺服器關於目標地址的配置方式都有所不同,下面以jboss為例,配置一個queue型別的目標地址。

Xml程式碼  
<?xml version="1.0" encoding="UTF-8"?>  
<server>    
    <mbean code="org.jboss.mq.server.jmx.Queue"  
         name="jboss.mq.destination:service=Queue,name=foshanshop">  
        <attribute name="JNDIName">queue/foshanshop</attribute>     
        <depends optional-attribute-name="DestinationManager">jboss.mq:service=DestinationManager</depends>  
    </mbean>  
</server>  
 

 

(專案中用的這個:

Xml程式碼  
<?xml version="1.0" encoding="UTF-8" standalone="yes"?>  
<server>  
    <mbean xmbean-dd="xmdesc/Queue-xmbean.xml" name="jboss.messaging.destination:service=Queue,name=InstanceQueue" code="org.jboss.jms.server.destination.QueueService">  
        <attribute name="JNDIName">/queues/InstanceQueue</attribute>  
        <depends optional-attribute-name="ServerPeer">jboss.messaging:service=ServerPeer</depends>  
        <depends>jboss.messaging:service=PostOffice</depends>  
    </mbean>  
</server>  
 

放在server\default\deploy\queues\InstanceQueue-service.xml 中)

 模版可以在E:\jboss-5.1.0.GA-jdk6\jboss-5.1.0.GA\docs\examples\jms中的example-destinations-service.xml中找到。

 

Jboss使用一個XML檔案配置佇列地址,檔案的取名格式應遵守*-service.xml

<attribute name="JNDIName">屬性指定了該目標地址的全域性JNDI名稱。如果你不指定JNDIName屬性,jboss會為你生成一個預設的全域性JNDI,其名稱由“queue”+“/”+目標地址名稱組成。另外在任何佇列或主題被部署之前,應用伺服器必須先部署Destination Manager Mbean,所以我們通過<depends>節點宣告這一依賴。

 

 

在java類中傳送訊息

一般傳送訊息有以下步驟:

(1) 得到一個JNDI初始化上下文(Context)

InitialContext ctx = new InitialContext();

(2) 根據上下文查詢一個連線工廠 QueueConnectionFactory 。該連線工廠是由JMS提供的,不需我們自己建立,每個廠商都為它綁定了一個全域性JNDI,我們通過它的全域性JNDI便可獲取它;

QueueConnectionFactory factory = (QueueConnectionFactory) ctx.lookup("QueueConnectionFactory");

(3) 從連線工廠得到一個連線 QueueConnection

conn = factory.createQueueConnection();

(4) 通過連線來建立一個會話(Session); 

session = conn.createQueueSession(false, QueueSession.AUTO_ACKNOWLEDGE);

這句程式碼意思是:建立不需要事務的並且能自動確認訊息已接收的會話。

(5) 查詢目標地址:

Destination destination = (Destination ) ctx.lookup("queue/foshanshop"); //上面配置的那個目標地址

(6) 根據會話以及目標地址來建立訊息生產者MessageProducer (QueueSender和TopicPublisher都擴充套件自MessageProducer介面)

 

例子對應程式碼:

Java程式碼  
MessageProducer producer = session.createProducer(destination);  
TextMessage msg = session.createTextMessage("您好,這是我的第一個訊息驅動Bean");  
producer.send(msg);  
 

專案用:

 

JMS工廠和佇列JNDI配在配置檔案jmsqueue.properties裡,內容如下:

Xml程式碼  
connectionFactoryName=ConnectionFactory    
queueName=/queues/InstanceQueue   
 

Java程式碼  
/** 
 * 目的:讀取jmsqueue.properties中訊息佇列的資訊,初始化訊息佇列,提供傳送訊息的函式 
 * 
 */  
public class MsgQueueSender {  
      
    private static final Logger logger = LoggerFactory.getLogger(MsgQueueSender.class);  
  
    private static final MsgQueueSender ms = new MsgQueueSender();  
  
    private Properties info = new Properties();  
  
    /** 
     * jms 
     */  
    private QueueConnection conn;  
      
    private Queue que;  
  
    private MsgQueueSender() {  
        initJMSInfo();  
        initMsgQueue();  
    }  
  
    /** 
     * 初始化jndi佇列 
     *  
     */  
    private void initMsgQueue() {  
          
        InitialContext iniCtx;  
        try {  
            iniCtx = new InitialContext();  
            Object tmp = iniCtx.lookup(info.getProperty("connectionFactoryName", "ConnectionFactory"));  
            QueueConnectionFactory qcf = (QueueConnectionFactory) tmp;  
            conn = qcf.createQueueConnection();  
            que = (Queue) iniCtx.lookup(info.getProperty("queueName", "/queues/InstanceQueue"));  
            conn.start();  
        } catch (Exception e) {  
            logger.error("[MsgQueueSender.initMsgQueue] \u65e0\u6cd5\u8fde\u63a5\u5230\u6d88\u606f\u670d\u52a1\u5668\uff0c\u8bf7\u68c0\u67e5jms\u914d\u7f6e\u548c\u670d\u52a1\u7aef" + e.getMessage(),e);  
        }  
  
    }  
  
    /** 
     * 讀取jms配置 
     */  
    private void initJMSInfo() {  
        InputStream is = this.getClass().getClassLoader().getResourceAsStream("jmsqueue.properties");  
        if (is != null) {  
            try {  
                info.load(is);  
            } catch (IOException e) {  
                logger.error("[MsgQueueSender.initJMSInfo] \u8bfb\u53d6jmsqueue.properties\u51fa\u9519\uff0c\u5c06\u4f7f\u7528\u9ed8\u8ba4\u914d\u7f6e " + e.getMessage(),e);  
            }  
        }  
  
    }  
  
    public static MsgQueueSender getInstance() {  
        return ms;  
    }  
      
    public void sendTextMsg(String msg) throws JMSException{  
        QueueSession session = conn.createQueueSession(false,QueueSession.AUTO_ACKNOWLEDGE);  
        session.createSender(que).send(session.createTextMessage(msg));  
        session.close();  
    }  
      
    public void sendObjMsg(Serializable obj) throws JMSException{  
        QueueSession session = conn.createQueueSession(false,QueueSession.AUTO_ACKNOWLEDGE);  
        session.createSender(que).send(session.createObjectMessage(obj));  
        session.close();  
    }  
  
}  
 

 

採用訊息驅動Bean (Message Driven Bean)接收訊息

 

訊息驅動Bean(MDB)是設計用來專門處理基於訊息請求的元件。它和無狀態Session Bean一樣也使用了例項池技術,容器可以使用一定數量的bean例項併發處理成百上千個JMS訊息。正因為MDB具有處理大量併發訊息的能力,所以非常適合應用在一些訊息閘道器產品。如果一個業務執行的時間很長,而執行結果無需實時向用戶反饋時,也很適合使用MDB。如訂單成功後給使用者傳送一封電子郵件或傳送一條簡訊等。

 

一個MDB通常要實現MessageListener介面,該介面定義了onMessage()方法。Bean通過它來處理收到的JMS訊息。

Java程式碼  
package javax.jms;  
public interface MessageListener {  
    public void onMessage(Message message);  
}  
 

當容器檢測到bean守候的目標地址有訊息到達時,容器呼叫onMessage()方法,將訊息作為引數傳入MDB。MDB在onMessage()中決定如何處理該訊息。你可以使用註釋指定MDB監聽哪一個目標地址(Destination)。當MDB部署時,容器將讀取其中的配置資訊。

Java程式碼  
@MessageDriven(activationConfig =  
{  
  @ActivationConfigProperty(propertyName="destinationType",  
    propertyValue="javax.jms.Queue"),  
  @ActivationConfigProperty(propertyName="destination",  
    propertyValue="queue/foshanshop"),  
  @ActivationConfigProperty(propertyName="acknowledgeMode",   
    propertyValue="Auto-acknowledge")  
})  
public class PrintBean implements MessageListener {  
  
    public void onMessage(Message msg) {  
      
    }  
}  
 

 

專案中用:

META-INF下:

ejb-jar.xml:

Xml程式碼  
<?xml version="1.0" encoding="UTF-8"?>  
<ejb-jar xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"  
    xmlns="http://java.sun.com/xml/ns/javaee" xmlns:ejb="http://java.sun.com/xml/ns/javaee/ejb-jar_3_0.xsd"  
    xsi:schemaLocation="http://java.sun.com/xml/ns/javaee http://java.sun.com/xml/ns/javaee/ejb-jar_3_0.xsd"  
    version="3.0">  
    <display-name>msgreceiver</display-name>  
    <enterprise-beans>  
        <message-driven>  
            <display-name>instanceMDB</display-name>  
            <ejb-name>instanceMDB</ejb-name>  
            <ejb-class>com.project.soa.msgreceiver.InstanceReceiver</ejb-class>  
            <activation-config>  
                <activation-config-property>  
                    <activation-config-property-name>destinationType</activation-config-property-name>  
                    <activation-config-property-value>javax.jms.Queue</activation-config-property-value>  
                </activation-config-property>  
                <activation-config-property>  
                    <activation-config-property-name>destination</activation-config-property-name>  
                    <activation-config-property-value>/queues/InstanceQueue</activation-config-property-value>  
                </activation-config-property>  
            </activation-config>  
        </message-driven>  
    </enterprise-beans>  
</ejb-jar>  
 

persistence.xml:

Xml程式碼  
<?xml version="1.0" encoding="UTF-8"?>  
<persistence xmlns="http://java.sun.com/xml/ns/persistence"  
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"  
    xsi:schemaLocation="http://java.sun.com/xml/ns/persistence http://java.sun.com/xml/ns/persistence/persistence_1_0.xsd"  
    version="1.0">  
    <persistence-unit name="msgreceiver-ds">  
        <!-- 資料來源:server\default\deploy\datasources\visesbdb-ds.xml 中jndi-name為datasources/visesbdb -->  
        <jta-data-source>java:/datasources/visesbdb</jta-data-source>  
          
        <jar-file>com.project.soa.bean-2.2.0.jar</jar-file>  
        <properties>  
            <property name="hibernate.hbm2ddl.auto" value="none" />  
            <property name="hibernate.show_sql" value="false" />  
            <property name="hibernate.format_sql" value="false" />  
        </properties>  
    </persistence-unit>  
</persistence>  
 

Java程式碼  
public class InstanceReceiver implements javax.jms.MessageListener {  
      
    private static final Logger logger = LoggerFactory.getLogger(InstanceReceiver.class);  
  
    @EJB(name="InstanceService")  
    private InstanceService is;  
      
    @Override  
    public void onMessage(Message msg) {  
        try {  
            is.processMsg(((ObjectMessage)msg).getObject());  
        } catch (JMSException e) {  
            logger.error("[InstanceReceiver.onMessage] " + e.getMessage());  
            e.printStackTrace();  
        }  
    }  
}  
  
  
@Stateless  
@Local ({InstanceService.class})   
public class InstanceServiceImpl implements InstanceService{  
      
    private static final Logger logger = LoggerFactory.getLogger(InstanceServiceImpl.class);  
      
    @PersistenceContext  
    private EntityManager em;  
          
    ...  
}  
 

JMS中訊息的 同步消費 和 非同步消費

 

同步消費 比如 

connection.start();

Message message=queueReceiver.receive();

 

同步消費 receive 就執行一次,並返回message物件。

同步消費中,訊息的接收者會一直等待下去,直到有訊息到達,或者超時。

 

非同步消費 比如

connection.start();

receiver.setMessageListener(new MyMessageListener());  //MyMessageListener實現了MessageListener介面

System.in.read();  //這句話是為了人為的阻塞程式不然 還沒接收到訊息 ,程式一下子就執行完了,關閉了。

 

非同步消費會註冊一個監聽器,當有訊息到達的時候,會回撥它的onMessage()方法,沒有次數限制
--------------------- 
作者:一杯甜酒 
來源:CSDN 
原文:https://blog.csdn.net/u012562943/article/details/49636779 
版權宣告:本文為博主原創文章,轉載請附上博文連結!