1. 程式人生 > >ActiveMQ 發送和就收消息

ActiveMQ 發送和就收消息

ace listen OS factor row conn ack 多個 tar

一、添加 jar 包

<dependency>
    <groupId>org.apache.activemq</groupId>
    <artifactId>activemq-all</artifactId>
    <version>5.11.2</version>
</dependency>

二、消息傳遞的兩種形式

  1、點對點:發送的消息只能被一個消費者接收,第一個消費者接收後,消息沒了

  2、發布/訂閱:消息可以被多個消費者接收 。發完消息,如果沒有消費者接收,這消息會自動消失。也就是說,消費者服務必須是啟動的狀態。( topic 消息在 ActiveMQ 服務端默認不是持久化的,可以通過配置文件配置持久化 )

技術分享圖片

三、點對點發送消息

/**
 * 點到點形式發送消息
 * @throws Exception
 */
@Test
public void testQueueProducer() throws Exception{
    //1、創建一個連接工廠,需要指定服務的 ip 和端口
    String brokerURL = "tcp://192.168.25.129:61616";
    ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(brokerURL);
    //2、使用工廠對象創建一個 Connection 對象
Connection connection = connectionFactory.createConnection(); //3、開啟連接,調用 Connection 對象的 start 方法 connection.start(); //4、創建一個 Session 對象。 //第一個參數:是否開啟事務(一般不開啟,如果開啟事務,第二個參數沒意義); //第二個參數:應答模式。自動應答或者手動應答,一般是自動應答 Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
//5、使用 Session 對象創建一個 Destination 對象。兩種形式 queue、topic。 Queue queue = session.createQueue("test-queue"); //6、使用 Session 對象創建一個 Producer 對象 MessageProducer producer = session.createProducer(queue); //7、創建一個 Message 對象,可以使用 TextMessage。下面兩種方式都可以 /*TextMessage textMessage = new ActiveMQTextMessage(); textMessage.setText("hello ActiveMQ");*/ TextMessage textMessage = session.createTextMessage("hello ActiveMQ"); //8、發布消息 producer.send(textMessage); //9、關閉資源 producer.close(); session.close(); connection.close(); }

技術分享圖片

四、點對點接收消息

/**
 * 點對點接收消息
 * @throws Exception
 */
@Test
public void testQueueConsumer() throws Exception{
    //1、創建一個 ConnectionFactory 對象連接 MQ 服務器
    String brokerURL = "tcp://192.168.25.129:61616";
    ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(brokerURL);
    //2、創建一個連接對象
    Connection connection = connectionFactory.createConnection();
    //3、開啟連接
    connection.start();
    //4、使用 Connection 對象 創建一個 Session 對象
    Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
    //5、創建一個 Destination 對象。queue 對象
    Queue queue = session.createQueue("test-queue");
    //6、使用 Session 對象創建一個消費者
    MessageConsumer consumer = session.createConsumer(queue);
    //7、接收消息
    consumer.setMessageListener(new MessageListener() {
        
        @Override
        public void onMessage(Message message) {
            //8、打印結果
            TextMessage textMessage = (TextMessage) message;
            
            try {
                String text = textMessage.getText();
                System.out.println(text);
            } catch (JMSException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }
            
        }
    });
    
    //9、等待接收消息。( 接收到消息後才網下面執行。關閉資源 )
    System.in.read();
    //10、關閉資源
    consumer.close();
    session.close();
    connection.close();
    
}

五、廣播發送消息

/**
 * 廣播發送消息
 * @throws Exception
 */
@Test
public void testTopicProducer() throws Exception{
    //1、創建一個連接工廠,需要指定服務的 ip 和端口
    String brokerURL = "tcp://192.168.25.129:61616";
    ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(brokerURL);
    //2、使用工廠對象創建一個 Connection 對象
    Connection connection = connectionFactory.createConnection();
    //3、開啟連接,調用 Connection 對象的 start 方法
    connection.start();
    //4、創建一個 Session 對象。
        //第一個參數:是否開啟事務(一般不開啟,如果開啟事務,第二個參數沒意義);
        //第二個參數:應答模式。自動應答或者手動應答,一般是自動應答
    Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
    //5、使用 Session 對象創建一個 Destination 對象。兩種形式 queue、topic。
    Topic topic = session.createTopic("test-topic");
    //6、使用 Session 對象創建一個 Producer 對象
    MessageProducer producer = session.createProducer(topic);
    //7、創建一個 Message 對象,可以使用 TextMessage。下面兩種方式都可以
    /*TextMessage textMessage = new ActiveMQTextMessage(); 
    textMessage.setText("hello ActiveMQ");*/
    TextMessage textMessage = session.createTextMessage("hello ActiveMQ");
    //8、發布消息
    producer.send(textMessage);
    //9、關閉資源
    producer.close();
    session.close();
    connection.close();
}

技術分享圖片

六、廣播接收消息

/**
 * 廣播接收消息
 * @throws Exception
 */
@Test
public void testTopicConsumer() throws Exception{
    //1、創建一個 ConnectionFactory 對象連接 MQ 服務器
    String brokerURL = "tcp://192.168.25.129:61616";
    ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(brokerURL);
    //2、創建一個連接對象
    Connection connection = connectionFactory.createConnection();
    //3、開啟連接
    connection.start();
    //4、使用 Connection 對象 創建一個 Session 對象
    Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
    //5、創建一個 Destination 對象。Topic 對象
    Topic topic = session.createTopic("test-topic");
    //6、使用 Session 對象創建一個消費者
    MessageConsumer consumer = session.createConsumer(topic);
    //7、接收消息
    consumer.setMessageListener(new MessageListener() {
        
        @Override
        public void onMessage(Message message) {
            //8、打印結果
            TextMessage textMessage = (TextMessage) message;
            
            try {
                String text = textMessage.getText();
                System.out.println(text);
            } catch (JMSException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }
            
        }
    });
    System.out.println("topic消費者");
    //9、等待接收消息。( 接收到消息後才網下面執行。關閉資源 )
    System.in.read();
    //10、關閉資源
    consumer.close();
    session.close();
    connection.close();  
}

ActiveMQ 發送和就收消息