1. 程式人生 > >一篇文章看懂TPCx-BB(大資料基準測試工具)原始碼

一篇文章看懂TPCx-BB(大資料基準測試工具)原始碼

TPCx-BB是大資料基準測試工具,它通過模擬零售商的30個應用場景,執行30個查詢來衡量基於Hadoop的大資料系統的包括硬體和軟體的效能。其中一些場景還用到了機器學習演算法(聚類、線性迴歸等)。為了更好地瞭解被測試的系統的效能,需要對TPCx-BB整個測試流程深入瞭解。本文詳細分析了整個TPCx-BB測試工具的原始碼,希望能夠對大家理解TPCx-BB有所幫助。

程式碼結構

主目錄($BENCH_MARK_HOME)下有:

  • bin
  • conf
  • data-generator
  • engines
  • tools

幾個子目錄。

bin下有幾個 module ,是執行時需要用到的指令碼:bigBench

、cleanLogs、logEnvInformation、runBenchmark、zipLogs等

conf下有兩個配置檔案:bigBench.propertiesuserSettings.conf

bigBench.properties 主要設定 workload(執行的benchmarkPhases)和 power_test_0POWER_TEST 階段需要執行的SQL查詢)

預設 workload

workload=CLEAN_ALL,ENGINE_VALIDATION_DATA_GENERATION,ENGINE_VALIDATION_LOAD_TEST,ENGINE_VALIDATION_POWER_TEST,ENGINE_VALIDATION_RESULT_VALIDATION,CLEAN_DATA,DATA_GENERATION,BENCHMARK_START,LOAD_TEST,POWER_TEST,THROUGHPUT_TEST_1,BENCHMARK_STOP,VALIDATE_POWER_TEST,VALIDATE_THROUGHPUT_TEST_1

預設 power_test_01-30

userSetting.conf 是一些基本設定,包括JAVA environment 、default settings for benchmark(database、engine、map_tasks、scale_factor ...)、HADOOP environment、
HDFS config and paths、Hadoop data generation options(DFS_REPLICATION、HADOOP_JVM_ENV...)

data-generator下是跟資料生成相關的指令碼及配置檔案。詳細內容在下面介紹。

engines

下是TPCx-BB支援的4種引擎:biginsights、hive、impala、spark_sql。預設引擎為hive。實際上,只有hive目錄下不為空,其他三個目錄下均為空,估計是現在還未完善。

tools下有兩個jar包:HadoopClusterExec.jarRunBigBench.jar 。其中 RunBigBench.jar 是執行TPCx-BB測試的一個非常重要的檔案,大部分程式都在該jar包內。

資料生成

資料生成相關程式和配置都在 data-generator 目錄下。該目錄下有一個 pdgf.jar 包和 config、dicts、extlib 三個子目錄。

pdgf.jar是資料生成的Java程式。config下有兩個配置檔案:bigbench-generation.xmlbigbench-schema.xml

bigbench-generation.xml 主要設定生成的原始資料(不是資料庫表)包含哪幾張表、每張表的表名、表的大小以及表輸出的目錄、表文件的字尾、分隔符、字元編碼等。

<schema name="default">
        <tables>    
        <!-- not refreshed tables -->   
            
            <!-- tables not used in benchmark, but some tables have references to them. not refreshed. Kept for legacy reasons  -->
            <table name="income_band"></table>
            <table name="reason"></table>
            <table name="ship_mode"></table>
            <table name="web_site"></table>     
            <!-- /tables not used in benchmark  -->
            
            <!-- Static tables (fixed small size, generated only on node 1, skipped on others, not generated during refresh) -->
            <table name="date_dim" static="true"></table>
            <table name="time_dim" static="true"></table>
            <table name="customer_demographics" static="true"></table>
            <table name="household_demographics" static="true"></table>
            <!-- /static tables -->
            
            <!-- "normal" tables. split over all nodes. not generated during refresh -->
            <table name="store"></table>
            <table name="warehouse"></table>
            <table name="promotion"></table>
            <table name="web_page"></table>
            <!-- /"normal" tables.-->
            
        <!-- /not refreshed tables -->  
            
            <!-- 
            refreshed tables. Generated on all nodes. 
            Refresh tables generate extra data during refresh (e.g. add new data to the existing tables)
            In "normal"-Phase  generate table rows:  [0,REFRESH_PERCENTAGE*Table.Size]; 
            In "refresh"-Phase generate table rows:  [REFRESH_PERCENTAGE*Table.Size+1, Table.Size] 
            .Has effect only if  ${REFRESH_SYSTEM_ENABLED}==1. 
            -->
            <table name="customer">
                <scheduler name="DefaultScheduler">
                    <partitioner
                        name="pdgf.core.dataGenerator.scheduler.TemplatePartitioner">
                        <prePartition><![CDATA[
                    if(${REFRESH_SYSTEM_ENABLED}>0){
                        int tableID = table.getTableID();
                        int timeID = 0;
                        long lastTableRow=table.getSize()-1;
                        long rowStart;
                        long rowStop;
                        boolean exclude=false;
                        long refreshRows=table.getSize()*(1.0-${REFRESH_PERCENTAGE});
                        if(${REFRESH_PHASE}>0){
                            //Refresh part
                            rowStart = lastTableRow - refreshRows +1;
                            rowStop  = lastTableRow;
                            if(refreshRows<=0){
                                exclude=true;
                            }
                            
                        }else{
                            //"normal" part
                            rowStart = 0;
                            rowStop = lastTableRow - refreshRows;
                        }
                        return new pdgf.core.dataGenerator.scheduler.Partition(tableID, timeID,rowStart,rowStop,exclude);
                    }else{
                        //DEFAULT
                        return getParentPartitioner().getDefaultPrePartition(project, table);               
                    }
                    
                    ]]></prePartition>
                    </partitioner>
                </scheduler>
            </table>
<output name="SplitFileOutputWrapper">
  <!-- DEFAULT output for all Tables, if no table specific output is specified-->
    <output name="CSVRowOutput">
      <fileTemplate><![CDATA[outputDir + table.getName() +(nodeCount!=1?"_"+pdgf.util.StaticHelper.zeroPaddedNumber(nodeNumber,nodeCount):"")+ fileEnding]]></fileTemplate>
      <outputDir>output/</outputDir>
      <fileEnding>.dat</fileEnding>
      <delimiter>|</delimiter>
      <charset>UTF-8</charset>
      <sortByRowID>true</sortByRowID>
    </output>

    <output name="StatisticsOutput" active="1">
      <size>${item_size}</size><!-- a counter per item .. initialize later-->

      <fileTemplate><![CDATA[outputDir + table.getName()+"_audit" +(nodeCount!=1?"_"+pdgf.util.StaticHelper.zeroPaddedNumber(nodeNumber,nodeCount):"")+ fileEnding]]></fileTemplate>
      <outputDir>output/</outputDir>
      <fileEnding>.csv</fileEnding>
      <delimiter>,</delimiter>
      <header><!--"" + pdgf.util.Constants.DEFAULT_LINESEPARATOR-->
      </header>
      <footer></footer>

bigbench-schema.xml 設定了很多引數,有跟表的規模有關的,比如每張表的大小(記錄的條數);絕大多數是跟表的欄位有關的,比如時間的起始、結束、性別比例、結婚比例、指標的上下界等。還具體定義了每個欄位是怎麼生成的,以及限制條件。示例如下:

生成的資料大小由 SCALE_FACTOR(-f) 決定。如 -f 1,則生成的資料總大小約為1G;-f 100,則生成的資料總大小約為100G。那麼SCALE_FACTOR(-f) 是怎麼精確控制生成的資料的大小呢?

原因是 SCALE_FACTOR(-f) 決定了每張表的記錄數。如下,customer 表的記錄數為 100000.0d * ${SF_sqrt},即如果 -f 1customer 表的記錄數為 100000*sqrt(1)= 10萬條 ;如果 -f 100customer 表的記錄數為 100000*sqrt(100)= 100萬條

<property name="${customer_size}" type="long">100000.0d * ${SF_sqrt}</property>
<property name="${DIMENSION_TABLES_START_DAY}" type="datetime">2000-01-03 00:00:00</property>
<property name="${DIMENSION_TABLES_END_DAY}" type="datetime">2004-01-05 00:00:00</property>
<property name="${gender_likelihood}" type="double">0.5</property>
<property name="${married_likelihood}" type="double">0.3</property>
<property name="${WP_LINK_MIN}" type="double">2</property>
<property name="${WP_LINK_MAX}" type="double">25</property>
<field name="d_date" size="13" type="CHAR" primary="false">
   <gen_DateTime>
     <disableRng>true</disableRng>
     <useFixedStepSize>true</useFixedStepSize>
     <startDate>${date_dim_begin_date}</startDate>
     <endDate>${date_dim_end_date}</endDate>
     <outputFormat>yyyy-MM-dd</outputFormat>
    </gen_DateTime>
  </field>
<field name="t_time_id" size="16" type="CHAR" primary="false">
   <gen_ConvertNumberToString>
    <gen_Id/>
    <size>16.0</size>
    <characters>ABCDEFGHIJKLMNOPQRSTUVWXYZ</characters>
   </gen_ConvertNumberToString>
  </field>

<field name="cd_dep_employed_count" size="10" type="INTEGER" primary="false">
   <gen_Null probability="${NULL_CHANCE}">
   <gen_WeightedListItem filename="dicts/bigbench/ds-genProbabilities.txt" list="dependent_count" valueColumn="0" weightColumn="0" />
   </gen_Null>
  </field>

dicts下有city.dict、country.dict、male.dict、female.dict、state.dict、mail_provider.dict等字典檔案,表裡每一條記錄的各個欄位應該是從這些字典裡生成的。

extlib下是引用的外部程式jar包。有 lucene-core-4.9.0.jarcommons-net-3.3.jarxml-apis.jarlog4j-1.2.15.jar

總結

pdgf.jar根據bigbench-generation.xmlbigbench-schema.xml兩個檔案裡的配置(表名、欄位名、表的記錄條數、每個欄位生成的規則),從 dicts 目錄下對應的 .dict
檔案獲取表中每一條記錄、每個欄位的值,生成原始資料。

customer 表裡的某條記錄如下:

0 AAAAAAAAAAAAAAAA 1824793 3203 2555 28776 14690 Ms. Marisa Harrington N 17 4 1988 UNITED ARAB EMIRATES RRCyuY3XfE3a [email protected]   gdMmGdU9

如果執行 TPCx-BB 測試時指定 -f 1(SCALE_FACTOR = 1) 則最終生成的原始資料總大小約為 1G(977M+8.6M)

[[email protected] ~]# hdfs dfs -du -h /user/root/benchmarks/bigbench/data
12.7 M   38.0 M   /user/root/benchmarks/bigbench/data/customer
5.1 M    15.4 M   /user/root/benchmarks/bigbench/data/customer_address
74.2 M   222.5 M  /user/root/benchmarks/bigbench/data/customer_demographics
14.7 M   44.0 M   /user/root/benchmarks/bigbench/data/date_dim
151.5 K  454.4 K  /user/root/benchmarks/bigbench/data/household_demographics
327      981      /user/root/benchmarks/bigbench/data/income_band
405.3 M  1.2 G    /user/root/benchmarks/bigbench/data/inventory
6.5 M    19.5 M   /user/root/benchmarks/bigbench/data/item
4.0 M    12.0 M   /user/root/benchmarks/bigbench/data/item_marketprices
53.7 M   161.2 M  /user/root/benchmarks/bigbench/data/product_reviews
45.3 K   135.9 K  /user/root/benchmarks/bigbench/data/promotion
3.0 K    9.1 K    /user/root/benchmarks/bigbench/data/reason
1.2 K    3.6 K    /user/root/benchmarks/bigbench/data/ship_mode
3.3 K    9.9 K    /user/root/benchmarks/bigbench/data/store
4.1 M    12.4 M   /user/root/benchmarks/bigbench/data/store_returns
88.5 M   265.4 M  /user/root/benchmarks/bigbench/data/store_sales
4.9 M    14.6 M   /user/root/benchmarks/bigbench/data/time_dim
584      1.7 K    /user/root/benchmarks/bigbench/data/warehouse
170.4 M  511.3 M  /user/root/benchmarks/bigbench/data/web_clickstreams
7.9 K    23.6 K   /user/root/benchmarks/bigbench/data/web_page
5.1 M    15.4 M   /user/root/benchmarks/bigbench/data/web_returns
127.6 M  382.8 M  /user/root/benchmarks/bigbench/data/web_sales
8.6 K    25.9 K   /user/root/benchmarks/bigbench/data/web_site

執行流程

要執行TPCx-BB測試,首先需要切換到TPCx-BB源程式的目錄下,然後進入bin目錄,執行以下語句:

./bigBench runBenchmark -f 1 -m 8 -s 2 -j 5 

其中,-f、-m、-s、-j都是引數,使用者可根據叢集的效能以及自己的需求來設定。如果不指定,則使用預設值,預設值在 conf 目錄下的 userSetting.conf 檔案指定,如下:

export BIG_BENCH_DEFAULT_DATABASE="bigbench"
export BIG_BENCH_DEFAULT_ENGINE="hive"
export BIG_BENCH_DEFAULT_MAP_TASKS="80"
export BIG_BENCH_DEFAULT_SCALE_FACTOR="1000"
export BIG_BENCH_DEFAULT_NUMBER_OF_PARALLEL_STREAMS="2"
export BIG_BENCH_DEFAULT_BENCHMARK_PHASE="run_query"

預設 MAP_TASKS80(-m 80)SCALE_FACTOR1000(-f 1000)NUMBER_OF_PARALLEL_STREAMS2(-s 2)

所有可選引數及其意義如下:

General options:
-d  使用的資料庫 (預設: $BIG_BENCH_DEFAULT_DATABASE -> bigbench)
-e  使用的引擎 (預設: $BIG_BENCH_DEFAULT_ENGINE -> hive)
-f  資料集的規模因子(scale factor) (預設: $BIG_BENCH_DEFAULT_SCALE_FACTOR -> 1000)
-h  顯示幫助
-m  資料生成的`map tasks`數 (default: $BIG_BENCH_DEFAULT_MAP_TASKS)"
-s  並行的`stream`數 (預設: $BIG_BENCH_DEFAULT_NUMBER_OF_PARALLEL_STREAMS -> 2)

Driver specific options:
-a  偽裝模式執行
-b  執行期間將呼叫的bash指令碼在標準輸出中打印出來
-i  指定需要執行的階段 (詳情見$BIG_BENCH_CONF_DIR/bigBench.properties)
-j  指定需要執行的查詢 (預設:1-30共30個查詢均執行)"
-U  解鎖專家模式 

若指定了-U,即解鎖了專家模式,則:

echo "EXPERT MODE ACTIVE"
echo "WARNING - INTERNAL USE ONLY:"
echo "Only set manually if you know what you are doing!"
echo "Ignoring them is probably the best solution" 
echo "Running individual modules:"
echo "Usage: `basename $0` module [options]"

-D  指定需要debug的查詢部分. 大部分查詢都只有一個單獨的部分
-p  需要執行的benchmark phase (預設: $BIG_BENCH_DEFAULT_BENCHMARK_PHASE -> run_query)"
-q  指定需要執行哪個查詢(只能指定一個)
-t  指定執行該查詢時用第哪個stream
-v  metastore population的sql指令碼 (預設: ${USER_POPULATE_FILE:-"$BIG_BENCH_POPULATION_DIR/hiveCreateLoad.sql"})"
-w  metastore refresh的sql指令碼 (預設: ${USER_REFRESH_FILE:-"$BIG_BENCH_REFRESH_DIR/hiveRefreshCreateLoad.sql"})"
-y  含額外的使用者自定義查詢引數的檔案 (global: $BIG_BENCH_ENGINE_CONF_DIR/queryParameters.sql)"
-z  含額外的使用者自定義引擎設定的檔案 (global: $BIG_BENCH_ENGINE_CONF_DIR/engineSettings.sql)"

List of available modules:
    $BIG_BENCH_ENGINE_BIN_DIR

回到剛剛執行TPCx-BB測試的語句:

./bigBench runBenchmark -f 1 -m 8 -s 2 -j 5 

bigBench

bigBench是主指令碼,runBenchmark是module。

bigBench 裡設定了很多環境變數(包括路徑、引擎、STREAM數等等),因為後面呼叫 runBigBench.jar 的時候需要在Java程式裡讀取這些環境變數。

bigBench 前面都是在做一些基本工作,如設定環境變數、解析使用者輸入引數、賦予檔案許可權、設定路徑等等。到最後一步呼叫 runBenchmarkrunModule() 方法:

  1. 設定基本路徑

    export BIG_BENCH_VERSION="1.0"
    export BIG_BENCH_BIN_DIR="$BIG_BENCH_HOME/bin"
    export BIG_BENCH_CONF_DIR="$BIG_BENCH_HOME/conf"
    export BIG_BENCH_DATA_GENERATOR_DIR="$BIG_BENCH_HOME/data-generator"
    export BIG_BENCH_TOOLS_DIR="$BIG_BENCH_HOME/tools"
    export BIG_BENCH_LOGS_DIR="$BIG_BENCH_HOME/logs"
    
  2. 指定 core-site.xmlhdfs-site.xml 的路徑

    資料生成時要用到Hadoop叢集,生成在hdfs上

    export BIG_BENCH_DATAGEN_CORE_SITE="$BIG_BENCH_HADOOP_CONF/core-site.xml"
    

export BIG_BENCH_DATAGEN_HDFS_SITE="$BIG_BENCH_HADOOP_CONF/hdfs-site.xml"
```

  1. 賦予整個包下所有可執行檔案許可權(.sh/.jar/.py)

    find "$BIG_BENCH_HOME" -name '*.sh' -exec chmod 755 {} +
    

find "$BIG_BENCH_HOME" -name '.jar' -exec chmod 755 {} +
find "$BIG_BENCH_HOME" -name '
.py' -exec chmod 755 {} +
```

  1. 設定 userSetting.conf 的路徑並 source

    USER_SETTINGS="$BIG_BENCH_CONF_DIR/userSettings.conf"
    if [ ! -f "$USER_SETTINGS" ]
    then
      echo "User settings file $USER_SETTINGS not found"
      exit 1
    else
      source "$USER_SETTINGS"
    fi
    
  2. 解析輸入引數和選項並根據選項的內容作設定

    第一個引數必須是module_name

    如果沒有輸入引數或者第一個引數以"-"開頭,說明使用者沒有輸入需要執行的module。

    if [[ $# -eq 0 || "`echo "$1" | cut -c1`" = "-" ]]
    then
      export MODULE_NAME=""
      SHOW_HELP="1"
    else
      export MODULE_NAME="$1"
      shift
    fi
    export LIST_OF_USER_OPTIONS="[email protected]"
    

解析使用者輸入的引數

根據使用者輸入的引數來設定環境變數

```bash
while getopts ":d:D:e:f:hm:p:q:s:t:Uv:w:y:z:abi:j:" OPT; do

case "$OPT" in
# script options
d)
#echo "-d was triggered, Parameter: $OPTARG" >&2
USER_DATABASE="$OPTARG"
;;
D)
#echo "-D was triggered, Parameter: $OPTARG" >&2
DEBUG_QUERY_PART="$OPTARG"
;;
e)
#echo "-e was triggered, Parameter: $OPTARG" >&2
USER_ENGINE="$OPTARG"
;;
f)
#echo "-f was triggered, Parameter: $OPTARG" >&2
USER_SCALE_FACTOR="$OPTARG"
;;
h)
#echo "-h was triggered, Parameter: $OPTARG" >&2
SHOW_HELP="1"
;;
m)
#echo "-m was triggered, Parameter: $OPTARG" >&2
USER_MAP_TASKS="$OPTARG"
;;
p)
#echo "-p was triggered, Parameter: $OPTARG" >&2
USER_BENCHMARK_PHASE="$OPTARG"
;;
q)
#echo "-q was triggered, Parameter: $OPTARG" >&2
QUERY_NUMBER="$OPTARG"
;;
s)
#echo "-t was triggered, Parameter: $OPTARG" >&2
USER_NUMBER_OF_PARALLEL_STREAMS="$OPTARG"
;;
t)
#echo "-s was triggered, Parameter: $OPTARG" >&2
USER_STREAM_NUMBER="$OPTARG"
;;
U)
#echo "-U was triggered, Parameter: $OPTARG" >&2
USER_EXPERT_MODE="1"
;;
v)
#echo "-v was triggered, Parameter: $OPTARG" >&2
USER_POPULATE_FILE="$OPTARG"
;;
w)
#echo "-w was triggered, Parameter: $OPTARG" >&2
USER_REFRESH_FILE="$OPTARG"
;;
y)
#echo "-y was triggered, Parameter: $OPTARG" >&2
USER_QUERY_PARAMS_FILE="$OPTARG"
;;
z)
#echo "-z was triggered, Parameter: $OPTARG" >&2
USER_ENGINE_SETTINGS_FILE="$OPTARG"
;;
# driver options
a)
#echo "-a was triggered, Parameter: $OPTARG" >&2
export USER_PRETEND_MODE="1"
;;
b)
#echo "-b was triggered, Parameter: $OPTARG" >&2
export USER_PRINT_STD_OUT="1"
;;
i)
#echo "-i was triggered, Parameter: $OPTARG" >&2
export USER_DRIVER_WORKLOAD="$OPTARG"
;;
j)
#echo "-j was triggered, Parameter: $OPTARG" >&2
export USER_DRIVER_QUERIES_TO_RUN="$OPTARG"
;;
?)
echo "Invalid option: -$OPTARG" >&2
exit 1
;;
:)
echo "Option -$OPTARG requires an argument." >&2
exit 1
;;
esac
done
```

設定全域性變數。如果使用者指定了某個引數的值,則採用該值,否則使用預設值。

```bash
export BIG_BENCH_EXPERT_MODE="${USER_EXPERT_MODE:-"0"}"

export SHOW_HELP="${SHOW_HELP:-"0"}"
export BIG_BENCH_DATABASE="${USER_DATABASE:-"$BIG_BENCH_DEFAULT_DATABASE"}"
export BIG_BENCH_ENGINE="${USER_ENGINE:-"$BIG_BENCH_DEFAULT_ENGINE"}"
export BIG_BENCH_MAP_TASKS="${USER_MAP_TASKS:-"$BIG_BENCH_DEFAULT_MAP_TASKS"}"
export BIG_BENCH_SCALE_FACTOR="${USER_SCALE_FACTOR:-"$BIG_BENCH_DEFAULT_SCALE_FACTOR"}"
export BIG_BENCH_NUMBER_OF_PARALLEL_STREAMS="${USER_NUMBER_OF_PARALLEL_STREAMS:-"$BIG_BENCH_DEFAULT_NUMBER_OF_PARALLEL_STREAMS"}"
export BIG_BENCH_BENCHMARK_PHASE="${USER_BENCHMARK_PHASE:-"$BIG_BENCH_DEFAULT_BENCHMARK_PHASE"}"
export BIG_BENCH_STREAM_NUMBER="${USER_STREAM_NUMBER:-"0"}"
export BIG_BENCH_ENGINE_DIR="$BIG_BENCH_HOME/engines/$BIG_BENCH_ENGINE"
export BIG_BENCH_ENGINE_CONF_DIR="$BIG_BENCH_ENGINE_DIR/conf"
```

  1. 檢測 -s -m -f -j的選項是否為數字

    if [ -n "`echo "$BIG_BENCH_MAP_TASKS" | sed -e 's/[0-9]*//g'`" ]
    then
      echo "$BIG_BENCH_MAP_TASKS is not a number"
    fi
    if [ -n "`echo "$BIG_BENCH_SCALE_FACTOR" | sed -e 's/[0-9]*//g'`" ]
    then
      echo "$BIG_BENCH_SCALE_FACTOR is not a number"
    fi
    if [ -n "`echo "$BIG_BENCH_NUMBER_OF_PARALLEL_STREAMS" | sed -e 's/[0-9]*//g'`" ]
    then
      echo "$BIG_BENCH_NUMBER_OF_PARALLEL_STREAMS is not a number"
    fi
    if [ -n "`echo "$BIG_BENCH_STREAM_NUMBER" | sed -e 's/[0-9]*//g'`" ]
    then
      echo "$BIG_BENCH_STREAM_NUMBER is not a number"
    fi
    
  2. 檢查引擎是否存在

    if [ ! -d "$BIG_BENCH_ENGINE_DIR" ]
    then
      echo "Engine directory $BIG_BENCH_ENGINE_DIR not found. Aborting script..."
      exit 1
    fi
    if [ ! -d "$BIG_BENCH_ENGINE_CONF_DIR" ]
    then
      echo "Engine configuration directory $BIG_BENCH_ENGINE_CONF_DIR not found. Aborting script..."
      exit 1
    fi
    
  3. 設定 engineSetting.conf 路徑並 source

    ENGINE_SETTINGS="$BIG_BENCH_ENGINE_CONF_DIR/engineSettings.conf"
    if [ ! -f "$ENGINE_SETTINGS" ]
    then
      echo "Engine settings file $ENGINE_SETTINGS not found"
      exit 1
    else
      source "$ENGINE_SETTINGS"
    fi
    
  4. 檢查module是否存在

    當輸入某個module時,系統會先到$BIG_BENCH_ENGINE_BIN_DIR/目錄下去找該module是否存在,如果存在,就source "$MODULE";如果該目錄下不存在指定的module,再到export MODULE="$BIG_BENCH_BIN_DIR/"目錄下找該module,如果存在,就source "$MODULE";否則,輸出Module $MODULE not found, aborting script.

    export MODULE="$BIG_BENCH_ENGINE_BIN_DIR/$MODULE_NAME"
    if [ -f "$MODULE" ]
    then
      source "$MODULE"
    else
      export MODULE="$BIG_BENCH_BIN_DIR/$MODULE_NAME"
      if [ -f "$MODULE" ]
      then
        source "$MODULE"
      else
        echo "Module $MODULE not found, aborting script."
        exit 1
      fi
    fi
    
  5. 檢查module裡的runModule()、helpModule ( )、runEngineCmd()方法是否有定義

    MODULE_RUN_METHOD="runModule"
    if ! declare -F "$MODULE_RUN_METHOD" > /dev/null 2>&1
    then
      echo "$MODULE_RUN_METHOD was not implemented, aborting script"
      exit 1
    fi
    
  6. 執行module

    如果module是runBenchmark,執行
    runCmdWithErrorCheck "$MODULE_RUN_METHOD"
    也就是
    runCmdWithErrorCheck runModule()

由上可以看出,bigBench指令碼主要執行一些如設定環境變數、賦予許可權、檢查並解析輸入引數等基礎工作,最終呼叫runBenchmarkrunModule()方法繼續往下執行。

runBenchmark

接下來看看runBenchmark指令碼。

runBenchmark裡有兩個函式:helpModule ()runModule ()

helpModule ()就是顯示幫助。

runModule ()是執行runBenchmark模組時真正呼叫的函式。該函式主要做四件事:

  1. 清除之前生成的日誌
  2. 呼叫RunBigBench.jar來執行
  3. logEnvInformation
  4. 將日誌資料夾打包成zip

原始碼如下:

runModule () {
  #check input parameters
  if [ "$BIG_BENCH_NUMBER_OF_PARALLEL_STREAMS" -le 0 ]
  then
    echo "The number of parallel streams -s must be greater than 0"
    return 1
  fi

  "${BIG_BENCH_BIN_DIR}/bigBench" cleanLogs -U $LIST_OF_USER_OPTIONS
  "$BIG_BENCH_JAVA" -jar "${BIG_BENCH_TOOLS_DIR}/RunBigBench.jar"
  "${BIG_BENCH_BIN_DIR}/bigBench" logEnvInformation -U $LIST_OF_USER_OPTIONS
  "${BIG_BENCH_BIN_DIR}/bigBench" zipLogs -U $LIST_OF_USER_OPTIONS
  return $?
}

相當於執行runBenchmark模組時又呼叫了cleanLogslogEnvInformationzipLogs三個模組以及RunBigBench.jar。其中RunBigBench.jar是TCPx-BB測試執行的核心程式碼,用Java語言編寫。接下來分析RunBigBench.jar原始碼。

runModule()

runModule()函式用來執行某個module。我們已知,執行某個module需要切換到主目錄下的bin目錄,然後執行:

./bigBench module_name arguments

在runModule()函式裡,cmdLine用來生成如上命令。

ArrayList cmdLine = new ArrayList();
cmdLine.add("bash");
cmdLine.add(this.runScript);
cmdLine.add(benchmarkPhase.getRunModule());
cmdLine.addAll(arguments);

其中,this.runScript為:

this.runScript = (String)env.get("BIG_BENCH_BIN_DIR") + "/bigBench";

benchmarkPhase.getRunModule()用來獲得需要執行的module。

arguments為使用者輸入的引數。

至此,cmdLine為:

bash $BIG_BENCH_BIN_DIR/bigBench module_name arguments

那麼,怎麼讓系統執行該bash命令呢?答案是呼叫runCmd()方法。

boolean successful = this.runCmd(this.homeDir, benchmarkPhase.isPrintStdOut(), (String[])cmdLine.toArray(new String[0]));

接下來介紹rumCmd()方法

runCmd()

runCmd()方法通過ProcessBuilder來建立一個作業系統程序,並用該程序執行以上的bash命令。

ProcessBuilder還可以設定工作目錄和環境。

ProcessBuilder pb = new ProcessBuilder(command);
pb.directory(new File(workingDirectory));
Process p = null;
---
p = pb.start();

getQueryList()

getQueryList()用來獲得需要執行的查詢列表。從$BIG_BENCH_LOGS_DIR/bigBench.properties檔案中讀取。與$BIG_BENCH_HOME/conf/bigBench.properties內容一致。

bigBench.propertiespower_test_0=1-30規定了powter_test_0階段需要執行的查詢及其順序。

可以用區間如 5-12 或者單個數字如 21 表示,中間用 , 隔開。

power_test_0=28-25,2-5,10,22,30

表示powter_test_0階段需要執行的查詢及其順序為:28,27,26,25,2,3,4,5,10,22,30

如果想讓30個查詢按順序執行,則:

power_test_0=1-30

獲得查詢列表的原始碼如下:

    private List<Integer> getQueryList(BigBench.BenchmarkPhase benchmarkPhase, int streamNumber) {
        String SHUFFLED_NAME_PATTERN = "shuffledQueryList";
        BigBench.BenchmarkPhase queryOrderBasicPhase = BigBench.BenchmarkPhase.POWER_TEST;
        String propertyKey = benchmarkPhase.getQueryListProperty(streamNumber);
        boolean queryOrderCached = benchmarkPhase.isQueryOrderCached();
        if(queryOrderCached && this.queryListCache.containsKey(propertyKey)) {
            return new ArrayList((Collection)this.queryListCache.get(propertyKey));
        } else {
            Object queryList;
            String basicPhaseNamePattern;
            if(!this.properties.containsKey(propertyKey)) {
                if(benchmarkPhase.isQueryOrderRandom()) {
                    if(!this.queryListCache.containsKey("shuffledQueryList")) {
                        basicPhaseNamePattern = queryOrderBasicPhase.getQueryListProperty(0);
                        if(!this.properties.containsKey(basicPhaseNamePattern)) {
                            throw new IllegalArgumentException("Property " + basicPhaseNamePattern + " is not defined, but is the basis for shuffling the query list.");
                        }

                        this.queryListCache.put("shuffledQueryList", this.getQueryList(queryOrderBasicPhase, 0));
                    }

                    queryList = (List)this.queryListCache.get("shuffledQueryList");
                    this.shuffleList((List)queryList, this.rnd);
                } else {
                    queryList = this.getQueryList(queryOrderBasicPhase, 0);
                }
            } else {
                queryList = new ArrayList();
                String[] var11;
                int var10 = (var11 = this.properties.getProperty(propertyKey).split(",")).length;

                label65:
                for(int var9 = 0; var9 < var10; ++var9) {
                    basicPhaseNamePattern = var11[var9];
                    String[] queryRange = basicPhaseNamePattern.trim().split("-");
                    switch(queryRange.length) {
                    case 1:
                        ((List)queryList).add(Integer.valueOf(Integer.parseInt(queryRange[0].trim())));
                        break;
                    case 2:
                        int startQuery = Integer.parseInt(queryRange[0]);
                        int endQuery = Integer.parseInt(queryRange[1]);
                        int i;
                        if(startQuery > endQuery) {
                            i = startQuery;

                            while(true) {
                                if(i < endQuery) {
                                    continue label65;
                                }

                                ((List)queryList).add(Integer.valueOf(i));
                                --i;
                            }
                        } else {
                            i = startQuery;

                            while(true) {
                                if(i > endQuery) {
                                    continue label65;
                                }

                                ((List)queryList).add(Integer.valueOf(i));
                                ++i;
                            }
                        }
                    default:
                        throw new IllegalArgumentException("Query numbers must be in the form X or X-Y, comma separated.");
                    }
                }
            }

            if(queryOrderCached) {
                this.queryListCache.put(propertyKey, new ArrayList((Collection)queryList));
            }

            return new ArrayList((Collection)queryList);
        }
    }

parseEnvironment()

parseEnvironment()讀取系統的環境變數並解析。

Map env = System.getenv();
this.version = (String)env.get("BIG_BENCH_VERSION");
this.homeDir = (String)env.get("BIG_BENCH_HOME");
this.confDir = (String)env.get("BIG_BENCH_CONF_DIR");
this.runScript = (String)env.get("BIG_BENCH_BIN_DIR") + "/bigBench";
this.datagenDir = (String)env.get("BIG_BENCH_DATA_GENERATOR_DIR");
this.logDir = (String)env.get("BIG_BENCH_LOGS_DIR");
this.dataGenLogFile = (String)env.get("BIG_BENCH_DATAGEN_STAGE_LOG");
this.loadLogFile = (String)env.get("BIG_BENCH_LOADING_STAGE_LOG");
this.engine = (String)env.get("BIG_BENCH_ENGINE");
this.database = (String)env.get("BIG_BENCH_DATABASE");
this.mapTasks = (String)env.get("BIG_BENCH_MAP_TASKS");
this.numberOfParallelStreams = Integer.parseInt((String)env.get("BIG_BENCH_NUMBER_OF_PARALLEL_STREAMS"));
this.scaleFactor = Long.parseLong((String)env.get("BIG_BENCH_SCALE_FACTOR"));
this.stopAfterFailure = ((String)env.get("BIG_BENCH_STOP_AFTER_FAILURE")).equals("1");

並自動在使用者指定的引數後面加上 -U (解鎖專家模式)

this.userArguments.add("-U");

如果使用者指定了 PRETEND_MODEPRINT_STD_OUTWORKLOADQUERIES_TO_RUN,則以使用者指定的引數為準,否則使用預設值。

if(env.containsKey("USER_PRETEND_MODE")) {
    this.properties.setProperty("pretend_mode", (String)env.get("USER_PRETEND_MODE"));
}

if(env.containsKey("USER_PRINT_STD_OUT")) {
   this.properties.setProperty("show_command_stdout", (String)env.get("USER_PRINT_STD_OUT"));
}

if(env.containsKey("USER_DRIVER_WORKLOAD")) {
   this.properties.setProperty("workload", (String)env.get("USER_DRIVER_WORKLOAD"));
}

if(env.containsKey("USER_DRIVER_QUERIES_TO_RUN")) {
    this.properties.setProperty(BigBench.BenchmarkPhase.POWER_TEST.getQueryListProperty(0), (String)env.get("USER_DRIVER_QUERIES_TO_RUN"));
}

讀取 workload 並賦值 benchmarkPhases。如果 workload 裡不包含 BENCHMARK_STARTBENCHMARK_STOP,自動在 benchmarkPhases 的首位和末位分別加上 BENCHMARK_STARTBENCHMARK_STOP

this.benchmarkPhases = new ArrayList();
Iterator var7 = Arrays.asList(this.properties.getProperty("workload").split(",")).iterator();
    
while(var7.hasNext()) {
    String benchmarkPhase = (String)var7.next();
    this.benchmarkPhases.add(BigBench.BenchmarkPhase.valueOf(benchmarkPhase.trim()));
}
    
if(!this.benchmarkPhases.contains(BigBench.BenchmarkPhase.BENCHMARK_START)) {
    this.benchmarkPhases.add(0, BigBench.BenchmarkPhase.BENCHMARK_START);
}
    
if(!this.benchmarkPhases.contains(BigBench.BenchmarkPhase.BENCHMARK_STOP)) {
    this.benchmarkPhases.add(BigBench.BenchmarkPhase.BENCHMARK_STOP);
}

run()

run() 方法是 RunBigBench.jar 裡核心的方法。所有的執行都是通過 run() 方法呼叫的。比如 runQueries()runModule()generateData()等。runQueries()runModule()generateData() 又通過呼叫 runCmd() 方法來建立作業系統程序,執行bash命令,呼叫bash指令碼。

run() 方法裡通過一個 while 迴圈來逐一執行 workload 裡的每一個 benchmarkPhase。 不同的 benchmarkPhase 會呼叫 runQueries()runModule()generateData()...中的不同方法。

try {
long e = 0L;
this.log.finer("Benchmark phases: " + this.benchmarkPhases);
Iterator startCheckpoint = this.benchmarkPhases.iterator();

long throughputStart;
while(startCheckpoint.hasNext()) {
    BigBench.BenchmarkPhase children = (BigBench.BenchmarkPhase)startCheckpoint.next();
    if(children.isPhaseDone()) {
        this.log.info("The phase " + children.name() + " was already performed earlier. Skipping this phase");
    } else {
        try {
            switch($SWITCH_TABLE$io$bigdatabenchmark$v1$driver$BigBench$BenchmarkPhase()[children.ordinal()]) {
            case 1:
            case 20:
                throw new IllegalArgumentException("The value " + children.name() + " is only used internally.");
            case 2:
                this.log.info(children.getConsoleMessage());
                e = System.currentTimeMillis();
                break;
            case 3:
                if(!BigBench.BenchmarkPhase.BENCHMARK_START.isPhaseDone()) {
                    throw new IllegalArgumentException("Error: Cannot stop the benchmark before starting it");
                }

                throughputStart = System.currentTimeMillis();
                this.log.info(String.format("%-55s finished. Time: %25s", new Object[]{children.getConsoleMessage(), BigBench.Helper.formatTime(throughputStart - e)}));
                this.logTreeRoot.setCheckpoint(new BigBench.Checkpoint(BigBench.BenchmarkPhase.BENCHMARK, -1L, -1L, e, throughputStart, this.logTreeRoot.isSuccessful()));
                break;
            case 4:
            case 15:
            case 18:
            case 22:
            case 27:
            case 28:
            case 29:
                this.runModule(children, this.userArguments);
                break;
            case 5:
            case 10:
            case 11:
                this.runQueries(children, 1, validationArguments);
                break;
            case 6:
            case 9:
                this.runModule(children, validationArguments);
                break;
            case 7:
                this.generateData(children, false, validationArguments);
                break;
            case 8:
                this.generateData(children, true, validationArguments);
                break;
            case 12:
            case 19:
            case 24:
                this.runQueries(children, 1, this.userArguments);
                break;
            case 13:
            case 14:
            case 21:
            case 23:
            case 25:
            case 26:
                this.runQueries(children, this.numberOfParallelStreams, this.userArguments);
                break;
            case 16:
                this.generateData(children, false, this.userArguments);
                break;
            case 17:
                this.generateData(children, true, this.userArguments);
            }

            children.setPhaseDone(true);
        } catch (IOException var21) {
            this.log.info("==============\nBenchmark run terminated\nReason: An error occured while running a command in phase " + children + "\n==============");
            var21.printStackTrace();
            if(this.stopAfterFailure || children.mustSucceed()) {
                break;
            }
        }
    }
}

這裡的 case 1-29 並不是 1-29 條查詢,而是列舉型別裡的 1-29benmarkPhase 。如下所示:

private static enum BenchmarkPhase {
BENCHMARK((String)null, "benchmark", false, false, false, false, "BigBench benchmark"),
BENCHMARK_START((String)null, "benchmark_start", false, false, false, false, "BigBench benchmark: Start"),
BENCHMARK_STOP((String)null, "benchmark_stop", false, false, false, false, "BigBench benchmark: Stop"),
CLEAN_ALL("cleanAll", "clean_all", false, false, false, false, "BigBench clean all"),
ENGINE_VALIDATION_CLEAN_POWER_TEST("cleanQuery", "engine_validation_power_test", false, false, false, false, "BigBench engine validation: Clean power test queries"),
ENGINE_VALIDATION_CLEAN_LOAD_TEST("cleanMetastore", "engine_validation_metastore", false, false, false, false, "BigBench engine validation: Clean metastore"),
ENGINE_VALIDATION_CLEAN_DATA("cleanData", "engine_validation_data", false, false, false, false, "BigBench engine validation: Clean data"),
ENGINE_VALIDATION_DATA_GENERATION("dataGen", "engine_validation_data", false, false, false, true, "BigBench engine validation: Data generation"),
ENGINE_VALIDATION_LOAD_TEST("populateMetastore", "engine_validation_metastore", false, false, false, true, "BigBench engine validation: Populate metastore"),
ENGINE_VALIDATION_POWER_TEST("runQuery", "engine_validation_power_test", false, false, false, false, "BigBench engine validation: Power test"),
ENGINE_VALIDATION_RESULT_VALIDATION("validateQuery", "engine_validation_power_test", false, false, true, false, "BigBench engine validation: Check all query results"),
CLEAN_POWER_TEST("cleanQuery", "power_test", false, false, false, false, "BigBench clean: Clean power test queries"),
CLEAN_THROUGHPUT_TEST_1("cleanQuery", "throughput_test_1", false, false, false, false, "BigBench clean: Clean first throughput test queries"),
CLEAN_THROUGHPUT_TEST_2("cleanQuery", "throughput_test_2", false, false, false, false, "BigBench clean: Clean second throughput test queries"),
CLEAN_LOAD_TEST("cleanMetastore", "metastore", false, false, false, false, "BigBench clean: Load test"),
CLEAN_DATA("cleanData", "data", false, false, false, false, "BigBench clean: Data"),
DATA_GENERATION("dataGen", "data", false, false, false, true, "BigBench preparation: Data generation"),
LOAD_TEST("populateMetastore", "metastore", false, false, false, true, "BigBench phase 1: Load test"),
POWER_TEST("runQuery", "power_test", false, true, false, false, "BigBench phase 2: Power test"),
THROUGHPUT_TEST((String)null, "throughput_test", false, false, false, false, "BigBench phase 3: Throughput test"),
THROUGHPUT_TEST_1("runQuery", "throughput_test_1", true, true, false, false, "BigBench phase 3: First throughput test run"),
THROUGHPUT_TEST_REFRESH("refreshMetastore", "throughput_test_refresh", false, false, false, false, "BigBench phase 3: Throughput test data refresh"),
THROUGHPUT_TEST_2("runQuery", "throughput_test_2", true, true, false, false, "BigBench phase 3: Second throughput test run"),
VALIDATE_POWER_TEST("validateQuery", "power_test", false, false, true, false, "BigBench validation: Power test results"),
VALIDATE_THROUGHPUT_TEST_1("validateQuery", "throughput_test_1", false, false, true, false, "BigBench validation: First throughput test results"),
VALIDATE_THROUGHPUT_TEST_2("validateQuery", "throughput_test_2", false, false, true, false, "BigBench validation: Second throughput test results"),
SHOW_TIMES("showTimes", "show_times", false, false, true, false, "BigBench: show query times"),
SHOW_ERRORS("showErrors", "show_errors", false, false, true, false, "BigBench: show query errors"),
SHOW_VALIDATION("showValidation", "show_validation", false, false, true, false, "BigBench: show query validation results");

private String runModule;
private String namePattern;
private boolean queryOrderRandom;
private boolean queryOrderCached;
private boolean printStdOut;
private boolean mustSucceed;
private String consoleMessage;
private boolean phaseDone;

private BenchmarkPhase(String runModule, String namePattern, boolean queryOrderRandom, boolean queryOrderCached, boolean printStdOut, boolean mustSucceed, String consoleMessage) {
    this.runModule = runModule;
    this.namePattern = namePattern;
    this.queryOrderRandom = queryOrderRandom;
    this.queryOrderCached = queryOrderCached;
    this.printStdOut = printStdOut;
    this.mustSucceed = mustSucceed;
    this.consoleMessage = consoleMessage;
    this.phaseDone = false;
}       

3對應 BENCHMARK_STOP,4對應 CLEAN_ALL,29對應 SHOW_VALIDATION,依此類推...

可以看出:

CLEAN_ALL、CLEAN_LOAD_TEST、LOAD_TEST、THROUGHPUT_TEST_REFRESH、SHOW_TIMES、SHOW_ERRORS、SHOW_VALIDATION等benchmarkPhases呼叫的是

this.runModule(children, this.userArguments);

方法是 runModule ,引數是 this.userArguments

ENGINE_VALIDATION_CLEAN_POWER_TEST、ENGINE_VALIDATION_POWER_TEST、ENGINE_VALIDATION_RESULT_VALIDATION 呼叫的是

this.runQueries(children, 1, validationArguments);

方法是 runQueries ,引數是 1(stream number) 和 validationArguments

ENGINE_VALIDATION_CLEAN_LOAD_TESTENGINE_VALIDATION_LOAD_TEST 呼叫的是

this.runModule(children, validationArguments);

ENGINE_VALIDATION_CLEAN_DATA 呼叫的是

this.generateData(children, false, validationArguments);

ENGINE_VALIDATION_DATA_GENERATION 呼叫的是

this.generateData(children, true, validationArguments);

CLEAN_POWER_TESTPOWER_TESTVALIDATE_POWER_TEST 呼叫的是

this.runQueries(children, 1, this.userArguments);

CLEAN_THROUGHPUT_TEST_1``CLEAN_THROUGHPUT_TEST_2``THROUGHPUT_TEST_1``THROUGHPUT_TEST_2``VALIDATE_THROUGHPUT_TEST_1 VALIDATE_THROUGHPUT_TEST_2 呼叫的是

this.runQueries(children, this.numberOfParallelStreams, this.userArguments);

CLEAN_DATA 呼叫的是

this.generateData(children, false, this.userArguments);

DATA_GENERATION 呼叫的是

this.generateData(children, true, this.userArguments);

總結一下以上的方法呼叫可以發現:

  • ENGINE_VALIDATION 相關的benchmarkPhase用的引數都是 validationArguments。其餘用的是 userArguments( validationArguments 和 userArguments 唯一的區別是 validationArguments 的 SCALE_FACTOR 恆為1)
  • POWER_TEST 相關的都是呼叫 runQueries() 方法,因為 POWER_TEST 就是執行SQL查詢
  • CLEAN_DATA DATA_GENERATION 相關的都是呼叫 generateData() 方法
  • LOAD_TEST SHOW 相關的都是呼叫 runModule() 方法

benchmarkPhase 和 module 對應關係

具體每個 benchmarkPhasemodule(執行的指令碼)的對應關係如下:

CLEAN_ALL -> "cleanAll"
ENGINE_VALIDATION_CLEAN_POWER_TEST -> "cleanQuery"
ENGINE_VALIDATION_CLEAN_LOAD_TEST -> "cleanMetastore",
ENGINE_VALIDATION_CLEAN_DATA -> "cleanData"
ENGINE_VALIDATION_DATA_GENERATION -> "dataGen"
ENGINE_VALIDATION_LOAD_TEST -> "populateMetastore"
ENGINE_VALIDATION_POWER_TEST -> "runQuery"
ENGINE_VALIDATION_RESULT_VALIDATION -> "validateQuery"
CLEAN_POWER_TEST -> "cleanQuery"
CLEAN_THROUGHPUT_TEST_1 -> "cleanQuery"
CLEAN_THROUGHPUT_TEST_2 -> "cleanQuery"
CLEAN_LOAD_TEST -> "cleanMetastore"
CLEAN_DATA -> "cleanData"
DATA_GENERATION -> "dataGen"
LOAD_TEST -> "populateMetastore"
POWER_TEST -> "runQuery"
THROUGHPUT_TEST -> (String)null
THROUGHPUT_TEST_1 -> "runQuery"
THROUGHPUT_TEST_REFRESH -> "refreshMetastore"
THROUGHPUT_TEST_2 -> "runQuery"
VALIDATE_POWER_TEST -> "validateQuery"
VALIDATE_THROUGHPUT_TEST_1 -> "validateQuery"
VALIDATE_THROUGHPUT_TEST_2 -> "validateQuery"
SHOW_TIMES -> "showTimes"
SHOW_ERRORS -> "showErrors"
SHOW_VALIDATION -> "showValidation"

當執行某個 benchmarkPhase 時會去呼叫如上該 benchmarkPhase 對應的 module (指令碼位於 $BENCH_MARK_HOME/engines/hive/bin 目錄下)

cmdLine.add(benchmarkPhase.getRunModule());

程式呼叫流程

bigBench.png

接下來介紹每個module的功能

module

cleanAll

1. DROP DATABASE
2. 刪除hdfs上的源資料
echo "dropping database (with all tables)"
runCmdWithErrorCheck runEngineCmd -e "DROP DATABASE IF EXISTS $BIG_BENCH_DATABASE CASCADE;"
echo "cleaning ${BIG_BENCH_HDFS_ABSOLUTE_HOME}"
hadoop fs -rm -r -f -skipTrash "${BIG_BENCH_HDFS_ABSOLUTE_HOME}"

cleanQuery

1. 刪除對應的 Query 生成的臨時表
2. 刪除對應的 Query 生成的結果表
runCmdWithErrorCheck runEngineCmd -e "DROP TABLE IF EXISTS $TEMP_TABLE1; DROP TABLE IF EXISTS $TEMP_TABLE2; DROP TABLE IF EXISTS $RESULT_TABLE;"
return $?

cleanMetastore

1. 呼叫 `dropTables.sql` 將23張表依次DROP
echo "cleaning metastore tables"
runCmdWithErrorCheck runEngineCmd -f "$BIG_BENCH_CLEAN_METASTORE_FILE"
export BIG_BENCH_CLEAN_METASTORE_FILE="$BIG_BENCH_CLEAN_DIR/dropTables.sql"

dropTables.sql 將23張表依次DROP,原始碼如下:

DROP TABLE IF EXISTS ${hiveconf:customerTableName};
DROP TABLE IF EXISTS ${hiveconf:customerAddressTableName};
DROP TABLE IF EXISTS ${hiveconf:customerDemographicsTableName};
DROP TABLE IF EXISTS ${hiveconf:dateTableName};
DROP TABLE IF EXISTS ${hiveconf:householdDemographicsTableName};
DROP TABLE IF EXISTS ${hiveconf:incomeTableName};
DROP TABLE IF EXISTS ${hiveconf:itemTableName};
DROP TABLE IF EXISTS ${hiveconf:promotionTableName};
DROP TABLE IF EXISTS ${hiveconf:reasonTableName};
DROP TABLE IF EXISTS ${hiveconf:shipModeTableName};
DROP TABLE IF EXISTS ${hiveconf:storeTableName};
DROP TABLE IF EXISTS ${hiveconf:timeTableName};
DROP TABLE IF EXISTS ${hiveconf:warehouseTableName};
DROP TABLE IF EXISTS ${hiveconf:webSiteTableName};
DROP TABLE IF EXISTS ${hiveconf:webPageTableName};
DROP TABLE IF EXISTS ${hiveconf:inventoryTableName};
DROP TABLE IF EXISTS ${hiveconf:storeSalesTableName};
DROP TABLE IF EXISTS ${hiveconf:storeReturnsTableName};
DROP TABLE IF EXISTS ${hiveconf:webSalesTableName};
DROP TABLE IF EXISTS ${hiveconf:webReturnsTableName};
DROP TABLE IF EXISTS ${hiveconf:marketPricesTableName};
DROP TABLE IF EXISTS ${hiveconf:clickstreamsTableName};
DROP TABLE IF EXISTS ${hiveconf:reviewsTableName};

cleanData

1. 刪除hdfs上 /user/root/benchmarks/bigbench/data 目錄
2. 刪除hdfs上 /user/root/benchmarks/bigbench/data_refresh 目錄
echo "cleaning ${BIG_BENCH_HDFS_ABSOLUTE_INIT_DATA_DIR}"
hadoop fs -rm -r -f -skipTrash "${BIG_BENCH_HDFS_ABSOLUTE_INIT_DATA_DIR}"
echo "cleaning ${BIG_BENCH_HDFS_ABSOLUTE_REFRESH_DATA_DIR}"
hadoop fs -rm -r -f -skipTrash "${BIG_BENCH_HDFS_ABSOLUTE_REFRESH_DATA_DIR}"

dataGen

1. 建立目錄 /user/root/benchmarks/bigbench/data 並賦予許可權
2. 建立目錄 /user/root/benchmarks/bigbench/data_refresh 並賦予許可權
3. 呼叫 HadoopClusterExec.jar 和 pdgf.jar 生成 base data 到 /user/root/benchmarks/bigbench/data 目錄下
4. 呼叫 HadoopClusterExec.jar 和 pdgf.jar 生成 refresh data 到 /user/root/benchmarks/bigbench/data_refresh 目錄下

建立目錄 /user/root/benchmarks/bigbench/data 並賦予許可權

runCmdWithErrorCheck hadoop fs -mkdir -p "${BIG_BENCH_HDFS_ABSOLUTE_INIT_DATA_DIR}"
runCmdWithErrorCheck hadoop fs -chmod 777 "${BIG_BENCH_HDFS_ABSOLUTE_INIT_DATA_DIR}"

建立目錄 /user/root/benchmarks/bigbench/data_refresh 並賦予許可權

runCmdWithErrorCheck hadoop fs -mkdir -p "${BIG_BENCH_HDFS_ABSOLUTE_REFRESH_DATA_DIR}"
runCmdWithErrorCheck hadoop fs -chmod 777 "${BIG_BENCH_HDFS_ABSOLUTE_REFRESH_DATA_DIR}"

呼叫 HadoopClusterExec.jar 和 pdgf.jar 生成 base data

runCmdWithErrorCheck hadoop jar "${BIG_BENCH_TOOLS_DIR}/HadoopClusterExec.jar" -archives "${PDGF_ARCHIVE_PATH}" ${BIG_BENCH_DATAGEN_HADOOP_EXEC_DEBUG} -taskFailOnNonZeroReturnValue -execCWD "${PDGF_DISTRIBUTED_NODE_DIR}" ${HadoopClusterExecOptions} -exec ${BIG_BENCH_DATAGEN_HADOOP_JVM_ENV} -cp "${HADOOP_CP}:pdgf.jar" ${PDGF_CLUSTER_CONF} pdgf.Controller -nc HadoopClusterExec.tasks -nn HadoopClusterExec.taskNumber -ns -c -sp REFRESH_PHASE 0 -o "'${BIG_BENCH_HDFS_ABSOLUTE_INIT_DATA_DIR}/'+table.getName()+'/'" ${BIG_BENCH_DATAGEN_HADOOP_OPTIONS} -s ${BIG_BENCH_DATAGEN_TABLES} ${PDGF_OPTIONS} "[email protected]" 2>&1 | tee -a "$BIG_BENCH_DATAGEN_STAGE_LOG" 2>&1

呼叫 HadoopClusterExec.jar 和 pdgf.jar 生成 refresh data

runCmdWithErrorCheck hadoop jar "${BIG_BENCH_TOOLS_DIR}/HadoopClusterExec.jar" -archives "${PDGF_ARCHIVE_PATH}" ${BIG_BENCH_DATAGEN_HADOOP_EXEC_DEBUG} -taskFailOnNonZeroReturnValue -execCWD "${PDGF_DISTRIBUTED_NODE_DIR}" ${HadoopClusterExecOptions} -exec ${BIG_BENCH_DATAGEN_HADOOP_JVM_ENV} -cp "${HADOOP_CP}:pdgf.jar" ${PDGF_CLUSTER_CONF} pdgf.Controller -nc HadoopClusterExec.tasks -nn HadoopClusterExec.taskNumber -ns -c -sp REFRESH_PHASE 1 -o "'${BIG_BENCH_HDFS_ABSOLUTE_REFRESH_DATA_DIR}/'+table.getName()+'/'" ${BIG_BENCH_DATAGEN_HADOOP_OPTIONS} -s ${BIG_BENCH_DATAGEN_TABLES} ${PDGF_OPTIONS} "[email protected]" 2>&1 | tee -a "$BIG_BENCH_DATAGEN_STAGE_LOG" 2>&1 

populateMetastore

該過程是真正的建立資料庫表的過程。建表的過程呼叫的是 $BENCH_MARK_HOME/engines/hive/population/ 目錄下的 hiveCreateLoad.sql ,通過該sql檔案來建資料庫表。

  1. 從 /user/root/benchmarks/bigbench/data 路徑下讀取 .dat 的原始資料,生成 TEXTFILE 格式的外部臨時表
  2. select * from 臨時表 來建立最終的 ORC 格式的資料庫表
  3. 刪除外部臨時表。

從 /user/root/benchmarks/bigbench/data 路徑下讀取 .dat 的原始資料,生成 TEXTFILE 格式的外部臨時表

DROP TABLE IF EXISTS ${hiveconf:customerTableName}${hiveconf:temporaryTableSuffix};
CREATE EXTERNAL TABLE ${hiveconf:customerTableName}${hiveconf:temporaryTableSuffix}
  ( c_customer_sk             bigint              --not null
  , c_customer_id             string              --not null
  , c_current_cdemo_sk        bigint
  , c_current_hdemo_sk        bigint
  , c_current_addr_sk         bigint
  , c_first_shipto_date_sk    bigint
  , c_first_sales_date_sk     bigint
  , c_salutation              string
  , c_first_name              string
  , c_last_name               string
  , c_preferred_cust_flag     string
  , c_birth_day               int
  , c_birth_month             int
  , c_birth_year              int
  , c_birth_country           string
  , c_login                   string
  , c_email_address           string
  , c_last_review_date        string
  )
  ROW FORMAT DELIMITED FIELDS TERMINATED BY '${hiveconf:fieldDelimiter}'
  STORED AS TEXTFILE LOCATION '${hiveconf:hdfsDataPath}/${hiveconf:customerTableName}'
;

select * from 臨時表 來建立最終的 ORC 格式的資料庫表

DROP TABLE IF EXISTS ${hiveconf:customerTableName};
CREATE TABLE ${hiveconf:customerTableName}
STORED AS ${hiveconf:tableFormat}
AS
SELECT * FROM ${hiveconf:customerTableName}${hiveconf:temporaryTableSuffix}
;

刪除外部臨時表

DROP TABLE ${hiveconf:customerTableName}${hiveconf:temporaryTableSuffix};

runQuery

1. runQuery 呼叫每個query下的 run.sh 裡的 `query_run_main_method()` 方法
2. `query_run_main_method()` 呼叫 `runEngineCmd` 來執行query指令碼(qxx.sql)

runQuery 呼叫每個query下的 run.sh 裡的 query_run_main_method() 方法

QUERY_MAIN_METHOD="query_run_main_method"
-----------------------------------------
"$QUERY_MAIN_METHOD" 2>&1 | tee -a "$LOG_FILE_NAME" 2>&1

query_run_main_method() 呼叫 runEngineCmd 來執行query指令碼(qxx.sql)

query_run_main_method () {
    QUERY_SCRIPT="$QUERY_DIR/$QUERY_NAME.sql"
    if [ ! -r "$QUERY_SCRIPT" ]
    then
        echo "SQL file $QUERY_SCRIPT can not be read."
        exit 1
    fi

    runCmdWithErrorCheck runEngineCmd -f "$QUERY_SCRIPT"
    return $?
}

一般情況下 query_run_main_method () 方法只是執行對應的query指令碼,但是像 q05、q20... 這些查詢,用到了機器學習演算法,所以在執行對應的query指令碼後會把生成的結果表作為輸入,然後呼叫執行機器學習演算法(如聚類、邏輯迴歸)的jar包繼續執行,得到最終的結果。

runEngineCmd () {
  if addInitScriptsToParams
  then
    "$BINARY" "${BINARY_PARAMS[@]}" "${INIT_PARAMS[@]}" "[email protected]"
  else
    return 1
  fi
}
--------------------------
BINARY="/usr/bin/hive"
BINARY_PARAMS+=(--hiveconf BENCHMARK_PHASE=$BIG_BENCH_BENCHMARK_PHASE --hiveconf STREAM_NUMBER=$BIG_BENCH_STREAM_NUMBER --hiveconf QUERY_NAME=$QUERY_NAME --hiveconf QUERY_DIR=$QUERY_DIR --hiveconf RESULT_TABLE=$RESULT_TABLE --hiveconf RESULT_DIR=$RESULT_DIR --hiveconf TEMP_TABLE=$TEMP_TABLE --hiveconf TEMP_DIR=$TEMP_DIR --hiveconf TABLE_PREFIX=$TABLE_PREFIX)
INIT_PARAMS=(-i "$BIG_BENCH_QUERY_PARAMS_FILE" -i "$BIG_BENCH_ENGINE_SETTINGS_FILE")
INIT_PARAMS+=(-i "$LOCAL_QUERY_ENGINE_SETTINGS_FILE")

if [ -n "$USER_QUERY_PARAMS_FILE" ]
then
if [ -r "$USER_QUERY_PARAMS_FILE" ]
then
  echo "User defined query parameter file found. Adding $USER_QUERY_PARAMS_FILE to hive init."
  INIT_PARAMS+=(-i "$USER_QUERY_PARAMS_FILE")
else
  echo "User query parameter file $USER_QUERY_PARAMS_FILE can not be read."
  return 1
fi
fi

if [ -n "$USER_ENGINE_SETTINGS_FILE" ]
then
if [ -r "$USER_ENGINE_SETTINGS_FILE" ]
then
  echo "User defined engine settings file found. Adding $USER_ENGINE_SETTINGS_FILE to hive init."
  INIT_PARAMS+=(-i "$USER_ENGINE_SETTINGS_FILE")
else
  echo "User hive settings file $USER_ENGINE_SETTINGS_FILE can not be read."
  return 1
fi
fi
return 0

validateQuery

1. 呼叫每個query下的 run.sh 裡的 `query_run_validate_method()` 方法
2. `query_run_validate_method()` 比較 `$BENCH_MARK_HOME/engines/hive/queries/qxx/results/qxx-result` 和hdfs上 `/user/root/benchmarks/bigbench/queryResults/qxx_hive_${BIG_BENCH_BENCHMARK_PHASE}_${BIG_BENCH_STREAM_NUMBER}_result` 兩個檔案,如果一樣,則驗證通過,否則驗證失敗。
if diff -q "$VALIDATION_RESULTS_FILENAME" <(hadoop fs -cat "$RESULT_DIR/*")
then
    echo "Validation of $VALIDATION_RESULTS_FILENAME passed: Query returned correct results"
else
    echo "Validation of $VALIDATION_RESULTS_FILENAME failed: Query returned incorrect results"
    VALIDATION_PASSED="0"
fi

SF為1時(-f 1),用上面的方法比較,SF不為1(>1)時,只要hdfs上的結果表中行數大於等於1即驗證通過

if [ `hadoop fs -cat "$RESULT_DIR/*" | head -n 10 | wc -l` -ge 1 ]
then
    echo "Validation passed: Query returned results"
else
    echo "Validation failed: Query did not return results"
    return 1
fi

refreshMetastore

1. 呼叫 `$BENCH_MARK_HOME/engines/hive/refresh/` 目錄下的 `hiveRefreshCreateLoad.sql` 指令碼
2. `hiveRefreshCreateLoad.sql` 將hdfs上 `/user/root/benchmarks/bigbench/data_refresh/` 目錄下每個表資料插入外部臨時表
3. 外部臨時表再將每個表的資料插入Hive資料庫對應的表中

hiveRefreshCreateLoad.sql 將hdfs上 /user/root/benchmarks/bigbench/data_refresh/ 目錄下每個表資料插入外部臨時表

DROP TABLE IF EXISTS ${hiveconf:customerTableName}${hiveconf:temporaryTableSuffix};
CREATE EXTERNAL TABLE ${hiveconf:customerTableName}${hiveconf:temporaryTableSuffix}
  ( c_customer_sk             bigint              --not null
  , c_customer_id             string              --not null
  , c_current_cdemo_sk        bigint
  , c_current_hdemo_sk        bigint
  , c_current_addr_sk         bigint
  , c_first_shipto_date_sk    bigint
  , c_first_sales_date_sk     bigint
  , c_salutation              string
  , c_first_name              string
  , c_last_name               string
  , c_preferred_cust_flag     string
  , c_birth_day               int
  , c_birth_month             int
  , c_birth_year              int
  , c_birth_country           string
  , c_login                   string
  , c_email_address           string
  , c_last_review_date        string
  )
  ROW FORMAT DELIMITED FIELDS TERMINATED BY '${hiveconf:fieldDelimiter}'
  STORED AS TEXTFILE LOCATION '${hiveconf:hdfsDataPath}/${hiveconf:customerTableName}'
;
-----------------
set hdfsDataPath=${env:BIG_BENCH_HDFS_ABSOLUTE_REFRESH_DATA_DIR};

外部臨時表再將每個表的資料插入Hive資料庫對應的表中

INSERT INTO TABLE ${hiveconf:customerTableName}
SELECT * FROM ${hiveconf:customerTableName}${hiveconf:temporaryTableSuffix}
;

附錄

23張資料庫表

轉自簡書:https://www.jianshu.com/p/63cf1047b7e1