1. 程式人生 > >RabbitMQ學習——生產者與消費者入門例子

RabbitMQ學習——生產者與消費者入門例子

生產者

package com.learn.rabbitmqapi.message;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class Producer {
    public static final String MQ_HOST =
"192.168.222.101"; public static final String MQ_VHOST = "/"; public static final int MQ_PORT = 5672; public static void main(String[] args) throws IOException, TimeoutException { //1. 建立一個ConnectionFactory ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.
setHost(MQ_HOST);//配置host connectionFactory.setPort(MQ_PORT);//配置port connectionFactory.setVirtualHost(MQ_VHOST);//配置vHost //2. 通過連線工廠建立連線 Connection connection = connectionFactory.newConnection(); //3. 通過connection建立一個Channel Channel channel = connection.createChannel
(); //4. 通過Channel傳送資料 for (int i = 0; i < 10; i++) { String message = "Hello" + i; //exchange為"",則通過routingKey取尋找佇列 channel.basicPublish("","testQueue",null,message.getBytes()); } //5. 關閉連線 channel.close(); connection.close(); } }

消費者

package com.learn.rabbitmqapi.message;

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class Consumer {
    public static final String MQ_HOST = "192.168.222.101";
    public static final String MQ_VHOST = "/";
    public static final int MQ_PORT = 5672;

    public static final String QUEUE_NAME = "testQueue";

    public static void main(String[] args) throws Exception {
        //1. 建立一個ConnectionFactory
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost(MQ_HOST);//配置host
        connectionFactory.setPort(MQ_PORT);//配置port
        connectionFactory.setVirtualHost(MQ_VHOST);//配置vHost

        //2. 通過連線工廠建立連線
        Connection connection = connectionFactory.newConnection();

        //3. 通過connection建立一個Channel
        Channel channel = connection.createChannel();

        //4. 宣告(建立)一個佇列
        channel.queueDeclare(QUEUE_NAME,true,false,false,null);

        //5. 建立消費者
        QueueingConsumer queueingConsumer = new QueueingConsumer(channel);

        //6. 設定Channel
        channel.basicConsume(QUEUE_NAME,true,queueingConsumer);

        int num = 0;
        //7. 獲取訊息
        while (true) {
            //nextDelivery 會阻塞直到有訊息過來
            QueueingConsumer.Delivery delivery = queueingConsumer.nextDelivery();

            String message = new String(delivery.getBody());
            System.out.println("收到:" + message);
            num++;
            if (num == 8) {
                break;
            }
        }

        channel.close();
        connection.close();

    }
}

先啟動消費者,消費者程式碼會新建一個佇列,再啟動生成者