MQ的訂閱模式
阿新 • • 發佈:2018-03-20
def mage 生產者 blog email ued don 場景 pre
一:介紹
1.模式
2.使用場景
一個生產者,多個消費者
每一個消費者都有自己的隊列
生產者沒有直接把消息發送給隊列,而是發送到了交換機
每一個隊列都要綁定到交換機
可以實現一個消息被多個消費者消費。
二:程序
1.生產者
1 package com.mq.PubSubFanout; 2 3 import com.mq.utils.ConnectionUtil; 4 import com.rabbitmq.client.Channel; 5 import com.rabbitmq.client.Connection; 6 7 public classFanoutSend { 8 private static final String EXCHANGE_NAME="text_exchange_fanout"; 9 public static void main(String[] args) throws Exception { 10 //獲取一個連接 11 Connection connection= ConnectionUtil.getConnection(); 12 //從連接中獲取一個通道 13 Channel channel=connection.createChannel();14 //創建交換機 15 channel.exchangeDeclare(EXCHANGE_NAME,"fanout"); 16 17 //消息 18 String msg="hello pubsub"; 19 20 //發送 21 channel.basicPublish(EXCHANGE_NAME,"",null,msg.getBytes()); 22 23 System.out.println("send msg:"+msg); 24 //關閉連接 25 channel.close();26 connection.close(); 27 } 28 }
2.消費者一
1 package com.mq.PubSubFanout; 2 3 import com.mq.utils.ConnectionUtil; 4 import com.rabbitmq.client.*; 5 6 import java.io.IOException; 7 8 public class FanoutReceive1 { 9 private static final String EXCHANGE_NAME="text_exchange_fanout"; 10 private static final String QUENE_NAME="test_fanout_queue_email"; 11 public static void main(String[] args)throws Exception{ 12 //獲取一個連接 13 Connection connection = ConnectionUtil.getConnection(); 14 //創建通道 15 final Channel channel = connection.createChannel(); 16 //創建隊列聲明 17 channel.queueDeclare(QUENE_NAME,false,false,false,null); 18 19 //綁定交換機 20 channel.queueBind(QUENE_NAME,EXCHANGE_NAME,""); 21 22 //一次只能發送一個消息 23 channel.basicQos(1); 24 25 //創建消費者 26 DefaultConsumer consumer=new DefaultConsumer(channel){ 27 @Override 28 public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { 29 String msg=new String(body,"utf-8"); 30 System.out.println("[1]receive msg:"+msg); 31 try { 32 Thread.sleep(200); 33 } catch (InterruptedException e) { 34 e.printStackTrace(); 35 }finally { 36 System.out.println("done"); 37 //手動應答 38 channel.basicAck(envelope.getDeliveryTag(),false); 39 } 40 } 41 }; 42 //監聽隊列,不是自動應答 43 boolean autoAck=false; 44 channel.basicConsume(QUENE_NAME,autoAck,consumer); 45 } 46 }
3.消費者二
1 package com.mq.PubSubFanout; 2 3 import com.mq.utils.ConnectionUtil; 4 import com.rabbitmq.client.*; 5 6 import java.io.IOException; 7 8 public class FanoutReceive2 { 9 private static final String EXCHANGE_NAME="text_exchange_fanout"; 10 private static final String QUENE_NAME="test_fanout_queue_ems"; 11 public static void main(String[] args)throws Exception{ 12 //獲取一個連接 13 Connection connection = ConnectionUtil.getConnection(); 14 //創建通道 15 final Channel channel = connection.createChannel(); 16 //創建隊列聲明 17 channel.queueDeclare(QUENE_NAME,false,false,false,null); 18 19 //綁定交換機 20 channel.queueBind(QUENE_NAME,EXCHANGE_NAME,""); 21 22 //一次只能發送一個消息 23 channel.basicQos(1); 24 25 //創建消費者 26 DefaultConsumer consumer=new DefaultConsumer(channel){ 27 @Override 28 public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { 29 String msg=new String(body,"utf-8"); 30 System.out.println("[2]receive msg:"+msg); 31 try { 32 Thread.sleep(200); 33 } catch (InterruptedException e) { 34 e.printStackTrace(); 35 }finally { 36 System.out.println("done"); 37 //手動應答 38 channel.basicAck(envelope.getDeliveryTag(),false); 39 } 40 } 41 }; 42 //監聽隊列,不是自動應答 43 boolean autoAck=false; 44 channel.basicConsume(QUENE_NAME,autoAck,consumer); 45 } 46 }
4.效果
send:
receive1:
receive2:
5.運行註意點
如果之間運行receive類,會發現報錯,因為沒有交換機。
所以,可以先運行send類,雖然交換機不能存儲發送的消息,但是可以創建交換機。
然後,就可以按照原來的方式。
先啟動兩個消費者進行監聽,然後啟動生產者。
現象:就是消費者都獲取到了生產者生產的消息。
MQ的訂閱模式