1. 程式人生 > >rabbitmq學習之路(三)訊息應答、持久化以及公平轉發

rabbitmq學習之路(三)訊息應答、持久化以及公平轉發

上兩篇博文簡單介紹了下rabbitmq的使用方式,接下來,筆者再給大家介紹下rabbitmq的基礎配置:設定訊息的應答、持久化以及公平轉發。
下面,筆者簡單的來解釋下這個三個配置:
1. 訊息應答:
預設情況下,只要有消費者,訊息進去佇列後,訊息就會被全部分配好到相應的消費者進行處理,對應的訊息也會在佇列中去除。如果某個消費者處理過程中突然掛了,那麼這些訊息就沒有被處理,所以我們可以設定訊息為應答模式,也就是在消費者處理完一條訊息後,就告訴mq此訊息已被處理完,那麼未作出應答的訊息就會被轉發到其它的消費者進行處理。
2. 訊息持久化:
如果我們不設定訊息持久化,那麼在伺服器重啟後,所有的佇列以及相應的資料都會丟失,所以設定持久化,相應的資料就會儲存在磁碟中,不會丟失。
3. 公平轉發
由於佇列中的訊息的分配機制,會導致某些消費者一直處於繁忙的狀態,而讓其他處理完的消費者處於等待狀態,就算再新增一個消費者也不會把訊息分配到此消費者,所以我們可以設定公平轉發,這樣可以保證多個消費者之間公平的處理訊息,同時還可以動態新增消費者加入工作。

好了,接下來,給大家看看相應的程式碼,說明已於註釋中:
生產者:

/**
 * Project Name:qyk_testJava
 * File Name:Producer.java
 * Package Name:com.qiyongkang.mq.rabbitMq.basic
 * Date:2017年3月6日下午5:27:59
 * Copyright (c) 2017, Thinkive(http://www.thinkive.com/) All Rights Reserved.
 *
*/

package com.qiyongkang.mq.rabbitMq.basic;

import
java.io.IOException; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.MessageProperties; /** * ClassName:Producer <br/> * Function: TODO ADD FUNCTION. <br/> * Reason: TODO ADD REASON. <br/> * Date: 2017年3月6日 下午5:27:59 <br/> * * @author
qiyongkang * @version * @since JDK 1.6 * @see */
public class Producer { // 佇列名稱 private final static String QUEUE_NAME = "qyk.basic"; public static void main(String[] args) throws IOException { // 建立連線和頻道 ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); // 宣告佇列, 設定佇列持久化 boolean durable = true; channel.queueDeclare(QUEUE_NAME, durable, false, false, null); // 傳送10條訊息,依次在訊息後面附加1-10個點 for (int i = 0; i < 10; i++) { String dots = ""; for (int j = 0; j <= i; j++) { dots += "."; } String message = "helloworld" + dots + dots.length(); channel.basicPublish("", QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes()); System.out.println(" [x] Sent '" + message + "'"); } // 關閉頻道和資源 channel.close(); connection.close(); } }

消費者:

/**
 * Project Name:qyk_testJava
 * File Name:Consumer.java
 * Package Name:com.qiyongkang.mq.rabbitMq.basic
 * Date:2017年3月6日下午5:27:51
 * Copyright (c) 2017, Thinkive(http://www.thinkive.com/) All Rights Reserved.
 *
*/

package com.qiyongkang.mq.rabbitMq.basic;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.QueueingConsumer;

/**
 * ClassName:Consumer <br/>
 * Function: TODO ADD FUNCTION. <br/>
 * Reason: TODO ADD REASON. <br/>
 * Date: 2017年3月6日 下午5:27:51 <br/>
 * 
 * @author qiyongkang
 * @version
 * @since JDK 1.6
 * @see
 */
public class Consumer {
    // 佇列名稱
    private final static String QUEUE_NAME = "qyk.basic";

    public static void main(String[] argv) throws java.io.IOException, java.lang.InterruptedException {
        // 區分不同工作程序的輸出
        int hashCode = Thread.currentThread().hashCode();
        // 建立連線和頻道
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();
        // 宣告佇列、設定佇列持久化
        boolean durable = true;
        channel.queueDeclare(QUEUE_NAME, durable, false, false, null);
        System.out.println(hashCode + " [*] Waiting for messages. To exit press CTRL+C");

        //設定最大服務轉發訊息數量, 公平轉發
        int prefetchCount = 1;
        channel.basicQos(prefetchCount);

        QueueingConsumer consumer = new QueueingConsumer(channel);
        // 指定消費佇列,開啟應答機制, 注意false才是開啟手動應對
        boolean ack = false ; 
        channel.basicConsume(QUEUE_NAME, ack, consumer);
        while (true) {
            QueueingConsumer.Delivery delivery = consumer.nextDelivery();
            String message = new String(delivery.getBody());

            System.out.println(hashCode + " [x] Received '" + message + "'");
            doWork(message);
            System.out.println(hashCode + " [x] Done");

            //另外需要在每次處理完成一個訊息後,手動傳送一次應答。
            channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
        }

    }

    /**
     * 每個點耗時1s
     * 
     * @param task
     * @throws InterruptedException
     */
    private static void doWork(String task) throws InterruptedException {
        for (char ch : task.toCharArray()) {
            if (ch == '.')
                Thread.sleep(1000);
        }
    }
}

這裡,我們把消費者執行多次,就可以模擬多個消費者了。
好了,rabbitmq的這三個配置就簡單的介紹到這兒了~