1. 程式人生 > >訊息中介軟體 與 JMS的原生寫法

訊息中介軟體 與 JMS的原生寫法

第一部分: 點對點Queue佇列模式

queque模式特點:訊息持久化,只要消費者上線就可以消費

原生的生產者步驟總結:

1 new一個ActiveMQConnectionFactory工廠跟安裝有訊息中介軟體的連線上 

2 通過連線物件獲取session   

3 通過session封裝目的地 

4 通過session封裝訊息 

5 關閉資源持久化

 

原生消費者步驟總結:

前面的步驟一模一樣,只是他拿到session後,用session1 封裝目的地  ,

2 通過session獲取監控物件MessageListener拿到message

 

 

第一步:導包

匯入訊息中介軟體的包

<dependency>
    <groupId>org.apache.activemq</groupId>
    <artifactId>activemq-client</artifactId>
    <version>5.13.4</version>
</dependency>

第二步:直接擼程式碼

生產者productor

package queuejms;
import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;

public class QueueProduct {
    public static void main(String[] args) throws JMSException {
        //1 建立工廠
        ConnectionFactory factory=new ActiveMQConnectionFactory("tcp://192.168.5.111:61616");
        //2 獲取連線物件
        Connection connection = factory.createConnection();
        //3 開啟連線
         connection.start();
        //4 根據連線物件獲取session
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        //5 根據ssession建立佇列物件,封裝目的地名稱
        Queue queue = session.createQueue("queue-product-demo");
        //6 根據session建立生產者
        MessageProducer producer = session.createProducer(queue);
        //7 建立併發送訊息
        TextMessage textMessage = session.createTextMessage("我的第一個jms,訊息中介軟體");
        producer.send(textMessage);
        //關閉資源
        session.close();
        connection.close();
    }
}

 

消費者consumer

package queuejms;
import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;
import java.io.IOException;

public class QueueConsumer {
    public static void main(String[] args) throws JMSException {
        //1 建立訊息中介軟體工廠activeMQ
        ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("tcp://192.168.5.111:61616");
        //2 根據工廠獲取連線
        Connection connection = factory.createConnection();
        //3 開啟連線
        connection.start();
        //4 根據連線建立session
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        //5 建立佇列物件
        Queue queue = session.createQueue("queue-product-demo");
        //6 建立消費者
        MessageConsumer consumer = session.createConsumer(queue);
        //7 監聽訊息
       consumer.setMessageListener(new MessageListener() {
           public void onMessage(Message message) {
               try {
                   TextMessage textMessage= (TextMessage) message;
                   System.out.println("接收到了訊息:"+textMessage.getText());
               } catch (Exception e) {
                   e.printStackTrace();
               }
           }
       });
        try {
            System.in.read();
        } catch (IOException e) {
            e.printStackTrace();
        }
        //關閉資源
        session.close();
        connection.close();
    }
}

------------------------------------------------------------------------------------------------------------------------------------------------------------

第二部分: 點對點Topic釋出和訂閱模式

導包後直接擼程式碼

生產者:productor

package topicjms;

import org.apache.activemq.ActiveMQConnectionFactory;

import javax.jms.*;

public class TopicProduct {
    public static void main(String[] args) throws JMSException {
        //建立工廠
        ConnectionFactory factory=new ActiveMQConnectionFactory("tcp://192.168.5.111:61616");
        //獲取連線物件
        Connection connection = factory.createConnection();
        //開啟連線
         connection.start();
        //根據連線物件獲取session
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        //根據ssession建立佇列物件,封裝目的地名稱
        Topic topic = session.createTopic("topic-product-demo");
        //根據session建立生產者
        MessageProducer producer = session.createProducer(topic);
        //建立併發送訊息
        TextMessage textMessage = session.createTextMessage("我的第一個jms,訊息中介軟體,使用topic方式傳送");
        producer.send(textMessage);
        //關閉資源
        session.close();
        connection.close();
    }
}



消費者consumer
package topicjms;
import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;
import java.io.IOException;

public class TopicConsumer {
    public static void main(String[] args) throws JMSException {
        //建立訊息中介軟體工廠activeMQ
        ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("tcp://192.168.5.111:61616");
        //根據工廠獲取連線
        Connection connection = factory.createConnection();
        //開啟連線
        connection.start();
        //根據連線建立session
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        //建立佇列物件
        Topic topic = session.createTopic("topic-product-demo");
        //建立消費者
        MessageConsumer consumer = session.createConsumer(topic);
        //監聽訊息
       consumer.setMessageListener(new MessageListener() {
           public void onMessage(Message message) {
               try {
                   TextMessage textMessage= (TextMessage) message;
                   System.out.println("接收到了訊息:"+textMessage.getText());
               } catch (Exception e) {
                   e.printStackTrace();
               }
           }
       });
        try {
            System.in.read();
        } catch (IOException e) {
            e.printStackTrace();
        }
        //關閉資源
        session.close();
        connection.close();
    }
}