1. 程式人生 > >【AMQ】 二:點對點模式Dome

【AMQ】 二:點對點模式Dome

AMQ通訊分為兩種,一種是點對點模式,另一種是釋出訂閱模式,本文主要介紹點對點模式和簡單實現。

什麼是點對點模式? 點對點模式是AMQ的一種通過佇列方式通訊的模式, 即生產者會把生產的訊息放在某個佇列中,消費者從佇列中取得訊息進行通訊的方式。

基本實現:

生產者:

package www.amp.com;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;


/**
 * Created by wangpengzhi1 on 2018/10/11.
 */
public class QueueProducer {
    /** 預設使用者名稱 */
    public static final String USERNAME = "admin";
    /** 預設密碼 */
    public static final String PASSWORD = "admin";
    /** 預設連線地址(格式如:tcp://IP:61616) */
    public static final String BROKER_URL = "tcp://192.168.198.138:61616";
    /** 佇列名稱 */
    public static final String QUEUE_NAME = "hello amq";

    // 連線工廠(在AMQ中由ActiveMQConnectionFactory實現)
    private ConnectionFactory connectionFactory;

    // 連線物件
    private Connection connection;

    // 會話物件
    private Session session;

    // 訊息目的地(對於點對點模型,是Queue物件;對於釋出訂閱模型,是Topic物件;它們都繼承或實現了該介面)
    private Destination destination;

    // 訊息傳送(生產)者
    private MessageProducer messageProducer;

    public static void main(String[] args) {
        QueueProducer producer = new QueueProducer();
        producer.doSend();
    }

    public void doSend() {
        try {
            /**
             * 1.建立連線工廠<br>
             * 建構函式有多個過載,預設連線本地MQ伺服器,也可以手動設定使用者名稱、密碼、連線地址資訊<br>
             * new ActiveMQConnectionFactory(userName, password, brokerURL)
             */
            connectionFactory = new ActiveMQConnectionFactory(USERNAME,PASSWORD,BROKER_URL);

            /**
             * 2.建立連線
             */
            connection = connectionFactory.createConnection();

            /**
             * 3.啟動連線
             */
            connection.start();

            /**
             * 4.建立會話<br>
             * param1:是否支援事務,若為true,則會忽略第二個引數,預設為SESSION_TRANSACTED<br>
             * param2:確認訊息模式,若第一個引數為false時,該引數有以下幾種狀態<br>
             * -Session.AUTO_ACKNOWLEDGE:自動確認。客戶端傳送和接收訊息不需要做額外的工作,即使接收端發生異常,
             * 也會被當作正常傳送成功 <br>
             * -Session.CLIENT_ACKNOWLEDGE:客戶端確認。客戶端接收到訊息後,必須呼叫message.
             * acknowledge() 方法給予收到反饋,JMS伺服器才會把該訊息當做傳送成功,並刪除<br>
             * -Session.DUPS_OK_ACKNOWLEDGE:副本確認。一旦接收端應用程式的方法呼叫從處理訊息處返回,
             * 會話物件就會確認訊息的接收,而且允許重複確認。
             */
            session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);

            /**
             * 5.建立(傳送)訊息目的地,即佇列,引數為佇列名稱
             */
            destination = session.createQueue(QUEUE_NAME);

            /**
             * 6.建立一個訊息生產者,並指定目的地
             */
            messageProducer = session.createProducer(destination);
            /**
             * 其他操作: 設定生產者的生產模式,預設為持久化<br>
             * 引數有以下兩種狀態:<br>
             * -DeliveryMode.NON_PERSISTENT:訊息不持久化,訊息被消費之後或者超時之後將從佇列中刪除
             * -DeliveryMode.PERSISTENT:訊息會持久化,即使接收端消費訊息之後仍然會儲存
             */
            messageProducer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);

            /**
             * 其他操作:設定訊息的存活時間(單位:毫秒)
             */
            messageProducer.setTimeToLive(60000);

            for (int i = 0; i < 5; i++) {
                /**
                 * 7.建立文字訊息<br>
                 * 此外,還有多種型別的訊息如物件,位元組……都可以通過session.createXXXMessage()方法建立
                 */
                TextMessage message = session.createTextMessage("send content:"
                        + i);

                /**
                 * 8. 傳送
                 */
                messageProducer.send(message);

            }
            System.out.println("訊息傳送完成!");
            /**
             * 如果有事務操作也可以提交事務
             */
            session.commit();

            /**
             * 9.關閉生產者物件(即使關閉了程式也在執行)
             */
            messageProducer.close();
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            if (connection != null) {
                try {
                    /**
                     * 10.關閉連線(將會關閉程式)
                     */
                    connection.close();
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }
    }
}

消費者端:

package www.amp.com;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnectionFactory;

public class QueueConsumer {
    private ConnectionFactory connectionFactory;
    private Connection connection;
    private Session session;
    private Destination destination;
    // 注意這裡是訊息接收(消費)者
    private MessageConsumer messageConsumer;
    /** 預設使用者名稱 */
    public static final String USERNAME = "admin";
    /** 預設密碼 */
    public static final String PASSWORD = "admin";
    /** 預設連線地址(格式如:tcp://IP:61616) */
    public static final String BROKER_URL = "tcp://192.168.198.138:61616";

    public static void main(String[] args) {
        QueueConsumer consumer = new QueueConsumer();
        consumer.doReceive();
    }

    public void doReceive() {
        try {
            connectionFactory = new ActiveMQConnectionFactory(USERNAME, PASSWORD, BROKER_URL);
            connection = connectionFactory.createConnection();
            connection.start();
            session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
            destination = session.createQueue(QueueProducer.QUEUE_NAME);

            /**
             * 注意:這裡要建立一個訊息消費,並指定目的地(即訊息源佇列)
             */
            messageConsumer = session.createConsumer(destination);

            // 方式一:監聽接收
            receiveByListener();

            // 方式二:阻塞接收
            // receiveByManual();
            /**
             * 注意:這裡不能再關閉物件了
             */
            // messageConsumer.close();
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            /**
             * 注意:這裡不能再關閉Connection了
             */
            // connection.close();
        }
    }

    /**
     * 通過註冊監聽器的方式接收訊息,屬於被動監聽
     */
    private void receiveByListener() {
        try {
            messageConsumer.setMessageListener(new MessageListener() {
                public void onMessage(Message message) {
                    if (message instanceof TextMessage) {
                        try {
                            TextMessage msg = (TextMessage) message;
                            System.out.println("Received:“" + msg.getText()+ "”");
                            // 可以通過此方法反饋訊息已收到
                            msg.acknowledge();
                        } catch (Exception e) {
                            e.printStackTrace();
                        }
                    }
                }
            });
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    /**
     * 通過手動去接收訊息的方式,屬於主動獲取
     */
    private void receiveByManual() {
        while (true) {
            try {
                /**
                 * 通過receive()方法阻塞接收訊息,引數為超時時間(單位:毫秒)
                 */
                TextMessage message = (TextMessage) messageConsumer.receive(60000);
                if (message != null) {
                    System.out.println("Received:“" + message.getText() + "”");
                }
            } catch (Exception e) {
                e.printStackTrace();
            }

        }
    }
}