1. 程式人生 > >基於ODPS的大資料應用源端感知

基於ODPS的大資料應用源端感知

大資料雲平臺如ODPS是離線計算平臺,而源端的oracle資料庫,mysql資料庫等都是實時變化著的,一旦源端表結構發生變化,而云平臺又未及時獲知,對後續的應用業務,OGG,流計算等都會造成不小的麻煩,時間越長需要補做的資料就越多,甚至需要重做上雲

本文用shell編寫指令碼來對比源端和雲平臺之間的表結構,從而做到及時的源端感知,後續再用ETL工具把這些變化資料上雲,應用調取等,建立良好的報警機制,如此便可及時get到源端變化,從而做出相應措施。

雙端表結構對比指令碼如下:

#!/bin/bash
################################################################################
# SCRIPT_NAME     : ty_compare_table_column.sh
#
# CREATE_TIME     : 2018/08/17
# AUTHOR          : Mochou_liqb
# DESCRIBETION    : comparing today's and yesterday's all_columns to get changing_tables
# PARAMETER       : see next
# EXAMPLE         : ./ty_compare_table_column.sh HX
# UPDATE_RECORD   : this is the first version and kill some bugs
#
# DATE      OPERATION       CZR         DESCRIBETION
# ________  _____________   ________    __________________________________
#
# 2018/08/17  UPDATE Mochou_liqb
################################################################################

if [ $# -lt 2 ] ;then
	echo  "請輸入兩個引數:一.源端系統名,二.目標端專案名"
	exit 1;
fi
#sending parameter
baseConf=$1
baseProject=$2
echo "當前系統是 ${baseConf},對比目標端ODPS專案是 ${baseProject}"
#building these pathes
curdt="`date +%Y%m%d`"
basePath="/u01/ZJSY/version/TY"
shellPath="$basePath/shell"
confBase="$basePath/$baseConf/conf"
#desc every odpsTableName by odpscmd and save into this dir one by one
#eg.file_name : [T/V]_[PROJECT]_[OWNER]_[TABLE_NAME]
logPathConf="$shellPath/log/odpsTb/$baseConf/conf"
logPathOra="$shellPath/log/odpsTb/$baseConf/ora"
logPathReport="$shellPath/log/odpsTb/$baseConf/report/${curdt}"
logPathTmp="$shellPath/log/odpsTb/$baseConf/tmp"
odpsPath="/u01/ZJSY/ODPS/odpscmd_20"
#build && delete log_dir
if [ ! -d $logPathConf ] ;then
	mkdir -p $logPathConf
fi
if [ ! -d $logPathOra ] ;then
	mkdir -p $logPathOra
fi
if [ ! -d $logPathReport ] ;then
	mkdir -p $logPathReport
fi
if [ ! -d $logPathTmp ] ;then
	mkdir -p $logPathTmp
fi
if [ -f $logPathReport/ydbbgdjmx_$curdt.csv ] ;then
	rm -rf $logPathReport/ydbbgdjmx_$curdt.csv
fi
if [ -f $logPathReport/dbxx_$curdt.csv ] ;then
	rm -rf $logPathReport/dbxx_$curdt.csv
fi
if [ -f $logPathTmp/bgdjtmp_$curdt.txt ] ;then
	rm -rf $logPathTmp/bgdjtmp_$curdt.txt
fi
#get SOURCE_TABLE_COLUMNS and TARGET_TABLE_COLUMNS
#oracle sqlplus  environment variable by system servers
reader="oraclereader"
source $confBase/ty_datasource.conf
if [[ "$reader" == "oraclereader" ]];then
	export ORACLE_HOME=$TY_ORACLE_HOME
	export LD_LIBRARY_PATH=$TY_LD_LIBRARY_PATH
	export NLS_LANG="$nls_lang"
	export PATH=$ORACLE_HOME/bin:$LD_LIBRARY_PATH:$PATH
fi

for line in `cat $confBase/ty_createJson_ql.conf | grep -v "^#"`
do
	tableUser=`echo $line | awk -F '|' '{print $1}' | sed -r "s/\(|\)//g" | tr [a-z] [A-Z]`
	tableName=`echo $line | awk -F '|' '{print $2}' | sed -r "s/\(|\)//g" | tr [a-z] [A-Z]`
	tableODPS=`echo $line | awk -F '|' '{print $3}' | sed -r "s/\(|\)//g" | tr [a-z] [A-Z]`
	loadsql="SELECT COLUMN_NAME FROM ALL_TAB_COLUMNS  WHERE OWNER = '$tableUser' AND TABLE_NAME = '$tableName' ORDER BY COLUMN_ID;"
	tableInfoSQL="$loadsql"
	result=`sqlplus -S $user/
[email protected]
$jdbc <<END set heading off set feedback off set pagesize 0 set verify off set echo off set line 3000 $tableInfoSQL quit; END` sselect=`echo "$result"| awk '{printf "%s\n", $0}'` echo "$sselect" > $logPathOra/$tableODPS $odpsPath/bin/odpscmd -e "desc ${baseProject}.${tableODPS};" > $logPathConf/$tableODPS done #comparing source_table and target_table to get changing_tables #use odps_table_name one by one to get table_parameter table_num=0 table_num_bg=0 RWBH=`uuid | awk -F '-' '{print $1$2$3$4$5}'` dbsj=`date -d today +"%Y-%m-%d %T"` for line in `cat $confBase/ty_createJson_ql.conf | grep -v "^#"` do tableUser=`echo $line | awk -F '|' '{print $1}' | sed -r "s/\(|\)//g" | tr [a-z] [A-Z]` tableName=`echo $line | awk -F '|' '{print $2}' | sed -r "s/\(|\)//g" | tr [a-z] [A-Z]` tableODPS=`echo $line | awk -F '|' '{print $3}' | sed -r "s/\(|\)//g" | tr [a-z] [A-Z]` #read file and file type is OWNER|TABLE_NAME|[T/V]_[PROJECT]_[OWNER]_[TABLE_NAME]|PK_NAME|PAR-YES/NO|NUM_ROWS #get odps_table_column echo "======== tableODPS is : $tableODPS =============" #declare arr to save columns declare -a array_column_odps i=0 for arr in `cat $logPathConf/$tableODPS | grep -E 'bigint|string|boolean|double|datetime|decimal|Partition'| awk '{print $2}' | grep -vw 'Partition' | grep -vw 'rfq' | grep -vw 'ypt_jgsj' | grep -vw 'ypt_ysjczxl' | grep -vw 'ypt_ysjczlx' | grep -vw 'ypt_ysjczsj' | tr [a-z] [A-Z]` do #add element into array_name array_column_odps[i]="$arr" i=`expr $i+1` done #get oracle_column #oracle column to set #declare array_column_ora declare -a array_column_ora j=0 for arr_ora in `cat $logPathOra/$tableODPS` do #add element into array_name array_column_ora[j]=$arr_ora j=`expr $j+1` done echo "ODPS表結構是${array_column_odps[@]}" echo "源端表結構是${array_column_ora[@]}" if [ ${#array_column_ora[*]} -ge ${#array_column_odps[*]} ]; then num_comp=${#array_column_ora[*]} else num_comp=${#array_column_odps[*]} fi num_comp=0 #compare all columns for table in `ls $logPathOra` do if [ ${#array_column_ora[*]} -ge ${#array_column_odps[*]} ]; then num_comp=${#array_column_ora[*]} if [ "$table" = "$tableODPS" ] ;then for((k=0;k<${num_comp};k++)) do if [ "${array_column_ora[k]}" = "${array_column_odps[k]}" ]; then continue elif [ "${array_column_ora[k]}" != "${array_column_odps[k]}" ] && [ $k -le `expr ${num_comp} - 1` ] && [ ${#array_column_ora[*]} -eq ${#array_column_odps[*]} ]; then UUID=`uuid | awk -F '-' '{print $1$2$3$4$5}'` table_num_bg=`expr ${table_num_bg} + 1` # write into ydbbgdjmx echo "${UUID},${RWBH},預生產庫,${baseConf},${tableUser}.${tableName},${baseProject},$tableODPS,列欄位改名,${dbsj},${array_column_odps[@]},${array_column_ora[@]}" >> $logPathReport/ydbbgdjmx_$curdt.csv echo "$tableODPS 表結構發生變化,具體情況已寫入$curdt報告" break else UUID=`uuid | awk -F '-' '{print $1$2$3$4$5}'` table_num_bg=`expr ${table_num_bg} + 1` # write into ydbbgdjmx echo "${UUID},${RWBH},預生產庫,${baseConf},${tableUser}.${tableName},${baseProject},$tableODPS,列欄位新增,${dbsj},${array_column_odps[@]},${array_column_ora[@]}" >> $logPathReport/ydbbgdjmx_$curdt.csv echo "$tableODPS 表結構發生變化,具體情況已寫入$curdt報告" break fi done fi else num_comp=${#array_column_odps[*]} if [ "$table" = "$tableODPS" ] ;then for((k=0;k<${num_comp};k++)) do if [ "${array_column_ora[k]}" = "${array_column_odps[k]}" ]; then continue else UUID=`uuid | awk -F '-' '{print $1$2$3$4$5}'` table_num_bg=`expr ${table_num_bg} + 1` # write into ydbbgdjmx echo "${UUID},${RWBH},預生產庫,${baseConf},${tableUser}.${tableName},${baseProject},$tableODPS,列欄位減少,${dbsj},${array_column_odps[@]},${array_column_ora[@]}" >> $logPathReport/ydbbgdjmx_$curdt.csv echo "$tableODPS 表結構發生變化,具體情況已寫入$curdt報告" break fi done fi fi done echo "" echo "---------------下一個----------------" table_num=`expr ${table_num} + 1` # clear , start next unset array_column_odps unset array_column_ora done # more rows merge into one row # sed -i ':a ; N;s/\n/ / ; t a ; ' $logPathTmp/bgdjtmp_$curdt.txt # write into dbxx echo "${RWBH},預生產庫,${baseConf},${baseProject},${table_num},${table_num_bg},${dbsj}" >> $logPathReport/dbxx_$curdt.csv function checkDBlink(){ SQL="select to_char(sysdate,'yyyy-mm-dd') today from dual;" ii=0 flag=false DATE=$(date +%Y-%m-%d) while [ $ii -lt 3 ] do OK=`sqlplus -S $user/
[email protected]
$jdbc <<END set heading off set feedback off set pagesize 0 set verify off set echo off set line 3000 $SQL quit; END` ii=$[ii+1] if [[ $OK == $DATE ]] ; then flag=true; echo "資料庫連線連線成功,開始執行指令碼!"; break; fi sleep 5; done if [[ $flag == false ]] ;then echo "資料庫連線失敗,請檢查資料庫連線資訊!"; fi exit 0 } checkDBlink