1. 程式人生 > >RabbitMQ 的路由模式 Topic模式

RabbitMQ 的路由模式 Topic模式

模型

生產者

 1 package cn.wh;
 2 
 3 import java.io.IOException;
 4 import java.util.concurrent.TimeoutException;
 5 
 6 import cn.util.RabbitMqConnectionUtil;
 7 
 8 import com.rabbitmq.client.Channel;
 9 import com.rabbitmq.client.Connection;
10 
11 public class Send {
12     
13     private
static final String EXCHANGE_NAME="test_exchange_direct"; 14 15 public static void main(String[] args) throws IOException, TimeoutException { 16 17 18 Connection connection = RabbitMqConnectionUtil.getConnection(); 19 20 Channel channel = connection.createChannel();
21 22 //exchange 23 channel.exchangeDeclare(EXCHANGE_NAME, "direct"); 24 25 String msg="hello direct!"; 26 27 28 String routingKey="error"; 29 channel.basicPublish(EXCHANGE_NAME, routingKey, null, msg.getBytes()); 30 31
System.out.println("send "+msg); 32 33 channel.close(); 34 connection.close(); 35 } 36 }

消費者

 

 1 package cn.wh;
 2 
 3 import java.io.IOException;
 4 import java.util.concurrent.TimeoutException;
 5 
 6 
 7 import cn.util.RabbitMqConnectionUtil;
 8 import com.rabbitmq.client.Channel;
 9 import com.rabbitmq.client.Connection;
10 import com.rabbitmq.client.Consumer;
11 import com.rabbitmq.client.DefaultConsumer;
12 import com.rabbitmq.client.Envelope;
13 import com.rabbitmq.client.AMQP.BasicProperties;
14 
15 public class Recv1 {
16     private static final String EXCHANGE_NAME = "test_exchange_direct";
17     private static final String QUEUE_NAME = "test_queue_direct_1";
18 
19     public static void main(String[] args) throws IOException, TimeoutException {
20 
21         Connection connection = RabbitMqConnectionUtil.getConnection();
22         final Channel channel = connection.createChannel();
23 
24         channel.queueDeclare(QUEUE_NAME, false, false, false, null);
25         
26     
27         channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "error");
28         
29         channel.basicQos(1);
30         
31         //定義一個消費者
32         Consumer consumer=new DefaultConsumer(channel){
33             //訊息到達 觸發這個方法
34             @Override
35             public void handleDelivery(String consumerTag, Envelope envelope,
36                     BasicProperties properties, byte[] body) throws IOException {
37              
38                 String msg=new String(body,"utf-8");
39                 System.out.println("[1] Recv msg:"+msg);
40                 
41                 try {
42                     Thread.sleep(2000);
43                 } catch (InterruptedException e) {
44                     e.printStackTrace();
45                 }finally{
46                     System.out.println("[1] done ");
47                     channel.basicAck(envelope.getDeliveryTag(), false);
48                 }
49             }
50         };
51         
52         boolean autoAck=false;//自動應答 false
53         channel.basicConsume(QUEUE_NAME,autoAck , consumer);
54     }
55         
56 }

消費者2

 1 package cn.wh;
 2 
 3 import java.io.IOException;
 4 import java.util.concurrent.TimeoutException;
 5 
 6 
 7 import cn.util.RabbitMqConnectionUtil;
 8 import com.rabbitmq.client.Channel;
 9 import com.rabbitmq.client.Connection;
10 import com.rabbitmq.client.Consumer;
11 import com.rabbitmq.client.DefaultConsumer;
12 import com.rabbitmq.client.Envelope;
13 import com.rabbitmq.client.AMQP.BasicProperties;
14 
15 public class Recv2 {
16     private static final String EXCHANGE_NAME = "test_exchange_direct";
17     private static final String QUEUE_NAME = "test_queue_direct_2";
18 
19     public static void main(String[] args) throws IOException, TimeoutException {
20 
21         Connection connection = RabbitMqConnectionUtil.getConnection();
22         final Channel channel = connection.createChannel();
23 
24         channel.queueDeclare(QUEUE_NAME, false, false, false, null);
25         
26     
27         channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "error");
28         channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "info");
29         channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "warning");
30         
31         channel.basicQos(1);
32         
33         //定義一個消費者
34         Consumer consumer=new DefaultConsumer(channel){
35             //訊息到達 觸發這個方法
36             @Override
37             public void handleDelivery(String consumerTag, Envelope envelope,
38                     BasicProperties properties, byte[] body) throws IOException {
39              
40                 String msg=new String(body,"utf-8");
41                 System.out.println("[2] Recv msg:"+msg);
42                 
43                 try {
44                     Thread.sleep(2000);
45                 } catch (InterruptedException e) {
46                     e.printStackTrace();
47                 }finally{
48                     System.out.println("[2] done ");
49                     channel.basicAck(envelope.getDeliveryTag(), false);
50                 }
51             }
52         };
53         
54         boolean autoAck=false;//自動應答 false
55         channel.basicConsume(QUEUE_NAME,autoAck , consumer);
56     }
57         
58 }

Topic模型

 1 public class Send {
 2  private final static String EXCHANGE_NAME = "test_exchange_topic";
 3  public static void main(String[] argv) throws Exception {
 4  // 獲取到連線以及mq通道
 5  Connection connection = ConnectionUtils.getConnection();
 6  Channel channel = connection.createChannel();
 7  // 宣告exchange
 8  channel.exchangeDeclare(EXCHANGE_NAME, "topic");
 9  // 訊息內容
10  String message = "id=1001";
11  channel.basicPublish(EXCHANGE_NAME, "item.delete", null, message.getBytes());
12  System.out.println(" [x] Sent '" + message + "'");
13  channel.close();
14  connection.close();
15  }
16 }

消費者

 1 public class Recv {
 2 private final static String QUEUE_NAME = "test_queue_topic_1";
 3 private final static String EXCHANGE_NAME = "test_exchange_topic";
 4 public static void main(String[] argv) throws Exception {
 5 // 獲取到連線以及mq通道
 6 Connection connection = ConnectionUtils.getConnection();
 7 final Channel channel = connection.createChannel();
 8 // 宣告佇列
 9 channel.queueDeclare(QUEUE_NAME, false, false, false, null);
10 // 繫結佇列到交換機
11 channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "item.update");
12 channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "item.delete");
13 // 同一時刻伺服器只會發一條訊息給消費者
14 channel.basicQos(1);
15 // 定義佇列的消費者
16 Consumer consumer = new DefaultConsumer(channel) {
17 // 訊息到達 觸發這個方法
18 @Override
19 public void handleDelivery(String consumerTag, Envelope envelope,
20 BasicProperties properties, byte[] body) throws IOException {
21 String msg = new String(body, "utf-8");
22 System.out.println("[2] Recv msg:" + msg);
23 try {
24 Thread.sleep(1000);
25 } catch (InterruptedException e) {
26 e.printStackTrace();
27 } finally {
28 System.out.println("[2] done ");
29 // 手動回執
30 channel.basicAck(envelope.getDeliveryTag(), false);
31 }
32 }
33 };
34 boolean autoAck = false;
35 channel.basicConsume(QUEUE_NAME, autoAck, consumer);
36 }
37 }

消費者2

 1 public class Recv {
 2 private final static String QUEUE_NAME = "test_queue_topic_1";
 3 private final static String EXCHANGE_NAME = "test_exchange_topic";
 4 public static void main(String[] argv) throws Exception {
 5 // 獲取到連線以及mq通道
 6 Connection connection = ConnectionUtils.getConnection();
 7 final Channel channel = connection.createChannel();
 8 // 宣告佇列
 9 channel.queueDeclare(QUEUE_NAME, false, false, false, null);
10 // 繫結佇列到交換機
11 channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "item.update");
12 channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "item.delete");
13 // 同一時刻伺服器只會發一條訊息給消費者
14 channel.basicQos(1);
15 // 定義佇列的消費者
16 Consumer consumer = new DefaultConsumer(channel) {
17 // 訊息到達 觸發這個方法
18 @Override
19 public void handleDelivery(String consumerTag, Envelope envelope,
20 BasicProperties properties, byte[] body) throws IOException {
21 String msg = new String(body, "utf-8");
22 System.out.println("[2] Recv msg:" + msg);
23 try {
24 Thread.sleep(1000);
25 } catch (InterruptedException e) {
26 e.printStackTrace();
27 } finally {
28 System.out.println("[2] done ");
29 // 手動回執
30 channel.basicAck(envelope.getDeliveryTag(), false);
31 }
32 }
33 };
34 boolean autoAck = false;
35 channel.basicConsume(QUEUE_NAME, autoAck, consumer);
36 }
37 }

Exchanges(轉發器|交換機)

轉發器一方面它接受生產者的訊息,另一方面向佇列推送訊息

Nameless exchange(匿名轉發)

之前我們對轉換器一無所知,卻可以將訊息傳送到佇列,那是可能是我們用了預設的轉發器,轉發器名為空字串""。之前我們釋出訊息的程式碼

Fanout Exchange

不處理路由鍵。你只需要將佇列繫結到交換機上。傳送訊息到交換機都會被轉發到與該交換機繫結的所有佇列

 

Direct Exchange

處理路由鍵。
需要將一個佇列繫結到交換機上,要求該訊息與一個特定的路由鍵完全匹配。這是一個完整的匹配。如果一個佇列繫結到該交換機上要求路由鍵 “dog”,則只有被標記為“dog”的訊息才被轉發,不會轉發 dog.puppy,也不會轉發dog.guard,只會轉發 dog。
                                                    

Topic Exchange

將路由鍵和某模式進行匹配。
此時佇列需要繫結要一個模式上。符號“#”匹配一個或多個詞,符號“*”匹配一個詞。因此“audit.#”能夠匹配到“audit.irs.corporate”,但是“audit.*” 只會匹配到“audit.irs”。