1. 程式人生 > >主題:JMS的兩種訊息模型(Point-to-Point(P2P)和Publish/Subscribe(Pub/Sub))應用舉例...

主題:JMS的兩種訊息模型(Point-to-Point(P2P)和Publish/Subscribe(Pub/Sub))應用舉例...

1、P2P模型
在P2P模型中,有下列概念:訊息佇列(Queue)、傳送者(Sender)、接收者(Receiver)。每個訊息都被髮送到一個特定的佇列,接收者從佇列中獲取訊息。佇列保留著訊息,直到它們被消費或超時。
 每個訊息只有一個消費者(Consumer)(即一旦被消費,訊息就不再在訊息佇列中)
 傳送者和接收者之間在時間上沒有依賴性,也就是說當傳送者傳送了訊息之後,不管接收者有沒有正在執行,它不會影響到訊息被髮送到佇列。
 接收者在成功接收訊息之後需向佇列應答成功
如果你希望傳送的每個訊息都應該被成功處理的話,那麼你需要P2P模型。
舉例:
//註冊訊息監聽器,當有訊息傳送過來的時候會呼叫onMessage方法(實現MessageListener 介面)
Java程式碼
複製程式碼
  1. import javax.ejb.ActivationConfigProperty;   
  2. import javax.ejb.MessageDriven;   
  3. import javax.jms.JMSException;   
  4. import javax.jms.Message;   
  5. import javax.jms.MessageListener;   
  6. import javax.jms.TextMessage;   
  7. @MessageDriven(activationConfig={   
  8. @ActivationConfigProperty(propertyName=
    "destinationType",propertyValue="javax.jms.Queue"),   
  9. @ActivationConfigProperty(propertyName="destination", propertyValue="queue/myqueue")   
  10.     }   
  11. )   
  12. publicclass QueueMessageBean implements MessageListener {   
  13. publicvoid onMessage(Message msg) {   
  14. //共有下面幾種訊息型別
  15. //1 Text
  16. //2 Map
  17. //3 Object
  18. //4 stream
  19. //5 byte
  20.         TextMessage txtMsg = (TextMessage)msg;   
  21.         String s = "";   
  22. try {   
  23.             s = txtMsg.getText();   
  24.         } catch (JMSException e) {   
  25.             e.printStackTrace();   
  26.         }   
  27.         System.out.println("QueueMessageBean接收到了訊息:" + s);   
  28.     }   
  29. }   
  30. //客戶端呼叫
  31. import javax.jms.Message;   
  32. import javax.jms.MessageProducer;   
  33. import javax.jms.Queue;   
  34. import javax.jms.QueueConnection;   
  35. import javax.jms.QueueConnectionFactory;   
  36. import javax.jms.QueueSession;   
  37. import javax.naming.InitialContext;   
  38. publicclass Test {   
  39. publicstaticvoid main(String[] args) throws Exception {   
  40.     InitialContext ctx = new InitialContext();   
  41. //獲得QueueConnectionFactory物件
  42.     QueueConnectionFactory factory = (QueueConnectionFactory) ctx.lookup("QueueConnectionFactory");   
  43. //建立QueueConnection對像 
  44.     QueueConnection connection = factory.createQueueConnection();   
  45. //建立會話
  46. //arg1:與事物有關,true表示最後提交,false表示自動提交
  47. //arg2:表示訊息向中介軟體傳送確認通知,這裡採用的是自動通知的型別
  48.     QueueSession session = (QueueSession) connection.createQueueSession(false, QueueSession.AUTO_ACKNOWLEDGE);   
  49. //取得destination
  50.     Queue queue = (Queue) ctx.lookup("queue/myqueue");   
  51. //訊息生產者
  52.     MessageProducer sender = session.createProducer(queue);   
  53. //定義訊息
  54.     Message msg = session.createTextMessage("訊息來了");   
  55. //傳送訊息
  56.     sender.send(queue, msg);   
  57.     session.close();   
  58.     connection.close();   
  59.     }   
  60. }  
import javax.ejb.ActivationConfigProperty;
import javax.ejb.MessageDriven;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.TextMessage;

@MessageDriven(activationConfig={
			@ActivationConfigProperty(propertyName="destinationType",propertyValue="javax.jms.Queue"),
			@ActivationConfigProperty(propertyName="destination", propertyValue="queue/myqueue")
	}
)
public class QueueMessageBean implements MessageListener {

	public void onMessage(Message msg) {
		//共有下面幾種訊息型別
		//1 Text
		//2 Map
		//3 Object
		//4 stream
		//5 byte
		TextMessage txtMsg = (TextMessage)msg;
		String s = "";
		try {
			s = txtMsg.getText();
		} catch (JMSException e) {
			e.printStackTrace();
		}
		System.out.println("QueueMessageBean接收到了訊息:" + s);
	}
}
//客戶端呼叫
import javax.jms.Message;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.QueueConnection;
import javax.jms.QueueConnectionFactory;
import javax.jms.QueueSession;
import javax.naming.InitialContext;


public class Test {
	public static void main(String[] args) throws Exception {
	InitialContext ctx = new InitialContext();
	//獲得QueueConnectionFactory物件
	QueueConnectionFactory factory = (QueueConnectionFactory) ctx.lookup("QueueConnectionFactory");
	//建立QueueConnection對像 
	QueueConnection connection = factory.createQueueConnection();
	//建立會話
	//arg1:與事物有關,true表示最後提交,false表示自動提交
	//arg2:表示訊息向中介軟體傳送確認通知,這裡採用的是自動通知的型別
	QueueSession session = (QueueSession) connection.createQueueSession(false, QueueSession.AUTO_ACKNOWLEDGE);
	//取得destination
	Queue queue = (Queue) ctx.lookup("queue/myqueue");
	//訊息生產者
	MessageProducer sender = session.createProducer(queue);
	//定義訊息
	Message msg = session.createTextMessage("訊息來了");
	//傳送訊息
	sender.send(queue, msg);
	session.close();
	connection.close();
		
	}
}



2、Pub/Sub模式
在Pub/Sub模型中,有下列概念: 主題(Topic)、釋出者(Publisher)、訂閱者(Subscriber)。客戶端將訊息傳送到主題。多個釋出者將訊息傳送到Topic,系統將這些訊息傳遞給多個訂閱者。
 每個訊息可以有多個消費者
 釋出者和訂閱者之間有時間上的依賴性。針對某個主題(Topic)的訂閱者,它必須建立一個訂閱之後,才能消費釋出者的訊息,而且,為了消費訊息,訂閱者必須保持執行的狀態。
當然,為了緩和這種嚴格的時間相關性,JMS允許訂閱者建立一個可持久化的訂閱。這樣,即使訂閱者沒有被啟用(執行),它也能接收到釋出者的訊息。
如果你希望傳送的訊息可以不被做任何處理、或者被一個消費者處理、或者可以被多個消費者處理的話,那麼可以採用Pub/Sub模型。

//註冊訊息監聽器,當有訊息傳送過來的時候會呼叫onMessage方法(實現MessageListener 介面)

Java程式碼 複製程式碼
  1. import javax.ejb.ActivationConfigProperty;   
  2. import javax.ejb.MessageDriven;   
  3. import javax.jms.JMSException;   
  4. import javax.jms.Message;   
  5. import javax.jms.MessageListener;   
  6. import javax.jms.TextMessage;   
  7. @MessageDriven(activationConfig={   
  8. @ActivationConfigProperty(propertyName="destinationType",propertyValue="javax.jms.Topic"),   
  9. @ActivationConfigProperty(propertyName="destination", propertyValue="topic/myTopic")   
  10.     }   
  11. )   
  12. publicclass TopicMessageBean implements MessageListener {   
  13. publicvoid onMessage(Message msg) {   
  14. //共有下面幾種訊息型別
  15. //1 Text
  16. //2 Map
  17. //3 Object
  18. //4 stream
  19. //5 byte
  20.         TextMessage txtMsg = (TextMessage)msg;   
  21.         String s = "";   
  22. try {   
  23.             s = txtMsg.getText();   
  24.         } catch (JMSException e) {   
  25.             e.printStackTrace();   
  26.         }   
  27.         System.out.println("TopicMessageBean接收到了訊息:" + s);   
  28.     }   
  29. }   
  30. //客戶端測試
  31. import javax.jms.MessageProducer;   
  32. import javax.jms.Topic;   
  33. import javax.jms.TopicConnection;   
  34. import javax.jms.TopicConnectionFactory;   
  35. import javax.jms.TopicSession;   
  36. import javax.naming.InitialContext;   
  37. publicclass Test {   
  38. publicstaticvoid main(String[] args) throws Exception {   
  39.     InitialContext ctx = new InitialContext();   
  40. //獲得QueueConnectionFactory物件
  41.     TopicConnectionFactory factory = (TopicConnectionFactory) ctx.lookup("TopicConnectionFactory");   
  42. //建立QueueConnection對像 
  43.     TopicConnection connection = factory.createTopicConnection();   
  44. //建立會話
  45. //arg1:與事物有關,true表示最後提交,false表示自動提交
  46. //arg2:表示訊息向中介軟體傳送確認通知,這裡採用的是自動通知的型別
  47.     TopicSession session = (TopicSession) connection.createTopicSession(false, TopicSession.AUTO_ACKNOWLEDGE);   
  48. //取得destination
  49.     Topic queue = (Topic) ctx.lookup("topic/myTopic");   
  50. //訊息生產者
  51.     MessageProducer publisher = session.createProducer(queue);   
  52. //定義訊息
  53.     Message msg = session.createTextMessage("訊息來了");   
  54. //傳送訊息
  55.     publisher.send(queue, msg);   
  56.     session.close();   
  57.     connection.close();   
  58.     }   
  59. }  
import javax.ejb.ActivationConfigProperty;
import javax.ejb.MessageDriven;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.TextMessage;

@MessageDriven(activationConfig={
			@ActivationConfigProperty(propertyName="destinationType",propertyValue="javax.jms.Topic"),
			@ActivationConfigProperty(propertyName="destination", propertyValue="topic/myTopic")
	}
)
public class TopicMessageBean implements MessageListener {

	public void onMessage(Message msg) {
		//共有下面幾種訊息型別
		//1 Text
		//2 Map
		//3 Object
		//4 stream
		//5 byte
		TextMessage txtMsg = (TextMessage)msg;
		String s = "";
		try {
			s = txtMsg.getText();
		} catch (JMSException e) {
			e.printStackTrace();
		}
		System.out.println("TopicMessageBean接收到了訊息:" + s);
	}
}

//客戶端測試
import javax.jms.MessageProducer;
import javax.jms.Topic;
import javax.jms.TopicConnection;
import javax.jms.TopicConnectionFactory;
import javax.jms.TopicSession;
import javax.naming.InitialContext;


public class Test {
	public static void main(String[] args) throws Exception {
	InitialContext ctx = new InitialContext();
	//獲得QueueConnectionFactory物件
	TopicConnectionFactory factory = (TopicConnectionFactory) ctx.lookup("TopicConnectionFactory");
	//建立QueueConnection對像 
	TopicConnection connection = factory.createTopicConnection();
	//建立會話
	//arg1:與事物有關,true表示最後提交,false表示自動提交
	//arg2:表示訊息向中介軟體傳送確認通知,這裡採用的是自動通知的型別
	TopicSession session = (TopicSession) connection.createTopicSession(false, TopicSession.AUTO_ACKNOWLEDGE);
	//取得destination
	Topic queue = (Topic) ctx.lookup("topic/myTopic");
	//訊息生產者
	MessageProducer publisher = session.createProducer(queue);
	//定義訊息
	Message msg = session.createTextMessage("訊息來了");
	//傳送訊息
	publisher.send(queue, msg);
	session.close();
	connection.close();
		
	}
}



二種模型的實現結果:對於p2p模型的每個訊息只能有一個消費者  如果我們定義二個訊息接受者的Bean那麼只能有一端會接收到訊息。當你把部署在Jboss中的訊息接收Bean去掉以後,然後傳送訊息 此時訊息在佇列中,一旦你重新部署他會立刻就接收到剛剛傳送的訊息所以它沒有時間的依賴性, pub/sub模型可以有多個消費者 在這個模型中如果我們定義多個接收訊息的Bean當我們在客戶端傳送訊息的時候二個bean都會接收到訊息,所以他有多個消費者 但是如果你把Jboss部署中的訊息接收bean去掉之後,傳送訊息。然後在重新部署,那麼訊息也無法接收到,所以說他有時間的依賴性。

//程式碼中幾個概念的理解
Connection Factory
建立Connection物件的工廠,針對兩種不同的JMS訊息模型,分別有QueueConnectionFactory和TopicConnectionFactory兩種。可以通過JNDI來查詢ConnectionFactory物件。

Destination
Destination的意思是訊息生產者的訊息傳送目標或者說訊息消費者的訊息來源。對於訊息生產者來說,它的Destination是某個佇列(Queue)或某個主題(Topic);對於訊息消費者來說,它的Destination也是某個佇列或主題(即訊息來源)。

所以,Destination實際上就是兩種型別的物件:Queue、Topic。

可以通過JNDI來查詢Destination。

Connection:
Connection表示在客戶端和JMS系統之間建立的連結(對TCP/IP socket的包裝)。Connection可以產生一個或多個Session。跟ConnectionFactory一樣,Connection也有兩種型別:QueueConnection和TopicConnection。
Session:
Session是我們操作訊息的介面。可以通過session建立生產者、消費者、訊息等。Session提供了事務的功能。當我們需要使用session傳送/接收多個訊息時,可以將這些傳送/接收動作放到一個事務中。同樣,也分QueueSession和TopicSession。
訊息生產者:
訊息生產者由Session建立,並用於將訊息傳送到Destination。同樣,訊息生產者分兩種型別:QueueSender和TopicPublisher。可以呼叫訊息生產者的方法(send或publish方法)傳送訊息!
訊息消費者:
訊息消費者由Session建立,用於接收被髮送到Destination的訊息。兩種型別:QueueReceiver和TopicSubscriber。可分別通過session的createReceiver(Queue)或createSubscriber(Topic)來建立。當然,也可以通過session的createDurableSubscriber方法來建立持久化的訂閱者。
MessageListener:
訊息監聽器。如果註冊了訊息監聽器,一旦訊息到達,將自動呼叫監聽器的onMessage方法。EJB中的MDB(Message-Driven Bean)就是一種MessageListener。

MDB介紹:
對客戶端來說,message-driven bean就是非同步訊息的消費者。當訊息到達之後,由容器負責呼叫MDB。客戶端傳送訊息到destination,MDB作為一個MessageListener接收訊息。