RabbitMQ學習筆記(二)-----------------RabbitMQ生產消費訊息
阿新 • • 發佈:2018-11-23
專案地址:https://github.com/gongxianshengjiadexiaohuihui/RabbitMQ/tree/master/Hello_RabbitMQ
專案結構
需要的jar包
專案流程圖
x
首先是生產者的類,我們需要與RabbitServer建立連線,建立通道,並宣告一個佇列,然後放10條訊息進去
package com.ggp; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import java.io.IOException; import java.util.concurrent.TimeoutException; /** * @ClassName Producer * @Description 生產者 * @Author Mr.G * @Date 2018/11/20 10:38 * @Version 1.0 */ public class Producer { /** * 設定佇列名 */ public final static String QUEUE_NAME = "rabbitMQ.test"; public Producer()throws IOException, TimeoutException { /** * 建立連線工廠 */ ConnectionFactory connectionFactory = new ConnectionFactory(); /** * 設定配置資訊,如果你裝完rabbitmq沒有進行配置,預設就是下面的配置 */ connectionFactory.setHost("localhost"); connectionFactory.setUsername("guest"); connectionFactory.setPassword("guest"); /** * 建立一個新的連線 */ Connection connection = connectionFactory.newConnection(); /** * 建立一個通道 */ Channel channel = connection.createChannel(); /** * 引數,從左到右 * 佇列名 是否持久化 是否是私有佇列 是否自動刪除 map */ channel.queueDeclare(QUEUE_NAME,false,false,false,null); /** * */ for(int i = 0; i < 10; i++) { String message = "Hello rabbitmq !" + i; channel.basicPublish("", QUEUE_NAME, null, message.getBytes("UTF-8")); System.out.println("Producer Send Message: " + message); } channel.close(); connection.close(); } }
然後建立消費者的類,同樣是建立連線,通道,然後監聽這個通道,如果有資訊的話,就取出處理,每次設定取出一條,處理時間設為1秒
package com.ggp; import com.rabbitmq.client.*; import java.io.IOException; import java.util.concurrent.TimeoutException; /** * @ClassName Customer * @Description TODO * @Author Mr.G * @Date 2018/11/20 10:57 * @Version 1.0 */ public class Customer { private final static String QUEUE_NAME = "rabbitMQ.test"; private String cumstomerName; public Customer(final String customerName) throws IOException, TimeoutException { this.cumstomerName = customerName; ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost("localhost"); connectionFactory.setUsername("guest"); connectionFactory.setPassword("guest"); Connection connection = connectionFactory.newConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAME,false,false,false,null); System.out.println(customerName+" Waiting Received messages"); /** * 每次從佇列種獲取訊息的數量 */ channel.basicQos(1); /**DefaultConsumer類實現了Consumer介面,通過傳入一個通道,告訴服務我們需要那個通道的資訊,如果通道有資訊傳回,就會執行回撥函式handldDelivery * */ Consumer consumer = new DefaultConsumer(channel){ @Override /** * 訊息分發 */ public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body)throws IOException{ String message = new String(body, "UTF-8"); System.out.println(customerName+" Received Message: "+message); System.out.println(customerName+" begin dealing"); try { Thread.sleep(1000); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } System.out.println(customerName+" dealing successfully!"); } }; /** * 自動回覆佇列應答 RabbitMQ的訊息確認機制 * 第二個引數為自動回覆 */ channel.basicConsume(QUEUE_NAME,true,consumer); } }
最後是我們測試類
package com.ggp.test; import com.ggp.Customer; import com.ggp.Producer; /** * @ClassName DemoTest * @Description TODO * @Author Mr.G * @Date 2018/11/20 15:08 * @Version 1.0 */ public class DemoTest { public static void main(String[] args) throws Exception{ Customer customer1 = new Customer("GGP"); Customer customer2 = new Customer("RQB"); new Producer(); } }
執行結果
參考資料:https://www.cnblogs.com/LipeiNet