RabbitMQ學習筆記(三)-----------------RabbitMQ不同的交換機進行路由
阿新 • • 發佈:2018-11-23
專案地址:https://github.com/gongxianshengjiadexiaohuihui/RabbitMQ/tree/master/Exchange_RabbitMQ
專案結構:
流程圖
補充知識:
Connection是RabbitMQ的sockert連結,封裝了socket協議相關部分邏輯
ConnectionFactory是製造Connection的工廠
Connection是建立一個TCP連線,需要經過三次握手,如果每次生產和消費訊息都要建立TCP連線十分耗費資源,因此我們用
Chanel建立虛擬連線,這個虛擬連線是建立在上面TCP連線的基礎上,資料傳輸都通過Channel,不需要頻繁的建立關閉tcp連線,而且tcp的連線數是有限制的
交換機一共有4種規則,第四種並不常用,因此我們只敘述前三種
fanout:類似與訂閱釋出,交換機把生產者的所有資訊都發送到與它繫結的佇列中
direct:交換機只發送資訊到符合條件 rountingkey = bindingkey的佇列中
topic:交換機發送資訊到rountingkey符合匹配規則bindingkey的佇列中,bindingkey類似於正則表示式
生產者程式碼
package main.java.com.ggp; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import java.io.IOException; import java.util.concurrent.TimeoutException; /** * @ClassName Producer * @Description TODO * @Author Mr.G * @Date 2018/11/21 9:16 * @Version 1.0 */ public class Producer { public Producer(String exchangeName, String exchangeType)throws IOException, TimeoutException { ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost("localhost"); connectionFactory.setUsername("guest"); connectionFactory.setPassword("guest"); Connection connection = connectionFactory.newConnection(); Channel channel = connection.createChannel(); channel.exchangeDeclare(exchangeName, exchangeType); /** * 分發訊息,根據不同的交換機規則分發 * */ if("fanout".equals(exchangeType)) { for (int i = 0; i < 5; i++) { String message = "Hello Rabbit " + i; channel.basicPublish(exchangeName, "", null, message.getBytes()); System.out.println("Producer Send Message: " + message); } channel.close(); connection.close(); return; } if("direct".equals(exchangeType)){ String[] routingKeys = new String[]{"info", "warning", "error"}; for(String routingKey:routingKeys){ String message = "RoutingSendDirect Send the message level:"+routingKey; channel.basicPublish(exchangeName,routingKey,null,message.getBytes()); System.out.println("RoutingSendDirect Send " + routingKey + ": "+message); } channel.close();; connection.close(); return; } if("topic".equals(exchangeType)){ String[] routingKeys = new String[]{"a.b.c", "a.a.b.d","ad.dd.b.c"}; for(String routingKey:routingKeys){ String message = "Topic Send the message which the routingkey is "+routingKey; channel.basicPublish(exchangeName,routingKey,null,message.getBytes()); System.out.println("send message: " + message + " successfully!"); } channel.close(); connection.close(); return; } channel.close(); connection.close(); throw new RuntimeException("Unsupported the exchangeType!"); } }
消費者程式碼
package main.java.com.ggp; import com.rabbitmq.client.*; import java.io.IOException; import java.util.concurrent.TimeoutException; /** * @ClassName Customer * @Description TODO * @Author Mr.G * @Date 2018/11/21 10:12 * @Version 1.0 */ public class Customer { public Customer(String customerName,String exchangeName, String exchangeType)throws IOException,TimeoutException { final String customer_name = customerName; final String exchange_type = exchangeType; ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost("localhost"); connectionFactory.setUsername("guest"); connectionFactory.setPassword("guest"); Connection connection = connectionFactory.newConnection(); Channel channel = connection.createChannel(); channel.exchangeDeclare(exchangeName,exchangeType); /** * 建立隨機佇列,根據不同的交換機規則繫結佇列 */ String queueName = channel.queueDeclare().getQueue(); if("fanout".equals(exchangeType)){ channel.queueBind(queueName,exchangeName,""); System.out.println(customerName+" is waiting message that the type of it is fanout"); }else if("direct".equals(exchangeType)){ String[] bindingKeys = new String[]{"info","warning"}; for(String bindingKey:bindingKeys){ channel.queueBind(queueName,exchangeName,bindingKey); } System.out.println(customerName+" is waiting message that the type of it is direct"); }else if("topic".equals(exchangeType)){ String bindingKey = "#.b.*"; channel.queueBind(queueName,exchangeName,bindingKey); System.out.println(customerName+" is waiting message that the type of it is topic"); }else { throw new RuntimeException("Unsupported the exchangeType!"); } Consumer consumer = new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body)throws IOException{ String message = new String(body,"UTF-8"); System.out.println(customer_name+" has dealt the "+exchange_type+" message: "+ message); } }; channel.basicConsume(queueName,true,consumer); } }
測試程式碼
package test.java.com.ggp.test;
import main.java.com.ggp.Customer;
import main.java.com.ggp.Producer;
/**
* @ClassName FanoutTest
* @Description TODO
* @Author Mr.G
* @Date 2018/11/21 10:47
* @Version 1.0
*/
public class FanoutTest {
public static void main(String[] args) throws Exception{
new Customer("ggp","fanout","fanout");
new Customer("rqb","fanout","fanout");
new Producer("fanout","fanout");
}
}
測試結果
參考資料:https://www.cnblogs.com/LipeiNet