1. 程式人生 > >RabbitMQ四種Exchange型別之Fanout (Java)

RabbitMQ四種Exchange型別之Fanout (Java)

RabbitMQ有四種Exchange型別,分別是Direct 、Fanout 、Topic、Headers 

Exchange特點:

  • Fanout 不處理路由鍵。你只需要簡單的將佇列繫結到交換機上。一個傳送到該型別交換機的訊息都會被廣播到與該交換機繫結的所有佇列上。如下圖:

  • Topic    將路由鍵和某模式進行匹配。此時佇列需要繫結要一個模式上。符號“#”匹配一個或多個詞,符號“*”只能匹配一個詞。因此“logs.#”能夠匹配到“logs.error”、“logs.info.toc”,但是“logs.*” 只能匹配到“logs.error”,不能匹配到“logs.info.toc” 。如下圖:

  • Direct   
    處理路由鍵,需要將一個佇列繫結到交換機上,要求該訊息與一個特定的路由鍵完全匹配。這是一個完整的匹配。如果一個佇列繫結到該交換機上要求路由鍵為 “logs”,則只有路由鍵為“logs”的訊息才被轉發,不會轉發路由鍵為"logs.error",只會轉發路由鍵為"logs"。 如下圖:

  • Headers  不處理路由鍵,而是根據傳送的訊息內容中的headers屬性進行匹配。在繫結Queue與Exchange時指定一組鍵值對;當訊息傳送到RabbitMQ時會取到該訊息的headers與Exchange繫結時指定的鍵值對進行匹配;如果完全匹配則訊息會路由到該佇列,否則不會路由到該佇列。headers屬性是一個鍵值對,可以是Hashtable,鍵值對的值可以是任何型別。而fanout,direct,topic 的路由鍵都需要要字串形式的。不過headers比較少用到,下面是headers的官方說明文件:
A headers exchange is designed to for routing on multiple attributes that are more easily expressed as message headers than a routing key. Headers exchanges ignore the routing key attribute. Instead, the attributes used for routing are taken from the headers attribute. A message is considered matching if the value of the header equals the value specified upon binding.It is possible to bind a queue to a headers exchange using more than one header for matching. In this case, the broker needs one more piece of information from the application developer, namely, should it consider messages with any of the headers matching, or all of them? This is what the "x-match" binding argument is for. When the "x-match" argument is set to "any", just one matching header value is sufficient. Alternatively, setting "x-match" to "all" mandates that all the values must match.Headers exchanges can be looked upon as "direct exchanges on steroids". Because they route based on header values, they can be used as direct exchanges where the routing key does not have to be a string; it could be an integer or a hash (dictionary) for example.

有了上面的介紹,下面就直接上程式碼吧!!

Consumer:

package fanout;
import java.io.IOException;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;


public class FanoutConsumer {


private static final String EXCHANGE_NAME = "logs";

public static void main(String[] argv) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
//rabbitmq監聽IP
factory.setHost("192.168.249.128");
//rabbitmq監聽預設埠
factory.setPort(5672);
//設定訪問的使用者
factory.setUsername("test");
factory.setPassword("test");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
//宣告路由名字和型別
channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
//獲取隨機佇列名稱
String queueName = channel.queueDeclare().getQueue();
//建立佇列
channel.queueDeclare(queueName, false, false, true, null);
//把佇列繫結到路由上
channel.queueBind(queueName, EXCHANGE_NAME, "");


System.out.println(" Waiting for msg....");


Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope,
AMQP.BasicProperties properties, byte[] body) throws IOException {
String message = new String(body, "UTF-8");
System.out.println("Received msg is '" + message + "'");
}
};
channel.basicConsume(queueName, true, consumer);
}
}

Producer:
package fanout;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

public class FanoutProducer {
	private static final String EXCHANGE_NAME = "logs";

	public static void main(String[] argv) throws Exception{

		ConnectionFactory factory = new ConnectionFactory();
		//rabbitmq監聽IP
		factory.setHost("192.168.249.128");
		//rabbitmq監聽預設埠
		factory.setPort(5672);
		//設定訪問的使用者
		factory.setUsername("test");
		factory.setPassword("test");
		Connection connection = factory.newConnection();
		Channel channel = connection.createChannel();

		//宣告路由名字和型別
		channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
		String message = makeMessage(argv);
		
		channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes());
		System.out.println("Sent msg is '" + message + "'");

		channel.close();
		connection.close();
	}

	private static String makeMessage(String[] strings){
		if (strings.length < 1){
			return "這是預設訊息!!";
		}else{
			StringBuffer buffer= new StringBuffer();
			for (int i = 0; i < strings.length; i++) {
				buffer.append(strings[i]);
			}
			return buffer.toString();
		}
	}

}

執行Consumer:


執行Producer

有問題可以掃碼向我提問哦好了 , 就到這裡~~    祝生活愉快!!!