1. 程式人生 > >RabbitMQ學習筆記(2)----RabbitMQ簡單佇列(Hello World)的使用

RabbitMQ學習筆記(2)----RabbitMQ簡單佇列(Hello World)的使用

1. 簡單佇列結構圖

  

2. 引入依賴

  pom.xml檔案

 <dependency>
      <groupId>com.rabbitmq</groupId>
      <artifactId>amqp-client</artifactId>
      <version>5.5.0</version>
    </dependency>
    <dependency>
      <groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId> <version>1.8.0-beta2</version> </dependency> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-simple</artifactId> <version>1.8.0-beta2</version>
</dependency>

3. 生產者生產訊息

  Producer如下:

package com.wangx.rabbitmq.helloworld;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class Producer { /** * 佇列名字 */ private static final String QUEUE_NAME = "MyQueue"; public static void main(String[] args) throws IOException, TimeoutException { //建立連線工廠 ConnectionFactory factory = new ConnectionFactory(); //設定伺服器主機 factory.setHost("localhost"); //設定使用者名稱 factory.setUsername("wangx"); //設定密碼 factory.setPassword("wangx"); //設定VirtualHost factory.setVirtualHost("/wangx"); Connection connection = null; Channel channel = null; try { //建立連線 connection = factory.newConnection(); //建立訊息通道 channel = connection.createChannel(); //宣告佇列 channel.queueDeclare(QUEUE_NAME, false, false, false, null); String message = "Hello World!"; //傳送訊息 for (int i = 0; i < 10; i++) { //傳送訊息 channel.basicPublish("", QUEUE_NAME, null, (message + i).getBytes()); System.out.println(" [x] Sent '" + message + i + "'"); } }catch (Exception e) { e.printStackTrace(); } finally { channel.close(); connection.close(); } } }

  執行傳送訊息,然後檢視控制檯:

  

  可以看到已經存在了剛剛的十條訊息了。

4. 消費者消費訊息

package com.wangx.rabbitmq.helloworld;

import com.rabbitmq.client.*;
import com.rabbitmq.client.impl.recovery.QueueRecoveryListener;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class MyConsumer {
    /**
     * 佇列名字
     */
    private static final String QUEUE_NAME = "MyQueue";
    public static void main(String[] args) throws IOException, TimeoutException {

        //建立連線工廠
        ConnectionFactory factory = new ConnectionFactory();
        //設定伺服器主機
        factory.setHost("localhost");
        //設定使用者
        factory.setUsername("wangx");
        //設定密碼
        factory.setPassword("wangx");
        //設定VirtualHost
        factory.setVirtualHost("/wangx");
        Connection connection = null;
        Channel channel = null;
        try {

            //建立連線
            connection = factory.newConnection();
            //建立訊息通道
            channel = connection.createChannel();
            //宣告佇列
            channel.queueDeclare(QUEUE_NAME, false, false, false, null);
            Consumer consumer = new DefaultConsumer(channel){
                //重寫DefaultConsumer中handleDelivery方法,在方法中獲取訊息
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope,
                                           AMQP.BasicProperties properties, byte[] body) throws IOException{
                    String message = new String(body, "UTF-8");
                    System.out.println("收到訊息 '" + message + "'");
                }
            };
            //監聽訊息
            channel.basicConsume(QUEUE_NAME, true,consumer);
        }catch (Exception e) {
            e.printStackTrace();
        }finally {
        }
    }
}

  啟動消費者,可以看到將會成功消費剛剛生產的十條訊息。

  控制檯列印如下:

收到訊息 'Hello World!0'
收到訊息 'Hello World!1'
收到訊息 'Hello World!2'
收到訊息 'Hello World!3'
收到訊息 'Hello World!4'
收到訊息 'Hello World!5'
收到訊息 'Hello World!6'
收到訊息 'Hello World!7'
收到訊息 'Hello World!8'
收到訊息 'Hello World!9'

  檢視rabbitmq:

  

 

  剛剛生產者生產的訊息已經不存在了,表示已經被消費了。