1. 程式人生 > >RabbitMQ的幾種典型使用場景-java語言開發

RabbitMQ的幾種典型使用場景-java語言開發

RabbitMQ主頁:https://www.rabbitmq.com/

AMQP

AMQP協議是一個高階抽象層訊息通訊協議,RabbitMQ是AMQP協議的實現。它主要包括以下元件:

1.Server(broker): 接受客戶端連線,實現AMQP訊息佇列和路由功能的程序。

2.Virtual Host:其實是一個虛擬概念,類似於許可權控制組,一個Virtual Host裡面可以有若干個Exchange和Queue,但是許可權控制的最小粒度是Virtual Host

3.Exchange:接受生產者傳送的訊息,並根據Binding規則將訊息路由給伺服器中的佇列。ExchangeType決定了Exchange路由訊息的行為,例如,在RabbitMQ中,ExchangeType有direct、Fanout和Topic三種,不同型別的Exchange路由的行為是不一樣的。

4.Message Queue:訊息佇列,用於儲存還未被消費者消費的訊息。

5.Message: 由Header和Body組成,Header是由生產者新增的各種屬性的集合,包括Message是否被持久化、由哪個Message Queue接受、優先順序是多少等。而Body是真正需要傳輸的APP資料。

6.Binding:Binding聯絡了Exchange與Message Queue。Exchange在與多個Message Queue發生Binding後會生成一張路由表,路由表中儲存著Message Queue所需訊息的限制條件即Binding Key。當Exchange收到Message時會解析其Header得到Routing Key,Exchange根據Routing Key與Exchange Type將Message路由到Message Queue。Binding Key由Consumer在Binding Exchange與Message Queue時指定,而Routing Key由Producer傳送Message時指定,兩者的匹配方式由Exchange Type決定。

7.Connection:連線,對於RabbitMQ而言,其實就是一個位於客戶端和Broker之間的TCP連線。

8.Channel:通道,僅僅建立了客戶端到Broker之間的連線後,客戶端還是不能傳送訊息的。需要為每一個Connection建立Channel,AMQP協議規定只有通過Channel才能執行AMQP的命令。一個Connection可以包含多個Channel。之所以需要Channel,是因為TCP連線的建立和釋放都是十分昂貴的,如果一個客戶端每一個執行緒都需要與Broker互動,如果每一個執行緒都建立一個TCP連線,暫且不考慮TCP連線是否浪費,就算作業系統也無法承受每秒建立如此多的TCP連線。RabbitMQ建議

客戶端執行緒之間不要共用Channel,至少要保證共用Channel的執行緒傳送訊息必須是序列的,但是建議儘量共用Connection

9.Command:AMQP的命令,客戶端通過Command完成與AMQP伺服器的互動來實現自身的邏輯。例如在RabbitMQ中,客戶端可以通過publish命令傳送訊息,txSelect開啟一個事務,txCommit提交一個事務。

在瞭解了AMQP模型以後,需要簡單介紹一下AMQP的協議棧,AMQP協議本身包括三層:

1.Module Layer,位於協議最高層,主要定義了一些供客戶端呼叫的命令,客戶端可以利用這些命令實現自己的業務邏輯,例如,客戶端可以通過queue.declare宣告一個佇列,利用consume命令獲取一個佇列中的訊息。

2.Session Layer,主要負責將客戶端的命令傳送給伺服器,在將伺服器端的應答返回給客戶端,主要為客戶端與伺服器之間通訊提供可靠性、同步機制和錯誤處理。

3.Transport Layer,主要傳輸二進位制資料流,提供幀的處理、通道複用、錯誤檢測和資料表示。

RabbitMQ使用場景

學習RabbitMQ的使用場景,來自官方教程:https://www.rabbitmq.com/getstarted.html

場景1:單傳送單接收

使用場景:簡單的傳送與接收,沒有特別的處理。

Producer:

複製程式碼
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Channel;

public class Producer{
    
  private final static String QUEUE_NAME = "hello";

  public static void main(String[] argv) throws Exception {
                
    ConnectionFactory factory = new ConnectionFactory();
    factory.setHost("localhost");
    Connection connection = factory.newConnection();
    Channel channel = connection.createChannel();

    channel.queueDeclare(QUEUE_NAME, false, false, false, null);
    String message = "Hello World!";
    channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
    System.out.println(" [x] Sent '" + message + "'");
    
    channel.close();
    connection.close();
  }
}
複製程式碼

Consumer:

複製程式碼
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.QueueingConsumer;

public class Consumer{
    
    private final static String QUEUE_NAME = "hello";

    public static void main(String[] argv) throws Exception {

    ConnectionFactory factory = new ConnectionFactory();
    factory.setHost("localhost");
    Connection connection = factory.newConnection();
    Channel channel = connection.createChannel();

    channel.queueDeclare(QUEUE_NAME, false, false, false, null);
    System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
    
    QueueingConsumer consumer = new QueueingConsumer(channel);
    channel.basicConsume(QUEUE_NAME, true, consumer);
    
    while (true) {
      QueueingConsumer.Delivery delivery = consumer.nextDelivery();
      String message = new String(delivery.getBody());
      System.out.println(" [x] Received '" + message + "'");
    }
  }
}
複製程式碼

場景2:單傳送多接收

使用場景:一個傳送端,多個接收端,如分散式的任務派發。為了保證訊息傳送的可靠性,不丟失訊息,使訊息持久化了。同時為了防止接收端在處理訊息時down掉,只有在訊息處理完成後才傳送ack訊息。

Producer:

複製程式碼
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.MessageProperties;

public class Producer{
  
  private static final String TASK_QUEUE_NAME = "task_queue";

  public static void main(String[] argv) throws Exception {

    ConnectionFactory factory = new ConnectionFactory();
    factory.setHost("localhost");
    Connection connection = factory.newConnection();
    Channel channel = connection.createChannel();
    
    channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null);
    
    String message = getMessage(argv);
    
    channel.basicPublish( "", TASK_QUEUE_NAME, 
                MessageProperties.PERSISTENT_TEXT_PLAIN,
                message.getBytes());
    System.out.println(" [x] Sent '" + message + "'");
    
    channel.close();
    connection.close();
  }
    
  private static String getMessage(String[] strings){
    if (strings.length < 1)
      return "Hello World!";
    return joinStrings(strings, " ");
  }  
  
  private static String joinStrings(String[] strings, String delimiter) {
    int length = strings.length;
    if (length == 0) return "";
    StringBuilder words = new StringBuilder(strings[0]);
    for (int i = 1; i < length; i++) {
      words.append(delimiter).append(strings[i]);
    }
    return words.toString();
  }
}
複製程式碼

傳送端和場景1不同點:

1、使用“task_queue”聲明瞭另一個Queue,因為RabbitMQ不容許宣告2個相同名稱、配置不同的Queue

2、使"task_queue"的Queue的durable的屬性為true,即使訊息佇列durable

3、使用MessageProperties.PERSISTENT_TEXT_PLAIN使訊息durable

When RabbitMQ quits or crashes it will forget the queues and messages unless you tell it not to. Two things are required to make sure that messages aren't lost: we need to mark both the queue and messages as durable.

Consumer:

複製程式碼
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.QueueingConsumer;
  
public class Consumer{

  private static final String TASK_QUEUE_NAME = "task_queue";

  public static void main(String[] argv) throws Exception {

    ConnectionFactory factory = new ConnectionFactory();
    factory.setHost("localhost");
    Connection connection = factory.newConnection();
    Channel channel = connection.createChannel();
    
    channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null);
    System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
    
    channel.basicQos(1);
    
    QueueingConsumer consumer = new QueueingConsumer(channel);
    channel.basicConsume(TASK_QUEUE_NAME, false, consumer);
    
    while (true) {
      QueueingConsumer.Delivery delivery = consumer.nextDelivery();
      String message = new String(delivery.getBody());
      
      System.out.println(" [x] Received '" + message + "'");
      doWork(message);
      System.out.println(" [x] Done");

      channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
    }         
  }
  
  private static void doWork(String task) throws InterruptedException {
    for (char ch: task.toCharArray()) {
      if (ch == '.') Thread.sleep(1000);
    }
  }
}
複製程式碼

接收端和場景1不同點:

1、使用“task_queue”宣告訊息佇列,並使訊息佇列durable

2、在使用channel.basicConsume接收訊息時使autoAck為false,即不自動會發ack,由channel.basicAck()在訊息處理完成後傳送訊息。

3、使用了channel.basicQos(1)保證在接收端一個訊息沒有處理完時不會接收另一個訊息,即接收端傳送了ack後才會接收下一個訊息。在這種情況下發送端會嘗試把訊息傳送給下一個not busy的接收端。

注意點:

1)It's a common mistake to miss the basicAck. It's an easy error, but the consequences are serious. Messages will be redelivered when your client quits (which may look like random redelivery), but RabbitMQ will eat more and more memory as it won't be able to release any unacked messages.

2)Note on message persistence

Marking messages as persistent doesn't fully guarantee that a message won't be lost. Although it tells RabbitMQ to save the message to disk, there is still a short time window when RabbitMQ has accepted a message and hasn't saved it yet. Also, RabbitMQ doesn't do fsync(2) for every message -- it may be just saved to cache and not really written to the disk. The persistence guarantees aren't strong, but it's more than enough for our simple task queue. If you need a stronger guarantee you can wrap the publishing code in atransaction.

3)Note about queue size

If all the workers are busy, your queue can fill up. You will want to keep an eye on that, and maybe add more workers, or have some other strategy.

場景3:Publish/Subscribe

使用場景:釋出、訂閱模式,傳送端傳送廣播訊息,多個接收端接收。

Producer:

複製程式碼
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Channel;

public class Producer{

  private static final String EXCHANGE_NAME = "logs";

  public static void main(String[] argv) throws Exception {

    ConnectionFactory factory = new ConnectionFactory();
    factory.setHost("localhost");
    Connection connection = factory.newConnection();
    Channel channel = connection.createChannel();

    channel.exchangeDeclare(EXCHANGE_NAME, "fanout");

    String message = getMessage(argv);

    channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes());
    System.out.println(" [x] Sent '" + message + "'");

    channel.close();
    connection.close();
  }
  
  private static String getMessage(String[] strings){
    if (strings.length < 1)
            return "info: Hello World!";
    return joinStrings(strings, " ");
  }
  
  private static String joinStrings(String[] strings, String delimiter) {
    int length = strings.length;
    if (length == 0) return "";
    StringBuilder words = new StringBuilder(strings[0]);
    for (int i = 1; i < length; i++) {
        words.append(delimiter).append(strings[i]);
    }
    return words.toString();
  }
}
複製程式碼

傳送端:

傳送訊息到一個名為“logs”的exchange上,使用“fanout”方式傳送,即廣播訊息,不需要使用queue,傳送端不需要關心誰接收。

Consumer:

複製程式碼
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.QueueingConsumer;

public class Consumer{

  private static final String EXCHANGE_NAME = "logs";

  public static void main(String[] argv) throws Exception {

    ConnectionFactory factory = new ConnectionFactory();
    factory.setHost("localhost");
    Connection connection = factory.newConnection();
    Channel channel = connection.createChannel();

    channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
    String queueName = channel.queueDeclare().getQueue();
    channel.queueBind(queueName, EXCHANGE_NAME, "");
    
    System.out.println(" [*] Waiting for messages. To exit press CTRL+C");

    QueueingConsumer consumer = new QueueingConsumer(channel);
    channel.basicConsume(queueName, true, consumer);

    while (true) {
      QueueingConsumer.Delivery delivery = consumer.nextDelivery();
      String message = new String(delivery.getBody());

      System.out.println(" [x] Received '" + message + "'");   
    }
  }
}
複製程式碼

接收端:

1、宣告名為“logs”的exchange的,方式為"fanout",和傳送端一樣。

2、channel.queueDeclare().getQueue();該語句得到一個隨機名稱的Queue,該queue的型別為non-durable、exclusive、auto-delete的,將該queue繫結到上面的exchange上接收訊息。

3、注意binding queue的時候,channel.queueBind()的第三個引數Routing key為空,即所有的訊息都接收。如果這個值不為空,在exchange type為“fanout”方式下該值被忽略!

場景4:Routing (按路線傳送接收)

使用場景:傳送端按routing key傳送訊息,不同的接收端按不同的routing key接收訊息。

Producer:

複製程式碼
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Channel;

public class Producer{

  private static final String EXCHANGE_NAME = "direct_logs";

  public static void main(String[] argv) throws Exception {

    ConnectionFactory factory = new ConnectionFactory();
    factory.setHost("localhost");
    Connection connection = factory.newConnection();
    Channel channel = connection.createChannel();

    channel.exchangeDeclare(EXCHANGE_NAME, "direct");

    String severity = getSeverity(argv);
    String message = getMessage(argv);

    channel.basicPublish(EXCHANGE_NAME, severity, null, message.getBytes());
    System.out.println(" [x] Sent '" + severity + "':'" + message + "'");

    channel.close();
    connection.close();
  }
  
  private static String getSeverity(String[] strings){
    if (strings.length < 1)
            return "info";
    return strings[0];
  }

  private static String getMessage(String[] strings){ 
    if (strings.length < 2)
            return "Hello World!";
    return joinStrings(strings, " ", 1);
  }
  
  private static String joinStrings(String[] strings, String delimiter, int startIndex) {
    int length = strings.length;
    if (length == 0 ) return "";
    if (length < startIndex ) return "";
    StringBuilder words = new StringBuilder(strings[startIndex]);
    for (int i = startIndex + 1; i < length; i++) {
        words.append(delimiter).append(strings[i]);
    }
    return words.toString();
  }
}
複製程式碼

傳送端和場景3的區別:

1、exchange的type為direct

2、傳送訊息的時候加入了routing key

Consumer:

複製程式碼
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.QueueingConsumer;

public class Consumer{

  private static final String EXCHANGE_NAME = "direct_logs";

  public static void main(String[] argv) throws Exception {

    ConnectionFactory factory = new ConnectionFactory();
    factory.setHost("localhost");
    Connection connection = factory.newConnection();
    Channel channel = connection.createChannel();

    channel.exchangeDeclare(EXCHANGE_NAME, "direct");
    String queueName = channel.queueDeclare().getQueue();
    
    if (argv.length < 1){
      System.err.println("Usage: ReceiveLogsDirect [info] [warning] [error]");
      System.exit(1);
    }
    
    for(String severity : argv){    
      channel.queueBind(queueName, EXCHANGE_NAME, severity);
    }
    
    System.out.println(" [*] Waiting for messages. To exit press CTRL+C");

    QueueingConsumer consumer = new QueueingConsumer(channel);
    channel.basicConsume(queueName, true, consumer);

    while (true) {
      QueueingConsumer.Delivery delivery = consumer.nextDelivery();
      String message = new String(delivery.getBody());
      String routingKey = delivery.getEnvelope().getRoutingKey();

      System.out.println(" [x] Received '" + routingKey + "':'" + message + "'");   
    }
  }
}
複製程式碼

接收端和場景3的區別:

在繫結queue和exchange的時候使用了routing key,即從該exchange上只接收routing key指定的訊息。

場景5:Topics (按topic傳送接收)

使用場景:傳送端不只按固定的routing key傳送訊息,而是按字串“匹配”傳送,接收端同樣如此。

Producer:

複製程式碼
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Channel;

public class Producer{

  private static final String EXCHANGE_NAME = "topic_logs";

  public static void main(String[] argv) {
    Connection connection = null;
    Channel channel = null;
    try {
      ConnectionFactory factory = new ConnectionFactory();
      factory.setHost("localhost");
  
      connection = factory.newConnection();
      channel = connection.createChannel();

      channel.exchangeDeclare(EXCHANGE_NAME, "topic");

      String routingKey = getRouting(argv);
      String message = getMessage(argv);

      channel.basicPublish(EXCHANGE_NAME, routingKey, null, message.getBytes());
      System.out.println(" [x] Sent '" + routingKey + "':'" + message + "'");

    }
    catch  (Exception e) {
      e.printStackTrace();
    }
    finally {
      if (connection != null) {
        try {
          connection.close();
        }
        catch (Exception ignore) {}
      }
    }
  }
  
  private static String getRouting(String[] strings){
    if (strings.length < 1)
            return "anonymous.info";
    return strings[0];
  }

  private static String getMessage(String[] strings){ 
    if (strings.length < 2)
            return "Hello World!";
    return joinStrings(strings