1. 程式人生 > >RabbitMQ實踐--安裝、JAVA客戶端操作

RabbitMQ實踐--安裝、JAVA客戶端操作

RabbitMQ是一種訊息中介軟體,用於處理來自客戶端的非同步訊息。服務端將要傳送的訊息放入到佇列池中。接收端可以根據RabbitMQ配置的轉發機制接收服務端發來的訊息。RabbitMQ依據指定的轉發規則進行訊息的轉發、緩衝和持久化操作,主要用在多伺服器間或單伺服器的子系統間進行通訊,是分散式系統標準的配置。

RabbitMQ服務端安裝

Rabbitmq基於erlang語言開發,所有需要安裝erlang虛擬機器,各平臺參考官網安裝即可。Mac、linux的安裝方法在頁面的相對後面一點,也很簡單。
連結地址:http://www.erlang.org/downloads
同理,參考rabbitMQ官網來安裝RabbitMQ:
連結地址:

http://www.rabbitmq.com/download.html

開啟管理外掛

使用Rabbit MQ 管理外掛,可以更好的視覺化方式檢視Rabbit MQ 伺服器例項的狀態
請在windows在RabbitMQ的安裝目錄 執行如下命令

# sbin\rabbitmq-plugins.bat enable rabbitmq_management
# net stop RabbitMQ && net start RabbitMQ

各平臺外掛管理命令:

開啟某個外掛:rabbitmq-plugins enable xxx
關閉某個外掛:rabbitmq-plugins
disable xxx

注意:重啟伺服器後生效
然後開啟連線http://localhost:15672,以guest/guest登入就可以看到伺服器當前的執行狀態

文件地址

java客戶端操作實踐

“Hello World”

本小節建立一個很簡單的佇列,一個生產者,一個消費者。
一個生產者,一個消費者
新增maven依賴,此處對slf4j的實現是簡單的slf4j-simple實現,在真正的生產環境中建議使用log4j、logback等。

<dependency>
    <groupId>com.rabbitmq</groupId>
    <artifactId
>
amqp-client</artifactId> <version>4.1.0</version> </dependency> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-api</artifactId> <version>1.7.25</version> </dependency> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-simple</artifactId> <version>1.7.25</version> </dependency>

程式碼如下:

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 *
 * @author xuexiaolei
 * @version 2017年08月20日
 */
public class Send {
    private final static String QUEUE_NAME = "hello";

    public static void main(String[] args) throws IOException, TimeoutException {
        //建立一個連線,並且從連線處獲取一個channel,為什麼用channel參考"RabbitMQ--整體綜述"
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();
        //將訊息傳送到某個Queue上面去
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        String message = "Hello World!";
        channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
        System.out.println(" [x] Sent '" + message + "'");
        //如同資料庫連線一樣,依次關閉連線
        channel.close();
        connection.close();
    }
}
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 *
 * @author xuexiaolei
 * @version 2017年08月20日
 */
public class Recv {
    private final static String QUEUE_NAME = "hello";

    public static void main(String[] args) throws IOException, TimeoutException {
        //建立一個連線,並且從連線處獲取一個channel,為什麼用channel參考"RabbitMQ--整體綜述"
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();
        //宣告接收某個佇列的訊息
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
        //建立一個預設消費者,並在handleDelivery中回撥處理訊息內容
        Consumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope,
                            AMQP.BasicProperties properties, byte[] body)
                            throws IOException {
                String message = new String(body, "UTF-8");
                System.out.println(" [x] Received '" + message + "'");
            }
        };
        //channel繫結佇列、消費者,autoAck為true表示一旦收到訊息則自動回覆確認訊息
        channel.basicConsume(QUEUE_NAME, true, consumer);
    }
}

執行:
傳送端執行兩次
Send1
接收端會一直接收
Recv1

Work Queues

分發訊息佇列,多個消費者
多個消費者
程式碼如下:

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 *
 * @author xuexiaolei
 * @version 2017年08月20日
 */
public class Send {
    private final static String QUEUE_NAME = "hello";

    public static void main(String[] args) throws IOException, TimeoutException {
        //建立一個連線,並且從連線處獲取一個channel,為什麼用channel參考"RabbitMQ--整體綜述"
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();
        //將訊息傳送到某個Queue上面去
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        for (int i = 0; i < 10; i++) {//改為傳送十次訊息
            String message = "Hello World " + (i+1);
            channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
            System.out.println(" [x] Sent '" + message + "'");
        }
        //如同資料庫連線一樣,依次關閉連線
        channel.close();
        connection.close();
    }
}
import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.Random;
import java.util.concurrent.TimeoutException;

/**
 *
 * @author xuexiaolei
 * @version 2017年08月20日
 */
public class Recv1 {
    private final static String QUEUE_NAME = "hello";

    public static void main(String[] args) throws IOException, TimeoutException {
        //建立一個連線,並且從連線處獲取一個channel,為什麼用channel參考"RabbitMQ--整體綜述"
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();
        //宣告接收某個佇列的訊息
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
        //建立一個預設消費者,並在handleDelivery中回撥處理訊息內容
        Consumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope,
                            AMQP.BasicProperties properties, byte[] body)
                            throws IOException {
                String message = new String(body, "UTF-8");
                System.out.println(" [x] Received '" + message + "'");
                //增加處理時間
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        };
        //channel繫結佇列、消費者,autoAck為true表示一旦收到訊息自動回覆確認訊息
        channel.basicConsume(QUEUE_NAME, true, consumer);
    }
}

此處Recv1與Recv2程式碼一致,不再贅述

執行
send傳送如下

[x] Sent ‘Hello World 1’
[x] Sent ‘Hello World 2’
[x] Sent ‘Hello World 3’
[x] Sent ‘Hello World 4’
[x] Sent ‘Hello World 5’
[x] Sent ‘Hello World 6’
[x] Sent ‘Hello World 7’
[x] Sent ‘Hello World 8’
[x] Sent ‘Hello World 9’
[x] Sent ‘Hello World 10’

Recv1接收如下

[*] Waiting for messages. To exit press CTRL+C
[x] Received ‘Hello World 1’
[x] Received ‘Hello World 3’
[x] Received ‘Hello World 5’
[x] Received ‘Hello World 7’
[x] Received ‘Hello World 9’

Recv2接收如下

[*] Waiting for messages. To exit press CTRL+C
[x] Received ‘Hello World 2’
[x] Received ‘Hello World 4’
[x] Received ‘Hello World 6’
[x] Received ‘Hello World 8’
[x] Received ‘Hello World 10’

探索1:將Rev1的Thread.sleep(50)修改為50,意思是Rev1的處理能力比Rev2的處理能力強20倍,會發生什麼事情呢?

結果還是和原來一樣,每個Recv處理5個間隔一個的訊息,為什麼這樣呢?預設情況下,RabbitMQ採用輪詢的方式傳送message,所以只能一個輪一個地傳送。這個在機器處理能力不均勻的場景是不合適的,當然RabbitMQ是提供了配置方法的。

探索2:在Recv2處理到收到第二條訊息的時候,我們強制kill掉Recv2會發生什麼事情呢?剩餘的訊息會不會轉發到Recv1上呢?

不會的,當前的模式下,RabbitMQ傳送完訊息後就把快取的訊息刪除了,不關心訊息是否真正的處理是否完成,所以如果宕機等會發生訊息丟失的情況。這個時候就需要訊息確認機制了,Recv真正的收到訊息,處理完訊息後,RabbitMQ才刪除訊息。
修改方法如下:
Recv增加訊息確認反饋機制

@Override
public void handleDelivery(String consumerTag, Envelope envelope,
                AMQP.BasicProperties properties, byte[] body)
                throws IOException {
    String message = new String(body, "UTF-8");
    System.out.println(" [x] Received '" + message + "'");
    //增加處理時間
    try {
        Thread.sleep(50);
    } catch (InterruptedException e) {
        e.printStackTrace();
    } finally {
        System.out.println(" [x] Done");
        //此處增加訊息確認確認機制,envelope.getDeliveryTag()獲取訊息的唯一標識,false表示僅ack當前訊息
        channel.basicAck(envelope.getDeliveryTag(), false);
    }

//channel繫結佇列、消費者,autoAck為true表示一旦收到訊息自動回覆確認訊息
channel.basicConsume(QUEUE_NAME, false, consumer);

探索3:訊息確認機制可以大體上保證消費端不丟失訊息,那麼Broker怎麼保證呢?

開啟持久化即可,當然達不到100%,畢竟持久化也是需要少量的時間,但這個時間可能造成微量損失。
程式碼改動如下:

//統一修改佇列名稱
private final static String QUEUE_NAME = "task_queue";
//宣告佇列的時候說明屬性
boolean durable = true;
channel.queueDeclare(QUEUE_NAME, durable, false, false, null);
//傳送訊息的時候說明儲存方式
channel.basicPublish("", QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());

為什麼要改佇列名字呢?因為RabbitMQ只會遵循首次建立佇列時候的屬性!後面宣告同一個名字的佇列的時候,如果沒有這個佇列則建立,有則忽略建立任務。當然,你也可以先刪除以前的佇列,然後再建立,就不用改佇列名了。

探索4:修正探索1的缺點

改成根據處理能力來分發訊息,程式碼修改如下:
Recv增加設定

int prefetchCount = 1;
channel.basicQos(prefetchCount);

最終整體程式碼如下:

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.MessageProperties;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 *
 * @author xuexiaolei
 * @version 2017年08月20日
 */
public class Send {
    private final static String QUEUE_NAME = "task_queue";

    public static void main(String[] args) throws IOException, TimeoutException {
        //建立一個連線,並且從連線處獲取一個channel,為什麼用channel參考"RabbitMQ--整體綜述"
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();
        //將訊息傳送到某個Queue上面去
        boolean durable = true;
        channel.queueDeclare(QUEUE_NAME, durable, false, false, null);
        for (int i = 0; i < 10; i++) {//改為傳送十次訊息
            String message = "Hello World " + (i+1);
            channel.basicPublish("", QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());
            System.out.println(" [x] Sent '" + message + "'");
        }
        //如同資料庫連線一樣,依次關閉連線
        channel.close();
        connection.close();
    }
}
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 *
 * @author xuexiaolei
 * @version 2017年08月20日
 */
public class Recv1 {
    private final static String QUEUE_NAME = "task_queue";

    public static void main(String[] args) throws IOException, TimeoutException {
        //建立一個連線,並且從連線處獲取一個channel,為什麼用channel參考"RabbitMQ--整體綜述"
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        Connection connection = factory.newConnection();
        final Channel channel = connection.createChannel();
        //宣告接收某個佇列的訊息
        boolean durable = true;
        channel.queueDeclare(QUEUE_NAME, durable, false, false, null);
        System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
        int prefetchCount = 1;
        channel.basicQos(prefetchCount);
        //建立一個預設消費者,並在handleDelivery中回撥處理訊息內容
        Consumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope,
                            AMQP.BasicProperties properties, byte[] body)
                            throws IOException {
                String message = new String(body, "UTF-8");
                System.out.println(" [x] Received '" + message + "'");
                //增加處理時間
                try {
                    Thread.sleep(50);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } finally {
                    System.out.println(" [x] Done");
                    //此處增加訊息確認確認機制,envelope.getDeliveryTag()獲取訊息的唯一標識,false表示僅ack當前訊息
                    channel.basicAck(envelope.getDeliveryTag(), false);
                }
            }
        };
        //channel繫結佇列、消費者,autoAck為true表示一旦收到訊息自動回覆確認訊息
        channel.basicConsume(QUEUE_NAME, false, consumer);
    }
}

Publish/Subscribe

fanout型別
建立一個日誌系統,一個Send,兩個Recv。
重點是增加Exchange,通過fanout型別將訊息廣播給所有的的Recv。

這裡我們建立臨時佇列,從RabbitMQ中獲取佇列名,然後在不需要使用的時候刪除它。
程式碼如下:

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.util.concurrent.TimeoutException;

/**
 *
 * @author xuexiaolei
 * @version 2017年08月20日
 */
public class EmitLog {

    private static final String EXCHANGE_NAME = "logs";

    public static void main(String[] argv) throws java.io.IOException, TimeoutException {
        //建立連線
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();
        //宣告exchange,並用fanout型別
        channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
        for (int i = 0; i < 10; i++) {//改為傳送十次訊息
            String message = "Hello World " + (i + 1);
            channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes());
            System.out.println(" [x] Sent '" + message + "'");
        }
        //關閉連線
        channel.close();
        connection.close();
    }
}
import com.rabbitmq.client.*;

import java.io.IOException;

/**
 *
 * @author xuexiaolei
 * @version 2017年08月20日
 */
public class ReceiveLogs {
    private static final String EXCHANGE_NAME = "logs";

    public static void main(String[] argv) throws Exception {
        //簡歷連線
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();
        //宣告Exchange型別
        channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
        //獲取隨機佇列名稱
        String queueName = channel.queueDeclare().getQueue();
        //Exchange與queue繫結
        channel.queueBind(queueName, EXCHANGE_NAME, "");

        System.out.println(" [*] " + queueName + "Waiting for messages. To exit press CTRL+C");

        Consumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope,
                            AMQP.BasicProperties properties, byte[] body) throws IOException {
                String message = new String(body, "UTF-8");
                System.out.println(" [x] Received '" + message + "'");
                try {
                    Thread.sleep(500);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        };
        channel.basicConsume(queueName, true, consumer);
    }
}

執行兩次ReceiveLogs,再執行EmitLog,就會發現ReceiveLogs都接受到了相同的訊息

Routing

direct型別
重點是Exchange的direct型別。如果某些消費者只關注部分訊息怎麼辦?這個direct型別解決這類問題,也叫routing模式

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.util.*;
import java.util.concurrent.TimeoutException;

/**
 *
 * @author xuexiaolei
 * @version 2017年08月20日
 */
public class EmitLogDirect {
    private static final String EXCHANGE_NAME = "direct_logs";
    private static final List<String> SEVERITIES = new ArrayList<>();
    static {
        SEVERITIES.add("info");
        SEVERITIES.add("error");
        SEVERITIES.add("warning");
    }

    public static void main(String[] argv) throws java.io.IOException, TimeoutException {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        channel.exchangeDeclare(EXCHANGE_NAME, "direct");

        for (int i = 0; i < 10; i++) {
            String severity = SEVERITIES.get(new Random().nextInt(3));//隨機產生一個routingKey
            String message = "some logs " + i;
            channel.basicPublish(EXCHANGE_NAME, severity, null, message.getBytes());
            System.out.println(" [x] Sent '" + severity + "':'" + message + "'");
        }

        channel.close();
        connection.close();
    }
}
import com.rabbitmq.client.*;

import java.io.IOException;

/**
 *
 * @author xuexiaolei
 * @version 2017年08月20日
 */
public class ReceiveLogsDirect1 {
    private static final String EXCHANGE_NAME = "direct_logs";

    public static void main(String[] argv) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        channel.exchangeDeclare(EXCHANGE_NAME, "direct");
        String queueName = channel.queueDeclare().getQueue();

        //繫結固定型別的routingKey
        channel.queueBind(queueName, EXCHANGE_NAME, "info");
        channel.queueBind(queueName, EXCHANGE_NAME, "error");
        channel.queueBind(queueName, EXCHANGE_NAME, "warning");

        System.out.println(" [*] Waiting for messages. To exit press CTRL+C");

        Consumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope,
                            AMQP.BasicProperties properties, byte[] body) throws IOException {
                String message = new String(body, "UTF-8");
                System.out.println(" [x] Received '" + envelope.getRoutingKey() + "':'" + message + "'");
            }
        };
        channel.basicConsume(queueName, true, consumer);
    }
}
import com.rabbitmq.client.*;

import java.io.IOException;

/**
 *
 * @author xuexiaolei
 * @version 2017年08月20日
 */
public class ReceiveLogsDirect2 {
    private static final String EXCHANGE_NAME = "direct_logs";

    public static void main(String[] argv) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        channel.exchangeDeclare(EXCHANGE_NAME, "direct");
        String queueName = channel.queueDeclare().getQueue();

        //繫結固定型別的routingKey,只關心error
        channel.queueBind(queueName, EXCHANGE_NAME, "error");

        System.out.println(" [*] Waiting for messages. To exit press CTRL+C");

        Consumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope,
                            AMQP.BasicProperties properties, byte[] body) throws IOException {
                String message = new String(body, "UTF-8");
                System.out.println(" [x] Received '" + envelope.getRoutingKey() + "':'" + message + "'");
            }
        };
        channel.basicConsume(queueName, true, consumer);
    }
}

執行即可發現ReceiveLogsDirect2只收到了error的相關資訊,而ReceiveLogsDirect1接收到了所有訊息

Topics

topic型別
Exchange的topic型別,用萬用字元的方式來匹配相應的接收資訊
符號“#”匹配一個或多個詞,符號“*”匹配不多不少一個詞
程式碼如下:

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.util.ArrayList;
import java.util.List;
import java.util.Random;
import java.util.concurrent.TimeoutException;

/**
 *
 * @author xuexiaolei
 * @version 2017年08月20日
 */
public class EmitLogDirect {
    private static final String EXCHANGE_NAME = "topic_logs";
    private static final List<String> SEVERITIES = new ArrayList<>();
    static {
        // 符號“#”匹配一個或多個詞,符號“*”匹配不多不少一個詞
        SEVERITIES.add("*.*.rabbit");
        SEVERITIES.add("a.b.rabbit");
        SEVERITIES.add("c.rabbit");//丟失,因為不匹配
        SEVERITIES.add("lazy.#");
        SEVERITIES.add("lazy.a.b");
        SEVERITIES.add("lazy.c");
        SEVERITIES.add("*.orange.*");
        SEVERITIES.add("a.orange.b");
        SEVERITIES.add("c.orange");//丟失,因為不匹配
    }

    public static void main(String[] argv) throws java.io.IOException, TimeoutException {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        channel.exchangeDeclare(EXCHANGE_NAME, "topic");

        for (int i = 0; i < 30; i++) {
            String severity = SEVERITIES.get(new Random().nextInt(SEVERITIES.size()));//隨機產生一個routingKey
            String message = "some logs " + i;
            channel.basicPublish(EXCHANGE_NAME, severity, null, message.getBytes());
            System.out.println(" [x] Sent '" + severity + "':'" + message + "'");
        }

        channel.close();
        connection.close();
    }
}
import com.rabbitmq.client.*;

import java.io.IOException;

/**
 *
 * @author xuexiaolei
 * @version 2017年08月20日
 */
public class ReceiveLogsDirect1 {
    private static final String EXCHANGE_NAME = "topic_logs";

    public static void main(String[] argv) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        channel.exchangeDeclare(EXCHANGE_NAME, "topic");
        String queueName = channel.queueDeclare().getQueue();

        //繫結固定型別的topic
        channel.queueBind(queueName, EXCHANGE_NAME, "*.*.rabbit");
        channel.queueBind(queueName, EXCHANGE_NAME, "lazy.#");

        System.out.println(" [*] Waiting for messages. To exit press CTRL+C");

        Consumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope,
                            AMQP.BasicProperties properties, byte[] body) throws IOException {
                String message = new String(body, "UTF-8");
                System.out.println(" [x] Received '" + envelope.getRoutingKey() + "':'" + message + "'");
            }
        };
        channel.basicConsume(queueName, true, consumer);
    }
}
import com.rabbitmq.client.*;

import java.io.IOException;

/**
 *
 * @author xuexiaolei
 * @version 2017年08月20日
 */
public class ReceiveLogsDirect2 {
    private static final String EXCHANGE_NAME = "topic_logs";

    public static void main(String[] argv) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        channel.exchangeDeclare(EXCHANGE_NAME, "topic");
        String queueName = channel.queueDeclare().getQueue();

        //繫結固定型別的topic
        channel.queueBind(queueName, EXCHANGE_NAME, "*.orange.*");

        System.out.println(" [*] Waiting for messages. To exit press CTRL+C");

        Consumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope,
                            AMQP.BasicProperties properties, byte[] body) throws IOException {
                String message = new String(body, "UTF-8");
                System.out.println(" [x] Received '" + envelope.getRoutingKey() + "':'" + message + "'");
            }
        };
        channel.basicConsume(queueName, true, consumer);
    }
}

Remote procedure call (RPC)

RPC畢竟不是MQ擅長的事情,建議使用擅長的工具做擅長的事,所以此處不再贅述
RPC方法