1. 程式人生 > >RabbitMQ學習筆記(二)-----------------RabbitMQ生產消費訊息

RabbitMQ學習筆記(二)-----------------RabbitMQ生產消費訊息

專案地址:https://github.com/gongxianshengjiadexiaohuihui/RabbitMQ/tree/master/Hello_RabbitMQ

專案結構

需要的jar包

專案流程圖

x

首先是生產者的類,我們需要與RabbitServer建立連線,建立通道,並宣告一個佇列,然後放10條訊息進去

package com.ggp;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * @ClassName Producer
 * @Description 生產者
 * @Author Mr.G
 * @Date 2018/11/20 10:38
 * @Version 1.0
 */
public class Producer {
    /**
     * 設定佇列名
     */
    public final static String QUEUE_NAME = "rabbitMQ.test";

    public Producer()throws IOException, TimeoutException {

        /**
         * 建立連線工廠
         */
        ConnectionFactory connectionFactory = new ConnectionFactory();
        /**
         * 設定配置資訊,如果你裝完rabbitmq沒有進行配置,預設就是下面的配置
         */
        connectionFactory.setHost("localhost");
        connectionFactory.setUsername("guest");
        connectionFactory.setPassword("guest");
        /**
         * 建立一個新的連線
         */
        Connection connection = connectionFactory.newConnection();
        /**
         * 建立一個通道
         */
        Channel channel = connection.createChannel();
        /**
         * 引數,從左到右
         * 佇列名     是否持久化     是否是私有佇列  是否自動刪除    map
         */
        channel.queueDeclare(QUEUE_NAME,false,false,false,null);
        /**
         *
         */
        for(int i = 0; i < 10; i++) {
            String message = "Hello rabbitmq !" + i;

            channel.basicPublish("", QUEUE_NAME, null, message.getBytes("UTF-8"));

            System.out.println("Producer Send  Message: " + message);
        }

        channel.close();

        connection.close();
    }

}

然後建立消費者的類,同樣是建立連線,通道,然後監聽這個通道,如果有資訊的話,就取出處理,每次設定取出一條,處理時間設為1秒

package com.ggp;



import com.rabbitmq.client.*;

import java.io.IOException;

import java.util.concurrent.TimeoutException;

/**
 * @ClassName Customer
 * @Description TODO
 * @Author Mr.G
 * @Date 2018/11/20 10:57
 * @Version 1.0
 */
public class Customer {

    private final static String QUEUE_NAME = "rabbitMQ.test";

    private String cumstomerName;

    public Customer(final String customerName) throws IOException, TimeoutException {

        this.cumstomerName = customerName;

        ConnectionFactory connectionFactory = new ConnectionFactory();

        connectionFactory.setHost("localhost");
        connectionFactory.setUsername("guest");
        connectionFactory.setPassword("guest");

        Connection connection = connectionFactory.newConnection();
        Channel channel = connection.createChannel();
        channel.queueDeclare(QUEUE_NAME,false,false,false,null);

        System.out.println(customerName+" Waiting Received messages");
        /**
         * 每次從佇列種獲取訊息的數量
         */
        channel.basicQos(1);
        /**DefaultConsumer類實現了Consumer介面,通過傳入一個通道,告訴服務我們需要那個通道的資訊,如果通道有資訊傳回,就會執行回撥函式handldDelivery
         *
         */
        Consumer consumer = new DefaultConsumer(channel){
          @Override
          /**
           * 訊息分發
           */
          public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body)throws IOException{
              String message = new String(body, "UTF-8");
              System.out.println(customerName+" Received Message: "+message);
              System.out.println(customerName+" begin dealing");
              try {
                  Thread.sleep(1000);
              } catch (InterruptedException e) {
                  Thread.currentThread().interrupt();
              }
              System.out.println(customerName+" dealing successfully!");
          }
        };

        /**
         * 自動回覆佇列應答    RabbitMQ的訊息確認機制
         * 第二個引數為自動回覆
         */
         channel.basicConsume(QUEUE_NAME,true,consumer);

    }

}

最後是我們測試類

package com.ggp.test;

import com.ggp.Customer;
import com.ggp.Producer;

/**
 * @ClassName DemoTest
 * @Description TODO
 * @Author Mr.G
 * @Date 2018/11/20 15:08
 * @Version 1.0
 */
public class DemoTest {
    public static void main(String[] args) throws Exception{

        Customer customer1 = new Customer("GGP");
        Customer customer2 = new Customer("RQB");
        new Producer();
    }
}

執行結果

參考資料:https://www.cnblogs.com/LipeiNet