基於ODPS的大資料應用源端感知
阿新 • • 發佈:2018-11-26
大資料雲平臺如ODPS是離線計算平臺,而源端的oracle資料庫,mysql資料庫等都是實時變化著的,一旦源端表結構發生變化,而云平臺又未及時獲知,對後續的應用業務,OGG,流計算等都會造成不小的麻煩,時間越長需要補做的資料就越多,甚至需要重做上雲
本文用shell編寫指令碼來對比源端和雲平臺之間的表結構,從而做到及時的源端感知,後續再用ETL工具把這些變化資料上雲,應用調取等,建立良好的報警機制,如此便可及時get到源端變化,從而做出相應措施。
雙端表結構對比指令碼如下:
#!/bin/bash ################################################################################ # SCRIPT_NAME : ty_compare_table_column.sh # # CREATE_TIME : 2018/08/17 # AUTHOR : Mochou_liqb # DESCRIBETION : comparing today's and yesterday's all_columns to get changing_tables # PARAMETER : see next # EXAMPLE : ./ty_compare_table_column.sh HX # UPDATE_RECORD : this is the first version and kill some bugs # # DATE OPERATION CZR DESCRIBETION # ________ _____________ ________ __________________________________ # # 2018/08/17 UPDATE Mochou_liqb ################################################################################ if [ $# -lt 2 ] ;then echo "請輸入兩個引數:一.源端系統名,二.目標端專案名" exit 1; fi #sending parameter baseConf=$1 baseProject=$2 echo "當前系統是 ${baseConf},對比目標端ODPS專案是 ${baseProject}" #building these pathes curdt="`date +%Y%m%d`" basePath="/u01/ZJSY/version/TY" shellPath="$basePath/shell" confBase="$basePath/$baseConf/conf" #desc every odpsTableName by odpscmd and save into this dir one by one #eg.file_name : [T/V]_[PROJECT]_[OWNER]_[TABLE_NAME] logPathConf="$shellPath/log/odpsTb/$baseConf/conf" logPathOra="$shellPath/log/odpsTb/$baseConf/ora" logPathReport="$shellPath/log/odpsTb/$baseConf/report/${curdt}" logPathTmp="$shellPath/log/odpsTb/$baseConf/tmp" odpsPath="/u01/ZJSY/ODPS/odpscmd_20" #build && delete log_dir if [ ! -d $logPathConf ] ;then mkdir -p $logPathConf fi if [ ! -d $logPathOra ] ;then mkdir -p $logPathOra fi if [ ! -d $logPathReport ] ;then mkdir -p $logPathReport fi if [ ! -d $logPathTmp ] ;then mkdir -p $logPathTmp fi if [ -f $logPathReport/ydbbgdjmx_$curdt.csv ] ;then rm -rf $logPathReport/ydbbgdjmx_$curdt.csv fi if [ -f $logPathReport/dbxx_$curdt.csv ] ;then rm -rf $logPathReport/dbxx_$curdt.csv fi if [ -f $logPathTmp/bgdjtmp_$curdt.txt ] ;then rm -rf $logPathTmp/bgdjtmp_$curdt.txt fi #get SOURCE_TABLE_COLUMNS and TARGET_TABLE_COLUMNS #oracle sqlplus environment variable by system servers reader="oraclereader" source $confBase/ty_datasource.conf if [[ "$reader" == "oraclereader" ]];then export ORACLE_HOME=$TY_ORACLE_HOME export LD_LIBRARY_PATH=$TY_LD_LIBRARY_PATH export NLS_LANG="$nls_lang" export PATH=$ORACLE_HOME/bin:$LD_LIBRARY_PATH:$PATH fi for line in `cat $confBase/ty_createJson_ql.conf | grep -v "^#"` do tableUser=`echo $line | awk -F '|' '{print $1}' | sed -r "s/\(|\)//g" | tr [a-z] [A-Z]` tableName=`echo $line | awk -F '|' '{print $2}' | sed -r "s/\(|\)//g" | tr [a-z] [A-Z]` tableODPS=`echo $line | awk -F '|' '{print $3}' | sed -r "s/\(|\)//g" | tr [a-z] [A-Z]` loadsql="SELECT COLUMN_NAME FROM ALL_TAB_COLUMNS WHERE OWNER = '$tableUser' AND TABLE_NAME = '$tableName' ORDER BY COLUMN_ID;" tableInfoSQL="$loadsql" result=`sqlplus -S $user/
[email protected]$jdbc <<END set heading off set feedback off set pagesize 0 set verify off set echo off set line 3000 $tableInfoSQL quit; END` sselect=`echo "$result"| awk '{printf "%s\n", $0}'` echo "$sselect" > $logPathOra/$tableODPS $odpsPath/bin/odpscmd -e "desc ${baseProject}.${tableODPS};" > $logPathConf/$tableODPS done #comparing source_table and target_table to get changing_tables #use odps_table_name one by one to get table_parameter table_num=0 table_num_bg=0 RWBH=`uuid | awk -F '-' '{print $1$2$3$4$5}'` dbsj=`date -d today +"%Y-%m-%d %T"` for line in `cat $confBase/ty_createJson_ql.conf | grep -v "^#"` do tableUser=`echo $line | awk -F '|' '{print $1}' | sed -r "s/\(|\)//g" | tr [a-z] [A-Z]` tableName=`echo $line | awk -F '|' '{print $2}' | sed -r "s/\(|\)//g" | tr [a-z] [A-Z]` tableODPS=`echo $line | awk -F '|' '{print $3}' | sed -r "s/\(|\)//g" | tr [a-z] [A-Z]` #read file and file type is OWNER|TABLE_NAME|[T/V]_[PROJECT]_[OWNER]_[TABLE_NAME]|PK_NAME|PAR-YES/NO|NUM_ROWS #get odps_table_column echo "======== tableODPS is : $tableODPS =============" #declare arr to save columns declare -a array_column_odps i=0 for arr in `cat $logPathConf/$tableODPS | grep -E 'bigint|string|boolean|double|datetime|decimal|Partition'| awk '{print $2}' | grep -vw 'Partition' | grep -vw 'rfq' | grep -vw 'ypt_jgsj' | grep -vw 'ypt_ysjczxl' | grep -vw 'ypt_ysjczlx' | grep -vw 'ypt_ysjczsj' | tr [a-z] [A-Z]` do #add element into array_name array_column_odps[i]="$arr" i=`expr $i+1` done #get oracle_column #oracle column to set #declare array_column_ora declare -a array_column_ora j=0 for arr_ora in `cat $logPathOra/$tableODPS` do #add element into array_name array_column_ora[j]=$arr_ora j=`expr $j+1` done echo "ODPS表結構是${array_column_odps[@]}" echo "源端表結構是${array_column_ora[@]}" if [ ${#array_column_ora[*]} -ge ${#array_column_odps[*]} ]; then num_comp=${#array_column_ora[*]} else num_comp=${#array_column_odps[*]} fi num_comp=0 #compare all columns for table in `ls $logPathOra` do if [ ${#array_column_ora[*]} -ge ${#array_column_odps[*]} ]; then num_comp=${#array_column_ora[*]} if [ "$table" = "$tableODPS" ] ;then for((k=0;k<${num_comp};k++)) do if [ "${array_column_ora[k]}" = "${array_column_odps[k]}" ]; then continue elif [ "${array_column_ora[k]}" != "${array_column_odps[k]}" ] && [ $k -le `expr ${num_comp} - 1` ] && [ ${#array_column_ora[*]} -eq ${#array_column_odps[*]} ]; then UUID=`uuid | awk -F '-' '{print $1$2$3$4$5}'` table_num_bg=`expr ${table_num_bg} + 1` # write into ydbbgdjmx echo "${UUID},${RWBH},預生產庫,${baseConf},${tableUser}.${tableName},${baseProject},$tableODPS,列欄位改名,${dbsj},${array_column_odps[@]},${array_column_ora[@]}" >> $logPathReport/ydbbgdjmx_$curdt.csv echo "$tableODPS 表結構發生變化,具體情況已寫入$curdt報告" break else UUID=`uuid | awk -F '-' '{print $1$2$3$4$5}'` table_num_bg=`expr ${table_num_bg} + 1` # write into ydbbgdjmx echo "${UUID},${RWBH},預生產庫,${baseConf},${tableUser}.${tableName},${baseProject},$tableODPS,列欄位新增,${dbsj},${array_column_odps[@]},${array_column_ora[@]}" >> $logPathReport/ydbbgdjmx_$curdt.csv echo "$tableODPS 表結構發生變化,具體情況已寫入$curdt報告" break fi done fi else num_comp=${#array_column_odps[*]} if [ "$table" = "$tableODPS" ] ;then for((k=0;k<${num_comp};k++)) do if [ "${array_column_ora[k]}" = "${array_column_odps[k]}" ]; then continue else UUID=`uuid | awk -F '-' '{print $1$2$3$4$5}'` table_num_bg=`expr ${table_num_bg} + 1` # write into ydbbgdjmx echo "${UUID},${RWBH},預生產庫,${baseConf},${tableUser}.${tableName},${baseProject},$tableODPS,列欄位減少,${dbsj},${array_column_odps[@]},${array_column_ora[@]}" >> $logPathReport/ydbbgdjmx_$curdt.csv echo "$tableODPS 表結構發生變化,具體情況已寫入$curdt報告" break fi done fi fi done echo "" echo "---------------下一個----------------" table_num=`expr ${table_num} + 1` # clear , start next unset array_column_odps unset array_column_ora done # more rows merge into one row # sed -i ':a ; N;s/\n/ / ; t a ; ' $logPathTmp/bgdjtmp_$curdt.txt # write into dbxx echo "${RWBH},預生產庫,${baseConf},${baseProject},${table_num},${table_num_bg},${dbsj}" >> $logPathReport/dbxx_$curdt.csv function checkDBlink(){ SQL="select to_char(sysdate,'yyyy-mm-dd') today from dual;" ii=0 flag=false DATE=$(date +%Y-%m-%d) while [ $ii -lt 3 ] do OK=`sqlplus -S $user/[email protected]$jdbc <<END set heading off set feedback off set pagesize 0 set verify off set echo off set line 3000 $SQL quit; END` ii=$[ii+1] if [[ $OK == $DATE ]] ; then flag=true; echo "資料庫連線連線成功,開始執行指令碼!"; break; fi sleep 5; done if [[ $flag == false ]] ;then echo "資料庫連線失敗,請檢查資料庫連線資訊!"; fi exit 0 } checkDBlink