RabbitMQ 的路由模式 Topic模式
模型
生產者
1 package cn.wh; 2 3 import java.io.IOException; 4 import java.util.concurrent.TimeoutException; 5 6 import cn.util.RabbitMqConnectionUtil; 7 8 import com.rabbitmq.client.Channel; 9 import com.rabbitmq.client.Connection; 10 11 public class Send { 12 13private static final String EXCHANGE_NAME="test_exchange_direct"; 14 15public static void main(String[] args) throws IOException, TimeoutException { 16 17 18Connection connection = RabbitMqConnectionUtil.getConnection(); 19 20Channel channel = connection.createChannel(); 21 22//exchange 23channel.exchangeDeclare(EXCHANGE_NAME, "direct"); 24 25Stringmsg="hello direct!"; 26 27 28String routingKey="error"; 29channel.basicPublish(EXCHANGE_NAME, routingKey, null, msg.getBytes()); 30 31System.out.println("send "+msg); 32 33channel.close(); 34connection.close(); 35} 36 }
消費者
1 package cn.wh; 2 3 import java.io.IOException; 4 import java.util.concurrent.TimeoutException; 5 6 7 import cn.util.RabbitMqConnectionUtil; 8 import com.rabbitmq.client.Channel; 9 import com.rabbitmq.client.Connection; 10 import com.rabbitmq.client.Consumer; 11 import com.rabbitmq.client.DefaultConsumer; 12 import com.rabbitmq.client.Envelope; 13 import com.rabbitmq.client.AMQP.BasicProperties; 14 15 public class Recv1 { 16private static final String EXCHANGE_NAME = "test_exchange_direct"; 17private static final String QUEUE_NAME = "test_queue_direct_1"; 18 19public static void main(String[] args) throws IOException, TimeoutException { 20 21Connection connection = RabbitMqConnectionUtil.getConnection(); 22final Channel channel = connection.createChannel(); 23 24channel.queueDeclare(QUEUE_NAME, false, false, false, null); 25 26 27channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "error"); 28 29channel.basicQos(1); 30 31//定義一個消費者 32Consumer consumer=new DefaultConsumer(channel){ 33//訊息到達 觸發這個方法 34@Override 35public void handleDelivery(String consumerTag, Envelope envelope, 36BasicProperties properties, byte[] body) throws IOException { 37 38String msg=new String(body,"utf-8"); 39System.out.println("[1] Recv msg:"+msg); 40 41try { 42Thread.sleep(2000); 43} catch (InterruptedException e) { 44e.printStackTrace(); 45}finally{ 46System.out.println("[1] done "); 47channel.basicAck(envelope.getDeliveryTag(), false); 48} 49} 50}; 51 52boolean autoAck=false;//自動應答 false 53channel.basicConsume(QUEUE_NAME,autoAck , consumer); 54} 55 56 }
消費者2
1 package cn.wh; 2 3 import java.io.IOException; 4 import java.util.concurrent.TimeoutException; 5 6 7 import cn.util.RabbitMqConnectionUtil; 8 import com.rabbitmq.client.Channel; 9 import com.rabbitmq.client.Connection; 10 import com.rabbitmq.client.Consumer; 11 import com.rabbitmq.client.DefaultConsumer; 12 import com.rabbitmq.client.Envelope; 13 import com.rabbitmq.client.AMQP.BasicProperties; 14 15 public class Recv2 { 16private static final String EXCHANGE_NAME = "test_exchange_direct"; 17private static final String QUEUE_NAME = "test_queue_direct_2"; 18 19public static void main(String[] args) throws IOException, TimeoutException { 20 21Connection connection = RabbitMqConnectionUtil.getConnection(); 22final Channel channel = connection.createChannel(); 23 24channel.queueDeclare(QUEUE_NAME, false, false, false, null); 25 26 27channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "error"); 28channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "info"); 29channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "warning"); 30 31channel.basicQos(1); 32 33//定義一個消費者 34Consumer consumer=new DefaultConsumer(channel){ 35//訊息到達 觸發這個方法 36@Override 37public void handleDelivery(String consumerTag, Envelope envelope, 38BasicProperties properties, byte[] body) throws IOException { 39 40String msg=new String(body,"utf-8"); 41System.out.println("[2] Recv msg:"+msg); 42 43try { 44Thread.sleep(2000); 45} catch (InterruptedException e) { 46e.printStackTrace(); 47}finally{ 48System.out.println("[2] done "); 49channel.basicAck(envelope.getDeliveryTag(), false); 50} 51} 52}; 53 54boolean autoAck=false;//自動應答 false 55channel.basicConsume(QUEUE_NAME,autoAck , consumer); 56} 57 58 }
Topic模型
1 public class Send { 2private final static String EXCHANGE_NAME = "test_exchange_topic"; 3public static void main(String[] argv) throws Exception { 4// 獲取到連線以及mq通道 5Connection connection = ConnectionUtils.getConnection(); 6Channel channel = connection.createChannel(); 7// 宣告exchange 8channel.exchangeDeclare(EXCHANGE_NAME, "topic"); 9// 訊息內容 10String message = "id=1001"; 11channel.basicPublish(EXCHANGE_NAME, "item.delete", null, message.getBytes()); 12System.out.println(" [x] Sent '" + message + "'"); 13channel.close(); 14connection.close(); 15} 16 }
消費者
1 public class Recv { 2 private final static String QUEUE_NAME = "test_queue_topic_1"; 3 private final static String EXCHANGE_NAME = "test_exchange_topic"; 4 public static void main(String[] argv) throws Exception { 5 // 獲取到連線以及mq通道 6 Connection connection = ConnectionUtils.getConnection(); 7 final Channel channel = connection.createChannel(); 8 // 宣告佇列 9 channel.queueDeclare(QUEUE_NAME, false, false, false, null); 10 // 繫結佇列到交換機 11 channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "item.update"); 12 channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "item.delete"); 13 // 同一時刻伺服器只會發一條訊息給消費者 14 channel.basicQos(1); 15 // 定義佇列的消費者 16 Consumer consumer = new DefaultConsumer(channel) { 17 // 訊息到達 觸發這個方法 18 @Override 19 public void handleDelivery(String consumerTag, Envelope envelope, 20 BasicProperties properties, byte[] body) throws IOException { 21 String msg = new String(body, "utf-8"); 22 System.out.println("[2] Recv msg:" + msg); 23 try { 24 Thread.sleep(1000); 25 } catch (InterruptedException e) { 26 e.printStackTrace(); 27 } finally { 28 System.out.println("[2] done "); 29 // 手動回執 30 channel.basicAck(envelope.getDeliveryTag(), false); 31 } 32 } 33 }; 34 boolean autoAck = false; 35 channel.basicConsume(QUEUE_NAME, autoAck, consumer); 36 } 37 }
消費者2
1 public class Recv { 2 private final static String QUEUE_NAME = "test_queue_topic_1"; 3 private final static String EXCHANGE_NAME = "test_exchange_topic"; 4 public static void main(String[] argv) throws Exception { 5 // 獲取到連線以及mq通道 6 Connection connection = ConnectionUtils.getConnection(); 7 final Channel channel = connection.createChannel(); 8 // 宣告佇列 9 channel.queueDeclare(QUEUE_NAME, false, false, false, null); 10 // 繫結佇列到交換機 11 channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "item.update"); 12 channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "item.delete"); 13 // 同一時刻伺服器只會發一條訊息給消費者 14 channel.basicQos(1); 15 // 定義佇列的消費者 16 Consumer consumer = new DefaultConsumer(channel) { 17 // 訊息到達 觸發這個方法 18 @Override 19 public void handleDelivery(String consumerTag, Envelope envelope, 20 BasicProperties properties, byte[] body) throws IOException { 21 String msg = new String(body, "utf-8"); 22 System.out.println("[2] Recv msg:" + msg); 23 try { 24 Thread.sleep(1000); 25 } catch (InterruptedException e) { 26 e.printStackTrace(); 27 } finally { 28 System.out.println("[2] done "); 29 // 手動回執 30 channel.basicAck(envelope.getDeliveryTag(), false); 31 } 32 } 33 }; 34 boolean autoAck = false; 35 channel.basicConsume(QUEUE_NAME, autoAck, consumer); 36 } 37 }
Exchanges(轉發器|交換機)
轉發器一方面它接受生產者的訊息,另一方面向佇列推送訊息
Nameless exchange(匿名轉發)
之前我們對轉換器一無所知,卻可以將訊息傳送到佇列,那是可能是我們用了預設的轉發器,轉發器名為空字串""。之前我們釋出訊息的程式碼
Fanout Exchange
不處理路由鍵。你只需要將佇列繫結到交換機上。傳送訊息到交換機都會被轉發到與該交換機繫結的所有佇列
Direct Exchange
處理路由鍵。
需要將一個佇列繫結到交換機上,要求該訊息與一個特定的路由鍵完全匹配。這是一個完整的匹配。如果一個佇列繫結到該交換機上要求路由鍵 “dog”,則只有被標記為“dog”的訊息才被轉發,不會轉發 dog.puppy,也不會轉發dog.guard,只會轉發 dog。

Topic Exchange
將路由鍵和某模式進行匹配。
此時佇列需要繫結要一個模式上。符號“#”匹配一個或多個詞,符號“*”匹配一個詞。因此“audit.#”能夠匹配到“audit.irs.corporate”,但是“audit.*” 只會匹配到“audit.irs”。