1. 程式人生 > >RabbitMQ指南之一:"Hello World!"

RabbitMQ指南之一:"Hello World!"

新建 指定 根據 依賴 只需要 工作 說明 配置 hello

  為什麽要使用MQ消息中間件?它解決了什麽問題?關於為什麽要使用消息中間件?消息中間件是如何做到同步變異步、流量削鋒、應用解耦的?網上已經有很多說明,我這裏就不再說明了,讀者可以參考(https://www.jianshu.com/p/2820561158c4)。我在接下來的RabbitMq系列博客裏會將官方的講解翻譯過來,同時加以自己的理解整理成博客,希望能和大家共同交流,一起進步。

  技術分享圖片

技術分享圖片

                  RabbitMq原理圖

1、RabbitMq簡介

  RabbitMq是一個消息中間件:它接收消息、轉發消息。你可以把它理解為一個郵局:當你向郵箱裏寄出一封信後,郵遞員們就能最終將信送到收信人手中。類似的,RabbitMq就好比是一個郵箱、郵局和郵遞員。RabbitMq和郵局最大的區別是:RabbitMq接收、轉發的都是二進制數據塊--消息,而不是紙質的數據文件。

  RabbitMq、消息相關術語如下:

  生產者:生產者只發送消息,發送消息的程序即為生產者:

  技術分享圖片

  消息隊列:消息隊列就相當於RabbitMq中的郵箱名稱。盡管消息在你的程序和RabbitMq中流動,但它只能存儲在消息隊列中。隊列本質上是一個大的消息緩存,它能存多少消息,取決於主機的內存和磁盤限制。多個生產者可以往同一個消息隊列中發送消息;多個消費者可以從同一個隊列中獲取數據。我們以下列圖形來表示一個消息隊列:

  技術分享圖片

  消費者:消費者是一個等待接收消息的程序:

  技術分享圖片

  註意:生產者、消費者和RabbitMq可以在不同的機器上;在很多的應用中,一個生產者同時也可能是消費者。

2、“Hello World!”

  在這小節裏,我們將寫一個消息生產者用來發送消息、一個消息消費者來消費消息(接收消息並打印出來)。

  在下面圖形中,“P”是我們的生產者,“C”是我們的消費者,中間的紅框是我們的消息隊列,保存了從生產者那裏接收到的準備轉發到消費方的消息。

  技術分享圖片

  

  Java客戶端類庫說明:

  RabbitMq使用多種協議,本指南使用AMQP 0-9-1協議,該協議是一個開源的、通用的消息協議。RabbitMq有多種語言的客戶端,這裏我們使用JAVA語言的客戶端做實驗。通過以下地址下載RabbitMq客戶端jar包和依賴包:

  amqp-client-5.5.1.jar

  slf4j-api-1.7.25.jar

  slf4j-simple-1.7.25.jar

  把這三個jar包拷貝到你的工作目錄,包括後面教程要新建的java文件。

2.1 發送消息

  生產者連接RabbitMq,發送一條簡單的消息”Hello World!“後就退出。

  在Send.java類中,需要引入以下依賴包:

1 import com.rabbitmq.client.ConnectionFactory;
2 import com.rabbitmq.client.Connection;
3 import com.rabbitmq.client.Channel;

  給隊列起個名字:

1 public class Send {
2   private final static String QUEUE_NAME = "hello";
3   public static void main(String[] argv) throws Exception {
4       ...
5   }
6 }

  創建連接到服務器的連接Collection:

1 onnectionFactory factory = new ConnectionFactory();
2 factory.setHost("localhost");
3 try (Connection connection = factory.newConnection();
4      Channel channel = connection.createChannel()) {
5 
6 }

  這個連接即套接字連接,為我們處理協議版本協商和身份驗證等。這裏我們連接一個本地的RabbitMq:因此是localhost,如果你想要連接一個遠程機器上的RabbitMq,只需要把localhst改成那臺機器的計算機名或是IP地址。

  創建完連接之後,我們繼續創建一個信道:Channel。我們需要使用try-with-resource表達式,因為Connection和Channel都實現了JAVA接口Closeable,屬於資源,需要關閉,這樣我們就不需要顯示地在我們的代碼中進行關閉了。(關於信道,請參考文章最頂部的RabbitMq原理圖,是TCP裏面的虛擬鏈接,例如:電纜相當於一個TCP,信道就是裏面的一個獨立光纖,一條TCP上面創建多條信道是沒有問題的;TCP一旦打開就分創建AMQP信道;無論是發布消息、接收消息、訂閱隊列,這些動作都是通過信道完成的)。

  為了發送消息,我們還必須要定義一個需要發送到的消息隊列,這些都要使用try-with-resource表達式:

1 channel.queueDeclare(QUEUE_NAME, false, false, false, null);
2 String message = "Hello World!";
3 channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
4 System.out.println(" [x] Sent ‘" + message + "‘");

  定義一個消息隊列是冪等的:只有該隊列不存在的時候才能被創建,消息是二進制數組,因此你可以根據需要指定編碼。

  完成的Send.java如下:

 1 import com.rabbitmq.client.Channel;
 2 import com.rabbitmq.client.Connection;
 3 import com.rabbitmq.client.ConnectionFactory;
 4 
 5 public class Send {
 6 
 7     private final static String QUEUE_NAME = "hello";
 8 
 9     public static void main(String[] argv) throws Exception {
10         ConnectionFactory factory = new ConnectionFactory();
11         factory.setHost("localhost");
12         try (Connection connection = factory.newConnection();
13              Channel channel = connection.createChannel()) {
14             channel.queueDeclare(QUEUE_NAME, false, false, false, null);
15             String message = "Hello World!";
16             channel.basicPublish("", QUEUE_NAME, null, message.getBytes("UTF-8"));
17             System.out.println(" [x] Sent ‘" + message + "‘");
18         }
19     }
20 }

2.2 接收消息

  消費者監聽RabbitMq中的消息,因此與生產者發送一條消息就退出不同,消費者要保持運行狀態來接收消息並打印出來。

  Recv.java同樣需要導入以下依賴包:

1 import com.rabbitmq.client.Channel;
2 import com.rabbitmq.client.Connection;
3 import com.rabbitmq.client.ConnectionFactory;
4 import com.rabbitmq.client.DeliverCallback;

  與生產者相同,我們需要創建Connetcion和Channel、定義隊列(需要監聽並接收消息的隊列):

 1 public class Recv {
 2 
 3   private final static String QUEUE_NAME = "hello";
 4 
 5   public static void main(String[] argv) throws Exception {
 6     ConnectionFactory factory = new ConnectionFactory();
 7     factory.setHost("localhost");
 8     Connection connection = factory.newConnection();
 9     Channel channel = connection.createChannel();
10 
11     channel.queueDeclare(QUEUE_NAME, false, false, false, null);
12     System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
13 
14   }
15 }

  註意我們也在這裏聲明隊列,因為我們可能在生產者之前啟動消費者,我們想要確保在我們嘗試消費消息的時候隊列就已經存在了。

  這裏我們為什麽不使用try-with-resource表達式自動關閉channl和connection?通過這樣,我們就可以使我們的程序一直保持運行狀態,如果把這些關了,程序也就停止了。這就尷尬了,因為我們需要保持消費者一直處於異步監聽消息過來的狀態。

  RabbitMq會將隊列中的消息異步地推送過來,我們需要提供一個回調函數來緩存消息直到我們需要用到這些消息:

1 DeliverCallback deliverCallback = (consumerTag, delivery) -> {
2     String message = new String(delivery.getBody(), "UTF-8");
3     System.out.println(" [x] Received ‘" + message + "‘");
4 };
5 channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> { });

  Rec.java完整代碼:

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;

public class Recv {

    private final static String QUEUE_NAME = "hello";

    public static void main(String[] argv) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        System.out.println(" [*] Waiting for messages. To exit press CTRL+C");

        DeliverCallback deliverCallback = (consumerTag, delivery) -> {
            String message = new String(delivery.getBody(), "UTF-8");
            System.out.println(" [x] Received ‘" + message + "‘");
        };
        channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> { });
    }
}

3、測試

  在官方手冊中,測試部分他們是將客戶端jar和依賴jar添加到classpath路徑,然後在cmd終端來運行的,我覺得麻煩,因此,我這裏放到IDEA中來運行,效果是一樣的。

  技術分享圖片

  第一步:首先運行Send.java:

  輸出結果:

[x] Sent ‘Hello World!‘

  查看RabbitMq控制臺:

  技術分享圖片

技術分享圖片

  說明消息已經發送成功。

  第二步:啟動消費者Recv.java:

  輸出結果:

[x] Received ‘Hello World!‘

  說明消息已經消費成功了,此時再查看控制臺:

技術分享圖片

技術分享圖片

  消息依然存在在隊列中,但是區別是,在第一張圖中Ready由1變成了0,Unacknowledged由0變成了1;第二張圖中Ready也由1變成0,Unacked由0變成了1。為什麽會這樣?按道理,消息消費了之後就應該刪除掉,否則可能造成重復消費。關於這方面知識,將會在後面的章節中再介紹(Ack機制)。

4、用SpringBoot實現

  上面雖然實現了功能,但在實際工作中,我們更多的可能是使用SpringBoot、SpringCloud等成熟的框架來實現。本小節就通過SpringBoot來實現以上功能。

  工程目錄如下:

技術分享圖片

  Provider和Consumer的配置文件相同,IP請替換成你自己的:

1 #RabbitMq
2 spring.rabbitmq.host=192.168.xx.xx  
3 spring.rabbitmq.username=rabbitmq
4 spring.rabbitmq.password=123456
5 
6 hello_world.queue=hello

  為方便讓系統啟動時就往隊列發送消息,所以寫了一個SenderRunner類:

 1 @Component
 2 public class SenderRunner implements ApplicationRunner {
 3 
 4     @Autowired
 5     private Send send;
 6 
 7     @Override
 8     public void run(ApplicationArguments args) throws Exception {
 9         send.doSender("Hello RabbitMq");
10     }
11 }

  Send.java

 1 @Component
 2 public class Send {
 3 
 4     @Value("${hello_world.queue}")
 5     private String queueName;
 6 
 7     @Autowired
 8     private AmqpTemplate amqpTemplate;
 9 
10     public void doSender(String msg) {
11 
12         amqpTemplate.convertAndSend(queueName,msg);
13         System.out.println("發送消息:" + msg);
14     }
15 }

  啟動類:

1 @SpringBootApplication
2 public class ProviderApplication {
3     public static void main(String[] args) {
4         SpringApplication.run(ProviderApplication.class, args);
5     }
6 }

  Recv.java

@Component
public class Recv {

    @RabbitListener(queues = "${hello_world.queue}")
    public void receive(String msg) {
        System.out.println("接收到消息:" + msg);
    }
}

  啟動Provider:

技術分享圖片

  查看控制臺:

技術分享圖片

  啟動Consumer:

 技術分享圖片

  可見,SpringBoot為我們做了很多封裝,隱藏了很多底層的細節,使用起來簡單多了。

 

  PS:關於SpringBoot的實現涉及到很多的配置,我將在系統的最後專門用一章來講解SpringBoot的實現

RabbitMQ指南之一:"Hello World!"