1. 程式人生 > >訊息中介軟體rabbitmq(2)

訊息中介軟體rabbitmq(2)

介紹

主要演示生產者和消費如何傳遞訊息的

生產者

public static void main(String[] args) throws IOException, TimeoutException {
        // TODO Auto-generated method stub
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        factory.setUsername("admin");
        factory.setPassword(
"123456"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); //queueDeclare第一個引數表示佇列名稱、第二個引數為是否持久化(true表示是,佇列將在伺服器重啟時生存)、第三個引數為是否是獨佔佇列(建立者可以使用的私有佇列,斷開後自動刪除)、第四個引數為當所有消費者客戶端連線斷開時是否自動刪除佇列、第五個引數為佇列的其他引數
//basicPublish第一個引數為交換機名稱、第二個引數為佇列對映的路由key、第三個引數為訊息的其他屬性、第四個引數為傳送資訊的主體 channel.queueDeclare(QUEUE_NAME, false, false, false, null); String message = "Hello RabbitMQ"; //傳送訊息到佇列中 channel.basicPublish("", QUEUE_NAME, null, message.getBytes("UTF-8")); System.out.println(
"Producer Send +'" + message + "'"); //關閉通道和連線 channel.close(); connection.close(); }

消費者

public static void main(String[] args) throws IOException, TimeoutException {
        // TODO Auto-generated method stub
         // 建立連線工廠
        ConnectionFactory factory = new ConnectionFactory();
        //設定RabbitMQ地址
        factory.setHost("localhost");
        factory.setUsername("admin");
        factory.setPassword("123456");
        //建立一個新的連線
        Connection connection = factory.newConnection();
        //建立一個通道
        Channel channel = connection.createChannel();
        //宣告要關注的佇列
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        System.out.println("Customer Waiting Received messages");
        //DefaultConsumer類實現了Consumer介面,通過傳入一個頻道,
        // 告訴伺服器我們需要那個頻道的訊息,如果頻道中有訊息,就會執行回撥函式handleDelivery
        Consumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope,
                                       AMQP.BasicProperties properties, byte[] body)
                    throws IOException {
                String message = new String(body, "UTF-8");
                System.out.println("Customer Received '" + message + "'");
                //throw new RuntimeException();
            }
        };
        //自動回覆佇列應答 -- RabbitMQ中的訊息確認機制  ack機制,ack = false時,如果程式異常會重新消費
        channel.basicConsume(QUEUE_NAME, true, consumer);
    }

注:在消費者最後設定channel.basicConsume時,第二個引數代表是否需要自動回覆,即如果是true,不論消費成功還是失敗,即使丟擲異常,該條訊息也會從訊息佇列中刪除,所以對於訊息準確性要求教高的業務場景,可以設定為false,只有消費成功後,自己返回才算消費成功。