1. 程式人生 > >RabbitMQ for Java【入門教程 1】

RabbitMQ for Java【入門教程 1】

       RabbitMQ是訊息代理。從本質上說,它接受來自生產者的資訊,並將它們傳遞給消費者。在兩者之間,它可以根據你給它的路由,緩衝規則進行傳遞訊息。        如果你的工作中需要用到RabbitMQ,那麼我建議你先在電腦上安裝好RabbitMQ伺服器,然後開啟eclipse,跟這我的教程一步步的學習RabbitMQ,這樣你會對RabbitMQ有一個全面的認識,而且能打好一個很好的基礎。

準備工作:安裝erlang,RabbitMQ等工具 配置相應的環境變數  這裡就不多說!

一:helloWorld的實現

 P消費者將訊息推送到queue佇列中,佇列將訊息推送給消費者或者快取到本地快取(取決於消費者的狀態)  C消費者向queue取相關的資訊或者queue推送給消費者

注意:

        生產者,消費者和佇列(RabbitMQ)不必部署在同一臺機器上。實際在生產環境的大多數應用中,他們都是分開部署的

程式碼實現:

首先加入依賴包

<dependency>
    <groupId>com.rabbitmq</groupId>
    <artifactId>amqp-client</artifactId>
    <version>3.6.3</version>
</dependency>

接下來建立連線rabbitMq伺服器的工具類

package wxtest.rabbitMq;

import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class ConnectionUtil {


    public static Connection getConnection() throws IOException, TimeoutException {
        ConnectionFactory factory = new ConnectionFactory();
        // 設定MabbitMQ, 主機ip或者主機名
        factory.setHost("localhost");
        // 建立一個連線
        Connection connection = factory.newConnection();
        return connection;
    }

}

生產者:

package wxtest.rabbitMq;

import com.rabbitmq.client.Channel;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * 訊息傳送方
 **/
public class rabbitMqSend {
    
    private final static String QUEUE_NAME = "hello";

    public static void main(String[] args) throws IOException, TimeoutException {

        // 建立一個通道
        Channel channel = ConnectionUtil.getConnection().createChannel();
        // 指定一個佇列(給佇列明命名 那麼接受者也應該是這個對列名)
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        // 傳送訊息
        String message = "Hello World!";
        channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
        System.out.println(" [x] Sent '" + message + "'");
        // 關閉頻道和連線
        channel.close();
        ConnectionUtil.getConnection().close();
    }
}

消費者:

package wxtest.rabbitMq;

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * 訊息接受方
 **/

public class rabbitMqRev {

    private final static String QUEUE_NAME = "hello";

    public static void main(String[] args) throws IOException, TimeoutException {

        // 建立一個通道
        Channel channel = ConnectionUtil.getConnection().createChannel();
        // 指定一個佇列
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);

        Consumer defaultConsumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
                                       byte[] body) throws IOException {
                String message = new String(body, "UTF-8");
                System.out.println(" [x] Received '" + message + "'");
            }
        };
        channel.basicConsume(QUEUE_NAME, true, defaultConsumer);

    }

}

輸出結果如下

 [x] Received 'Hello World!'  [x] Received 'Hello World!'  [x] Received 'Hello World!'