1. 程式人生 > >activemq的幾種基本通訊方式總結

activemq的幾種基本通訊方式總結

簡介

     在前面一篇文章裡討論過幾種應用系統整合的方式,發現實際上面向訊息佇列的整合方案算是一個總體比較合理的選擇。這裡,我們先針對具體的一個訊息佇列Activemq的基本通訊方式進行探討。activemq是JMS訊息通訊規範的一個實現。總的來說,訊息規範裡面定義最常見的幾種訊息通訊模式主要有釋出-訂閱、點對點這兩種。另外,通過結合這些模式的具體應用,我們在處理某些應用場景的時候也衍生出來了一種請求應答的模式。下面,我們針對這幾種方式一一討論一下。

基礎流程

    在討論具體方式的時候,我們先看看使用activemq需要啟動服務的主要過程。

    按照JMS的規範,我們首先需要獲得一個JMS connection factory.,通過這個connection factory來建立connection.在這個基礎之上我們再建立session, destination, producer和consumer。因此主要的幾個步驟如下:

1. 獲得JMS connection factory. 通過我們提供特定環境的連線資訊來構造factory。

2. 利用factory構造JMS connection

3. 啟動connection

4. 通過connection建立JMS session.

5. 指定JMS destination.

6. 建立JMS producer或者建立JMS message並提供destination.

7. 建立JMS consumer或註冊JMS message listener.

8. 傳送和接收JMS message.

9. 關閉所有JMS資源,包括connection, session, producer, consumer等。

publish-subscribe

     釋出訂閱模式有點類似於我們日常生活中訂閱報紙。每年到年尾的時候,郵局就會發一本報紙集合讓我們來選擇訂閱哪一個。在這個表裡頭列了所有出版發行的報紙,那麼對於我們每一個訂閱者來說,我們可以選擇一份或者多份報紙。比如北京日報、瀟湘晨報等。那麼這些個我們訂閱的報紙,就相當於釋出訂閱模式裡的topic。有很多個人訂閱報紙,也有人可能和我訂閱了相同的報紙。那麼,在這裡,相當於我們在同一個topic裡註冊了。對於一份報紙發行方來說,它和所有的訂閱者就構成了一個1對多的關係。這種關係如下圖所示:

     現在,假定我們用前面討論的場景來寫一個簡單的示例。我們首先需要定義的是publisher.

publisher

     publisher是屬於釋出資訊的一方,它通過定義一個或者多個topic,然後給這些topic傳送訊息。

    publisher的建構函式如下:

Java程式碼  收藏程式碼
  1. public Publisher() throws JMSException {  
  2.         factory = new ActiveMQConnectionFactory(brokerURL);  
  3.         connection = factory.createConnection();  
  4.         try {  
  5.         connection.start();  
  6.         } catch (JMSException jmse) {  
  7.             connection.close();  
  8.             throw jmse;  
  9.         }  
  10.         session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);  
  11.         producer = session.createProducer(null);  
  12.     }  

     我們按照前面說的流程定義了基本的connectionFactory, connection, session, producer。這裡程式碼就是主要實現初始化的效果。

    接著,我們需要定義一系列的topic讓所有的consumer來訂閱,設定topic的程式碼如下:

Java程式碼  收藏程式碼
  1. protected void setTopics(String[] stocks) throws JMSException {  
  2.     destinations = new Destination[stocks.length];  
  3.     for(int i = 0; i < stocks.length; i++) {  
  4.         destinations[i] = session.createTopic("STOCKS." + stocks[i]);  
  5.     }  
  6. }  

     這裡destinations是一個內部定義的成員變數Destination[]。這裡我們總共定義了的topic數取決於給定的引數stocks。

     在定義好topic之後我們要給這些指定的topic發訊息,具體實現的程式碼如下:

Java程式碼  收藏程式碼
  1. protected void sendMessage(String[] stocks) throws JMSException {  
  2.     for(int i = 0; i < stocks.length; i++) {  
  3.         Message message = createStockMessage(stocks[i], session);  
  4.         System.out.println("Sending: " + ((ActiveMQMapMessage)message).getContentMap() + " on destination: " + destinations[i]);  
  5.         producer.send(destinations[i], message);  
  6.     }  
  7. }  
  8. protected Message createStockMessage(String stock, Session session) throws JMSException {  
  9.     MapMessage message = session.createMapMessage();  
  10.     message.setString("stock", stock);  
  11.     message.setDouble("price"1.00);  
  12.     message.setDouble("offer"0.01);  
  13.     message.setBoolean("up"true);  
  14.     return message;  
  15. }  

     前面的程式碼很簡單,在sendMessage方法裡我們遍歷每個topic,然後給每個topic傳送定義的Message訊息。

    在定義好前面傳送訊息的基礎之後,我們呼叫他們的程式碼就很簡單了:

Java程式碼  收藏程式碼
  1. public static void main(String[] args) throws JMSException {  
  2.     if(args.length < 1)  
  3.         throw new IllegalArgumentException();  
  4.         // Create publisher       
  5.         Publisher publisher = new Publisher();  
  6.         // Set topics  
  7.     publisher.setTopics(args);  
  8.     for(int i = 0; i < 10; i++) {  
  9.         publisher.sendMessage(args);  
  10.         System.out.println("Publisher '" + i + " price messages");  
  11.         try {  
  12.             Thread.sleep(1000);  
  13.         } catch(InterruptedException e) {  
  14.             e.printStackTrace();  
  15.         }  
  16.     }  
  17.     // Close all resources  
  18.     publisher.close();  
  19. }  

     呼叫他們的程式碼就是我們遍歷所有topic,然後通過sendMessage傳送訊息。在傳送一個訊息之後先sleep1秒鐘。要注意的一個地方就是我們使用完資源之後必須要使用close方法將這些資源關閉釋放。close方法關閉資源的具體實現如下:

Java程式碼  收藏程式碼
  1. public void close() throws JMSException {  
  2.     if (connection != null) {  
  3.         connection.close();  
  4.      }  
  5. }  

consumer

    Consumer的程式碼也很類似,具體的步驟無非就是1.初始化資源。 2. 接收訊息。 3. 必要的時候關閉資源。

    初始化資源可以放到建構函式裡面:

Java程式碼  收藏程式碼
  1. public Consumer() throws JMSException {  
  2.         factory = new ActiveMQConnectionFactory(brokerURL);  
  3.         connection = factory.createConnection();  
  4.         connection.start();  
  5.         session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);  
  6.     }  

     接收和處理訊息的方法有兩種,分為同步和非同步的,一般同步的方式我們是通過MessageConsumer.receive()方法來處理接收到的訊息。而非同步的方法則是通過註冊一個MessageListener的方法,使用MessageConsumer.setMessageListener()。這裡我們採用非同步的方式實現:

Java程式碼  收藏程式碼
  1. public static void main(String[] args) throws JMSException {  
  2.     Consumer consumer = new Consumer();  
  3.     for (String stock : args) {  
  4.     Destination destination = consumer.getSession().createTopic("STOCKS." + stock);  
  5.     MessageConsumer messageConsumer = consumer.getSession().createConsumer(destination);  
  6.     messageConsumer.setMessageListener(new Listener());  
  7.     }  
  8. }  
  9. public Session getSession() {  
  10.     return session;  
  11. }  

     在前面的程式碼裡我們先找到同樣的topic,然後遍歷所有的topic去獲得訊息。對於訊息的處理我們專門通過Listener物件來負責。

    Listener物件的職責很簡單,主要就是處理接收到的訊息:

Java程式碼  收藏程式碼
  1. public class Listener implements MessageListener {  
  2.     public void onMessage(Message message) {  
  3.         try {  
  4.             MapMessage map = (MapMessage)message;  
  5.             String stock = map.getString("stock");  
  6.             double price = map.getDouble("price");  
  7.             double offer = map.getDouble("offer");  
  8.             boolean up = map.getBoolean("up");  
  9.             DecimalFormat df = new DecimalFormat( "#,###,###,##0.00" );  
  10.             System.out.println(stock + "\t" + df.format(price) + "\t" + df.format(offer) + "\t" + (up?"up":"down"));  
  11.         } catch (Exception e) {  
  12.             e.printStackTrace();  
  13.         }  
  14.     }  
  15. }  

    它實現了MessageListener介面,裡面的onMessage方法就是在接收到訊息之後會被呼叫的方法。

    現在,通過實現前面的publisher和consumer我們已經實現了pub-sub模式的一個例項。仔細回想它的步驟的話,主要就是要兩者設定一個共同的topic,有了這個topic之後他們可以實現一方發訊息另外一方接收。另外,為了連線到具體的message server,這裡是使用了連線tcp://localhost:16161作為定義ActiveMQConnectionFactory的路徑。在publisher端通過session建立producer,根據指定的引數建立destination,然後將訊息和destination作為producer.send()方法的引數發訊息。在consumer端也要建立類似的connection, session。通過session得到destination,再通過session.createConsumer(destination)來得到一個MessageConsumer物件。有了這個MessageConsumer我們就可以自行選擇是直接同步的receive訊息還是註冊listener了。

p2p

    p2p的過程則理解起來更加簡單。它好比是兩個人打電話,這兩個人是獨享這一條通訊鏈路的。一方傳送訊息,另外一方接收,就這麼簡單。在實際應用中因為有多個使用者對使用p2p的鏈路,它的通訊場景如下圖所示:

    我們再來看看一個p2p的示例:

    在p2p的場景裡,相互通訊的雙方是通過一個類似於佇列的方式來進行交流。和前面pub-sub的區別在於一個topic有一個傳送者和多個接收者,而在p2p裡一個queue只有一個傳送者和一個接收者。

傳送者

    和前面的示例非常相似,我們建構函式裡需要初始化的內容基本上差不多:

Java程式碼  收藏程式碼
  1. public Publisher() throws JMSException {  
  2.     factory = new ActiveMQConnectionFactory(brokerURL);  
  3.     connection = factory.createConnection();  
  4.     connection.start();  
  5.     session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);  
  6.     producer = session.createProducer(null);  
  7. }  
     傳送訊息的方法如下: Java程式碼  收藏程式碼
  1. public void sendMessage() throws JMSException {  
  2.     for(int i = 0; i < jobs.length; i++)  
  3.     {  
  4.         String job = jobs[i];  
  5.         Destination destination = session.createQueue("JOBS." + job);  
  6.         Message message = session.createObjectMessage(i);  
  7.         System.out.println("Sending: id: " + ((ObjectMessage)message).getObject() + " on queue: " + destination);  
  8.         producer.send(destination, message);  
  9.     }  
  10. }  
     這裡我們定義了一個jobs的陣列,通過遍歷這個陣列來建立不同的job queue。這樣就相當於建立了多個點對點通訊的鏈路。

    訊息傳送者的啟動程式碼如下:

Java程式碼  收藏程式碼
  1. public static void main(String[] args) throws JMSException {  
  2.     Publisher publisher = new Publisher();  
  3.     for(int i = 0; i < 10; i++) {  
  4.         publisher.sendMessage();  
  5.         System.out.println("Published " + i + " job messages");  
  6.     try {  
  7.             Thread.sleep(1000);  
  8.         } catch (InterruptedException x) {  
  9.         e.printStackTrace();  
  10.         }  
  11.     }  
  12.     publisher.close();  
  13. }  
     我們在這裡傳送10條訊息,當然,在每個sendMessage的方法裡實際上是針對每個queue傳送了10條。

接收者

     接收者的程式碼很簡單,一個建構函式初始化所有的資源:

Java程式碼  收藏程式碼
  1. public Consumer() throws JMSException {  
  2.         factory = new ActiveMQConnectionFactory(brokerURL);  
  3.         connection = factory.createConnection();  
  4.         connection.start();  
  5.         session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);  
  6.     }  
    還有一個就是註冊訊息處理的物件: Java程式碼  收藏程式碼
  1. public static void main(String[] args) throws JMSException {  
  2.         Consumer consumer = new Consumer();  
  3.         for (String job : consumer.jobs) {  
  4.             Destination destination = consumer.getSession().createQueue("JOBS." + job);  
  5.             MessageConsumer messageConsumer = consumer.getSession().createConsumer(destination);  
  6.             messageConsumer.setMessageListener(new Listener(job));  
  7.         }  
  8.     }  
  9.     public Session getSession() {  
  10.         return session;  
  11.     }  
     具體註冊的物件處理方法和前面還是類似,實現MessageListener介面就可以了。