1. 程式人生 > >訊息佇列MQ實踐----實現Queue(佇列訊息)和Topic(主題訊息)兩種模式

訊息佇列MQ實踐----實現Queue(佇列訊息)和Topic(主題訊息)兩種模式

之前有篇檔案介紹了生產消費者模式(http://blog.csdn.net/canot/article/details/51541920 ),當時是通過BlockingQueue阻塞佇列來實現,以及在Redis中使用pub/sub模式(http://blog.csdn.net/canot/article/details/51938955)。而實際專案中往往是通過JMS使用訊息佇列來實現這兩種模式的。 JMS(Java Messaging Service)是Java平臺上有關面向訊息中介軟體的技術規範,它便於訊息系統中的Java應用程式進行訊息交換,並且通過提供標準的產生、傳送、接收訊息的介面簡化企業應用的開發。  JMS類似與JDBC,sun提供介面,由各個廠商(provider)來進行具體的實現。市面上眾多成熟的JMS規範實現的框架Kafk,RabbitMQ,ActiveMQ,ZeroMQ,RocketMQ等。 JMS的佇列訊息(Queue)傳遞過程如下圖: 這裡寫圖片描述 對於Queue模式,一個釋出者釋出訊息,下面的接收者按佇列順序接收,比如釋出了10個訊息,兩個接收者A,B那就是A,B總共會收到10條訊息,不重複。 JMS的主題訊息傳遞過程如下圖: 這裡寫圖片描述 對於Topic模式,一個釋出者釋出訊息,有兩個接收者A,B來訂閱,那麼釋出了10條訊息,A,B各收到10條訊息。 我們從ActiveMQ來實踐:(安裝部署省掉) Queue模式實踐: 訊息生產者: public class Sender {       public static void main(String[] args) throws JMSException, InterruptedException {           // ConnectionFactory :連線工廠,JMS 用它建立連線           //61616是ActiveMQ預設埠         ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(                                                     ActiveMQConnection.DEFAULT_USER,                                                     ActiveMQConnection.DEFAULT_PASSWORD,                                                     );           // Connection :JMS 客戶端到JMS Provider 的連線           Connection connection =  connectionFactory.createConnection();           connection.start();           // Session: 一個傳送或接收訊息的執行緒           Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);           // Destination :訊息的目的地;訊息傳送給誰.           Destination destination =  session.createQueue();           // MessageProducer:訊息傳送者           MessageProducer producer =  session.createProducer(destination);           // 設定不持久化,可以更改           producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);           for(int i=0;i<10;i++){               //建立文字訊息               TextMessage message = session.createTextMessage(+i);               Thread.sleep(1000);               //傳送訊息               producer.send(message);           }           session.commit();           session.close();           connection.close();       }   }   訊息接收者     // ConnectionFactory :連線工廠,JMS 用它建立連線      private static ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(ActiveMQConnection.DEFAULT_USER,              ActiveMQConnection.DEFAULT_PASSWORD, );      public static void main(String[] args) throws JMSException {           // Connection :JMS 客戶端到JMS Provider 的連線           final Connection connection =  connectionFactory.createConnection();           connection.start();           // Session: 一個傳送或接收訊息的執行緒           final Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);           // Destination :訊息的目的地;訊息送誰那獲取.           Destination destination =  session.createQueue();           // 消費者,訊息接收者           MessageConsumer consumer1 =  session.createConsumer(destination);           consumer1.setMessageListener(new MessageListener() {                   @Override                   public void onMessage(Message msg) {                       try {                           TextMessage message = (TextMessage)msg ;                           System.out.println(+message.getText());                           session.commit();                       } catch (JMSException e) {                                         e.printStackTrace();                       }                   }               });   }   執行之後控制檯不會退出一直監聽訊息庫,對於訊息傳送者的十條資訊,控制輸出: consumerOne收到訊息: hello.I am producer, this is a test message0  consumerOne收到訊息: hello.I am producer, this is a test message1  consumerOne收到訊息: hello.I am producer, this is a test message2  consumerOne收到訊息: hello.I am producer, this is a test message3  consumerOne收到訊息: hello.I am producer, this is a test message4  consumerOne收到訊息: hello.I am producer, this is a test message5  consumerOne收到訊息: hello.I am producer, this is a test message6  consumerOne收到訊息: hello.I am producer, this is a test message7  consumerOne收到訊息: hello.I am producer, this is a test message8  consumerOne收到訊息: hello.I am producer, this is a test message9  如果此時另外一個執行緒也存在消費者監聽該Queue,則兩者交換輸出,共輸出10條 Topic模式實現 訊息釋出者 public static void main(String[] args) throws JMSException, InterruptedException {         // ConnectionFactory :連線工廠,JMS 用它建立連線           //61616是ActiveMQ預設埠         ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(                                                     ActiveMQConnection.DEFAULT_USER,                                                     ActiveMQConnection.DEFAULT_PASSWORD,                                                     );           // Connection :JMS 客戶端到JMS Provider 的連線           Connection connection =  connectionFactory.createConnection();           connection.start();           // Session: 一個傳送或接收訊息的執行緒           Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);           // Destination :訊息的目的地;訊息傳送給誰.           //Destination destination =  session.createQueue("my-queue");           Destination destination = session.createTopic(); //建立topic   myTopic         // MessageProducer:訊息傳送者           MessageProducer producer =  session.createProducer(destination);           // 設定不持久化,可以更改           producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);           for(int i=0;i<10;i++){               //建立文字訊息               TextMessage message = session.createTextMessage(+i);               //傳送訊息               producer.send(message);           }           session.commit();           session.close();           connection.close();     }   訊息訂閱者 private static ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(ActiveMQConnection.DEFAULT_USER,             ActiveMQConnection.DEFAULT_PASSWORD, );     public void run() {         // Connection :JMS 客戶端到JMS Provider 的連線         try {             final Connection connection = connectionFactory.createConnection();             connection.start();             // Session: 一個傳送或接收訊息的執行緒             final Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);             // Destination :訊息的目的地;訊息送誰那獲取.             // Destination destination = session.createQueue("my-queue");             Destination destination = session.createTopic(); // 建立topic                                                                                 // myTopic             // 消費者,訊息接收者             MessageConsumer consumer1 = session.createConsumer(destination);             consumer1.setMessageListener(new MessageListener() {                 public void onMessage(Message msg) {                     try {                         TextMessage message = (TextMessage) msg;                         System.out.println( + message.getText());                         session.commit();                     } catch (JMSException e) {                         e.printStackTrace();                     }                 }             });             // 再來一個消費者,訊息接收者             MessageConsumer consumer2 = session.createConsumer(destination);             consumer2.setMessageListener(new MessageListener() {                 public void onMessage(Message msg) {                     try {                         TextMessage message = (TextMessage) msg;                         System.out.println( + message.getText());                         session.commit();                     } catch (JMSException e) {                         e.printStackTrace();                     }                 }             });         } catch (Exception e) {         }     } 最後訊息會重複輸出:  consumerOne收到訊息: hello.I am producer, this is a test message0  consumerTwo收到訊息: hello.I am producer, this is a test message0  consumerOne收到訊息: hello.I am producer, this is a test message1  consumerTwo收到訊息: hello.I am producer, this is a test message1  consumerOne收到訊息: hello.I am producer, this is a test message2  consumerTwo收到訊息: hello.I am producer, this is a test message2  consumerOne收到訊息: hello.I am producer, this is a test message3  consumerTwo收到訊息: hello.I am producer, this is a test message3  consumerOne收到訊息: hello.I am producer, this is a test message4  consumerTwo收到訊息: hello.I am producer, this is a test message4  consumerOne收到訊息: hello.I am producer, this is a test message5  consumerTwo收到訊息: hello.I am producer, this is a test message5  consumerOne收到訊息: hello.I am producer, this is a test message6  consumerTwo收到訊息: hello.I am producer, this is a test message6  consumerOne收到訊息: hello.I am producer, this is a test message7  consumerTwo收到訊息: hello.I am producer, this is a test message7  consumerOne收到訊息: hello.I am producer, this is a test message8  consumerTwo收到訊息: hello.I am producer, this is a test message8  consumerOne收到訊息: hello.I am producer, this is a test message9 我們簡單總結一下使用MQ的過程: 1.建立與MQ的連結 2.建立訊息的目的地或者來原地即Destination 3.傳送訊息或者制定對應的MessageListener 上述就是關於MQ兩種訊息模型的簡單應用,至於具體的細節。如在消費者監聽訊息時有哪些Listener型別,生產者傳送訊息時有哪些Message型別。生成Session時引數1表示是否開啟事務,至於事務的處理,訊息的持久化等等。後面慢慢介紹。