1. 程式人生 > >RabbitMQ四種Exchange型別之Headers(Java)

RabbitMQ四種Exchange型別之Headers(Java)

Headers 型別的Exchanges是不處理路由鍵的,而是根據傳送的訊息內容中的headers屬性進行匹配。在繫結Queue與Exchange時指定一組鍵值對;當訊息傳送到RabbitMQ時會取到該訊息的headers與Exchange繫結時指定的鍵值對進行匹配;如果完全匹配則訊息會路由到該佇列,否則不會路由到該佇列。headers屬性是一個鍵值對,可以是Hashtable,鍵值對的值可以是任何型別。而fanout,direct,topic 的路由鍵都需要要字串形式的。

匹配規則x-match有下列兩種型別:

  • x-match = all   :表示所有的鍵值對都匹配才能接受到訊息
  • x-match = any :表示只要有鍵值對匹配就能接受到訊息

不過headers比較少用到,下面是headers的官方說明文件:

A headers exchange is designed to for routing on multiple attributes that are more easily expressed as message headers than a routing key. Headers exchanges ignore the routing key attribute. Instead, the attributes used for routing are taken from the headers attribute. A message is considered matching if the value of the header equals the value specified upon binding.It is possible to bind a queue to a headers exchange using more than one header for matching. In this case, the broker needs one more piece of information from the application developer, namely, should it consider messages with any of the headers matching, or all of them? This is what the "x-match" binding argument is for. When the "x-match" argument is set to "any", just one matching header value is sufficient. Alternatively, setting "x-match" to "all" mandates that all the values must match.Headers exchanges can be looked upon as "direct exchanges on steroids". Because they route based on header values, they can be used as direct exchanges where the routing key does not have to be a string; it could be an integer or a hash (dictionary) for example.下面貼出headers的事例程式碼!!

Consumer:

package headers;

import java.io.IOException;
import java.util.Hashtable;
import java.util.Map;
import java.util.concurrent.TimeoutException;

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;


public class HeadersConsumer {

	private static final String EXCHANGE_NAME 	= "exchange_headers";
	private static final String QUEUE_NAME 		= "headers_test_queue";
	public static void main(String[] argv) throws IOException, TimeoutException  {
		new ExchangeHeaders();
	}

	static class ExchangeHeaders{
		public  ExchangeHeaders() throws IOException, TimeoutException {
			ConnectionFactory factory = new ConnectionFactory();
			//rabbitmq監聽IP
			factory.setHost("192.168.249.128");
			//rabbitmq監聽預設埠
			factory.setPort(5672);
			//設定訪問的使用者
			factory.setUsername("test");
			factory.setPassword("test");
			Connection connection = factory.newConnection();
			Channel channel = connection.createChannel();
			//宣告路由名字和型別
			channel.exchangeDeclare(EXCHANGE_NAME, "headers", false, true, null);
			//建立佇列
			channel.queueDeclare(QUEUE_NAME, false, false, true, null);
			
			//設定訊息頭鍵值對資訊
			Map<String, Object> headers = new Hashtable<String,Object>();
			//這裡x-match有兩種型別
			//all:表示所有的鍵值對都匹配才能接受到訊息
			//any:表示只要有鍵值對匹配就能接受到訊息
			headers.put("x-match", "any");
			headers.put("name", "jack");
			headers.put("age" , 31);
			
			//把佇列繫結到路由上並指定headers
			channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "", headers);
			System.out.println(" Waiting for msg....");
			Consumer consumer = new DefaultConsumer(channel) {
				@Override
				public void handleDelivery(String consumerTag, Envelope envelope,
						AMQP.BasicProperties properties, byte[] body) throws IOException {
					String message = new String(body, "UTF-8");
					
					System.out.println("Received msg is '" + message + "'");
				}
			};
			channel.basicConsume(QUEUE_NAME, true, consumer);
		}

	}

}
Producer:
package headers;
import java.io.IOException;
import java.util.Hashtable;
import java.util.Map;
import java.util.concurrent.TimeoutException;

import com.rabbitmq.client.AMQP.BasicProperties.Builder;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;


public class HeadersProducer {

	private static final String EXCHANGE_NAME = "exchange_headers";

	public static void main(String[] argv) throws Exception{
		new ExchangeHeaders("exchanges type headers test msg~");
	}
	
	static class ExchangeHeaders{
		public ExchangeHeaders(String message) throws IOException, TimeoutException{
			ConnectionFactory factory = new ConnectionFactory();
			//rabbitmq監聽IP
			factory.setHost("192.168.249.128");
			//rabbitmq監聽預設埠
			factory.setPort(5672);
			//設定訪問的使用者
			factory.setUsername("test");
			factory.setPassword("test");
			Connection connection = factory.newConnection();
			Channel channel = connection.createChannel();

			//宣告路由名字和型別
			channel.exchangeDeclare(EXCHANGE_NAME, "headers", false, true, null);
			
			//設定訊息頭鍵值對資訊
			Map<String, Object> headers = new Hashtable<String, Object>();
			headers.put("name", "jack");
			headers.put("age", 30);
			Builder builder = new Builder();
			builder.headers(headers);
			
			channel.basicPublish(EXCHANGE_NAME, "", builder.build(), message.getBytes());
			System.out.println("Sent msg is '" + message + "'");

			channel.close();
			connection.close();
			
		}
	}

}
執行結果:



注意如果將Consumer中的headers.put("x-match", "any");改成headers.put("x-match", "all"); 則不會接受到訊息!!

有問題可以掃下面向我提問哦!1


如果幫助到你了的話,請關注我的微信公眾號吧(hellos520

祝生活愉快!!