1. 程式人生 > >RabbitMQ之消費者Demo(隊列參數詳細說明)

RabbitMQ之消費者Demo(隊列參數詳細說明)

per don create pac col div icp rod 忽略

  1 package com.jiefupay;
  2 
  3 import java.io.IOException;
  4 import java.util.HashMap;
  5 import java.util.Map;  8 
  6 import org.apache.log4j.Logger;
  7 
  8 import com.jiefupay.dao.Dao;
  9 
 10 import com.rabbitmq.client.AMQP;
 11 import com.rabbitmq.client.Channel;
 12 import com.rabbitmq.client.Connection;
13 import com.rabbitmq.client.ConnectionFactory; 14 import com.rabbitmq.client.Consumer; 15 import com.rabbitmq.client.DefaultConsumer; 16 import com.rabbitmq.client.Envelope; 17 18 public class App{ 19 20 private static final Logger log = Logger.getLogger(App.class); 26 21 private
static final String EXCHANGE_NAME = "refreshDispatcherMemoryExchange"; 22 23 private static String QUEUE_NAME = "refreshDispatcherMemoryhfQueue"; 24 25 public static void main(String[] args) throws Exception { 26 27 ConnectionFactory factory = new ConnectionFactory();
28 factory.setHost("127.0.0.1"); 29 factory.setPort(5672); 30 factory.setUsername("yourusername"); 31 factory.setPassword("yourpassword"); 32 33 //0.創建連接和通道 34 Connection connection = factory.newConnection(); 35 Channel channel = connection.createChannel(); 36 37 //1.聲明一個死信交換機(扇形交換機) 38 channel.exchangeDeclare("refreshDispatcherDeadExchange", "fanout"); 39 40 //2.創建隊列的參數 41 Map<String, Object> queueArgs = new HashMap<String, Object>(); 42 queueArgs.put("x-dead-letter-exchange", "refreshDispatcherDeadExchange"); //死信隊列 43 queueArgs.put("x-message-ttl", 10000); // 消息超時:讓發布的message在隊列中可以存活多長時間,以毫秒為單位。 44 queueArgs.put("x-expires", 1000); // 隊列超時:當前的queue在指定的時間內,沒有消費者訂閱就會被刪除,以毫秒為單位。 45 queueArgs.put("x-max-length", 100); // 隊列最大長度:當超過了這個大小的時候,會刪除之前最早插入的消息為本次的留出空間。 46 queueArgs.put("x-queue-mode", "lazy"); //延遲加載:queue的信息盡可能的都保存在磁盤上,僅在有消費者訂閱的時候才會加載到RAM中。 47 48 //3.聲明隊列。-將隊列參數傳到隊列 (隊列名字,是否持久化,是否排外,是否自動清理,參數) 49 channel.queueDeclare(QUEUE_NAME, true, false, false, queueArgs); 50 51 //4.隊列綁定交換機。 綁定鍵的意義依賴於轉發器的類型,對於fanout類型,忽略此參數(第三個參數為binding key)。 52 channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ""); 53 54 Consumer consumer = new DefaultConsumer(channel) { 55 @Override 56 public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, 57 byte[] body) throws IOException { 58 // 捕獲消息內容 59 String message = new String(body, "UTF-8"); 60 61 try { 62 //消息處理(自己實現的方法) 63 messageHandler(message); 64 65 //消息確認 66 channel.basicAck(envelope.getDeliveryTag(), false); 67 68 }catch (Exception e) { 69 70 //出現異常,置為true,重新入隊。 71 channel.basicAck(envelope.getDeliveryTag(), true); 72 73 //出現異常,不重新入隊,而是reject入死信隊列。 74 //channel.basicReject(envelope.getDeliveryTag(), false); 75 76 } 77 } 78 }; 79 //第二個參數值為false代表關閉RabbitMQ的自動應答機制,改為手動應答。 80 channel.basicConsume(QUEUE_NAME, false, consumer); 81 } 82 83 public static void messageHandler(String message) { 84 switch (message) { 85 case "loadQDProductData": // 渠道信息 渠道產品 86 Dao.loadQDProductDataToSystem(); 87 break; 88 case "loadQDGroupData": //渠道組 89 Dao.loadQDGroupDataToSystem(); 90 break; 91 case "loadCustomerData": // 客戶信息 92 Dao.loadCustomerDataToSystem(); 93 break; 94 case "loadUserProductData": // 客戶產品 95 Dao.loadUserProductDataToSystem(); 96 break; 97 default: 98 break; 99 } 100 log.info( message + " Done" ); 101 102 } 103 }

RabbitMQ之消費者Demo(隊列參數詳細說明)