1. 程式人生 > >RabbitMQ學習總結(3)——入門例項教程詳解

RabbitMQ學習總結(3)——入門例項教程詳解

一、起航

       本章節,柯南君將從幾個層面,用官網例子講解一下RabbitMQ的實操經典程式案例,讓大家重新回到經典“Hello world!”(The simplest thing that does something )時代,RabbitMQ 支援N多種客戶端(client),這裡無法一一講解,暫定java client,有時間的情況下,在彌補一下。

事先,先普及一下圖示(我們會在下面的事例中,會大量用到,所以先普及一下,便於識別,最終更好理解事例的含義)

1、圖示概念

① producting(生產者):在程式中 傳送訊息的一端,我們暫且稱之為 生產者,在這裡用“p”表示

 queue(佇列):佇列是一個郵箱的名字。它住在RabbitMQ。儘管訊息流經RabbitMQ和您的應用程式,他們只可以儲存在一個佇列中。佇列是不受任何限制,它可以儲存儘可能多的資訊(按你興趣來了),它本質上是一個無限緩衝區。許多生產商可以傳送訊息到一個佇列,許多消費者可以嘗試接收資料從一個佇列。

 consuming(消費者):消費者和生產者是對應的,較為相似的意思;在這裡,我用“C”表示

2、The Java client library

RabbitMQ 中AMQP這是一個開放的、通用的協議訊息。有許多客戶AMQP在許多不同的語言。我們將使用提供的Java客戶機RabbitMQ。

我們需要下載(Download) client library package,並要核實每個jar包,解壓到相應位置,如下圖所示:

第二步:選擇合適的下載,比如我下載了zip包,如圖所示:

第三步Unzip it(解壓它) 到你的working directory(工作目錄)中 and grab (並且獲得)你的jar包檔案

  • $ unzip rabbitmq-java-client-bin-*.zip
  • $ cp rabbitmq-java-client-bin-*/*.jar ./

二、程式案例

1) "Hello World" 

在這部分教程中我們將用Java寫兩個程式,傳送一個訊息的生產者,消費者接收資訊並打印出來。我們會掩蓋一些細節的Java API,集中在這個非常簡單的東西開始。這是一個“Hello World”的訊息。在下面的圖中,“P”是我們的生產和“C”是我們的消費者。中間的框是一個佇列,訊息緩衝區RabbitMQ保持代表消費者。
 ① sending (傳送)
首先 讓The sender(訊息傳送者)傳送訊息並且讓我們的receiver (訊息接收者)接收訊息,The sender(訊息傳送者)將會connect to(連線)RabbitMQ,傳送一個single message(單一的資訊),然後exit(退出)。
  • send.java 中,我們需要import一些class ,如下所示:
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Channel;
  • set up(設定)類和queue的name
public class Send {

  private final static String QUEUE_NAME = "hello";

  public static void main(String[] argv)
      throws java.io.IOException {
      ...
  }
       }
  • then 我們create 一個connection (連線)到server(服務端)
    onnectionFactory factory = new ConnectionFactory();
    factory.setHost("localhost");
    Connection connection = factory.newConnection();
    Channel channel = connection.createChannel();
備註
  •  這個connection 是抽象的socket connection連結;
  •  負責協議版本(protocol version negotigation)和身份認證(authentication );
  • 我們在本地機器上連線到一個代理即 localhost ,如果我們想要連線到代理不同機器上我們簡單的指定其名稱或者IP地址即可;
  接下來,我們建立一個channel(通道),這個通道彙集了大多數的API服務! 為了傳送,我們必須先宣告一個為我們傳送queue(佇列),然後,往queue裡傳送一個message
    channel.queueDeclare(QUEUE_NAME, false, false, false, null);
    String message = "Hello World!";
    channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
    System.out.println(" [x] Sent '" + message + "'");
訊息內容是一個位元組陣列,所以你可以編碼任何你喜歡的。最後,我們關閉通道和連線;
    channel.close();
    connection.close();
問題  如果 sending doesn‘t work! 我們將怎麼辦?why? 如果這是你第一次使用RabbitMQ並且你看不到“傳送的”訊息,那麼你可能抓耳撓腮沒有足夠的空閒磁碟空間(預設情況下它需要至少1 gb免費),因此拒絕接受訊息。檢查代理日誌檔案確認,如果有必要減少限制。配置檔案的文件將向您展示如何設定disk_free_limit。 接下來的是send.java所有原始碼: [java] view plaincopyprint?
  1. import com.rabbitmq.client.ConnectionFactory;  
  2. import com.rabbitmq.client.Connection;  
  3. import com.rabbitmq.client.Channel;  
  4. publicclass Send {  
  5.   privatefinalstatic String QUEUE_NAME = "hello";  
  6.   publicstaticvoid main(String[] argv) throws Exception {  
  7.     ConnectionFactory factory = new ConnectionFactory();  
  8.     factory.setHost("localhost");  
  9.     Connection connection = factory.newConnection();  
  10.     Channel channel = connection.createChannel();  
  11.     channel.queueDeclare(QUEUE_NAME, falsefalsefalsenull);  
  12.     String message = "Hello World!";  
  13.     channel.basicPublish("", QUEUE_NAME, null, message.getBytes());  
  14.     System.out.println(" [x] Sent '" + message + "'");  
  15.     channel.close();  
  16.     connection.close();  
  17.   }  
  18. }  

   ② Receiving (接收)
  這就是我們的傳送者。我們的接收器是將訊息從RabbitMQ,所以不像傳送方釋出一個訊息,我們將保持執行監聽訊息並打印出來
  • The code (in Recv.java) has almost the same imports as Send:
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.QueueingConsumer;
額外的QueueingConsumer是一個類,我們將使用緩衝訊息推到我們的伺服器。設定傳送者一樣,我們開啟一個連線和一個通道,並宣佈我們將使用的佇列。注意這與佇列,傳送釋出。
public class Recv {

  private final static String QUEUE_NAME = "hello";

  public static void main(String[] argv)
      throws java.io.IOException,
             java.lang.InterruptedException {

    ConnectionFactory factory = new ConnectionFactory();
    factory.setHost("localhost");
    Connection connection = factory.newConnection();
    Channel channel = connection.createChannel();

    channel.queueDeclare(QUEUE_NAME, false, false, false, null);
    System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
    ...
    }
}
注意,我們在這裡宣告佇列。因為我們可能會在傳送方之前開始啟動接收方,我們要確保佇列存在之前我們嘗試使用訊息。我們要告訴伺服器提供我們從佇列的訊息。因為它將非同步訊息,我們提供一個回撥物件的形式,將緩衝的訊息,直到我們準備使用它們。 QueueingConsumer要做什麼呢?
  QueueingConsumer consumer = new QueueingConsumer(channel);
    channel.basicConsume(QUEUE_NAME, true, consumer);

    while (true) {
      QueueingConsumer.Delivery delivery = consumer.nextDelivery();
      String message = new String(delivery.getBody());
      System.out.println(" [x] Received '" + message + "'");
    }
QueueingConsumer.nextDelivery()塊,直到另一個來自伺服器的訊息交付。   下面是Recv.java 原始碼:
[java] view plaincopyprint?在CODE上檢視程式碼片派生到我的程式碼片
  1. import com.rabbitmq.client.ConnectionFactory;  
  2. import com.rabbitmq.client.Connection;  
  3. import com.rabbitmq.client.Channel;  
  4. import com.rabbitmq.client.QueueingConsumer;  
  5. publicclass Recv {  
  6.     privatefinalstatic String QUEUE_NAME = "hello";  
  7.     publicstaticvoid main(String[] argv) throws Exception {  
  8.     ConnectionFactory factory = new ConnectionFactory();  
  9.     factory.setHost("localhost");  
  10.     Connection connection = factory.newConnection();  
  11.     Channel channel = connection.createChannel();  
  12.     channel.queueDeclare(QUEUE_NAME, falsefalsefalsenull);  
  13.     System.out.println(" [*] Waiting for messages. To exit press CTRL+C");  
  14.     QueueingConsumer consumer = new QueueingConsumer(channel);  
  15.     channel.basicConsume(QUEUE_NAME, true, consumer);  
  16.     while (true) {  
  17.       QueueingConsumer.Delivery delivery = consumer.nextDelivery();  
  18.       String message = new String(delivery.getBody());  
  19.       System.out.println(" [x] Received '" + message + "'");  
  20.     }  
  21.   }  
  22. }