1. 程式人生 > >三、RabbitMq學習筆記

三、RabbitMq學習筆記

RabbitMQ原生API三種交換模式

1. Hello World

在這裡沒有宣告交換機(exchange),也沒有宣告繫結(bind),RabbitMQ會使用預設的交換機(AMQP default)路由鍵就是佇列名稱

【生產者】

/**
 * 消費者
 * 
 * @author ITCloud
 */
public class Consumer {
	public static void main(String[] args) throws Exception {
		// 1.建立連線
		ConnectionFactory connectionFactory = new ConnectionFactory
(); connectionFactory.setHost("192.168.186.130"); connectionFactory.setPort(5672); connectionFactory.setUsername("admin"); connectionFactory.setPassword("admin"); connectionFactory.setVirtualHost("/"); // 2.獲取連線,建立channel Connection connection = connectionFactory.newConnection(); Channel channel =
connection.createChannel(); // 3. 佇列宣告 String queueName = "hello.world"; /** * (String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments) * 引數說明: * queue:佇列名稱 * duable:佇列是否是持久化的,rabbitmq重啟之後,佇列依然存在 * exclusive:獨佔佇列,只對當前連線有效,一般都會設定成非獨佔佇列false * autoDelete:佇列是否自動刪除 * arguments:一些引數,後續介紹 */
channel.queueDeclare(queueName, true, false, false, null); // 4.建立一個簡單的消費者 com.rabbitmq.client.Consumer consumer = new DefaultConsumer(channel) { //處理接收的訊息 /** * body:接收訊息體 */ @Override public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body) throws IOException { System.out.println("消費端:" + new String(body)); } }; //設定非同步接收訊息,當消費端啟動後,將一直會監聽消費端 channel.basicConsume(queueName, true, consumer); } }

【消費者】

/**
 * 生產者
 * @author ITCloud
 */
public class Producer {
	public static void main(String[] args) throws Exception {
		//1.建立連線工廠類
		ConnectionFactory connectionFactory = new ConnectionFactory();
		connectionFactory.setHost("192.168.186.130");
		connectionFactory.setPort(5672);
		connectionFactory.setUsername("admin");
		connectionFactory.setPassword("admin");
		connectionFactory.setVirtualHost("/");
		//2.獲取連線,建立channel
		Connection connection = connectionFactory.newConnection();
		Channel channel = connection.createChannel();
		
		//3. 傳送訊息
		String msg = "Hello world RabbitMQ!!";
		/**
		 * 引數詳解
		 *String exchange, String routingKey, BasicProperties props, byte[] body
		 * 交換機名稱:exchange 必須,如果不指定,則使用rabbitmq提供預設的exchange:AMQP default
		 * 路由鍵:routingKey 當交換機是Fanout時候routingKey可以不需要,如果沒有明確指定,則路由到佇列
		 * 傳送訊息帶一些引數:props 非必需
		 * 要傳送的訊息:body
		 */
		channel.basicPublish("", "hello.world", null, msg.getBytes());
		
		//4.關閉相關連線
		channel.close();
		connection.close();
	
	}
}

2. direct交換模式

direct交換模式的特點:生產者和消費者通過routingKey來連線,消費端只有擁有相應的routingKey才能進行消費

【消費者】

/**
 * direct型別的交換機,特點:
 * 	生產者和消費者通過routingKey來連線,消費端只有擁有相應的routingKey才能進行消費
 * @author ITCloud
 *
 */
public class DirectConsumer {
	public static void main(String[] args) throws Exception{
		// 1.建立連線
		ConnectionFactory connectionFactory = new ConnectionFactory();
		connectionFactory.setHost("192.168.186.130");
		connectionFactory.setPort(5672);
		connectionFactory.setUsername("admin");
		connectionFactory.setPassword("admin");
		connectionFactory.setVirtualHost("/");
		// 2.獲取連線,建立channel
		Connection connection = connectionFactory.newConnection();
		Channel channel = connection.createChannel();
		
		//3.交換機和佇列宣告
		/**
		 * 引數說明:(String exchange, String type, boolean durable, boolean autoDelete,Map<String, Object> arguments)
		 * exchange: 交換機名稱
		 * type:交換機型別:direct topic fanout hearders(幾乎不用)
		 * durable: 是否是持久化的佇列,
		 * autoDelete:是否自動刪除
		 * arguments:一些引數,幾乎不用
		 */
		channel.exchangeDeclare("direct.exchange", "direct", true, false, null);
		channel.queueDeclare("direct.queue", true, false, false, null);
		//4.佇列繫結
		channel.queueBind("direct.queue", "direct.exchange", "direct.queue");
		
		Consumer consumer = new DefaultConsumer(channel) {
			@Override
			public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body)
					throws IOException {
				System.out.println("消費端:" + new String(body));
			}
		};
		//true表示自動接收
		channel.basicConsume("direct.queue", true, consumer);
	}
}

【生產者】

public class DirectProducer {
	public static void main(String[] args) throws Exception{
		// 1.建立連線
		ConnectionFactory connectionFactory = new ConnectionFactory();
		connectionFactory.setHost("192.168.186.130");
		connectionFactory.setPort(5672);
		connectionFactory.setUsername("admin");
		connectionFactory.setPassword("admin");
		connectionFactory.setVirtualHost("/");
		// 2.獲取連線,建立channel
		Connection connection = connectionFactory.newConnection();
		Channel channel = connection.createChannel();
		
		//3. 傳送訊息
		String msg = "Send msg by RabbitMQ!! direct";
		channel.basicPublish("direct.exchange", "direct.queue", null, msg.getBytes());
		
		channel.close();
		connection.close();
	}
}

3. fanout交換模式

fanout交換模式的特點:不需要routingKey,只要綁定了交換機即可

這種模式可以用於死信佇列中

【消費者】

/**
 * fanout型別的交換機,特點:
 * 	不需要routingKey,只要綁定了交換機即可
 * @author ITCloud
 *
 */
public class FanoutConsumer {
	public static void main(String[] args) throws Exception{
		// 1.建立連線
		ConnectionFactory connectionFactory = new ConnectionFactory();
		connectionFactory.setHost("192.168.186.130");
		connectionFactory.setPort(5672);
		connectionFactory.setUsername("admin");
		connectionFactory.setPassword("admin");
		connectionFactory.setVirtualHost("/");
		// 2.獲取連線,建立channel
		Connection connection = connectionFactory.newConnection();
		Channel channel = connection.createChannel();
		
		//3.交換機和佇列宣告
		channel.exchangeDeclare("fanout.exchange", "fanout", true, false, null);
		channel.queueDeclare("fanout.queue", true, false, false, null);
		//4.佇列繫結,這裡rountingKey=""; 但是不可以設定為null
		channel.queueBind("fanout.queue", "fanout.exchange", "");
		
		Consumer consumer = new DefaultConsumer(channel) {
			@Override
			public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body)
					throws IOException {
				System.out.println("消費端:" + new String(body));
			}
		};
		//true表示自動接收
		channel.basicConsume("fanout.queue", true, consumer);
	}
}

【生產者】

public class FanoutProducer {
	public static void main(String[] args) throws Exception{
		// 1.建立連線
		ConnectionFactory connectionFactory = new ConnectionFactory();
		connectionFactory.setHost("192.168.186.130");
		connectionFactory.setPort(5672);
		connectionFactory.setUsername("admin");
		connectionFactory.setPassword("admin");
		connectionFactory.setVirtualHost("/");
		// 2.獲取連線,建立channel
		Connection connection = connectionFactory.newConnection();
		Channel channel = connection.createChannel();
		
		//3. 傳送訊息
		String msg = "Send msg by RabbitMQ!! fanout";
		channel.basicPublish("fanout.exchange", "", null, msg.getBytes());
		
		channel.close();
		connection.close();
	}
}

4. topic交換模式

【消費者】

/**
 * topic型別的交換機,特點:
 * 	通過rountingKey進行模糊匹配:
 * 		1. * 匹配一個單詞
 * 		2. # 匹配多個單詞
 * 例如:A.* 只可以匹配A.aab;但是不可以匹配A.aa.bb
 * @author ITCloud
 *
 */
public class TopicConsumer {
	public static void main(String[] args) throws Exception{
		// 1.建立連線
		ConnectionFactory connectionFactory = new ConnectionFactory();
		connectionFactory.setHost("192.168.186.130");
		connectionFactory.setPort(5672);
		connectionFactory.setUsername("admin");
		connectionFactory.setPassword("admin");
		connectionFactory.setVirtualHost("/");
		// 2.獲取連線,建立channel
		Connection connection = connectionFactory.newConnection();
		Channel channel = connection.createChannel();
		
		//3.交換機和佇列宣告
		channel.exchangeDeclare("topic.exchange", "topic", true, false, null);
		channel.queueDeclare("topic.queue", true, false, false, null);
		//4.佇列繫結,這裡可以設定rountingKey=""; 但是不可以設定為null
		channel.queueBind("topic.queue", "topic.exchange", "topic.#");
		
		Consumer consumer = new DefaultConsumer(channel) {
			@Override
			public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body)
					throws IOException {
				System.out.println("消費端:" + new String(body));
			}
		};
		//true表示自動接收
		channel.basicConsume("topic.queue", true, consumer);
	}
}

【生產者】

public class TopicProducer {
	public static void main(String[] args) throws Exception{
		// 1.建立連線
		ConnectionFactory connectionFactory = new ConnectionFactory();
		connectionFactory.setHost("192.168.186.130");
		connectionFactory.setPort(5672);
		connectionFactory.setUsername("admin");
		connectionFactory.setPassword("admin");
		connectionFactory.setVirtualHost("/");
		// 2.獲取連線,建立channel
		Connection connection = connectionFactory.newConnection();
		Channel channel = connection.createChannel();
		
		//3. 傳送訊息
		String msg = "Send msg by RabbitMQ!! topic";
        //宣告rountingKey = "topic.hello.world" 匹配topic.#
		channel.basicPublish("topic.exchange", "topic.hello.world", null, msg.getBytes());
		
		channel.close();
		connection.close();
	}
}

5. Ack之重回佇列

【消費者】

/**
 *重回佇列:就是將訊息進行重新扔到佇列中,給消費者重新消費
 *	簡單的說:就是把沒有消費成功的佇列,重新返回給Broker
 * @author ITCloud
 */
public class AckConsumer {
	public static void main(String[] args) throws Exception {
		// 1.建立連線
		ConnectionFactory connectionFactory = new ConnectionFactory();
		connectionFactory.setHost("192.168.186.130");
		connectionFactory.setPort(5672);
		connectionFactory.setUsername("admin");
		connectionFactory.setPassword("admin");
		connectionFactory.setVirtualHost("/");
		// 2.獲取連線,建立channel
		Connection connection = connectionFactory.newConnection();
		Channel channel = connection.createChannel();
		
		// 3.交換機和 佇列宣告
		String queueName = "ack.queue";
		String exchanegName = "ack.exchange";
		channel.queueDeclare(queueName, true, false, false, null);
		channel.exchangeDeclare(exchanegName, "direct", true, false, null);
		
		channel.queueBind(queueName, exchanegName, "ack.queue");
		
		// 4.建立一個簡單的消費者
		com.rabbitmq.client.Consumer consumer = new DefaultConsumer(channel) {
			@Override
			public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body)
					throws IOException {
				System.out.println("消費端:" + new String(body));
				Integer num = (Integer)properties.getHeaders().get("num");
				try {
					TimeUnit.SECONDS.sleep(1);
				} catch (InterruptedException e) {
					e.printStackTrace();
				}
				if (num == 3) {
					//表示將該訊息重新扔到訊息的尾端,進行重新消費
					//此時會阻塞在這個地方 TODO
					channel.basicNack(envelope.getDeliveryTag(), false, true);
				} else {
					//訊息手動接收
					channel.basicAck(envelope.getDeliveryTag(), false);
				}
			}
		};
		
		channel.basicQos(0, 1, false);
		//這裡設定成非自動確認接收訊息
		channel.basicConsume(queueName, false, consumer);
	}
}

【生產者】

/**
 * 
 * @author ITCloud
 */
public class AckProducer {
	public static void main(String[] args) throws Exception {
		//1.建立連線工廠類
		ConnectionFactory connectionFactory = new ConnectionFactory();
		connectionFactory.setHost("192.168.186.130");
		connectionFactory.setPort(5672);
		connectionFactory.setUsername("admin");
		connectionFactory.setPassword("admin");
		connectionFactory.setVirtualHost("/");
		//2.獲取連線,建立channel
		Connection connection = connectionFactory.newConnection();
		Channel channel = connection.createChannel();
		
		//3. 傳送訊息
		for (int x = 0; x<5; x++) {
			Map<String, Object> headers = new HashMap<>();
			headers.put("num", x);
			BasicProperties properties = new BasicProperties.Builder()
					.deliveryMode(2) //持久化投遞模式
					.contentEncoding("utf-8")
					.headers(headers)
					.build();
			String msg = "Hello world RabbitMQ!!" + x;
			channel.basicPublish("ack.exchange", "ack.queue", true, properties, msg.getBytes());
		}
		
		//4.關閉相關連線
		channel.close();
		connection.close();
	}
}

6. 訊息確認機制

6.1 生產者訊息確認

這裡主要研究非同步comfirm,因為非同步comfirm效能比較高

【消費者】

/**
 * 消費者
 * @author ITCloud
 */
public