1. 程式人生 > >RabbitMQ的三種Exchange,資料持久化,非持久化例項

RabbitMQ的三種Exchange,資料持久化,非持久化例項

由於最近專案需要用到Rabbitmq的資料持久化技術,利用空閒時間,分別對Rabbitmq的三種常用的Exchange(direct、fanout、topic)寫了個測試例項,僅供初學者參考學習,還望各路大神勿吐槽。

開發之前需要引用包:最好是3.4.0以下版本,因為以上版本試過,出現超時,具體原因尚未研究,還望研究過的大神留言告知一下,謝謝。

		<dependency>
			<groupId>com.rabbitmq</groupId>
			<artifactId>amqp-client</artifactId>
			<version>3.4.0</version>
		</dependency>

1.direct(釋出與訂閱)

1.1生產者(Direct)

public class Direct {
	private static final String EXCHANGE_NAME = "temp_direct";  
    private static final String[] TYPE = { "info", "warning", "error" };  
    
    public static void main(String[] argv) throws java.io.IOException  
    {  
        // 建立連線和頻道  
        ConnectionFactory factory = new ConnectionFactory();  
        factory.setHost("192.168.32.129");  
        Connection connection = factory.newConnection();  
        Channel channel = connection.createChannel();  
        // 宣告轉發器的型別  
        channel.exchangeDeclare(EXCHANGE_NAME, "direct",true);  
  
        //傳送6條訊息  
        for (int i = 0; i <TYPE.length; i++)  
        {  
            String rand = getRandom();  
            String message = rand + "_log :" + UUID.randomUUID().toString();  
            //持久化
            channel.queueDeclare(TYPE[i], true, false, false, null);
            //流量
            channel.basicQos(1);
            //將訊息佇列繫結到Exchange
            channel.queueBind(TYPE[i], EXCHANGE_NAME, TYPE[i]);
            // 釋出訊息至轉發器,指定routingkey  
            channel.basicPublish(EXCHANGE_NAME, TYPE[i], null, message  
                    .getBytes());  
            System.out.println("佇列" + TYPE[i] + "繫結成功!");  
        }  
  
        channel.close();  
        connection.close();  
    }  
  
    /** 
     * 隨機產生一種日誌型別 
     *  
     * @return 
     */  
    private static String getRandom()  
    {  
        Random random = new Random();  
        int ranVal = random.nextInt(3);  
        return TYPE[ranVal];  
    }  
}

1.2消費者
public class ReceiveDirect {
	private static final String EXCHANGE_NAME = "temp_direct";  
	private final static String HOST = "192.168.32.129";
    private static final String[] TYPE = { "info", "warning", "error" };  
    
    public static void main(String[] argv) throws java.io.IOException,  
    java.lang.InterruptedException  
	{  
	// 建立連線和頻道  
	ConnectionFactory factory = new ConnectionFactory();  
	factory.setHost(HOST);  
	Connection connection = factory.newConnection();  
	final Channel channel = connection.createChannel();  
	// 宣告direct型別轉發器  
	channel.exchangeDeclare(EXCHANGE_NAME, "direct",true);  
	for (int i = 0; i < TYPE.length; i++) {
		//持久化
		channel.queueDeclare(TYPE[i], true, false, false, null);
		//流量控制
		channel.basicQos(1);
		//將訊息佇列繫結到Exchange
		channel.queueBind(TYPE[i], EXCHANGE_NAME, TYPE[i]);
		 
		System.out.println("佇列" + TYPE[i] + "繫結成功!");
	}
	
	for (int i = 0; i < TYPE.length; i++) {
	      final String queue = TYPE[i];
	      new Thread(){
	        public void run() {
	          try {
	            receive(channel, queue);
	          } catch (Exception e) {
	            e.printStackTrace();
	          }
	        }
	      }.start();
	    }
	}  
	
	private static void receive(Channel channel,String QUEUE_NAME) throws Exception {
	    // 宣告消費者
	    QueueingConsumer consumer = new QueueingConsumer(channel);
	    channel.basicConsume(QUEUE_NAME, false, consumer);
	    while (true) {
	      // 等待佇列推送訊息
	      QueueingConsumer.Delivery delivery = consumer.nextDelivery();
	      String message = new String(delivery.getBody(), "UTF-8");
	      System.out.println(QUEUE_NAME + " Received '" + message + "'");
	      // 反饋給伺服器表示收到資訊
	      channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
	    }
	}
}

2.fanout(廣播)

2.1生產者

public class Fanout {
	 private final static String HOST = "192.168.32.129";
	 private final static String EXCHANGE_NAME = "fanout";
	 private final static String QUEUE = "temp_fanout";
	 private final static String ROUTKEY = "mq.fanout";
	 private final static boolean DURABLE = true;
	 public static void main(String[] args) throws IOException, TimeoutException{
		 	// 建立連線和頻道  
	        ConnectionFactory factory = new ConnectionFactory();  
	        factory.setHost(HOST);  
	        Connection connection = factory.newConnection();  
	        Channel channel = connection.createChannel();  
	        // 宣告轉發器和型別  
	        channel.exchangeDeclare(EXCHANGE_NAME, "fanout" );  
	        //持久化
	        channel.queueDeclare(QUEUE, DURABLE, false, false, null);
	        channel.basicQos(1);
	        channel.queueBind(QUEUE, EXCHANGE_NAME, ROUTKEY);
	        String message = new Date().getTime()+" : fanout something";  
	        // 往轉發器上傳送訊息  
	        channel.basicPublish(EXCHANGE_NAME, "", MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());  
	  
	        System.out.println(" [x] Sent '" + message + "'");  
	  
	        channel.close();  
	        connection.close();  
	 }
}
2.2消費者
public class ReceiveFanout {
	 private final static String HOST = "192.168.32.129";
	 private final static String EXCHANGE_NAME = "fanout";
	 private final static String QUEUE = "temp_fanout";
	 private final static String ROUTKEY = "mq.fanout";
	 private final static boolean DURABLE = true;
	public static void main(String[] args) throws java.io.IOException,  
    java.lang.InterruptedException, TimeoutException{
		// 建立連線和頻道  
        ConnectionFactory factory = new ConnectionFactory();  
        factory.setHost(HOST);  
        Connection connection = factory.newConnection();  
        Channel channel = connection.createChannel();  
  
        channel.exchangeDeclare(EXCHANGE_NAME, "fanout"); 
        //持久化
        channel.queueDeclare(QUEUE, DURABLE, false, false, null);
        channel.basicQos(1);
        // 建立一個非持久的、唯一的且自動刪除的佇列  
        //String queueName = channel.queueDeclare().getQueue();  
        // 為轉發器指定佇列,設定binding  
        channel.queueBind(QUEUE, EXCHANGE_NAME,ROUTKEY);  
  
        System.out.println(" [*] Waiting for messages. To exit press CTRL+C");  
  
        QueueingConsumer consumer = new QueueingConsumer(channel);  
        // 指定接收者,第二個引數為自動應答,無需手動應答  
        channel.basicConsume(QUEUE, true, consumer);  
  
        while (true)  
        {  
            QueueingConsumer.Delivery delivery = consumer.nextDelivery();  
            String message = new String(delivery.getBody());  
            System.out.println(" [x] Received '" + message + "'");  
  
        }  
	}
}

3.topic(主題)

3.1生產者

public class Topic {
	private final static String HOST = "192.168.32.129";
	private static final String EXCHANGE_NAME = "topic_Exc";
	private static final String QUEUE = "temp_wwww";
	private static final String ROUTKEY="*_topic";
	private static final boolean durable = true;
	public static void main(String[] argv) throws Exception  
    {  
        // 建立連線和頻道  
        ConnectionFactory factory = new ConnectionFactory();  
        factory.setHost(HOST);  
        Connection connection = factory.newConnection();  
        Channel channel = connection.createChannel();  
        //宣告轉發器
        channel.exchangeDeclare(EXCHANGE_NAME, "topic",durable);  
        //持久化
        channel.queueDeclare(QUEUE, durable, false, false, null);
        channel.basicQos(1);
        //將訊息佇列繫結到Exchange
        channel.queueBind(QUEUE, EXCHANGE_NAME, ROUTKEY);
        
        String msg = UUID.randomUUID().toString();
        
        channel.basicPublish(EXCHANGE_NAME, ROUTKEY, MessageProperties.PERSISTENT_TEXT_PLAIN, msg  
                .getBytes()); 
        System.out.println(msg);
  
        channel.close();  
        connection.close();  
    }  
}

3.2消費者
public class ReceiveTopicFortopic{
	private final static String HOST = "192.168.32.129";
	private static final String EXCHANGE_NAME = "topic_Exc";
	private static final String QUEUE = "temp_topic";
	private static final String ROUTKEY="*_topic";
	 private static final boolean durable = true;
	 public static void main(String[] argv) throws Exception  
	    {  
	        // 建立連線和頻道  
	        ConnectionFactory factory = new ConnectionFactory();  
	        factory.setHost(HOST);  
	        Connection connection = factory.newConnection();  
	        Channel channel = connection.createChannel();  
	        // 宣告轉發器  
	        channel.exchangeDeclare(EXCHANGE_NAME, "topic",durable); 
	        //持久化
	        channel.queueDeclare(QUEUE, durable, false, false, null);
	        channel.basicQos(1);
	        //將訊息佇列繫結到Exchange
	        channel.queueBind(QUEUE, EXCHANGE_NAME, ROUTKEY);

	        System.out  
	                .println(" [*] Waiting for critical messages. To exit press CTRL+C");  
	        //宣告消費者
	        QueueingConsumer consumer = new QueueingConsumer(channel);  
	        channel.basicConsume(QUEUE, true, consumer);  
	  
	        while (true)  
	        {  
	            QueueingConsumer.Delivery delivery = consumer.nextDelivery();  
	            String message = new String(delivery.getBody());  
	            String routingKey = delivery.getEnvelope().getRoutingKey();  
	  
	            System.out.println(" [x] Received routingKey = " + routingKey  
	                    + ",msg = " + message + ".");  
	        }  
	    }  
}


上面就是這三種Exchange例項,完畢。