1. 程式人生 > >RabbitMQ學習筆記(三)-----------------RabbitMQ不同的交換機進行路由

RabbitMQ學習筆記(三)-----------------RabbitMQ不同的交換機進行路由

專案地址:https://github.com/gongxianshengjiadexiaohuihui/RabbitMQ/tree/master/Exchange_RabbitMQ

專案結構:

流程圖

補充知識:

Connection是RabbitMQ的sockert連結,封裝了socket協議相關部分邏輯

ConnectionFactory是製造Connection的工廠

Connection是建立一個TCP連線,需要經過三次握手,如果每次生產和消費訊息都要建立TCP連線十分耗費資源,因此我們用

Chanel建立虛擬連線,這個虛擬連線是建立在上面TCP連線的基礎上,資料傳輸都通過Channel,不需要頻繁的建立關閉tcp連線,而且tcp的連線數是有限制的

 

交換機一共有4種規則,第四種並不常用,因此我們只敘述前三種

fanout:類似與訂閱釋出,交換機把生產者的所有資訊都發送到與它繫結的佇列中

direct:交換機只發送資訊到符合條件 rountingkey = bindingkey的佇列中

topic:交換機發送資訊到rountingkey符合匹配規則bindingkey的佇列中,bindingkey類似於正則表示式

生產者程式碼

package main.java.com.ggp;


import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * @ClassName Producer
 * @Description TODO
 * @Author Mr.G
 * @Date 2018/11/21 9:16
 * @Version 1.0
 */
public class Producer {

     public Producer(String exchangeName, String exchangeType)throws IOException, TimeoutException {


         ConnectionFactory connectionFactory = new ConnectionFactory();
         connectionFactory.setHost("localhost");
         connectionFactory.setUsername("guest");
         connectionFactory.setPassword("guest");
         Connection connection = connectionFactory.newConnection();
         Channel channel = connection.createChannel();

         channel.exchangeDeclare(exchangeName, exchangeType);
         /**
          * 分發訊息,根據不同的交換機規則分發
          *
          */
         if("fanout".equals(exchangeType)) {
             for (int i = 0; i < 5; i++) {
                 String message = "Hello Rabbit " + i;
                 channel.basicPublish(exchangeName, "", null, message.getBytes());
                 System.out.println("Producer Send  Message: " + message);
             }
             channel.close();
             connection.close();
             return;
         }
         if("direct".equals(exchangeType)){
             String[] routingKeys = new String[]{"info", "warning", "error"};
             for(String routingKey:routingKeys){
                 String message = "RoutingSendDirect Send the message level:"+routingKey;
                 channel.basicPublish(exchangeName,routingKey,null,message.getBytes());
                 System.out.println("RoutingSendDirect Send " + routingKey + ": "+message);
             }
             channel.close();;
             connection.close();
             return;
         }
         if("topic".equals(exchangeType)){
             String[] routingKeys = new String[]{"a.b.c", "a.a.b.d","ad.dd.b.c"};
             for(String routingKey:routingKeys){
                 String message = "Topic Send the message which the routingkey is "+routingKey;
                 channel.basicPublish(exchangeName,routingKey,null,message.getBytes());
                 System.out.println("send message: " + message + " successfully!");
             }
             channel.close();
             connection.close();
             return;
         }
         channel.close();
         connection.close();
         throw new RuntimeException("Unsupported the exchangeType!");
     }


}

消費者程式碼

package main.java.com.ggp;

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * @ClassName Customer
 * @Description TODO
 * @Author Mr.G
 * @Date 2018/11/21 10:12
 * @Version 1.0
 */
public class Customer {



    public Customer(String customerName,String exchangeName, String exchangeType)throws IOException,TimeoutException {
        final String customer_name = customerName;
        final String exchange_type = exchangeType;
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("localhost");
        connectionFactory.setUsername("guest");
        connectionFactory.setPassword("guest");
        Connection connection = connectionFactory.newConnection();
        Channel channel = connection.createChannel();
        channel.exchangeDeclare(exchangeName,exchangeType);
        /**
         * 建立隨機佇列,根據不同的交換機規則繫結佇列
         */
        String queueName = channel.queueDeclare().getQueue();
        if("fanout".equals(exchangeType)){
            channel.queueBind(queueName,exchangeName,"");
            System.out.println(customerName+" is waiting message that the type of it is fanout");
        }else if("direct".equals(exchangeType)){
            String[] bindingKeys = new String[]{"info","warning"};
            for(String bindingKey:bindingKeys){
                channel.queueBind(queueName,exchangeName,bindingKey);
            }
            System.out.println(customerName+" is waiting message that the type of it is direct");
        }else if("topic".equals(exchangeType)){
            String bindingKey = "#.b.*";
            channel.queueBind(queueName,exchangeName,bindingKey);
            System.out.println(customerName+" is waiting message that the type of it is topic");
        }else {
            throw new RuntimeException("Unsupported the exchangeType!");
        }
        Consumer consumer = new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body)throws IOException{
                String message = new String(body,"UTF-8");
                System.out.println(customer_name+" has dealt the "+exchange_type+" message: "+ message);
            }
        };
        channel.basicConsume(queueName,true,consumer);

    }
}

測試程式碼

package test.java.com.ggp.test;

import main.java.com.ggp.Customer;
import main.java.com.ggp.Producer;

/**
 * @ClassName FanoutTest
 * @Description TODO
 * @Author Mr.G
 * @Date 2018/11/21 10:47
 * @Version 1.0
 */
public class FanoutTest {
    public static void main(String[] args) throws Exception{
        new Customer("ggp","fanout","fanout");
        new Customer("rqb","fanout","fanout");
        new Producer("fanout","fanout");
    }
}

測試結果

參考資料:https://www.cnblogs.com/LipeiNet