1. 程式人生 > >RabbitMQ學習之旅(一)

RabbitMQ學習之旅(一)

RabbitMQ學習總結(一)

RabbitMQ簡介

RabbitMQ是一個訊息代理,其接收並轉發訊息。類似於現實生活中的郵局:你把信件投入郵箱的過程,相當於往佇列中新增資訊,因為所有郵箱中的信件最終都會彙集到郵局中;當郵遞員把你的新建傳送給收件人的時候,相當於訊息的轉發。

RabbitMQ中的常見術語

  • 生產者(Provider):生產者負責生產訊息,並將其傳送到訊息佇列中
  • 佇列(Queue):訊息代理(Proxy)角色,從生產者那裡接收訊息,並將其轉發到消費者進行消費。佇列主要受限於主機的記憶體和磁碟的大小,其本質是一個訊息緩衝區
  • 消費者(Consumer):消費者從佇列中接收訊息,並對訊息進行處理
  • 交換機(Exchange):交換機位於生產者(Provider)和佇列(Queue)之間,相當於路由(Routing),伺服器會根據路由鍵(RoutingKey)將訊息經由交換機路由到對應的佇列(Queue)上去
  • 通道(Channel):通道是生產者與消費者和佇列進行通訊的渠道,其是建立在TCP連線上的虛擬連線;RabbitMQ在一條TCP連線上建立成千上百條通道來達到多執行緒處理,這個TCP被多個執行緒共享,每個執行緒對應一個通道,每個通道對應唯一的ID,保障通道私有性

RabbitMQ簡單使用

STEP1:準備`Maven`依賴

<dependency
>

      <groupId>com.rabbitmq</groupId>
      <artifactId>amqp-client</artifactId>
      <version>RELEASE</version>
</dependency>

STEP2:生產者

public class Provider
 
{
    private final static String QUEUE_NAME = "hello";

    public static void main(String[] args) throws IOException, TimeoutException {
        // 建立連線
        ConnectionFactory factory = new ConnectionFactory();
        // 設定 RabbitMQ 的主機名
        factory.setHost("ip");
        factory.setPort(5672);
        factory.setUsername("username");
        factory.setPassword("password");
        // 建立一個連線
        Connection connection = factory.newConnection();
        // 建立一個通道
        Channel channel = connection.createChannel();    
        // 指定一個佇列
        // queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments)
        // 引數1 queue :佇列名
        // 引數2 durable :是否持久化
        // 引數3 exclusive :僅建立者可以使用的私有佇列,斷開後自動刪除
        // 引數4 autoDelete : 當所有消費客戶端連線斷開後,是否自動刪除佇列
        // 引數5 arguments
        channel.queueDeclare(QUEUE_NAME, falsefalsefalsenull);
        // 傳送訊息
        String message = "Hello World!";
        // basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body)
        // 引數1 exchange :交換器
        // 引數2 routingKey : 路由鍵
        // 引數3 props : 訊息的其他引數
        // 引數4 body : 訊息體
        channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
        System.out.println(" [x] Sent '" + message + "'");
        // 關閉頻道和連線  
        channel.close();
        connection.close();
    }
}

注意:宣告佇列是冪等的,代表著只有在佇列不存在的時候才會被建立,多次宣告並不會重複建立。

STEP3:消費者

public class Recv {
    private final static String QUEUE_NAME = "hello";

    public static void main(String[] args) throws IOException, TimeoutException {
        // 建立連線
        ConnectionFactory factory = new ConnectionFactory();
        // 設定 RabbitMQ 的主機名
        factory.setHost("ip");
        factory.setPort(5672);
        factory.setUsername("username");
        factory.setPassword("password");
        // 建立一個連線
        Connection connection = factory.newConnection();
        // 建立一個通道
        Channel channel = connection.createChannel();
        // 指定一個佇列
        // queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments)
        // 引數1 queue :佇列名
        // 引數2 durable :是否持久化
        // 引數3 exclusive :僅建立者可以使用的私有佇列,斷開後自動刪除
        // 引數4 autoDelete : 當所有消費客戶端連線斷開後,是否自動刪除佇列
        // 引數5 arguments
        channel.queueDeclare(QUEUE_NAME, falsefalsefalsenull);
        System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
        // 建立佇列消費者,此處是一個回撥函式,當消費者接收到來自訊息佇列的訊息時,會呼叫該介面中被重寫的方法
        Consumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope,
                                       AMQP.BasicProperties properties, byte[] body)
 throws IOException 
{
                String message = new String(body, "UTF-8");
                System.out.println(" [x] Received '" + message + "'");
            }
        };
        // basicConsume(String queue, boolean autoAck, Consumer callback)
        // 引數1 queue :佇列名
        // 引數2 autoAck : 是否自動ACK;為true的時候表示自動回覆,為false的時候表示手動回覆;自動回覆的含義是,當消費
        // 者接收到訊息的時候,就自動認定該訊息已經被消費;手動回覆的含義是,不進行自動回覆,只有接收到方法傳送到的確認信
        // 息,才確認該訊息已經被消費
        // 引數3 callback : 消費者物件的一個介面,用來配置回撥
        channel.basicConsume(QUEUE_NAME, true, consumer);
    }
}