RabbitMQ系列之三 RabbitMQ幾種典型模式
本文詳細介紹簡單模式Simple、工作模式Work、釋出訂閱模式Publish/Subscribe、路由模式Routing、萬用字元模式Topics、遠端呼叫模式RPC(暫不對該佇列模式進行詳解)
模式1:簡單模式(Simple / HelloWorld 單生產單消費)
簡單的傳送與接收,沒有特別的處理。
RabbitMQ連線(公共的連線方法,其他模式共用此方法)
package com.sc.queuemode.connection; import java.io.IOException; import java.util.concurrent.TimeoutException; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; public class ConnectionUtil { public static Connection getConnection() throws IOException, TimeoutException { //定義連線池 ConnectionFactory connectionFactory = new ConnectionFactory(); //連線地址 connectionFactory.setHost("localhost"); //連線埠 connectionFactory.setPort(5672); //使用者名稱 connectionFactory.setUsername("guest"); //密碼 connectionFactory.setPassword("guest"); //通過連線工廠獲取連線 Connection connection = connectionFactory.newConnection(); //返回連線 return connection; } }
Producer:
package com.sc.queuemode.simple; import java.io.IOException; import java.util.concurrent.TimeoutException; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.sc.queuemode.connection.ConnectionUtil; public class Producter { private final static String QUEUE_NAME = "hello"; public static void main(String[] args) throws IOException, TimeoutException { //獲取連線 Connection connection = ConnectionUtil.getConnection(); //從連線中宣告通道 Channel channel = connection.createChannel(); //佇列申明 channel.queueDeclare(QUEUE_NAME, false, false, false, null); //訊息內容 String message = "simple queue hello world !"; //推送釋出訊息 //basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body) channel.basicPublish("", QUEUE_NAME, null, message.getBytes()); //通道關閉 channel.close(); //連線關閉 connection.close(); } }
Consumer:
package com.sc.queuemode.simple; import java.io.IOException; import java.util.concurrent.TimeoutException; import com.rabbitmq.client.AMQP.BasicProperties; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.Consumer; import com.rabbitmq.client.DefaultConsumer; import com.rabbitmq.client.Envelope; import com.sc.queuemode.connection.ConnectionUtil; public class Consumer1 { private final static String QUEUE_NAME = "hello_queue"; public static void main(String[] args) throws IOException, TimeoutException{ //獲取連線 Connection connection = ConnectionUtil.getConnection(); //宣告通道 Channel channel = connection.createChannel(); //宣告佇列 channel.queueDeclare(QUEUE_NAME, false, false, false, null); //宣告消費者 Consumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body) throws IOException { String message = new String(body,"UTF-8"); System.out.println("customer 消費訊息:"+message); } }; channel.basicConsume(QUEUE_NAME, true,consumer); } }
模式2:工作模式(Work單傳送多接收)
一個生產者端,多個消費者端。示例中為了保證訊息傳送的可靠性,不丟失訊息,使訊息持久化了。同時為了防止接收端在處理訊息時down掉,只有在訊息處理完成後才傳送訊息確認。
Producer:
package com.sc.queuemode.work;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.sc.queuemode.connection.ConnectionUtil;
public class Producter {
private final static String QUEUE_NAME = "work_queue";
public static void main(String[] args) throws IOException, TimeoutException {
//獲取連線
Connection connection = ConnectionUtil.getConnection();
//宣告通道
Channel channel = connection.createChannel();
//佇列申明,durable:true訊息持久化
channel.queueDeclare(QUEUE_NAME, true, false, false, null);
//訊息內容
String message = getMessage(args);
for(int i = 0; i < 20; i++) {
//釋出訊息
//basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body)
channel.basicPublish("", QUEUE_NAME, null, (i + " " +message).getBytes());
}
//關閉通道
channel.close();
//連線關閉
connection.close();
}
private static String getMessage(String[] strings) {
if(strings.length < 1) {
return "Hello World!";
}
return joinStrings(strings, "");
}
private static String joinStrings(String[] strings, String delimiter) {
int length = strings.length;
if(length == 0) return "";
StringBuilder words = new StringBuilder(strings[0]);
for(int i = 0; i < length; i++) {
words.append(delimiter).append(strings[i]);
}
return words.toString();
}
}
Consumer:
消費者1
package com.sc.queuemode.work;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
import com.rabbitmq.client.AMQP.BasicProperties;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import com.sc.queuemode.connection.ConnectionUtil;
/*
* 消費者1
*/
public class Consumer1 {
private final static String QUEUE_NAME = "work_queue";
public static void main(String[] args) throws IOException, TimeoutException{
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME, true, false, false, null);
//同一時刻伺服器只發送1條訊息給消費者(能者多勞,消費訊息快的,會消費更多的訊息)
//保證在接收端一個訊息沒有處理完時不會接收另一個訊息,即消費者端傳送了ack後才會接收下一個訊息。
//在這種情況下生產者端會嘗試把訊息傳送給下一個空閒的消費者。
channel.basicQos(1);
//宣告佇列的消費者O
Consumer consumer1 = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body)
throws IOException {
//channel.basicConsume(QUEUE_NAME, false, consumer1);
String message = new String(body, "UTF-8");
System.out.println("customer1 消費訊息:"+message);
//手動返回結果
channel.basicAck(envelope.getDeliveryTag(), false);
try {
Thread.sleep(50);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
};
//定義的消費者監聽佇列 autoAck:true自動返回結果,false手動返回
channel.basicConsume(QUEUE_NAME, false,consumer1);
}
}
消費者2
package com.sc.queuemode.work;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
import com.rabbitmq.client.AMQP.BasicProperties;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import com.sc.queuemode.connection.ConnectionUtil;
/*
* 消費者2
*/
public class Consumer2 {
private final static String QUEUE_NAME = "work_queue";
public static void main(String[] args) throws IOException, TimeoutException{
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME, true, false, false, null);
//同一時刻伺服器只發送1條訊息給消費者(能者多勞,消費訊息快的,會消費更多的訊息)
channel.basicQos(1);
//宣告佇列的消費者
Consumer consumer2 = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body)
throws IOException {
//channel.basicConsume(QUEUE_NAME, false, consumer1);
String message = new String(body, "UTF-8");
System.out.println("customer2 消費訊息:"+message);
channel.basicAck(envelope.getDeliveryTag(), false);
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
};
//定義的消費者監聽佇列(第二個引數:true自動返回結果,false手動返回)
channel.basicConsume(QUEUE_NAME, false,consumer2);
}
}
模式3:釋出、訂閱模式(Publish/Subscribe)
使用場景:釋出、訂閱模式,生產者端傳送訊息,多個消費者同時接收所有的訊息。
Producer:
package com.sc.queuemode.publish_subscrible;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.sc.queuemode.connection.ConnectionUtil;
public class Producter {
private final static String EXCHANGE_NAME = "publishSubscrible_exchange";
public static void main(String[] args) throws IOException, TimeoutException {
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
//宣告交換機Exchange型別為fanout
channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
String message = "publish/subscrible hello world";
channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes());
System.out.println("釋出訂閱 生產者 釋出訊息:"+message);
channel.close();
connection.close();
}
}
生產者端釋出訊息到交換機,使用“fanout”方式傳送,即廣播訊息,不需要使用queue,傳送端不需要關心誰接收。
消費者1:
package com.sc.queuemode.publish_subscrible;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
import com.rabbitmq.client.AMQP.BasicProperties;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.ConsumerCancelledException;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import com.rabbitmq.client.ShutdownSignalException;
import com.sc.queuemode.connection.ConnectionUtil;
public class Customer1 {
//佇列1
private final static String QUEUE_NAME = "publishSubscrible_queue";
private final static String EXCHANGE_NAME = "publishSubscrible_exchange";
public static void main(String[] args) throws IOException, TimeoutException{
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
//宣告交換機
channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
//宣告佇列
channel.queueDeclare(QUEUE_NAME, true, false, false, null);
//繫結佇列到交換機
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");
//同一時刻伺服器只發送1條訊息給消費者(能者多勞,消費訊息快的,會消費更多的訊息)
//保證在接收端一個訊息沒有處理完時不會接收另一個訊息,即消費者端傳送了ack後才會接收下一個訊息。
//在這種情況下生產者端會嘗試把訊息傳送給下一個空閒的消費者。
channel.basicQos(1);
//申明消費者
Consumer consumer1 = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body)
throws IOException {
String message = new String(body, "UTF-8");
System.out.println("釋出訂閱 消費者1 消費訊息:"+message);
//手動返回結果
channel.basicAck(envelope.getDeliveryTag(), false);
}
};
channel.basicConsume(QUEUE_NAME, false, consumer1);
}
}
消費者2:
package com.sc.queuemode.publish_subscrible;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
import com.rabbitmq.client.AMQP.BasicProperties;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import com.sc.queuemode.connection.ConnectionUtil;
public class Customer2 {
//佇列2
private final static String QUEUE_NAME = "publishSubscrible_queue2";
private final static String EXCHANGE_NAME = "publishSubscrible_exchange";
public static void main(String[] args) throws IOException, TimeoutException{
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
//宣告交換機
channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
//宣告佇列
channel.queueDeclare(QUEUE_NAME, true, false, false, null);
//繫結佇列到交換機
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");
//同一時刻伺服器只發送1條訊息給消費者(能者多勞,消費訊息快的,會消費更多的訊息)
//保證在接收端一個訊息沒有處理完時不會接收另一個訊息,即消費者端傳送了ack後才會接收下一個訊息。
//在這種情況下生產者端會嘗試把訊息傳送給下一個空閒的消費者。
channel.basicQos(1);
//申明消費者
Consumer consumer2 = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body)
throws IOException {
String message = new String(body, "UTF-8");
System.out.println("釋出訂閱 消費者2 消費訊息:"+message);
//手動返回結果
channel.basicAck(envelope.getDeliveryTag(), false);
}
};
channel.basicConsume(QUEUE_NAME, false, consumer2);
}
}
消費者端:
1、宣告和生產者端一樣的交換機。
2、注意binding queue的時候,channel.queueBind()的第三個引數Routing key為空,即所有的訊息都接收。如果這個值不為空,在exchange type為“fanout”方式下該值被忽略!
3、訊息佇列和交換機繫結如下圖所示
模式4:路由模式(Routing)
生產者按routing key傳送訊息,不同的消費者端按不同的routing key接收訊息。
Producer:
package com.sc.queuemode.routing;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.sc.queuemode.connection.ConnectionUtil;
public class Producter {
private final static String EXCHANGE_NAME = "routing_exchange";
public static void main(String[] args) throws IOException, TimeoutException {
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
//宣告交換機Exchange型別為direct
channel.exchangeDeclare(EXCHANGE_NAME, "direct");
//釋出訊息3種routingKey的訊息
String message = "hello info";
channel.basicPublish(EXCHANGE_NAME, "info", null, message.getBytes());
System.out.println("路由模式釋出info訊息:"+message);
message = "hello warning";
channel.basicPublish(EXCHANGE_NAME, "warning", null, message.getBytes());
System.out.println("路由模式釋出warning訊息:"+message);
message = "hello error";
channel.basicPublish(EXCHANGE_NAME, "error", null, message.getBytes());
System.out.println("路由模式釋出error訊息:"+message);
}
}
消費者端和模式3(釋出訂閱模式)的區別:
1、exchange的type為direct
2、傳送訊息的時候加入了routing key
Consumer:
消費者1
package com.sc.queuemode.routing;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
import com.rabbitmq.client.AMQP.BasicProperties;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import com.sc.queuemode.connection.ConnectionUtil;
public class Customer1 {
private final static String QUEUE_NAME = "routing_queue1";
private final static String EXCHANGE_NAME = "routing_exchange";
public static void main(String[] args) throws IOException, TimeoutException{
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
//宣告交換機
channel.exchangeDeclare(EXCHANGE_NAME, "direct");
//申明佇列
channel.queueDeclare(QUEUE_NAME, true, false, false, null);
//佇列繫結交換機,指定路由routingKey
//結束路由routingKey為info和warning的訊息
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "info");
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "warning");
//同一時刻伺服器只發送1條訊息給消費者(能者多勞,消費訊息快的,會消費更多的訊息)
//保證在接收端一個訊息沒有處理完時不會接收另一個訊息,即消費者端傳送了ack後才會接收下一個訊息。
//在這種情況下生產者端會嘗試把訊息傳送給下一個空閒的消費者。
channel.basicQos(1);
//宣告消費者
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body)
throws IOException {
String message = new String(body, "UTF-8");
System.out.println("路由模式 消費者1 消費訊息:"+message);
channel.basicAck(envelope.getDeliveryTag(), false);
}
};
channel.basicConsume(QUEUE_NAME, false, consumer);
}
}
消費者2
package com.sc.queuemode.routing;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
import com.rabbitmq.client.AMQP.BasicProperties;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import com.sc.queuemode.connection.ConnectionUtil;
public class Customer2 {
private final static String QUEUE_NAME = "routing_queue2";
private final static String EXCHANGE_NAME = "routing_exchange";
public static void main(String[] args) throws IOException, TimeoutException{
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
//宣告交換機
channel.exchangeDeclare(EXCHANGE_NAME, "direct");
//申明佇列
channel.queueDeclare(QUEUE_NAME, true, false, false, null);
//佇列繫結交換機,指定路由routingKey
//結束路由routingKey為error的訊息
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "error");
//同一時刻伺服器只發送1條訊息給消費者(能者多勞,消費訊息快的,會消費更多的訊息)
//保證在接收端一個訊息沒有處理完時不會接收另一個訊息,即消費者端傳送了ack後才會接收下一個訊息。
//在這種情況下生產者端會嘗試把訊息傳送給下一個空閒的消費者。
channel.basicQos(1);
//宣告消費者
Consumer consumer2 = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body)
throws IOException {
String message = new String(body, "UTF-8");
System.out.println("路由模式 消費者2 消費訊息:"+message);
channel.basicAck(envelope.getDeliveryTag(), false);
}
};
channel.basicConsume(QUEUE_NAME, false, consumer2);
}
}
消費者端和模式3(釋出訂閱模式)的區別:
在繫結queue和exchange的時候使用了路由routing key,即從該exchange上只接收routing key指定的訊息。
佇列繫結如下圖所示
模式5:萬用字元模式(Topics ,按topic傳送接收)
生產者端不只按固定的routing key傳送訊息,而是按字串“匹配”傳送,消費者端同樣如此。
與之前的路由模式相比,它將資訊的傳輸型別的key更加細化,以“key1.key2.keyN…”的模式來指定資訊傳輸的key的大型別和大型別下面的小型別,讓消費者端可以更加精細的確認自己想要獲取的資訊型別。而在消費者端,不用精確的指定具體到哪一個大型別下的小型別的key,而是可以使用類似正則表示式(但與正則表示式規則完全不同)的萬用字元在指定一定範圍或符合某一個字串匹配規則的key,來獲取想要的資訊。“萬用字元交換機”(Topic Exchange)將路由鍵和某模式進行匹配。此時佇列需要繫結在一個模式上。符號“#”匹配一個或多個詞,符號“*”僅匹配一個詞。
Producer:
package com.sc.queuemode.topics;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.sc.queuemode.connection.ConnectionUtil;
public class Producter {
private static final String EXCHANGE_NAME = "topic_exchange";
public static void main(String[] args) throws IOException, TimeoutException {
Connection connection = ConnectionUtil.getConnection();
Channel channel =connection.createChannel();
//宣告交換機型別為topic
channel.exchangeDeclare(EXCHANGE_NAME, "topic");
String message = "釋出了一條中國新聞訊息";
channel.basicPublish(EXCHANGE_NAME, "china.news", null, message.getBytes());
message = "釋出了一條中國天氣訊息";
channel.basicPublish(EXCHANGE_NAME, "china.weather", null, message.getBytes());
message = "釋出了一條美國新聞訊息";
channel.basicPublish(EXCHANGE_NAME, "usa.news", null, message.getBytes());
message = "釋出了一條美國天氣訊息";
channel.basicPublish(EXCHANGE_NAME, "usa.weather", null, message.getBytes());
}
}
生產者和模式4(路由模式)的區別:
1、交換機exchange的type為topic
2、傳送訊息的routing key不是固定的單詞,而是匹配字串,如"china.#",*匹配一個單詞,#匹配0個或多個單詞。因此如“china.#”能夠匹配到“china.news.info”,但是“china.*
”只會匹配到“china.news”
Consumer:
消費者1
package com.sc.queuemode.topics;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
import com.rabbitmq.client.AMQP.BasicProperties;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import com.sc.queuemode.connection.ConnectionUtil;
public class Customer1 {
private static final String QUEUE_NAME = "topics_queue1";
private static final String EXCHANGE_NAME = "topic_exchange";
public static void main(String[] args) throws IOException, TimeoutException{
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
//交換機申明
channel.exchangeDeclare(EXCHANGE_NAME, "topic");
//佇列宣告
channel.queueDeclare(QUEUE_NAME, true, false, false, null);
//佇列繫結交換機並制定路由routingKey
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "china.#");
//同一時刻伺服器只發送1條訊息給消費者(能者多勞,消費訊息快的,會消費更多的訊息)
//保證在接收端一個訊息沒有處理完時不會接收另一個訊息,即消費者端傳送了ack後才會接收下一個訊息。
//在這種情況下生產者端會嘗試把訊息傳送給下一個空閒的消費者。
channel.basicQos(1);
//申明消費者
Consumer consumer1 = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body)
throws IOException {
String message = new String(body, "UTF-8");
System.out.println("萬用字元模式 消費者1消費:"+message);
channel.basicAck(envelope.getDeliveryTag(), false);
}
};
channel.basicConsume(QUEUE_NAME, false, consumer1);
}
}
消費者2
package com.sc.queuemode.topics;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
import com.rabbitmq.client.AMQP.BasicProperties;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import com.sc.queuemode.connection.ConnectionUtil;
public class Customer2 {
private static final String QUEUE_NAME = "topics_queue2";
private static final String EXCHANGE_NAME = "topic_exchange";
public static void main(String[] args) throws IOException, TimeoutException{
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
//交換機申明
channel.exchangeDeclare(EXCHANGE_NAME, "topic");
//佇列宣告
channel.queueDeclare(QUEUE_NAME, true, false, false, null);
//佇列繫結交換機並制定路由routingKey
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "usa.#");
//同一時刻伺服器只發送1條訊息給消費者(能者多勞,消費訊息快的,會消費更多的訊息)
//保證在接收端一個訊息沒有處理完時不會接收另一個訊息,即消費者端傳送了ack後才會接收下一個訊息。
//在這種情況下生產者端會嘗試把訊息傳送給下一個空閒的消費者。
channel.basicQos(1);
//申明消費者
Consumer consumer2 = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body)
throws IOException {
String message = new String(body, "UTF-8");
System.out.println("萬用字元模式 消費者2消費:"+message);
channel.basicAck(envelope.getDeliveryTag(), false);
}
};
channel.basicConsume(QUEUE_NAME, false, consumer2);
}
}
交換機和佇列繫結如下圖所示