1. 程式人生 > >rabbitmq direct型別中,生產者端無法接收應答

rabbitmq direct型別中,生產者端無法接收應答

研究了一下rabbitmq,做了一個測試,出現的問題,這裡記錄一下,以便下次檢視

下面的程式碼為生產者

import java.util.UUID;

import com.rabbitmq.client.AMQP.BasicProperties;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.QueueingConsumer;

public class EmitLogDirect {
    private static final String EXCHANGE_NAME = "direct_logs";

    public static void main(String[] argv) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();
        String replyToQueueName = channel.queueDeclare().getQueue();
        channel.exchangeDeclare(EXCHANGE_NAME, "direct");
        QueueingConsumer consumer = new QueueingConsumer(channel);
        channel.basicConsume(replyToQueueName, true, consumer);
        String severity ="error";
        String[] abc = {"a",".","b","c",".",".",".",".","d",".","g"};
        String message = getMessage(abc);
        for(int i= 0 ; i < 10;i++){
            String correlationId = UUID.randomUUID().toString();
            BasicProperties bp = new BasicProperties.Builder().correlationId(correlationId).replyTo(replyToQueueName).build();
            channel.basicPublish(EXCHANGE_NAME, severity, bp, message.getBytes());
            QueueingConsumer.Delivery delivery = consumer.nextDelivery(10000);
            if(delivery != null && correlationId.equals(delivery.getProperties().getCorrelationId())){
                 String reply = new String(delivery.getBody(), "utf-8");
                 System.out.println("reply="+reply);
            }else{
                System.out.println("do not receive response");
            }
            Thread.sleep(1000*6);
        }
        System.out.println(" [x] Sent '" + severity + "':'" + message + "'");
        channel.close();
        connection.close();
    }

下面的程式碼為消費者

package direct;

import com.rabbitmq.client.AMQP.BasicProperties;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.QueueingConsumer;

public class ReceiveLogsDirect {
    private static final String EXCHANGE_NAME = "direct_logs";

    public static void main(String[] argv) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();
        channel.exchangeDeclare(EXCHANGE_NAME, "direct");
        String queueName = channel.queueDeclare().getQueue();
        /*if (argv.length < 1) {
            System.err
                    .println("Usage: ReceiveLogsDirect [info] [warning] [error]");
            System.exit(1);
        }*/
        String[] aa={"error"};
        for (String severity : aa) {
            channel.queueBind(queueName, EXCHANGE_NAME, severity);
        }
        System.out.println(" [*] Waiting for messages error. To exit press CTRL+C");
        QueueingConsumer consumer = new QueueingConsumer(channel);
        channel.basicConsume(queueName, true, consumer);
        while (true) {
            QueueingConsumer.Delivery delivery = consumer.nextDelivery();
            String message = new String(delivery.getBody());
            String routingKey = delivery.getEnvelope().getRoutingKey();
            BasicProperties bp = new BasicProperties().builder().correlationId(delivery.getProperties().getCorrelationId()).build();
            //channel.basicPublish(EXCHANGE_NAME, "direct", bp, "this is respose".getBytes());
            System.out.println("replyTo="+delivery.getProperties().getReplyTo());
            channel.basicPublish(EXCHANGE_NAME, delivery.getProperties().getReplyTo(), bp, "this is respose".getBytes());(1)
            System.out.println(" [x] Received '" + routingKey + "':'" + message
                    + "'");
        }
    }
}

在(1)的標誌處,我把第一個引數的交換機名給寫上了,可是生產者一直接收不到應答,一直不理解是什麼。當把它改成""時,就可以收到。一直不知其理。

查看了一下rabbitmq官網的說明,如果第一個引數為“”,那代表的是預設或是無名字的交換機,這時訊息是傳送的佇列就直接由第二個引數來決定了

這時才想起在生產者端時,我對應答的佇列宣告的是一個臨時佇列,而這個佇列是沒有與任何交換機繫結的。而我的寫法中,把應答臨時佇列與一個具體的交換機

繫結起來了,那當然在生產者端接收不到應答了。