1. 程式人生 > >spark+kafka+Elasticsearch單機環境的部署和效能測試

spark+kafka+Elasticsearch單機環境的部署和效能測試

版本選型

spark 1.5.2 + kafka 0.9.0.1 + Elasticsearch 2.2.1

安裝部署

1. 安裝指令碼及檔案 密碼 4m7l

2. 指令碼使用

  • vi /etc/hosts
    新增 127.0.0.1 hostname
  • cd npminstall

    install.sh

#!/bin/sh

DIRNAME=`dirname "$0"`
localhome=`cd "$DIRNAME"; pwd`
upperhome=`cd "$DIRNAME/.."; pwd`

export username=root
export installpath=/data/mdware

export
hadoopFileName=hadoop-2.4.1 export hadoopPackageName=hadoop-2.4.1.tar.gz export sparkFileName=spark-1.5.2-bin-hadoop2.4 export sparkPackageName=spark-1.5.2-bin-hadoop2.4.tgz export kafkaFileName=kafka_2.10-0.9.0.1 export kafkaPackageName=kafka_2.10-0.9.0.1.tgz export elasticFileName=elasticsearch-2.2.1
export elasticPackageName=elasticsearch-2.2.1.tar.gz export kibanaFileName=kibana-4.4.2-linux-x64 export kibanaPackageName=kibana-4.4.2-linux-x64.tar.gz export installServices="hadoop spark kafka elastic kibana" mkdir -p $installpath for com in $installServices ; do if [ $com"" == "hadoop"
] ; then cp $localhome/files/$hadoopPackageName $installpath cd $installpath && tar -zxf $hadoopPackageName \cp -r $localhome/conf/hadoop/* $installpath/$hadoopFileName/etc/hadoop/ sh $installpath/$hadoopFileName/bin/hdfs namenode -format rm -rf $installpath/$hadoopPackageName ln -s $installpath/$hadoopFileName/ $installpath/hadoop fi if [ $com"" == "spark" ] ; then cp $localhome/files/$sparkPackageName $installpath cd $installpath && tar -zxf $sparkPackageName \cp -r $localhome/conf/spark-env.sh $installpath/$sparkFileName/conf/ rm -rf $installpath/$sparkPackageName ln -s $installpath/$sparkFileName/ $installpath/spark fi if [ $com"" == "kafka" ] ; then cp $localhome/files/$kafkaPackageName $installpath cd $installpath && tar -zxf $kafkaPackageName \cp $localhome/conf/server.properties $installpath/$kafkaFileName/config/ rm -rf $installpath/$kafkaPackageName ln -s $installpath/$kafkaFileName/ $installpath/kafka fi if [ $com"" == "elastic" ] ; then cp $localhome/files/$elasticPackageName $installpath cd $installpath && tar -zxf $elasticPackageName \cp $localhome/conf/elasticsearch.yml $installpath/$elasticFileName/config/ rm -rf $installpath/$elasticPackageName ln -s $installpath/$elasticFileName/ $installpath/es $installpath/es/bin/plugin install mobz/elasticsearch-head/2.2.1 $installpath/es/bin/plugin install lmenezes/elasticsearch-kopf/2.2.1 fi if [ $com"" == "kibana" ] ; then cp $localhome/files/$kibanaPackageName $installpath cd $installpath && tar -zxf $kibanaPackageName rm -rf $installpath/$kibanaPackageName ln -s $installpath/$kibanaFileName/ $installpath/kibana fi done chmod +x $localhome/manage.sh cp $localhome/manage.sh /etc/init.d/npm chkconfig npm on
chmod +x install.sh
./install.sh

3. 啟動程序

service npm start

npm服務

#!/bin/bash
# chkconfig: 2345 20 81
# description: start and stop npm service
# processname: npm

. /etc/rc.d/init.d/functions
prog="npm"

DIRNAME=`dirname "$0"`
localhome=`cd "$DIRNAME"; pwd`

menSize=`free -g | awk 'NR==2{print $2}'`
men_size=`expr ${menSize} + 1`
heap_size=`expr ${men_size} / 4`

export installpath=/data/mdware

start(){
                ulimit -n 655360
        sh $installpath/hadoop/sbin/hadoop-daemon.sh start namenode
        sh $installpath/hadoop/sbin/hadoop-daemon.sh start datanode
        $installpath/hadoop/bin/hdfs dfsadmin -safemode leave
        sh $installpath/spark/sbin/start-master.sh
        sh $installpath/spark/sbin/start-slave.sh spark://localhost:7077
        nohup $installpath/kafka/bin/zookeeper-server-start.sh $installpath/kafka/config/zookeeper.properties >> $installpath/kafka/zookeeper.log &
        sleep 60
        nohup $installpath/kafka/bin/kafka-server-start.sh $installpath/kafka/config/server.properties >> $installpath/kafka/kafka.log &
        export ES_HEAP_SIZE=${heap_size}g    
              $installpath/es/bin/elasticsearch -Des.insecure.allow.root=true -d
}

stop(){
        sh $installpath/hadoop/sbin/hadoop-daemon.sh stop namenode
        sh $installpath/hadoop/sbin/hadoop-daemon.sh stop datanode

        sh $installpath/spark/sbin/stop-master.sh
        sh $installpath/spark/sbin/stop-slave.sh

        zookeeper_id=`ps -ef | grep -i zookeeper.properties | grep -v grep | awk '{print $2}'`

        if [[ -z $zookeeper_id ]];then
                echo "The task is not running ! "
        else
                kill ${zookeeper_id}
        fi

        kafka_id=`ps -ef | grep -i server.properties | grep -v grep | awk '{print $2}'`
        if [[ -z $kafka_id ]];then
                echo "The task is not running ! "
        else
                kill ${kafka_id}
        fi

        es_id=`ps -ef|grep -i elasticsearch  | grep -v "grep"|awk '{print $2}'`
        if [[ -z $es_id ]];then
                echo "The task is not running ! "
        else
                kill ${es_id}
        fi


        sleep 20

        if [[ -z $zookeeper_id ]];then
                echo "The task is not running ! "
        else
                kill -9 ${zookeeper_id}
        fi

        kafka_id=`ps -ef | grep -i server.properties | grep -v grep | awk '{print $2}'`
        if [[ -z $kafka_id ]];then
                echo "The task is not running ! "
        else
                kill -9 ${kafka_id}
        fi

        es_id=`ps -ef|grep -i elasticsearch  | grep -v "grep"|awk '{print $2}'`
        if [[ -z $es_id ]];then
                echo "The task is not running ! "
        else
                kill -9 ${es_id}
        fi

}

case "$1" in
        start)
                start
    ;;
  stop)
        stop
        ;;
  *)
    echo $"Usage: $0 {start|stop}"
    exit 2
esac
exit $?
注:程序已設為開機自啟動

測試程式碼

public class KafkaDataProducer implements Runnable{
    private static Logger log = Logger.getLogger(KafkaDataProducer.class);

    private static Producer<String, String> producer;

    private String topic;

    private String path;

    public KafkaDataProducer() {
        Properties props = new Properties();
        props.put("bootstrap.servers", Config.getConfig("database.cnf").getProperty("bootstrap.server"));
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        producer = new KafkaProducer<>(props);
    }

    public KafkaDataProducer(String topic, String path) {
        this.path = path;
        this.topic = topic;

        Properties props = new Properties();
        props.put("bootstrap.servers", Config.getConfig("database.cnf").getProperty("bootstrap.server"));
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        producer = new KafkaProducer<>(props);
    }

    public static void main(String[] args) throws Exception {
        KafkaDataProducer kafkaDataProducer1 = new KafkaDataProducer("test","datafile");
        new Thread(kafkaDataProducer1).start();

//        KafkaDataProducer kafkaDataProducer2 = new KafkaDataProducer("tcptest","tcp.file");
//        new Thread(kafkaDataProducer2).start();
//
//        KafkaDataProducer kafkaDataProducer3 = new KafkaDataProducer("httptest","http.file");
//        new Thread(kafkaDataProducer3).start();

    }

    @Override
    public void run() {
        BufferedReader br = null;
        try {
            while ( true ) {
                br = new BufferedReader(new FileReader(Config.getConfig("database.cnf").getProperty(path)));
                String line;

                while ((line = br.readLine()) != null) {
                    if (!"".equals(line.trim())) {
                        producer.send(new ProducerRecord<>(topic, "", line));
                    }
                }
                Thread.sleep(Long.valueOf(Config.getConfig("database.cnf").getProperty("sleep.time")));
            }
        } catch (Exception e) {
            log.error("The read streaming error: ", e);
        } finally {
            if (br != null) {
                try {
                    br.close();
                } catch (IOException e) {
                    log.warn("close the read streaming error: ", e);
                }
            }
        }
    }
}
public class SSDPerformanceTest extends Analysis {
    public static final Logger LOG = LoggerFactory.getLogger(SSDPerformanceTest.class);

    protected static final Pattern TAB = Pattern.compile("\t");

    private static final SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm", Locale.CHINA);

    private static SimpleDateFormat sdf = new SimpleDateFormat("yyyy.MM.dd", Locale.CHINA);

    public static void main(String[] args) throws IOException {

        String configfile = "database.cnf";

        Properties config = Config.getConfig(configfile);

        JavaPairReceiverInputDStream<String, byte[]> rawStream = setupRawStreamFromKafka(
                config, config.getProperty("group.id"));

        LOG.info("database config:" + config.toString());

        rawStream.foreachRDD(new Function<JavaPairRDD<String, byte[]>, Void>() {
            @Override
            public Void call(JavaPairRDD<String, byte[]> stringJavaPairRDD) throws Exception {
                JavaRDD<Map<String, ?>> es = stringJavaPairRDD.mapToPair(new PairFunction<Tuple2<String, byte[]>, DBKey, DBData>() {
                    public Tuple2<DBKey, DBData> call(Tuple2<String, byte[]> stringTuple2) throws Exception {
                        String[] database = TAB.split(new String(stringTuple2._2));

                        DBKey dbKey = new DBKey();
                        DBData dbData = new DBData();
                        String sqlString = new String(Base64.decodeBase64(database[10].trim()));
                        String storageSql;
                        if(sqlString.length() > 1000){
                            storageSql = sqlString.substring(0,1000);
                        }else{
                            storageSql = sqlString;
                        }

                        //DBKey
                        dbKey.setProbeName(database[0].trim());
                        dbKey.setCustomService(database[1].trim());
                        dbKey.setIpClient(database[2].trim());
                        dbKey.setIpServer(database[3].trim());
                        dbKey.setPortServer(database[5].trim());
                        dbKey.setTimeStart(format.format(new Date().getTime()));
                        dbKey.setOperateType(storageSql.split(" ")[0]);   //Select, Insert, Update, Drop, Procedure
                        dbKey.setDbType(database[8].trim());

                        dbKey.setResponseCode(database[9].trim());
                        dbKey.setUser(database[2].trim());
                        dbKey.setSqlString(storageSql);

                        if(!database[12].trim().equals("-")) {
                            dbData.setOperateTime(Double.parseDouble(database[12].trim()));
                        }else if(!database[7].trim().equals("-")){
                            dbData.setOperateTime(Double.parseDouble(database[7].trim()) - Double.parseDouble(database[6].trim()));
                        }else{
                            dbData.setOperateTime(0);
                        }

                        if(!database[13].trim().equals("-")) {
                            dbData.setReqTransTime(Double.parseDouble(database[13].trim()));
                        }else{
                            dbData.setReqTransTime(0);
                        }

                        if(!database[14].trim().equals("-")) {
                            dbData.setRespTransTime(Double.parseDouble(database[14].trim()));
                        }else{
                            dbData.setRespTransTime(0);
                        }

                        if(!database[15].trim().equals("-")) {
                            dbData.setRespPayload(Integer.parseInt(database[15].trim()));
                        }else{
                            dbData.setRespPayload(0);
                        }

                        dbData.setCount(1);

                        dbData.setSlowCount(1);

                        return new Tuple2<>(dbKey,dbData);

                    }
                }).filter(new Function<Tuple2<DBKey, DBData>, Boolean>() {
                    @Override
                    public Boolean call(Tuple2<DBKey, DBData> v1) throws Exception {
                        return v1 != null;
                    }
                }).reduceByKey(new Function2<DBData, DBData, DBData>() {
                    public DBData call(DBData v1, DBData v2) throws Exception {
                        DBData result = new DBData();
                        result.setOperateTime(v1.getOperateTime() + v2.getOperateTime());
                        result.setReqTransTime(v1.getReqTransTime() + v1.getReqTransTime());
                        result.setRespTransTime(v1.getRespTransTime() + v2.getRespTransTime());
                        result.setRespPayload(v1.getRespPayload() + v2.getRespPayload());
                        result.setCount(v1.getCount() + v2.getCount());
                        result.setSlowCount(v1.getSlowCount() + v1.getSlowCount());
                        return result;
                    }
                }).map(new Function<Tuple2<DBKey,DBData>, Map<String, ?>>() {
                    public Map<String, ?> call(Tuple2<DBKey, DBData> v1) throws Exception {
                        DBKey dbKey = v1._1;
                        DBData dbData = v1._2;
                        ImmutableMap.Builder<String, Object> builder = ImmutableMap.builder();
                        builder.put("index_name", sdf.format(format.parse(dbKey.getTimeStart())));
                        builder.put("probeName",dbKey.getProbeName());
                        builder.put("customService",dbKey.getCustomService());
                        builder.put("ipClient",dbKey.getIpClient());
                        builder.put("ipServer",dbKey.getIpServer());
                        builder.put("portServer",dbKey.getPortServer());
                        builder.put("operateType",dbKey.getOperateType());
                        builder.put("timeStart",format.parse(dbKey.getTimeStart()));
                        builder.put("dbType",dbKey.getDbType());
                        builder.put("user",dbKey.getUser());
                        builder.put("responseCode",dbKey.getResponseCode());
                        builder.put("sqlString",dbKey.getSqlString());
                        builder.put("operateTime",dbData.getOperateTime());
                        builder.put("reqTransTime",dbData.getReqTransTime());
                        builder.put("respTransTime",dbData.getRespTransTime());
                        builder.put("respPayload",dbData.getRespPayload());
                        builder.put("count",dbData.getCount());
                        builder.put("slowCount",dbData.getSlowCount());
                        return builder.build();
                    }
                }).cache();

                if (es != null) {
                    JavaEsSpark.saveToEs(es, "ni-database-{index_name}/database", ImmutableMap.of
                            (ConfigurationOptions.ES_MAPPING_EXCLUDE, "index_name"));
                }
                return null;
            }
        });

        rawStream.context().start();
        rawStream.context().awaitTermination();

    }

}

測試環境

測試環境一 虛擬機器環境(8G記憶體 2核 非ssd)

  1. 分鐘寫入資料量
  2. 分鐘寫入事件數

測試環境二 虛擬機器環境(8G記憶體 2核 ssd)

  1. 分鐘寫入資料量
  2. 分鐘寫入事件數

測試環境三 IBM伺服器(126G記憶體 16核 非ssd)

  1. 分鐘寫入資料量
  2. 分鐘寫入事件數

測試環境四 IBM伺服器(126G記憶體 16核 ssd約160G)

任務資源分配 2G 2core

  1. 分鐘寫入資料量
    忘了記錄
  2. 分鐘寫入事件數
  3. 單獨寫入database資料

  4. database和tcp資料一起寫入

    database