1. 程式人生 > >activemq訂閱模式以及訊息時長和確認機制

activemq訂閱模式以及訊息時長和確認機制

程式碼如下:
  1. package com.activemq;  
  2. import org.apache.activemq.ActiveMQConnectionFactory;  
  3. import javax.jms.*;   
  4. publicclass TopicPub {  
  5.      publicstaticvoid main(String[] args) throws JMSException {    
  6.             ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("tcp://localhost:61616");    
  7.             Connection connection = factory.createConnection();    
  8.             connection.start();    
  9.             Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);    
  10.             /** 
  11.             Session javax.jms.Connection.createSession(boolean transacted, int acknowledgeMode) throws JMSException
     
  12.             1.transacted事務,事務成功commit,才會將訊息傳送到mom中 
  13.             2.acknowledgeMode訊息確認機制 
  14.                  1)、帶事務的session 
  15.                     如果session帶有事務,並且事務成功提交,則訊息被自動簽收。如果事務回滾,則訊息會被再次傳送。 
  16.                  2)、不帶事務的session 
  17.                     不帶事務的session的簽收方式,取決於session的配置。 
  18.                     Activemq支援一下三種模式:
     
  19.                     Session.AUTO_ACKNOWLEDGE  訊息自動簽收 
  20.                     Session.CLIENT_ACKNOWLEDGE  客戶端呼叫acknowledge方法手動簽收 
  21.                     Session.DUPS_OK_ACKNOWLEDGE 不是必須簽收,訊息可能會重複傳送。在第二次重新傳送訊息的時候,訊息 
  22.                     頭的JmsDelivered會被置為true標示當前訊息已經傳送過一次,客戶端需要進行訊息的重複處理控制。 
  23.                  程式碼示例如下: 
  24.                  session = connection.createSession(true, Session.CLIENT_ACKNOWLEDGE); 
  25.                  textMsg.acknowledge(); 
  26.             */
  27.             Topic topic = session.createTopic("wm5920.topic");    
  28.             MessageProducer producer = session.createProducer(topic);    
  29.             producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);//設定非持久化  
  30. //          producer.setTimeToLive(5000);//5秒後過期,這個對點對點模式有效
  31.             TextMessage message = session.createTextMessage();    
  32.             message.setText("message_" + System.currentTimeMillis());    
  33.             producer.send(message);    
  34.             System.out.println("Sent message: " + message.getText());    
  35.             session.close();    
  36.             connection.stop();    
  37.             connection.close();    
  38.         }    
  39. }  

訂閱主題,注:如果在釋出主題前,沒有訂閱,是收不到訊息的,這跟點對點的佇列模式不同
  1. package com.activemq;  
  2. import org.apache.activemq.ActiveMQConnectionFactory;    
  3. import javax.jms.*;    
  4. publicclass TopicSubs{  
  5. publicstaticvoid main(String[] args) throws JMSException {    
  6.         ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("tcp://localhost:61616");    
  7.         Connection connection = factory.createConnection();    
  8.         connection.setClientID("wm5920");  
  9.         connection.start();    
  10.         Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);    
  11.         Topic topic = session.createTopic("wm5920.topic");    
  12.         //持久訂閱方式,不會漏掉資訊
  13.         TopicSubscriber subs=session.createDurableSubscriber(topic, "wm5920");  
  14.         subs.setMessageListener(new MessageListener() {    
  15.             publicvoid onMessage(Message message) {    
  16.                 TextMessage tm = (TextMessage) message;    
  17.                 try {    
  18.                     System.out.println("Received message: " + tm.getText());    
  19.                 } catch (JMSException e) {    
  20.                     e.printStackTrace();    
  21.                 }    
  22.             }    
  23.         });  
  24.         //非持久訂閱方式
  25. //        MessageConsumer consumer = session.createConsumer(topic);  
  26. //        consumer.setMessageListener(new MessageListener() {  
  27. //            public void onMessage(Message message) {  
  28. //                TextMessage tm = (TextMessage) message;  
  29. //                try {  
  30. //                    System.out.println("Received message: " + tm.getText());  
  31. //                } catch (JMSException e) {  
  32. //                    e.printStackTrace();  
  33. //                }  
  34. //            }  
  35. //        }); 
  36. //        session.commit();
  37. //      session.close();  
  38. //      connection.stop();  
  39. //      connection.close();  
  40.     }    
  41. }