1. 程式人生 > >RabbitMQ入門-發布訂閱模式

RabbitMQ入門-發布訂閱模式

OS 臨時 cli src get 參數 asi receive 連接

兔子的Publish/Subscribe是這樣的:

技術分享圖片

有個生產者PX代表交換機,交換機綁定隊列,消費者從隊列中取得消息。每次有消息,先發到交換機中,然後由交換機負責發送到它已知的隊列中。

生產者代碼:

package com.example.demo;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/** * * 4種交換類型 * There are a few exchange types available: direct, topic, headers and fanout. * 扇出交換:將收到的消息廣播到它所知道的所有隊列裏 */ public class PSSend { private static final String EXCHANGE_NAME = "logs"; public static void main(String[] args) throws IOException, TimeoutException { ConnectionFactory factory
= new ConnectionFactory(); // 連接工廠 factory.setHost("localhost"); Connection connection = factory.newConnection(); // 獲取連接 Channel channel = connection.createChannel(); // 當我們發送時,需要一個路由密鑰,但是對於扇出交換,他的值將被忽略 // 第一個參數為交換的名字,第二個為交換的類型 channel.exchangeDeclare(EXCHANGE_NAME, "fanout"); String msg
= "發布訂閱"; channel.basicPublish(EXCHANGE_NAME, "", null, msg.getBytes()); System.out.println("PS-Send:" + msg); channel.close(); connection.close(); } }

消費者:

package com.example.demo;

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * 
 *
 * There are a few exchange types available: direct, topic, headers and fanout.
 * 扇出交換:將收到的消息廣播到它所知道的所有隊列裏
 */
public class PSReceive {

    private static final String EXCHANGE_NAME = "logs";

    public static void main(String[] args) throws IOException, TimeoutException {
        ConnectionFactory factory = new ConnectionFactory();    // 連接工廠
        factory.setHost("localhost");
        Connection connection = factory.newConnection();        // 獲取連接
        Channel channel = connection.createChannel();

        // 當我們發送時,需要一個路由密鑰,但是對於扇出交換,他的值將被忽略
        // 第一個參數為交換的名字,第二個為交換的類型
        channel.exchangeDeclare(EXCHANGE_NAME, "fanout");

        // 當聲明隊列,不加任何參數,產生的將是一個臨時隊列,getQueue返回的是隊列名稱
        String queueA = channel.queueDeclare().getQueue();
        //String queueB = channel.queueDeclare().getQueue();
        System.out.println("臨時隊列:" + queueA);

        // 下面綁定交換與隊列
        channel.queueBind(queueA, EXCHANGE_NAME, "");
        //channel.queueBind(queueB, EXCHANGE_NAME, "");

        Consumer consumer = new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String recv = new String(body, "UTF-8");
                System.out.println("PS-Receive:" + recv);
            }
        };

        channel.basicConsume(queueA, true, consumer);
        //channel.basicConsume(queueB, true, consumer);


    }

}

啟動消費者和生產者,控制臺打印

技術分享圖片

技術分享圖片

RabbitMQ入門-發布訂閱模式