1. 程式人生 > >【Spring訊息】RabbitMq安裝及簡單應用(二)

【Spring訊息】RabbitMq安裝及簡單應用(二)

前言:

埋頭苦寫。先把官方文件翻譯過來。整個流程跑一遍。上一篇文章,【Spring訊息】RabbitMq安裝及簡單應用(一),把點對點發送訊息寫完了。之前雖然也可以一個生產者多個消費者,但是一條訊息只能被一個消費者處理,所以是點對點。這篇文章來講講釋出訂閱,一對多。一條訊息同時被多個消費者(本文稱為訂閱者)處理。

正文:

一、釋出/訂閱模式:

引入了一個新概念:Exchange(即上圖的X概念)。(其實此概念,在Amqp協議中是必要的一環,暫時可無需理解,後續會提到)可簡單先理解為一個路由功能。RabbitMq定義了四種路由模式,direct, topic, headers and fanout.(本文會講到除headers外其它三種)

1、fanout廣播:

由上圖可知生產者和佇列之間沒直接關聯關係,甚至不需要知道有這麼個東西,所以如果看過上一篇文章的小夥伴可以刪掉佇列名有關的程式碼。

1)生產者程式碼

package com.haibo.future.web.rabbitmq.demo6;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.MessageProperties;

public class RabbitProduct {
    private final static String EXCHANGE_NAME = "logs";

    public static void main(String[] argv) throws Exception {
        for (int i = 0; i < 20; i++) {
            ConnectionFactory factory = new ConnectionFactory();
            factory.setHost("localhost");
            Connection connection = factory.newConnection();
            Channel channel = connection.createChannel();
            String message = "第"+i+"條"+"Hello World!";
            //本次新增EXCHANGE
            channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
            //傳送的時候只需要制定Exchange,不需要制定第二個引數佇列名稱
            channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes("UTF-8"));
            System.out.println(" [x] Sent '" + message + "'");
            channel.close();
            connection.close();
        }
    }
}

2)消費者程式碼

package com.haibo.future.web.rabbitmq.demo6;

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class RabbitConsumer {
    private static final String EXCHANGE_NAME = "logs";


    public static void main(String[] argv)
            throws IOException,
            InterruptedException {

        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        Connection connection = null;
        try {
            connection = factory.newConnection();
        } catch (TimeoutException e) {
            e.printStackTrace();
        }
        final Channel channel = connection.createChannel();
        //定義路由模式廣播
        channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
        //消費者需要知道佇列名稱,因為已知是fanout廣播模式,queueName直接從channel定義中獲取
        String queueName = channel.queueDeclare().getQueue();
        //將佇列和Exchange繫結起來
        channel.queueBind(queueName, EXCHANGE_NAME, "");
        System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
        Consumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope,
                                       AMQP.BasicProperties properties, byte[] body)
                    throws IOException {
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                String message = new String(body, "UTF-8");
                System.out.println(" [x] Received '" + message + "'");
            }
        };
        boolean autoAck = true;
        channel.basicConsume(queueName, autoAck, consumer);
    }
}

3)測試執行

執行兩個消費者,再執行生產者。結果:生產者訊息,同時發給了兩個消費者。日誌略。

2、direct直連:

直連:即binding Key全匹配。上圖中Binding Key即black。

1)、生產者:(不與佇列直接關聯,繫結binding Key)

package com.haibo.future.web.rabbitmq.demo7;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

public class RabbitProduct {
    private final static String EXCHANGE_NAME = "logs2";

    public static void main(String[] argv) throws Exception {
        for (int i = 0; i < 20; i++) {
            ConnectionFactory factory = new ConnectionFactory();
            factory.setHost("localhost");
            Connection connection = factory.newConnection();
            Channel channel = connection.createChannel();
            String message = "第"+i+"條"+"Hello World!";;
            //本次新增EXCHANGE:direct直接匹配,全匹配模式
            channel.exchangeDeclare(EXCHANGE_NAME, "direct");
            //傳送的時候只需要制定Exchange,不需要制定第二個引數佇列名稱/此處是drect模式,第二個引數為binding key
            channel.basicPublish(EXCHANGE_NAME, "black", null, message.getBytes("UTF-8"));
            System.out.println(" [x] Sent '" + message + "'");
            channel.close();
            connection.close();
        }
    }
}

2)、消費者1(與佇列直接關聯,繫結binding Key "black")

package com.haibo.future.web.rabbitmq.demo7;

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class RabbitConsumer {
    private static final String EXCHANGE_NAME = "logs2";

    public static void main(String[] argv)
            throws IOException,
            InterruptedException {

        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        Connection connection = null;
        try {
            connection = factory.newConnection();
        } catch (TimeoutException e) {
            e.printStackTrace();
        }
        final Channel channel = connection.createChannel();
        //定義路由模式direct
        channel.exchangeDeclare(EXCHANGE_NAME, "direct");
        //消費者需要知道佇列名稱,因為已知是direct模式,queueName直接從channel定義中獲取
        String queueName = channel.queueDeclare().getQueue();
        //將佇列和Exchange繫結起來
        channel.queueBind(queueName, EXCHANGE_NAME, "black");
        System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
        Consumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope,
                                       AMQP.BasicProperties properties, byte[] body)
                    throws IOException {
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                String message = new String(body, "UTF-8");
                System.out.println(" [x] Received '" + message + "'");
            }
        };
        boolean autoAck = true;
        channel.basicConsume(queueName, autoAck, consumer);
    }
}

3)、消費者2(與佇列直接關聯,繫結binding Key "red")

package com.haibo.future.web.rabbitmq.demo7;

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class RabbitConsumer2 {
    private static final String EXCHANGE_NAME = "logs2";


    public static void main(String[] argv)
            throws IOException,
            InterruptedException {

        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        Connection connection = null;
        try {
            connection = factory.newConnection();
        } catch (TimeoutException e) {
            e.printStackTrace();
        }
        final Channel channel = connection.createChannel();
        //定義路由模式direct
        channel.exchangeDeclare(EXCHANGE_NAME, "direct");
        //消費者需要知道佇列名稱,因為已知是direct模式,queueName直接從channel定義中獲取
        String queueName = channel.queueDeclare().getQueue();
        //將佇列和Exchange繫結起來,binding key 為black
        channel.queueBind(queueName, EXCHANGE_NAME, "red");
        System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
        Consumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope,
                                       AMQP.BasicProperties properties, byte[] body)
                    throws IOException {
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                String message = new String(body, "UTF-8");
                System.out.println(" [x] Received '" + message + "'");
            }
        };
        boolean autoAck = true;
        channel.basicConsume(queueName, autoAck, consumer);
    }
}

4)、消費者3(與佇列直接關聯,繫結binding Key "black")

package com.haibo.future.web.rabbitmq.demo7;

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class RabbitConsumer3 {
    private static final String EXCHANGE_NAME = "logs2";


    public static void main(String[] argv)
            throws IOException,
            InterruptedException {

        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        Connection connection = null;
        try {
            connection = factory.newConnection();
        } catch (TimeoutException e) {
            e.printStackTrace();
        }
        final Channel channel = connection.createChannel();
        //定義路由模式direct
        channel.exchangeDeclare(EXCHANGE_NAME, "direct");
        //消費者需要知道佇列名稱,因為已知是direct模式,queueName直接從channel定義中獲取
        String queueName = channel.queueDeclare().getQueue();
        //將佇列和Exchange繫結起來,binding key 為black
        channel.queueBind(queueName, EXCHANGE_NAME, "black");
        System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
        Consumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope,
                                       AMQP.BasicProperties properties, byte[] body)
                    throws IOException {
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                String message = new String(body, "UTF-8");
                System.out.println(" [x] Received '" + message + "'");
            }
        };
        boolean autoAck = true;
        channel.basicConsume(queueName, autoAck, consumer);
    }
}

5)、執行測試:

3個消費者啟動,啟動生產者。日誌顯示:消費者1、消費者3顯示收到生產者的訊息,消費者2不能收到生產者的訊息。日誌略。

3、Topic主題:

直接上程式碼:

1)、消費者1:(rootingKey是*.orange.rabbit)

package com.haibo.future.web.rabbitmq.demo8;

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class RabbitConsumer {
    private static final String EXCHANGE_NAME = "topic_logs";

    public static void main(String[] argv)
            throws IOException,
            InterruptedException {

        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        Connection connection = null;
        try {
            connection = factory.newConnection();
        } catch (TimeoutException e) {
            e.printStackTrace();
        }
        final Channel channel = connection.createChannel();
        //定義路由模式direct
        channel.exchangeDeclare(EXCHANGE_NAME, "topic");
        //消費者需要知道佇列名稱,因為已知是direct模式,queueName直接從channel定義中獲取
        String queueName = channel.queueDeclare().getQueue();
        //將佇列和Exchange繫結起來
        channel.queueBind(queueName, EXCHANGE_NAME, "*.orange.rabbit");
        System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
        Consumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope,
                                       AMQP.BasicProperties properties, byte[] body)
                    throws IOException {
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                String message = new String(body, "UTF-8");
                System.out.println(" [x] Received '" + message + "'");
            }
        };
        boolean autoAck = true;
        channel.basicConsume(queueName, autoAck, consumer);
    }
}

2)、消費者2:(rooting key 是 *.orange.bird)

package com.haibo.future.web.rabbitmq.demo8;

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class RabbitConsumer2 {
    private static final String EXCHANGE_NAME = "topic_logs";

    public static void main(String[] argv)
            throws IOException,
            InterruptedException {

        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        Connection connection = null;
        try {
            connection = factory.newConnection();
        } catch (TimeoutException e) {
            e.printStackTrace();
        }
        final Channel channel = connection.createChannel();
        //定義路由模式direct
        channel.exchangeDeclare(EXCHANGE_NAME, "topic");
        //消費者需要知道佇列名稱,因為已知是direct模式,queueName直接從channel定義中獲取
        String queueName = channel.queueDeclare().getQueue();
        //將佇列和Exchange繫結起來
        channel.queueBind(queueName, EXCHANGE_NAME, "*.orange.bird");
        System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
        Consumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope,
                                       AMQP.BasicProperties properties, byte[] body)
                    throws IOException {
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                String message = new String(body, "UTF-8");
                System.out.println(" [x] Received '" + message + "'");
            }
        };
        boolean autoAck = true;
        channel.basicConsume(queueName, autoAck, consumer);
    }
}

3)、消費者3:(rooting key是lazy.#)

package com.haibo.future.web.rabbitmq.demo8;

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class RabbitConsumer3 {
    private static final String EXCHANGE_NAME = "topic_logs";

    public static void main(String[] argv)
            throws IOException,
            InterruptedException {

        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        Connection connection = null;
        try {
            connection = factory.newConnection();
        } catch (TimeoutException e) {
            e.printStackTrace();
        }
        final Channel channel = connection.createChannel();
        //定義路由模式direct
        channel.exchangeDeclare(EXCHANGE_NAME, "topic");
        //消費者需要知道佇列名稱,因為已知是direct模式,queueName直接從channel定義中獲取
        String queueName = channel.queueDeclare().getQueue();
        //將佇列和Exchange繫結起來
        channel.queueBind(queueName, EXCHANGE_NAME, "lazy.#");
        System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
        Consumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope,
                                       AMQP.BasicProperties properties, byte[] body)
                    throws IOException {
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                String message = new String(body, "UTF-8");
                System.out.println(" [x] Received '" + message + "'");
            }
        };
        boolean autoAck = true;
        channel.basicConsume(queueName, autoAck, consumer);
    }
}

4)、生產者:

package com.haibo.future.web.rabbitmq.demo8;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

public class RabbitProduct {
    private static final String EXCHANGE_NAME = "topic_logs";

    public static void main(String[] argv) throws Exception {
        for (int i = 0; i < 10; i++) {
            ConnectionFactory factory = new ConnectionFactory();
            factory.setHost("localhost");
            Connection connection = factory.newConnection();
            Channel channel = connection.createChannel();
            String message = null;
            //P-->X :本次新增EXCHANGE:direct直接匹配,全匹配模式
            channel.exchangeDeclare(EXCHANGE_NAME, "topic");
            //X-->Q :傳送的時候只需要制定Exchange,不需要制定第二個引數佇列名稱/此處是drect模式,第二個引數為binding key/此處是topic模式,第二個引數為rooting key
            if (i == 5) {
                message = "第" + i + "條:" + "lazy.orange.rabbit" + i;
                channel.basicPublish(EXCHANGE_NAME, "lazy.orange.rabbit" + i, null, message.getBytes("UTF-8"));
            } else {
                if (i % 2 == 1) {
                    message = "第" + i + "條:" + i + ".orange." + "rabbit";
                    channel.basicPublish(EXCHANGE_NAME, i + ".orange." + "rabbit", null, message.getBytes("UTF-8"));
                } else {
                    message = "第" + i + "條:" + i + ".orange." + "bird";
                    channel.basicPublish(EXCHANGE_NAME, i + ".orange." + "bird", null, message.getBytes("UTF-8"));
                }
            }
            System.out.println(" [x] Sent '" + message + "'");
            channel.close();
            connection.close();
        }
    }
}

5)、執行測試:

消費者1,2,3分別執行,執行生產者。結果如下:

生產者:

[x] Sent '第0條:0.orange.bird'
 [x] Sent '第1條:1.orange.rabbit'
 [x] Sent '第2條:2.orange.bird'
 [x] Sent '第3條:3.orange.rabbit'
 [x] Sent '第4條:4.orange.bird'
 [x] Sent '第5條:lazy.orange.rabbit5'
 [x] Sent '第6條:6.orange.bird'
 [x] Sent '第7條:7.orange.rabbit'
 [x] Sent '第8條:8.orange.bird'
 [x] Sent '第9條:9.orange.rabbit'

消費者1:(rootingKey是*.orange.rabbit)

[*] Waiting for messages. To exit press CTRL+C
 [x] Received '第1條:1.orange.rabbit'
 [x] Received '第3條:3.orange.rabbit'
 [x] Received '第7條:7.orange.rabbit'
 [x] Received '第9條:9.orange.rabbit'

消費者2:(rooting key 是 *.orange.bird)

[*] Waiting for messages. To exit press CTRL+C
 [x] Received '第0條:0.orange.bird'
 [x] Received '第2條:2.orange.bird'
 [x] Received '第4條:4.orange.bird'
 [x] Received '第6條:6.orange.bird'
 [x] Received '第8條:8.orange.bird'

消費者3:(rooting key是lazy.#)

[*] Waiting for messages. To exit press CTRL+C
 [x] Received '第5條:lazy.orange.rabbit5'