1. 程式人生 > >RocketMQ叢集壓測實戰一:擴充套件JMeter

RocketMQ叢集壓測實戰一:擴充套件JMeter

測試工具:最新版JMeter 3.1

RocketMQ:1m單點或2m-2s-async叢集      

   版本apache-rocketmq 4.0.0-incubating(參考 點選開啟連結

JDK:Java7+

純粹生產者TPS壓測

public class Producter extends AbstractJavaSamplerClient{

    public SampleResult runTest(JavaSamplerContext javaSamplerContext) {

        SampleResult sr = new SampleResult();
        sr.setSampleLabel("RocketMQ測試");
        try {
            sr.sampleStart();
            DefaultMQProducer producer = new DefaultMQProducer(MixUtil.createGroupName("ProducerGroupName_"));
            producer.setNamesrvAddr(MixUtil.NAME_SRV_ADDR);
            producer.setInstanceName("Producer");
            producer.setVipChannelEnabled(false);

            producer.start();// once
            int times = 1000;
            for(int i=0;i<times;++i){
                try {
                    {
                        Message msg = new Message("WangXiaoRuiTopic", "TagA", "WangXiaoRui", ("王小瑞,花名誓嘉,阿里巴巴中介軟體訊息團隊負責人,具有豐富的高可用,高可靠分散式系統構建經驗,主導了阿里巴巴多次雙十一訊息引擎的改進優化專案,擁有多項分散式領域的專利。Apache RocketMQ聯合創始人").getBytes());
                        SendResult sendResult = producer.send(msg);
                        if(sendResult ==null || sendResult.getSendStatus() != SendStatus.SEND_OK){
                            System.err.println(sendResult);
                        }
                    }

                    {
                        Message msg = new Message("FengJiaTopic", "TagB", "FengJia", ("馮嘉,花名鼬神,阿里巴巴中介軟體架構師,具有豐富的分散式軟體架構、高併發網站設計、效能調優經驗,擁有多項分散式領域的專利。開源愛好者,專注分散式、大資料領域,關注Hbase/Hadoop/Spark/Flink等大資料技術棧。目前負責阿里訊息中介軟體生態輸出、雲上商業化,Apache RocketMQ聯合創始人。聯絡方式: 
[email protected]
").getBytes()); SendResult sendResult = producer.send(msg); if(sendResult ==null || sendResult.getSendStatus() != SendStatus.SEND_OK){ System.err.println(sendResult); } } { Message msg = new Message("RocketMQHostTopic", "TagC", "RocketMQHist", ("第三代,以拉模式為主,兼有推模式的高效能、低延遲訊息引擎RocketMQ,在二代功能特性的基礎上,為電商金融領域添加了可靠重試、基於檔案儲存的分散式事務等特性,並做了大量優化。從2012年開始,經歷了歷次雙11核心交易鏈路檢驗。目前已經捐贈給Apache基金會。時至今日,RocketMQ很好的服務了阿里集團大大小小上千個應用,在雙11當天,更有不可思議的萬億級訊息流轉,為集團大中臺的穩定發揮了舉足輕重的作用").getBytes()); SendResult sendResult = producer.send(msg); if(sendResult ==null || sendResult.getSendStatus() != SendStatus.SEND_OK){ System.err.println(sendResult); } } }catch (Exception e){ e.printStackTrace();; } } producer.shutdown(); sr.setResponseData("success","utf-8"); sr.setDataType(SampleResult.TEXT); sr.setSuccessful(true); }catch(Exception e){ sr.setSuccessful(false); sr.setResponseData(MixUtil.getStackTrace(e),"utf-8"); e.printStackTrace(); } finally { sr.sampleEnd(); } return sr; } ...

沒有純粹的消費者TPS壓測例子。。生產和消費混合TPS壓測

package cn.treebear.rocketmq.tester;

import org.apache.jmeter.protocol.java.sampler.AbstractJavaSamplerClient;
import org.apache.jmeter.protocol.java.sampler.JavaSamplerContext;
import org.apache.jmeter.samplers.SampleResult;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;

import java.util.List;
import java.util.concurrent.TimeUnit;

/**
 * Created by zhujl on 2017/4/13.
 */
public class Consumer extends AbstractJavaSamplerClient {

    public SampleResult runTest(JavaSamplerContext javaSamplerContext) {

        SampleResult sr = new SampleResult();
        sr.setSampleLabel("RocketMQ消費者測試");
        try {
            sr.sampleStart();
            DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(MixUtil.createGroupName("ConsumerGroupname_"));
            consumer.setNamesrvAddr(MixUtil.NAME_SRV_ADDR);
            consumer.setInstanceName("Consumer");

            consumer.subscribe("WangXiaoRuiTopic", "TagA || TagC || TagD");

            consumer.subscribe("FengJiaTopic", "*");

            consumer.registerMessageListener(new OutMessageListener());


            consumer.start();
            System.out.println("Consumer Started.");

            TimeUnit.MILLISECONDS.sleep(10 * 1000);

            consumer.shutdown();

            sr.setResponseData("success","utf-8");
            sr.setDataType(SampleResult.TEXT);
            sr.setSuccessful(true);
        }catch (Exception e){
            sr.setSuccessful(false);
            sr.setResponseData(MixUtil.getStackTrace(e),"utf-8");
            e.printStackTrace();
        }finally {
            sr.sampleEnd();
        }

        return sr;
    }
}

package cn.treebear.rocketmq.tester;

import java.io.PrintWriter;
import java.io.StringWriter;
import java.io.Writer;
import java.lang.management.ManagementFactory;
import java.lang.management.RuntimeMXBean;
import java.net.InetAddress;
import java.net.NetworkInterface;
import java.util.Enumeration;

/**
 * Created by zhujl on 2017/4/13.
 */
public class MixUtil {

    public final static String NAME_SRV_ADDR = "namesrv1:9876;namesrv2:9876";
    public static String getStackTrace(Throwable aThrowable) {
        final Writer result = new StringWriter();
        final PrintWriter printWriter = new PrintWriter(result);
        aThrowable.printStackTrace(printWriter);
        return result.toString();
    }

    public  static String getIpAddr(){
        String ip="";
        try {
            for (Enumeration<NetworkInterface> en = NetworkInterface.getNetworkInterfaces(); en.hasMoreElements(); ) {
                NetworkInterface intf = en.nextElement();
                String name = intf.getName();
                if (!name.contains("docker") && !name.contains("lo")) {
                    for (Enumeration<InetAddress> enumIpAddr = intf.getInetAddresses(); enumIpAddr.hasMoreElements(); ) {
                        //獲得IP
                        InetAddress inetAddress = enumIpAddr.nextElement();
                        if (!inetAddress.isLoopbackAddress()) {
                            String ipaddress = inetAddress.getHostAddress().toString();
                            if (!ipaddress.contains("::") && !ipaddress.contains("0:0:") && !ipaddress.contains("fe80")) {

                                System.out.println(ipaddress);
                                if (!"127.0.0.1".equals(ip)) {
                                    ip = ipaddress;
                                }
                            }
                        }
                    }
                }
            }
        }catch (Exception e){
            e.printStackTrace();
        }
        return ip;
    }

    public static final int getProcessID() {
        RuntimeMXBean runtimeMXBean = ManagementFactory.getRuntimeMXBean();
        System.out.println(runtimeMXBean.getName());
        return Integer.valueOf(runtimeMXBean.getName().split("@")[0])
                .intValue();
    }

    public static String createGroupName(String groupPrefix){
        return groupPrefix +"_" + getIpAddr().replaceAll("\\.","_") + "_"+getProcessID()+"_"+Thread.currentThread().getId();
    }


    public static void main(String[] args){
        System.out.println(MixUtil.createGroupName("ProducerGroupName_"));
    }
}

package cn.treebear.rocketmq.tester;

import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;

import java.util.List;

/**
 * Created by zhujl on 2017/4/17.
 *
 */
public class OutMessageListener implements MessageListenerConcurrently {
    /**
     * 預設msgs裡只有一條訊息,可以通過設定consumeMessageBatchMaxSize引數來批量接收訊息
     */
    @Override
    public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
        System.out.println(Thread.currentThread().getName() + " Receive new Message:" + msgs.size());
        MessageExt msg = msgs.get(0);
        if (msg.getTopic().equals("WangXiaoRuiTopic")) {
            if (msg.getTags() != null && msg.getTags().equals("TagA")) {
                System.out.println("do with TagA,msg:" + msg);
            } else if (msg.getTags() != null && msg.getTags().equals("TagC")) {
                System.out.println("do with TagC,msg:" + msg);
            } else if (msg.getTags() != null && msg.getTags().equals("TagC")) {
                System.out.println("do with TagC,msg:" + msg);
            }
        } else if (msg.getTopic().equals("TopicTest2")) {
            System.out.println(new String(msg.getBody()));
        }
        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
    }
}

將原始碼打為jar包與其它依賴的包一起拷貝到jmeter擴充套件目錄


新建執行計劃,新增java請求節點,選擇生產者或者消費者


相關推薦

RocketMQ叢集實戰:擴充套件JMeter

測試工具:最新版JMeter 3.1 RocketMQ:1m單點或2m-2s-async叢集          版本apache-rocketmq 4.0.0-incubating(參考 點選開啟連結) JDK:Java7+ 純粹生產者TPS壓測 public class

實戰的覆盤

## 前言 ​ 由於筆者在電商公司,算二三線的大廠了吧,最近跟京東拼的火熱。因為818大促在即,本人所負責的專案,在大促期間壓力會比較大,有必要對系統主要介面做一次壓測。下面覆盤了,我這次壓測從發現問題分析問題總結的全過程,希望能對你有所啟發。 ## 問題 ​ 壓測時發現系統的瓶頸在於cpu,那

有贊全鏈路實戰

作者: 張馳 | 15 Dec 2018 | 23 min (6253 words) 有贊全鏈路壓測實戰     前言 有贊致力於成為商家服務領域裡最被信任的引領者,因為被信任,所有我們更需要為商家保駕護航,保障系統的穩定性。有贊從去年開始通過全鏈路壓測

億級流量系列之Jmeter4.x分散式實戰學習資料

推薦課程學習地址  https://edu.csdn.net/course/play/7587/155054 億級流量系列之Jmeter4.x分散式壓測實戰  點選馬上進行學習 第一章節一壓力測試課程介紹 1、2018年億級流量壓測系列之Jmeter4.0課程介紹和

JMeter介面簡單使用(

一:JMeter壓測介面,測試介面的併發訪問是非常方便的.(Windows下測試一下)     1. JMeter官網:http://jmeter.apache.org/     2. JMeter的版本問題:使用Java開發的,安裝好JMeter

效能工具之JMeterWebSocket介面(

文章目錄 概述 什麼是WebSocket? WebSocket是如何工作的? JAVA WebSocket Springboot服務端實現 新建SpringBoot工程 配置WebSocket

RocketMQ實戰()之叢集環境的搭建

一:什麼是RocketMQ? RocketMQ作為一款分散式的訊息中介軟體(阿里的說法是不遵循任何規範的,所以不能完全用JMS的那一套東西來看它),經歷了Metaq1.x、Metaq2.x的發展和淘寶雙十一的洗禮,在功能和效能上遠超ActiveMQ。 二:為什麼要用RocketMQ? 分

rocketmq jmeter

最近要用阿里的rocketmq框架,這幾天用jmeter做了一個測試,在測試的過程中遇到了一些坑,在此總結一下,方便以後查閱,同時也希望能夠幫助到其他即將入坑的同學。 測試要求:寫一個請求接收api請求的引數作為訊息,非同步傳送到rocketmq中,併發測試框架的tps 準備工作:apa

實戰JmeterDubbo服務介面 | 併發程式設計網

一、前言 最近在做一些業務上雲的專案,其中遠端Rpc呼叫方式我們選擇了Dubbo,為便於收集壓測資訊,我們選擇了使用Jmeter來做壓測工具,本文就來簡單介紹如何使用Jmeter壓測Dubbo服務介面,以及需要注意的事情。 二、Jmeter使用 2.1 下載Jmeter imag

RabbitMQ叢集的架構搭建全過程及JMeter軟體安裝使用

什麼是RabbitMQ?MQ全稱為Message Queue, 訊息佇列(MQ)是一種應用程式對應用程式的通訊方法。應用程式通過讀寫出入佇列的訊息(針對應用程式的資料)來通訊,而無需專用連線來連結它們。訊息傳遞指的是程式之間通過在訊息中傳送資料進行通訊,而不是通過直接呼叫彼此

jmeter的坑

最近做了一個安全傳輸模組,因為怕對效能有較大影響,因此測試安排了針對性的壓測壓測的過程出現了一點小問題發現失敗率特別高,測試懷疑是服務端出了錯,但是我檢視日誌發現沒有報錯。後面我觀察TCP連結數排除了服

實戰jmeter入門介面效能

什麼是Jmeter? 是Apache組織開發的基於Java的壓力測試工具。   準備工作: 一、安裝配置好環境及壓測工具 Jmeter下載地址:http://mirrors.tuna.tsinghua.edu.cn/apache//jmeter/binaries/apache-jmeter-5.1.

文揭祕測試平臺中是如何將測試用例鍵轉化Jmeter指令碼

                   ​    ​接上篇,一鍵轉化將介面測試平臺測試用例轉化成Jmeter壓測指令碼思路,這裡我首先在java 上面做

Redis秒殺實戰-微信搶紅包-秒殺庫存,附案例原始碼(Jmeter

導讀   前二天我寫了一篇,Redis高階專案實戰(點我直達),SpringBoot整合Redis附原始碼(點我直達),今天我們來做一下Redis秒殺系統的設計。當然啦,Redis基礎知識還不過關的,先去加強下自身內功,然後在回來看這篇,Redis基礎知識(點我直達)。為啥寫這個微信搶紅包專案呢,公司0202

JMeter擴充套件Java請求實現WebRTC本地音視訊推流指令碼

WebRTC是Web Real-Time Communication縮寫,指網頁即時通訊,是一個支援Web瀏覽器進行實時語音或視訊對話的API,實現了基於網頁的視訊會議,比如聲網的Agora Web SDK就是基於WebRTC實現音視訊通訊的。與HTTP不同,WebRTC應用的主要壓力是碼流,JMeter沒有

接口工具--jmeter

cto linux下 一點 不一致 文件讀取 coo 並發 bin 文檔 jmeter     jmeter是apache公司基於java開發的一款開源壓力測試工具,是一個較輕量的測試工具。運行需要安裝jdk環境,jmeter為免安裝軟件,    解壓後直接運行j

【原】shell編寫一個簡單的jmeter自動化腳本

image tac vbo 用戶數 osx dot png das uvc 在公司做壓力測試也挺長時間了,每次測試前環境數據準備都需要話費較長時間,所以一直在考慮能不能將整個過程實現自動化進行,於是就抽空寫了一個自動化腳本,當然這個腳本目前功能十分簡陋,代碼也不完善,很有很

雙十臨近,怎樣讓買家流暢地秒殺? ——騰訊WeTest獨家開放電商產品服務

img 高峰 大促 做出 開始 認證 class display 購物車 WeTest 導讀 十一月臨近,一年一度的電商大戲“雙十一”又將隆重出場,目前各大商家已經開始各類優惠券的發放,各類大促的商品表單也已經提前流出,即將流入各個用戶的購物車中。 作為

RocketMQ性能分析(轉載)

2.3 rocket 點擊 loading 很好 分配 enabled 細節 毫秒 一 機器部署 1.1 機器組成 1臺nameserver 1臺broker 異步刷盤 2臺producer 2臺consumer 1.2 硬件配置 CPU 兩顆x86_64

命令運行Jmeter腳本

body inux bat 自動 linux 基本 問題 blog 結束時間 今天在針對單一接口壓測時出現了從未遇到的問題,設好並發量後用調度器控制腳本的開始和結束,但在腳本應該自動結束時間,腳本卻停不下來,手動stop報告就會有error率,卡了我很久很久不能解決,網絡上