1. 程式人生 > >rabbitmq7-topic模式

rabbitmq7-topic模式

一、簡介

如果你對rabbitmq6-direct路由模式比較熟悉的話,把這個簡介看完然後跳過,如果不熟的話,就把下面的程式碼看一看,有興趣的敲一敲就可以了。本來不想寫這一節的,但是為了體系的完整性,所以還是把這一節給補出來。

rabbitmq2-這可能是rabbitmq最全的概覽中,我們提到過topic和direct的區別,沒有什麼大的區別,就是binding key和routing key的匹配方式可以通過萬用字元的方式,也就是說路由模式是topic模式的一個特例。如果這一節程式碼你copy後發現有問題,請檢視上一節的內容,其中有一些注意事項。talk is cheap ,show me the code,不多比比,下面把程式碼呈上來。

二、程式碼:

2.1 producer
public class Producer {
    public static final String EXCHANGE_NAME = "topic_exchange";
    public static final String ROUTING_KEY1 = "topic.key1";
    public static final String ROUTING_KEY2 = "topic.key2";

    public static void main(String[] args) throws IOException, TimeoutException {
        Connection conn = ConnUtils.getConn();
        Channel channel = conn.createChannel();
        /**
         * 注意下面我們持久化的是交換機,已經不是持久化的隊列了,交換機是不存放訊息的,只是一個訊息的搬運工
         */
String exchangeType = BuiltinExchangeType.TOPIC.getType(); boolean durable = true; boolean autoDelete = false; boolean internal = false; Map<String,Object> arguments = null; // 宣告一個交換機 channel.exchangeDeclare(EXCHANGE_NAME, exchangeType,durable,autoDelete,internal,arguments); channel.basicPublish(EXCHANGE_NAME,ROUTING_KEY1,null
,"CEUIXCXI routing key1".getBytes()); channel.basicPublish(EXCHANGE_NAME,ROUTING_KEY2,null,"CEUIXCXI routing key2".getBytes()); } }

上面的程式碼對比起來和上一節的沒有明顯的不同

2.2 consumer程式碼:
public class Consume001 {

    public static final String EXCHANGE_NAME = "topic_exchange";
    public static final String QUEUE_NAME1 = "topic_queue";
    public static final String ROUTING_KEY1 = "topic.*";

    public static void main(String[] args) throws IOException, TimeoutException {
        Connection conn = ConnUtils.getConn();
        final Channel channel = conn.createChannel();
        channel.queueDeclare(QUEUE_NAME1,true,false,false,null);
        channel.queueBind(QUEUE_NAME1,EXCHANGE_NAME,ROUTING_KEY1);

        Consumer consumer = new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println(new String(body,"utf-8"));
                channel.basicAck(envelope.getDeliveryTag(),false);
            }
        };
        channel.basicConsume(QUEUE_NAME1,false,consumer);
    }
}
2.3、執行結果

image.png
image.png

ps:這個系列裡面還有一個rpc,但是我也不打算寫了,畢竟專門的人幹專門的事兒,框架也是一樣的。同時學習的時候我們也是採用2和8的原則的,學20%的東西幹80%的事兒,有時間的情況下我們再去慢慢的研究。