1. 程式人生 > >RabbitMQ客戶端引數效能測試(1)

RabbitMQ客戶端引數效能測試(1)

最近我在公司上線了rabbitmq,替換了原來阿里出的rocketmq(別說我黑阿里的東西,這玩意真的都是坑),我並不想告訴你rabbitmq安裝過程是怎麼樣的,去看官網就知道,戳這裡

看看網上說rabbitmq效率多高多高,但是怎麼測試也只有15000Qps,還是用golang的客戶端來測試訪問的中間沒有任何處理邏輯,悲催的是java原生客戶端只能跑到11000Qps,(好吧,我承認我也黑了java),看似基本上這個問題也不大,但是我相信優化的空間應該還是蠻大的,所以做了一下測試,先上程式碼吧:

package com.enniu.rabbitmq;

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.AMQP.BasicProperties;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicLong;

/**
 * Created by coffee on 15/10/28.
 */
public class Main {

  static final String exchangeName = "testblock";
  static final String routingKey = "testblock";
  static final String queueName = "testblock";

  private static int producterConnection_size = 1; //訊息生產者連線數
  private static int consumerConnection_size = 1; //消費者連線數
  private static final int consumer_size = 1;//每個消費者連線裡面開啟的consumer數量
  private static int qos = 1; //Qos設定
  private static long sleep_time = 0; //模擬每條訊息的處理時間
  private static boolean autoAck = true; //是否預設Ack

  private static Logger logger = LoggerFactory.getLogger(Main.class);
  public static void main(String[] args) throws Exception {
    final AtomicLong count = new AtomicLong(10000000000L);
    ConnectionFactory factory = new ConnectionFactory();
    factory.setUsername("test");
    factory.setPassword("test");
    factory.setVirtualHost("/test");
    factory.setHost("10.0.88.88");
    factory.setPort(5672);
    //啟動監控程式
    Thread t = new Thread(new Runnable() {
      @Override
      public void run() {
        long c = count.get();
        while (c != 0){
          try{
            Thread.sleep(1000);
            long c1 = count.get();
            logger.debug("每秒消費為:{}Qps",c-c1);
            c=c1;
          }catch (Exception e){
          }
        }
      }
    });
    t.start();
    //啟動
    for (int i=0;i<producterConnection_size;i++){
      Connection conn1 = factory.newConnection();
      Thread t1 = producter(conn1, count.get());
      t1.start();
    }
    //啟動consumer
    for (int i=0;i<consumerConnection_size;i++){
      Connection conn1 = factory.newConnection();
      Thread t2 = consumer(conn1, count);
      t2.start();
    }
  }

  public static Thread consumer(final Connection conn, final AtomicLong count) throws Exception {
    return new Thread(new Runnable() {
      @Override
      public void run() {
        logger.debug("start consumer");
        try {
          final CountDownLatch cdl = new CountDownLatch(1000);
          for(int i = 0;i<consumer_size;i++) {
          final Channel channel = conn.createChannel();
          channel.basicQos(0, qos, false);
          Consumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
              if (count.decrementAndGet() == 0) {
                channel.basicCancel(consumerTag);
                cdl.countDown();
                try {
                  channel.close();
                } catch (TimeoutException e) {
                  e.printStackTrace();
                }
              }
              try {
                Thread.sleep(sleep_time);
              } catch (InterruptedException e) {
              }
              if (!autoAck){
                getChannel().basicAck(envelope.getDeliveryTag(), true);
              }
            }
          };
            String consumerTag = channel.basicConsume(queueName,autoAck, "testConsumer" + i, consumer);
            logger.debug("consumerTag is {}", consumerTag);
          }
          cdl.await();
        } catch (Exception e) {
        }
      }
    });
  }


  public static Thread producter(final Connection conn, final long count) throws Exception {
    return new Thread(new Runnable() {
      @Override
      public void run() {
        logger.debug("start send Message");
        try {
          Channel channel = conn.createChannel();
          channel.exchangeDeclare(exchangeName, "direct", true);
          channel.queueDeclare(queueName, true, false, false, null);
          channel.queueBind(queueName, exchangeName, routingKey);
          BasicProperties properties = new BasicProperties.Builder().deliveryMode(2).build();
          for (long i = 0; i < count; i++) {
            byte[] messageBodyBytes = ("{\"merchantsId\":13}").getBytes();
            channel.basicPublish(exchangeName, routingKey, properties, messageBodyBytes);
//            logger.debug("add message {}",i);
          }
          channel.close();
          conn.close();
        } catch (Exception e) {
          e.printStackTrace();
        }
      }
    });
  }
}

說說我的測試環境,兩臺vm安裝rabbitmq做叢集,配置是CPU:2core,mem:2GB,disk:187G sata
客戶端是我的macpro,雷電口轉千兆網絡卡接入測試網路.但是據說中間有一個交換機是百兆的(哭死),所以只能按照百兆連線來計算.

測試一:

測試內容:

啟動一個生產者,無消費者

配置:

  private static int producterConnection_size = 1; //訊息生產者連線數
  private static int consumerConnection_size = 0; //消費者連線數
  private static final int consumer_size = 1;//每個消費者連線裡面開啟的consumer數量
  private static int qos = 1; //Qos設定
  private static long sleep_time = 0; //模擬每條訊息的處理時間
  private static boolean autoAck = true; //是否預設Ack

結果:

自己看圖,基本上不用我解釋

測試二:

測試內容:

啟動三個生產者,無消費者

配置:

  private static int producterConnection_size = 3; //訊息生產者連線數
  private static int consumerConnection_size = 0; //消費者連線數
  private static final int consumer_size = 1;//每個消費者連線裡面開啟的consumer數量
  private static int qos = 1; //Qos設定
  private static long sleep_time = 0; //模擬每條訊息的處理時間
  private static boolean autoAck = true; //是否預設Ack

結果:

另外我還發現mq已經啟動了流控,看來他已經認為我傳送過快.

由此可見生產者的個數對整體的訊息publish沒有太大的影響,至少在單機的情況下是這樣,而且publish的時候會一直抖動,不會一直保持在一個範圍內,應該是流控機制引起的,導致它無法超越15k/s的速率,具體怎麼優化流控還請各位大牛指教,我暫時還沒有什麼頭緒.

測試三:

測試內容:

啟動無生產者,一個消費者,Qos為0.預設Ack,接收到訊息馬上返回,不休眠

配置:

  private static int producterConnection_size = 0; //訊息生產者連線數
  private static int consumerConnection_size = 1; //消費者連線數
  private static final int consumer_size = 1;//每個消費者連線裡面開啟的consumer數量
  private static int qos = 0; //Qos設定
  private static long sleep_time = 0; //模擬每條訊息的處理時間
  private static boolean autoAck = true; //是否預設Ack

結果:

測試四:

測試內容:

啟動無產者,一個消費者,Qos為1.預設Ack,接收到訊息馬上返回,不休眠

配置:

  private static int producterConnection_size = 0; //訊息生產者連線數
  private static int consumerConnection_size = 1; //消費者連線數
  private static final int consumer_size = 1;//每個消費者連線裡面開啟的consumer數量
  private static int qos = 1; //Qos設定
  private static long sleep_time = 0; //模擬每條訊息的處理時間
  private static boolean autoAck = true; //是否預設Ack

結果:

貌似這裡沒有什麼差別,消費能力都是在10k/s左右,看來Qos設定為1還是不設定並不影響消費能力,可以吧Qos加大看看效果.

測試五:

測試內容:

啟動無產者,一個消費者,Qos為10.預設Ack,接收到訊息馬上返回,不模擬業務處理時間

配置:

  private static int producterConnection_size = 0; //訊息生產者連線數
  private static int consumerConnection_size = 1; //消費者連線數
  private static final int consumer_size = 1;//每個消費者連線裡面開啟的consumer數量
  private static int qos = 10; //Qos設定
  private static long sleep_time = 0; //模擬每條訊息的處理時間
  private static boolean autoAck = true; //是否預設Ack

結果:

這裡Qos的增加並沒有對消費效率產生影響,其實這是說得通的,Qos本質是控制consumer處理訊息的快取大小,

==但是如果在網路比較差得情況下Qos=1和Qos=10對消費會有很大的差異==.
例如,訊息從mq傳遞到consumer需要50ms,處理只需要5ms的時候,如果Qos=1,那麼就必須等到這條訊息消費完了再分配下一條訊息,這樣一條訊息處理的整體時間是50*2+5=105ms,但是如果這時Qos=10的話,相當於當一條訊息到達之後不用等訊息處理完,可以就再分配下一條訊息,這樣基本上保證時時刻刻都有訊息都消費,不需要等待網路傳輸的時間當快取訊息達到10條的時候正好可以吧傳輸的50ms衝抵掉.

今天最最後一個測試,就是publish對consumer的影響到底多大.

測試六:

測試內容:

啟動一個產者,一個消費者,Qos為10.預設Ack,接收到訊息馬上返回,不模擬業務處理時間

配置:

  private static int producterConnection_size = 1; //訊息生產者連線數
  private static int consumerConnection_size = 1; //消費者連線數
  private static final int consumer_size = 1;//每個消費者連線裡面開啟的consumer數量
  private static int qos = 10; //Qos設定
  private static long sleep_time = 0; //模擬每條訊息的處理時間
  private static boolean autoAck = true; //是否預設Ack

結果:

當然已經觸發了流控的,如下圖:

流控機制可以自行google,這裡不做描述

看來consumer與publish同時工作的話還是有影響的,這種影響到底有多大,因素有哪些,就這一個測試當然不能說明,由於今天時間比較晚了,明天繼續.