1. 程式人生 > >RabbitMQ 實戰教程(一)

RabbitMQ 實戰教程(一)

MQ是消費-生產者模型的一個典型的代表,一端往訊息佇列中不斷寫入訊息,而另一端則可以讀取或者訂閱佇列中的訊息。RabbitMQ是資訊傳輸的中間者。本質上,他從生產者接收訊息,轉發這些訊息給消費者。換句話說,RabbitMQ能夠按根據你指定的規則進行訊息轉發、緩衝、和持久化。

在專案中,將一些無需即時返回且耗時的操作提取出來,進行了非同步處理,而這種非同步處理的方式大大的節省了伺服器的請求響應時間,從而提高了系統的吞吐量。

常見術語

producer(生產者),consumer(消費者),broker(RabbitMQ服務)並不需要部署在同一臺機器上,實際上在大多數實際的應用中,也不會部署在同一臺機器上。

生產者(Producer)

一個傳送訊息的程式是一個producer(生產者)。
一般用下圖表示生產者:

佇列(Queue)

佇列類似郵箱。依存於RabbitMQ內部。雖然訊息通過RabbitMQ在你的應用中傳遞,但是它們只能儲存在佇列中。佇列不受任何限制,可以儲存任何數量的訊息—本質上是一個無限制的快取。不同的生產者可以通過同一個佇列傳送訊息,此外,不同的消費者也可以從同一個佇列上接收訊息。
一般用下圖表示佇列:

消費者(Consumer)

消費者屬於等待接收訊息的程式。
一般使用下圖表示消費者:

案例實戰

準備工作

使用Maven進行管理

  1. <dependency>
  2. <groupId>
    com.rabbitmq</groupId>
  3. <artifactId>amqp-client</artifactId>
  4. <version>3.6.3</version>
  5. </dependency>

傳送端

傳送端,連線到RabbitMQ(此時服務需要啟動),傳送一條資料,然後退出。

  1. package com.lianggzone.rabbitmq.demo.helloworld;
  2. import java.io.IOException;
  3. import java.util.concurrent.TimeoutException;
  4. import com
    .rabbitmq.client.Channel;
  5. import com.rabbitmq.client.Connection;
  6. import com.rabbitmq.client.ConnectionFactory;
  7. publicclassSend{
  8. privatefinalstaticString QUEUE_NAME ="hello";
  9. publicstaticvoid main(String[] args)throwsIOException,TimeoutException{
  10. // 建立連線
  11. ConnectionFactory factory =newConnectionFactory();
  12. // 設定MabbitMQ, 主機ip或者主機名
  13. factory.setHost("localhost");
  14. // 建立一個連線
  15. Connection connection = factory.newConnection();
  16. // 建立一個通道
  17. Channel channel = connection.createChannel();
  18. // 指定一個佇列
  19. channel.queueDeclare(QUEUE_NAME,false,false,false,null);
  20. // 傳送訊息
  21. String message ="Hello World!";
  22. channel.basicPublish("", QUEUE_NAME,null, message.getBytes());
  23. System.out.println(" [x] Sent '"+ message +"'");
  24. // 關閉頻道和連線
  25. channel.close();
  26. connection.close();
  27. }
  28. }

接受端

接受端,不斷等待伺服器推送訊息,然後在控制檯輸出。

  1. package com.lianggzone.rabbitmq.demo.helloworld;
  2. import java.io.IOException;
  3. import java.util.concurrent.TimeoutException;
  4. import com.rabbitmq.client.AMQP;
  5. import com.rabbitmq.client.Channel;
  6. import com.rabbitmq.client.Connection;
  7. import com.rabbitmq.client.ConnectionFactory;
  8. import com.rabbitmq.client.Consumer;
  9. import com.rabbitmq.client.DefaultConsumer;
  10. import com.rabbitmq.client.Envelope;
  11. publicclassRecv{
  12. privatefinalstaticString QUEUE_NAME ="hello";
  13. publicstaticvoid main(String[] args)throwsIOException,TimeoutException{
  14. // 建立連線
  15. ConnectionFactory factory =newConnectionFactory();
  16. // 設定MabbitMQ, 主機ip或者主機名
  17. factory.setHost("localhost");
  18. // 建立一個連線
  19. Connection connection = factory.newConnection();
  20. // 建立一個通道
  21. Channel channel = connection.createChannel();
  22. // 指定一個佇列
  23. channel.queueDeclare(QUEUE_NAME,false,false,false,null);
  24. System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
  25. // 建立佇列消費者
  26. Consumer consumer =newDefaultConsumer(channel){
  27. @Override
  28. publicvoid handleDelivery(String consumerTag,Envelope envelope, AMQP.BasicProperties properties,
  29. byte[] body)throwsIOException{
  30. String message =newString(body,"UTF-8");
  31. System.out.println(" [x] Received '"+ message +"'");
  32. }
  33. };
  34. channel.basicConsume(QUEUE_NAME,true, consumer);
  35. }
  36. }