1. 程式人生 > >3.RabbitMQ幾種典型模式

3.RabbitMQ幾種典型模式

本文詳細介紹簡單模式Simple、工作模式Work、釋出訂閱模式Publish/Subscribe、路由模式Routing、萬用字元模式Topics、遠端呼叫模式RPC(暫不對該佇列模式進行詳解)

模式1:簡單模式(Simple / HelloWorld 單生產單消費)
簡單的傳送與接收,沒有特別的處理。
在這裡插入圖片描述
RabbitMQ連線(公共的連線方法,其他模式共用此方法)

package com.sc.queuemode.connection; 
import java.io.IOException;
import java.util.concurrent.TimeoutException;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
public class ConnectionUtil {
	public static Connection getConnection() throws IOException, TimeoutException {
		//定義連線池
		ConnectionFactory connectionFactory = new ConnectionFactory();
		//連線地址
		connectionFactory.setHost("localhost");
		//連線埠
		connectionFactory.setPort(5672);
		//使用者名稱
		connectionFactory.setUsername("guest");
		//密碼
		connectionFactory.setPassword("guest");
		//通過連線工廠獲取連線
		Connection connection = connectionFactory.newConnection();
		//返回連線
		return connection;
	}
}

Producer:

package com.sc.queuemode.simple;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.sc.queuemode.connection.ConnectionUtil;

public class Producter {
	private final static String QUEUE_NAME = "hello";
	public static void main(String[] args) throws IOException, TimeoutException {
		//獲取連線
		Connection connection = ConnectionUtil.getConnection();
		//從連線中宣告通道
		Channel channel = connection.createChannel();
		//佇列申明
		channel.queueDeclare(QUEUE_NAME, false, false, false, null);
		//訊息內容
		String message = "simple queue hello world !";
		//推送釋出訊息
		//basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body)
		channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
		//通道關閉
		channel.close();
		//連線關閉
		connection.close();
	}
}

Consumer:

package com.sc.queuemode.simple;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

import com.rabbitmq.client.AMQP.BasicProperties;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import com.sc.queuemode.connection.ConnectionUtil;

public class Consumer1 {
	
	private final static String QUEUE_NAME = "hello_queue";
	
	public static void main(String[] args) throws IOException, TimeoutException{
		//獲取連線
		Connection connection = ConnectionUtil.getConnection();
		//宣告通道
		Channel channel = connection.createChannel();
		//宣告佇列
		channel.queueDeclare(QUEUE_NAME, false, false, false, null);
		//宣告消費者
		Consumer consumer = new DefaultConsumer(channel) {
			@Override
			public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body) 
					throws IOException {
				String message = new String(body,"UTF-8");
				System.out.println("customer 消費訊息:"+message);
			}
		};
		channel.basicConsume(QUEUE_NAME, true,consumer);
	}
}

模式2:工作模式(Work單傳送多接收)
一個生產者端,多個消費者端。示例中為了保證訊息傳送的可靠性,不丟失訊息,使訊息持久化了。同時為了防止接收端在處理訊息時down掉,只有在訊息處理完成後才傳送訊息確認。
在這裡插入圖片描述
Producer:

package com.sc.queuemode.work;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.sc.queuemode.connection.ConnectionUtil;

public class Producter {
	
	private final static String QUEUE_NAME = "work_queue";

	public static void main(String[] args) throws IOException, TimeoutException {
		//獲取連線
		Connection connection = ConnectionUtil.getConnection();
		//宣告通道
		Channel channel = connection.createChannel();
		//佇列申明,durable:true訊息持久化
		channel.queueDeclare(QUEUE_NAME, true, false, false, null);
		//訊息內容
		String message = getMessage(args);
		for(int i = 0; i < 20; i++) {
			//釋出訊息
			//basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body)
			channel.basicPublish("", QUEUE_NAME, null, (i + " " +message).getBytes());
		}
		//關閉通道
		channel.close();
		//連線關閉
		connection.close();
	}
	
	private static String getMessage(String[] strings) {
		if(strings.length < 1) {
			return "Hello World!";
		}
		return joinStrings(strings, "");
	}
	
	private static String joinStrings(String[] strings, String delimiter) {
		int length = strings.length;
		if(length == 0) return "";
		StringBuilder words = new StringBuilder(strings[0]);
		for(int i = 0; i < length; i++) {
			words.append(delimiter).append(strings[i]);
		}
		return words.toString();
	}
}

Consumer:
消費者1

package com.sc.queuemode.work;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
import com.rabbitmq.client.AMQP.BasicProperties;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import com.sc.queuemode.connection.ConnectionUtil;
/*
* 消費者1
*/
public class Consumer1 {
	
	private final static String QUEUE_NAME = "work_queue";
	public static void main(String[] args) throws IOException, TimeoutException{
		Connection connection = ConnectionUtil.getConnection();
		Channel channel = connection.createChannel();
		channel.queueDeclare(QUEUE_NAME, true, false, false, null);
		
		//同一時刻伺服器只發送1條訊息給消費者(能者多勞,消費訊息快的,會消費更多的訊息)
		//保證在接收端一個訊息沒有處理完時不會接收另一個訊息,即消費者端傳送了ack後才會接收下一個訊息。
		//在這種情況下生產者端會嘗試把訊息傳送給下一個空閒的消費者。
		channel.basicQos(1);
		
		//宣告佇列的消費者O
		Consumer consumer1 = new DefaultConsumer(channel){
			@Override
			public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body)
					throws IOException {
				//channel.basicConsume(QUEUE_NAME, false, consumer1);
				String message = new String(body, "UTF-8");
				System.out.println("customer1 消費訊息:"+message);
				//手動返回結果
				channel.basicAck(envelope.getDeliveryTag(), false);
				try {
					Thread.sleep(50);
				} catch (InterruptedException e) {
					e.printStackTrace();
				}
			}
		};
		//定義的消費者監聽佇列 autoAck:true自動返回結果,false手動返回
		channel.basicConsume(QUEUE_NAME, false,consumer1);
	}
}

消費者2

package com.sc.queuemode.work;
    import java.io.IOException;
    import java.util.concurrent.TimeoutException;    
    import com.rabbitmq.client.AMQP.BasicProperties;
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.Consumer;
    import com.rabbitmq.client.DefaultConsumer;
    import com.rabbitmq.client.Envelope;
    import com.sc.queuemode.connection.ConnectionUtil;
    /*
    * 消費者2
    */
    public class Consumer2 {
    	private final static String QUEUE_NAME = "work_queue";
    
   	public static void main(String[] args) throws IOException, TimeoutException{
   		Connection connection = ConnectionUtil.getConnection();
   		Channel channel = connection.createChannel();
   		channel.queueDeclare(QUEUE_NAME, true, false, false, null);
   		
   		//同一時刻伺服器只發送1條訊息給消費者(能者多勞,消費訊息快的,會消費更多的訊息)
   		channel.basicQos(1);
   		
   		//宣告佇列的消費者
   		Consumer consumer2 = new DefaultConsumer(channel){
   			@Override
   			public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body)
   					throws IOException {
   				//channel.basicConsume(QUEUE_NAME, false, consumer1);
   				String message = new String(body, "UTF-8");
   				System.out.println("customer2 消費訊息:"+message);
   				channel.basicAck(envelope.getDeliveryTag(), false);
   				try {
   					Thread.sleep(100);
   				} catch (InterruptedException e) {
   					e.printStackTrace();
   				}
   			}
   		};
   		//定義的消費者監聽佇列(第二個引數:true自動返回結果,false手動返回)
   		channel.basicConsume(QUEUE_NAME, false,consumer2);
	}
}

模式3:釋出、訂閱模式(Publish/Subscribe)
使用場景:釋出、訂閱模式,生產者端傳送訊息,多個消費者同時接收所有的訊息。
在這裡插入圖片描述
Producer

package com.sc.queuemode.publish_subscrible;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.sc.queuemode.connection.ConnectionUtil;

public class Producter {

	private final static String EXCHANGE_NAME = "publishSubscrible_exchange";
	
	public static void main(String[] args) throws IOException, TimeoutException {
		Connection connection = ConnectionUtil.getConnection();
		Channel channel = connection.createChannel();
		//宣告交換機Exchange型別為fanout
		channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
		
		String message = "publish/subscrible hello world";
		channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes());
		System.out.println("釋出訂閱 生產者 釋出訊息:"+message);
		channel.close();
		connection.close();
	}
}

生產者端釋出訊息到交換機,使用“fanout”方式傳送,即廣播訊息,不需要使用queue,傳送端不需要關心誰接收。
消費者1:

package com.sc.queuemode.publish_subscrible;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

import com.rabbitmq.client.AMQP.BasicProperties;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.ConsumerCancelledException;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import com.rabbitmq.client.ShutdownSignalException;
import com.sc.queuemode.connection.ConnectionUtil;

public class Customer1 {
	
	//佇列1
	private final static String QUEUE_NAME = "publishSubscrible_queue";
	private final static String EXCHANGE_NAME = "publishSubscrible_exchange";

	public static void main(String[] args) throws IOException, TimeoutException{
		Connection connection = ConnectionUtil.getConnection();
		Channel channel = connection.createChannel();
		//宣告交換機
		channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
		//宣告佇列
		channel.queueDeclare(QUEUE_NAME, true, false, false, null);
		//繫結佇列到交換機
		channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");
		
		//同一時刻伺服器只發送1條訊息給消費者(能者多勞,消費訊息快的,會消費更多的訊息)
		//保證在接收端一個訊息沒有處理完時不會接收另一個訊息,即消費者端傳送了ack後才會接收下一個訊息。
		//在這種情況下生產者端會嘗試把訊息傳送給下一個空閒的消費者。
		channel.basicQos(1);
		
		//申明消費者
		Consumer consumer1 = new DefaultConsumer(channel) {
			@Override
			public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body)
					throws IOException {
				String message = new String(body, "UTF-8");
				System.out.println("釋出訂閱 消費者1 消費訊息:"+message);
				//手動返回結果
				channel.basicAck(envelope.getDeliveryTag(), false);
			}
		};
		channel.basicConsume(QUEUE_NAME, false, consumer1);
	}
}

消費者2:

package com.sc.queuemode.publish_subscrible;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

import com.rabbitmq.client.AMQP.BasicProperties;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import com.sc.queuemode.connection.ConnectionUtil;

public class Customer2 {

	//佇列2
	private final static String QUEUE_NAME = "publishSubscrible_queue2";
	private final static String EXCHANGE_NAME = "publishSubscrible_exchange";

	public static void main(String[] args) throws IOException, TimeoutException{
		Connection connection = ConnectionUtil.getConnection();
		Channel channel = connection.createChannel();
		//宣告交換機
		channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
		//宣告佇列
		channel.queueDeclare(QUEUE_NAME, true, false, false, null);
		//繫結佇列到交換機
		channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");
		
		//同一時刻伺服器只發送1條訊息給消費者(能者多勞,消費訊息快的,會消費更多的訊息)
		//保證在接收端一個訊息沒有處理完時不會接收另一個訊息,即消費者端傳送了ack後才會接收下一個訊息。
		//在這種情況下生產者端會嘗試把訊息傳送給下一個空閒的消費者。
		channel.basicQos(1);
		
		//申明消費者
		Consumer consumer2 = new DefaultConsumer(channel) {
			@Override
			public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body)
					throws IOException {
				String message = new String(body, "UTF-8");
				System.out.println("釋出訂閱 消費者2 消費訊息:"+message);
				//手動返回結果
				channel.basicAck(envelope.getDeliveryTag(), false);
			}
		};
		channel.basicConsume(QUEUE_NAME, false, consumer2);
	}

}

消費者端:
1、宣告和生產者端一樣的交換機。
2、注意binding queue的時候,channel.queueBind()的第三個引數Routing key為空,即所有的訊息都接收。如果這個值不為空,在exchange type為“fanout”方式下該值被忽略!
3、訊息佇列和交換機繫結如下圖所示
在這裡插入圖片描述

模式4:路由模式(Routing)
生產者按routing key傳送訊息,不同的消費者端按不同的routing key接收訊息。
在這裡插入圖片描述
Producer:

package com.sc.queuemode.routing;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.sc.queuemode.connection.ConnectionUtil;

public class Producter {
	
	private final static String EXCHANGE_NAME = "routing_exchange";

	public static void main(String[] args) throws IOException, TimeoutException {
		Connection connection = ConnectionUtil.getConnection();
		Channel channel = connection.createChannel();
		//宣告交換機Exchange型別為direct
		channel.exchangeDeclare(EXCHANGE_NAME, "direct");
		
		//釋出訊息3種routingKey的訊息
		String message = "hello info";
		channel.basicPublish(EXCHANGE_NAME, "info", null, message.getBytes());
		System.out.println("路由模式釋出info訊息:"+message);
		
		message = "hello warning";
		channel.basicPublish(EXCHANGE_NAME, "warning", null, message.getBytes());
		System.out.println("路由模式釋出warning訊息:"+message);
		
		message = "hello error";
		channel.basicPublish(EXCHANGE_NAME, "error", null, message.getBytes());
		System.out.println("路由模式釋出error訊息:"+message);
	}
}

消費者端和模式3(釋出訂閱模式)的區別:
1、exchange的type為direct
2、傳送訊息的時候加入了routing key
Consumer:
消費者1

package com.sc.queuemode.routing;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

import com.rabbitmq.client.AMQP.BasicProperties;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import com.sc.queuemode.connection.ConnectionUtil;

public class Customer1 {

	private final static String QUEUE_NAME = "routing_queue1";
	private final static String EXCHANGE_NAME = "routing_exchange";
	
	public static void main(String[] args) throws IOException, TimeoutException{
		Connection connection = ConnectionUtil.getConnection();
		Channel channel = connection.createChannel();
		//宣告交換機
		channel.exchangeDeclare(EXCHANGE_NAME, "direct");
		//申明佇列
		channel.queueDeclare(QUEUE_NAME, true, false, false, null);
		
		//佇列繫結交換機,指定路由routingKey
		//結束路由routingKey為info和warning的訊息
		channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "info");
		channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "warning");
		
		//同一時刻伺服器只發送1條訊息給消費者(能者多勞,消費訊息快的,會消費更多的訊息)
		//保證在接收端一個訊息沒有處理完時不會接收另一個訊息,即消費者端傳送了ack後才會接收下一個訊息。
		//在這種情況下生產者端會嘗試把訊息傳送給下一個空閒的消費者。
		channel.basicQos(1);
		
		//宣告消費者
		Consumer consumer = new DefaultConsumer(channel) {
			@Override
			public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body)
					throws IOException {
				String message = new String(body, "UTF-8");
				System.out.println("路由模式 消費者1 消費訊息:"+message);
				channel.basicAck(envelope.getDeliveryTag(), false);
			}
		};
		channel.basicConsume(QUEUE_NAME, false, consumer);
	}
}

消費者2

package com.sc.queuemode.routing;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

import com.rabbitmq.client.AMQP.BasicProperties;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import com.sc.queuemode.connection.ConnectionUtil;

public class Customer2 {
	
	private final static String QUEUE_NAME = "routing_queue2";
	private final static String EXCHANGE_NAME = "routing_exchange";
	
	public static void main(String[] args) throws IOException, TimeoutException{
		Connection connection = ConnectionUtil.getConnection();
		Channel channel = connection.createChannel();
		//宣告交換機
		channel.exchangeDeclare(EXCHANGE_NAME, "direct");
		//申明佇列
		channel.queueDeclare(QUEUE_NAME, true, false, false, null);
		
		//佇列繫結交換機,指定路由routingKey
		//結束路由routingKey為error的訊息
		channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "error");
		
		//同一時刻伺服器只發送1條訊息給消費者(能者多勞,消費訊息快的,會消費更多的訊息)
		//保證在接收端一個訊息沒有處理完時不會接收另一個訊息,即消費者端傳送了ack後才會接收下一個訊息。
		//在這種情況下生產者端會嘗試把訊息傳送給下一個空閒的消費者。
		channel.basicQos(1);
		
		//宣告消費者
		Consumer consumer2 = new DefaultConsumer(channel) {
			@Override
			public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body)
					throws IOException {
				String message = new String(body, "UTF-8");
				System.out.println("路由模式 消費者2 消費訊息:"+message);
				channel.basicAck(envelope.getDeliveryTag(), false);
			}
		};
		channel.basicConsume(QUEUE_NAME, false, consumer2);
	}
}

消費者端和模式3(釋出訂閱模式)的區別:
在繫結queue和exchange的時候使用了路由routing key,即從該exchange上只接收routing key指定的訊息。
佇列繫結如下圖所示
在這裡插入圖片描述
模式5:萬用字元模式(Topics ,按topic傳送接收)
生產者端不只按固定的routing key傳送訊息,而是按字串“匹配”傳送,消費者端同樣如此。
與之前的路由模式相比,它將資訊的傳輸型別的key更加細化,以“key1.key2.keyN…”的模式來指定資訊傳輸的key的大型別和大型別下面的小型別,讓消費者端可以更加精細的確認自己想要獲取的資訊型別。而在消費者端,不用精確的指定具體到哪一個大型別下的小型別的key,而是可以使用類似正則表示式(但與正則表示式規則完全不同)的萬用字元在指定一定範圍或符合某一個字串匹配規則的key,來獲取想要的資訊。“萬用字元交換機”(Topic Exchange)將路由鍵和某模式進行匹配。此時佇列需要繫結在一個模式上。符號“#”匹配一個或多個詞,符號“*”僅匹配一個詞。
在這裡插入圖片描述
Producer:

package com.sc.queuemode.topics;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.sc.queuemode.connection.ConnectionUtil;

public class Producter {
	
	private static final String EXCHANGE_NAME = "topic_exchange";
	
	public static void main(String[] args) throws IOException, TimeoutException {
		Connection connection = ConnectionUtil.getConnection();
		Channel channel =connection.createChannel();
		//宣告交換機型別為topic
		channel.exchangeDeclare(EXCHANGE_NAME, "topic");
		
		String message = "釋出了一條中國新聞訊息";
		channel.basicPublish(EXCHANGE_NAME, "china.news", null, message.getBytes());
		
		message = "釋出了一條中國天氣訊息";
		channel.basicPublish(EXCHANGE_NAME, "china.weather", null, message.getBytes());
		
		message = "釋出了一條美國新聞訊息";
		channel.basicPublish(EXCHANGE_NAME, "usa.news", null, message.getBytes());
		
		message = "釋出了一條美國天氣訊息";
		channel.basicPublish(EXCHANGE_NAME, "usa.weather", null, message.getBytes());
	}
}

生產者和模式4(路由模式)的區別:
1、交換機exchange的type為topic
2、傳送訊息的routing key不是固定的單詞,而是匹配字串,如"china.#",*匹配一個單詞,#匹配0個或多個單詞。因此如“china.#”能夠匹配到“china.news.info”,但是“china.* ”只會匹配到“china.news”

Consumer:
消費者1

package com.sc.queuemode.topics;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

import com.rabbitmq.client.AMQP.BasicProperties;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import com.sc.queuemode.connection.ConnectionUtil;

public class Customer1 {

	private static final String QUEUE_NAME = "topics_queue1";
	private static final String EXCHANGE_NAME = "topic_exchange";
	
	public static void main(String[] args) throws IOException, TimeoutException{
		Connection connection = ConnectionUtil.getConnection();
		Channel channel = connection.createChannel();
		//交換機申明
		channel.exchangeDeclare(EXCHANGE_NAME, "topic");
		//佇列宣告
		channel.queueDeclare(QUEUE_NAME, true, false, false, null);
		//佇列繫結交換機並制定路由routingKey
		channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "china.#");
		//同一時刻伺服器只發送1條訊息給消費者(能者多勞,消費訊息快的,會消費更多的訊息)
		//保證在接收端一個訊息沒有處理完時不會接收另一個訊息,即消費者端傳送了ack後才會接收下一個訊息。
		//在這種情況下生產者端會嘗試把訊息傳送給下一個空閒的消費者。
		channel.basicQos(1);
		//申明消費者
		Consumer consumer1 = new DefaultConsumer(channel) {
			@Override
			public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body)
					throws IOException {
				String message = new String(body, "UTF-8");
				System.out.println("萬用字元模式 消費者1消費:"+message);
				channel.basicAck(envelope.getDeliveryTag(), false);
			}
		};
		channel.basicConsume(QUEUE_NAME, false, consumer1);
	}
}

消費者2

package com.sc.queuemode.topics;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

import com.rabbitmq.client.AMQP.BasicProperties;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import com.sc.queuemode.connection.ConnectionUtil;

public class Customer2 {
	
	private static final String QUEUE_NAME = "topics_queue2";
	private static final String EXCHANGE_NAME = "topic_exchange";
	
	public static void main(String[] args) throws IOException, TimeoutException{
		Connection connection = ConnectionUtil.getConnection();
		Channel channel = connection.createChannel();
		//交換機申明
		channel.exchangeDeclare(EXCHANGE_NAME, "topic");
		//佇列宣告
		channel.queueDeclare(QUEUE_NAME, true, false, false, null);
		//佇列繫結交換機並制定路由routingKey
		channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "usa.#");
		//同一時刻伺服器只發送1條訊息給消費者(能者多勞,消費訊息快的,會消費更多的訊息)
		//保證在接收端一個訊息沒有處理完時不會接收另一個訊息,即消費者端傳送了ack後才會接收下一個訊息。
		//在這種情況下生產者端會嘗試把訊息傳送給下一個空閒的消費者。
		channel.basicQos(1);
		//申明消費者
		Consumer consumer2 = new DefaultConsumer(channel) {
			@Override
			public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body)
					throws IOException {
				String message = new String(body, "UTF-8");
				System.out.println("萬用字元模式 消費者2消費:"+message);
				channel.basicAck(envelope.getDeliveryTag(), false);
			}
		};
		channel.basicConsume(QUEUE_NAME, false, consumer2);
	}
}

交換機和佇列繫結如下圖所示
在這裡插入圖片描述