1. 程式人生 > >ActiveMQ訊息中介軟體入門學習

ActiveMQ訊息中介軟體入門學習




同步通訊:客戶端向伺服器端發出請求,並一直等待伺服器端的響應。直到獲取到伺服器端返回的響應資訊,客戶端才能繼續執行。

MQ全稱為Message Queue, 訊息佇列(MQ)是一種應用程式對應用程式的通訊方法。應用程式通過讀寫出入佇列的訊息(針對應用程式的資料)來通訊,而無需專用連線來連結它們。訊息傳遞指的是程式之間通過在訊息中傳送資料進行通訊,而不是通過直接呼叫彼此來通訊,直接呼叫通常是用於諸如遠端過程呼叫的技術。


MOM訊息中介軟體:訊息傳送端將訊息傳送給訊息伺服器,訊息伺服器將訊息存放在若干個佇列中,在合適的時候再將訊息轉發給訊息接收端。在這種模式下,傳送訊息和接收訊息是一個非同步的過程,訊息傳送端和訊息接收端不一定同時執行,訊息傳送端在傳送完訊息後也無需繼續等待訊息接收端的響應資訊,而可以繼續做其它事情。


Java提供JMS規範,即定義了Java訪問訊息中介軟體的介面,它並沒有提供介面的實現。目前,有很多的JMS Provider提供了這些介面的實現,包括Apache的ActiveMQ、阿里巴巴的RocketMQ、Pivotal的RabbitMQ。


JMS中相關術語:

Provider:訊息生產者   

Customer:訊息消費者

Publish/Subscribe:訊息的釋出訂閱模式

Queue:訊息對列

Topic:訊息主題

ConnectionFactory:連線工廠,用於建立連線

Connection:訊息生產者或消費者到JMS Provider的連線

Destination:訊息的目的地

Session:會話,一個傳送或接收訊息的執行緒


訊息格式定義:

StreamMessage:資料流

MapMessage:名稱-值對

TextMessage:字串物件

ObjectMessage:序列化的Java物件

BytesMessage:位元組資料流


在ActiveMQ的官網下載最新版本的軟體包


ActiveMQ的配置檔案、jetty容器的配置檔案


ActiveMQ軟體包中一個比較全的jar包


webapps包含可以部署到jetty中的admin管控臺程式


在bin目錄下通過bat檔案啟動ActiveMQ


檢視jetty.xml中的埠號



啟動完成後,使用瀏覽器訪問




訊息生產者Producer


  
  1. package com.cb;
  2. import javax.jms.Connection;
  3. import javax.jms.ConnectionFactory;
  4. import javax.jms.DeliveryMode;
  5. import javax.jms.Destination;
  6. import javax.jms.MessageProducer;
  7. import javax.jms.Session;
  8. import javax.jms.TextMessage;
  9. import org.apache.activemq.ActiveMQConnectionFactory;
  10. public class Producer {
  11. public static void main(String[] args) throws Exception{
  12. //1.建立ConnectionFactory物件
  13. ConnectionFactory connectionFactory= new ActiveMQConnectionFactory(
  14. ActiveMQConnectionFactory.DEFAULT_USER,
  15. ActiveMQConnectionFactory.DEFAULT_PASSWORD,
  16. "tcp://localhost:61616");
  17. //2.建立一個Connection並開啟
  18. Connection connection=connectionFactory.createConnection();
  19. connection.start();
  20. //3.建立Session會話,用來接收訊息,通過引數可以設定:是否啟用事務、訊息簽收模式
  21. Session session=connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE);
  22. //4.建立Destination物件。在點對點模式中,該物件被稱為Queue;在釋出訂閱模式中,該物件被稱為Topic
  23. Destination destination=session.createQueue( "queue1");
  24. //5.建立訊息的生產者
  25. MessageProducer messageProducer=session.createProducer(destination);
  26. //6.設定生產者的訊息持久化與非持久化特性
  27. messageProducer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
  28. //7.選擇需要的JMS訊息格式,建立併發送訊息,此處選擇的是TextMessage字串物件
  29. TextMessage textMessage=session.createTextMessage();
  30. textMessage.setText( "生產者"+ "activemq訊息測試");
  31. messageProducer.send(textMessage);
  32. //8.釋放Connection
  33. if( null!=connection){
  34. connection.close();
  35. }
  36. }
  37. }

執行上述程式碼,檢視控制檯


點選佇列名稱,檢視詳情


此時,生產者傳送訊息到訊息中介軟體ActiveMQ中了。


訊息消費者Consumer


  
  1. package com.cb;
  2. import javax.jms.Connection;
  3. import javax.jms.ConnectionFactory;
  4. import javax.jms.Destination;
  5. import javax.jms.MessageConsumer;
  6. import javax.jms.Session;
  7. import javax.jms.TextMessage;
  8. import org.apache.activemq.ActiveMQConnectionFactory;
  9. public class Consumer {
  10. public static void main(String[] args) throws Exception{
  11. //1.建立ConnectionFactory物件
  12. ConnectionFactory connectionFactory= new ActiveMQConnectionFactory(
  13. ActiveMQConnectionFactory.DEFAULT_USER,
  14. ActiveMQConnectionFactory.DEFAULT_PASSWORD,
  15. "tcp://localhost:61616");
  16. //2.建立一個Connection並開啟
  17. Connection connection=connectionFactory.createConnection();
  18. connection.start();
  19. //3.建立Session會話,用來接收訊息,通過引數可以設定:是否啟用事務、訊息簽收模式
  20. Session session=connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE);
  21. //4.建立Destination物件。在點對點模式中,該物件被稱為Queue;在釋出訂閱模式中,該物件被稱為Topic
  22. Destination destination=session.createQueue( "queue1");
  23. //5.建立訊息的消費者
  24. MessageConsumer messageConsumer=session.createConsumer(destination);
  25. //6.消費者從訊息中介軟體的Queue獲取訊息
  26. while( true){
  27. TextMessage textMessage=(TextMessage) messageConsumer.receive();
  28. if( null==textMessage){
  29. break;
  30. }
  31. System.out.println( "消費者接收到的內容:"+textMessage.getText());
  32. }
  33. //7.釋放Connection
  34. if( null!=connection){
  35. connection.close();
  36. }
  37. }
  38. }

執行上面消費訊息的程式碼,並檢視控制檯


此時,點選queue1檢視佇列中訊息,發現為空,因為訊息已經被消費了


在Eclipse的Console中結果如下圖:




在activemq.xml中可以設定安全驗證,只有驗證通過的使用者才可以向ActiveMQ訊息中介軟體中傳送、獲取訊息。



  
  1. <plugins>
  2. <simpleAuthenticationPlugin>
  3. <users>
  4. <authenticationUser username="cb" password="123456" groups="users,admins"/>
  5. </users>
  6. </simpleAuthenticationPlugin>
  7. </plugins>

同時修改Producer、Customer第1步中的使用者名稱和密碼