1. 程式人生 > >通過Load table命令將資料檔案載入到Sybase IQ資料庫裡面的Python指令碼

通過Load table命令將資料檔案載入到Sybase IQ資料庫裡面的Python指令碼

CREATE TABLE poc_app.sys_ftp_cfg
(
    ftp_id              varchar(100) NOT NULL,          --話單檔名標記
    ftp_cycle_id        varchar(1) NOT NULL,            --話單檔名週期
    ftp_stage_filepath  varchar(255) NOT NULL,          --話單處理後路徑
    ftp_stage_filereg   varchar(100) NOT NULL,          --話單處理後名稱格式
    stage_schema        varchar(100) NOT NULL,          --schema名稱
    table_name          varchar(100) NOT NULL,          --表名
    delimiter_type_id varchar(10) NOT NULL              --分隔符
);
 
insert into poc_app.sys_ftp_cfg
values('jiang_test_d','D','/home/sybase/day','jiang_test_[YYYYMMDD].dat','poc_app','jiang_test','|');

#!/usr/bin/python

#-*- encoding: utf-8 -*-
####################################################################################
# name:     SybaseIQ_LoadData.py
# describe: 通過Load table命令將資料檔案載入到Sybase IQ資料庫裡面
####################################################################################
import os
import pyodbc
import string
import sys
from subprocess import Popen,PIPE
import ConfigParser
reload(sys)
sys.setdefaultencoding('utf8')

'''
將資料檔案載入到Sybase IQ資料庫裡面
'''
class SybaseIQLoad:
    debug = 0
    def __init__(self,dbinfo):
        self.UID = dbinfo[1]
        self.PWD = dbinfo[2]
        odbcinfo    = 'DSN=%s;UID=%s;PWD=%s'%(dbinfo[0],dbinfo[1],dbinfo[2])
        self.cnxn   = pyodbc.connect(odbcinfo,autocommit=True,ansi=True)
        self.cursor = self.cnxn.cursor()

    def __del__(self):
        if self.cursor:
            self.cursor.close()
        if self.cnxn:
            self.cnxn.close()

    def _printinfo(self,msg):
        print "%s"%(msg)
        print "\n"

    def _GetStageName(self,ftp_stage_filereg,ftp_cycle_id,cur_static_time):
        if ftp_cycle_id.lower() == 'h':
            ftp_stage_filename = ftp_stage_filereg.replace('[YYYYMMDDHH]',cur_static_time[0:10])            
        if ftp_cycle_id.lower() == 'd':
            ftp_stage_filename = ftp_stage_filereg.replace('[YYYYMMDD]',cur_static_time[0:8])
        if ftp_cycle_id.lower() == 'w':
            ftp_stage_filename = ftp_stage_filereg.replace('[YYYY_WW]',cur_static_time[0:7])
        if ftp_cycle_id.lower() == 'm':
            ftp_stage_filename = ftp_stage_filereg.replace('[YYYYMM]',cur_static_time[0:6])
        return  ftp_stage_filename
            
    def _getLoadInfo(self,ftp_id):
        sql = '''
                select
                  ftp_cycle_id
                 ,ftp_stage_filepath
                 ,ftp_stage_filereg
                 ,stage_schema
                 ,delimiter_type_id
                 ,table_name
                from  jiang.sys_ftp_cfg
                where  ftp_id = '%s'
              ''' %(ftp_id)
        self.cursor.execute(sql.strip())
        row = self.cursor.fetchone()
        return row
    
    def _getSybIQServInfo(self):
        # 儲存SybaseIQ的主機和埠號     
        sybservinfo = []
        
        # ODBC配置檔案絕對路徑    
        unixodbc_file = "/etc/unixODBC/odbc.ini"
        config = ConfigParser.ConfigParser()
        config.read(unixodbc_file)
        # 獲取SybaseIQ的IP地址             
        ServerIP = config.get("SybaseIQDSN", "Server")         
        # 獲取SybaseIQ的埠號            
        Port = config.get("SybaseIQDSN", "Port")
        
        # 儲存獲取的IP地址和埠號    
        sybservinfo.append(ServerIP)
        sybservinfo.append(Port)
        
        return sybservinfo
        
    def loaddata(self,ftp_id,cur_static_time):
        #取檔案載入相關配置資訊
        row =  self._getLoadInfo(ftp_id)

        ftp_cycle_id        = row[0]
        ftp_stage_filepath  = row[1]
        ftp_stage_filereg   = row[2]
        stage_schema        = row[3]
        delimiter_type_id   = row[4]
        table_name          = row[5]
        
        # 獲取指定日期的檔名    
        ftp_stage_filename  = self._GetStageName(ftp_stage_filereg,ftp_cycle_id,cur_static_time)
        
        # 獲取清洗後文件的絕對路徑    
        ftp_stage_absolute_filename = os.path.join(ftp_stage_filepath,ftp_stage_filename)

        # 對清洗後的檔案再進行處理  
        #ftp_stage_absolute_filename_final = ftp_stage_absolute_filename + '*'

        # 獲取SybaseIQ的主機IP地址和埠號    
        sybaseiq_ipport = self._getSybIQServInfo()  
                
        # 獲取表的所有欄位     
        table_columns = '''
                            select column_name
                              from syscolumn a
                              join systable b
                             on a.table_id = b.table_id  
                            where b.table_name = '%s' ># /tmp/table_name.log
                        '''%(table_name)
        load_sql='''dbisql -c "uid=%s;pwd=%s" -Host %s -port %s -nogui "%s"'''%(self.UID,self.PWD,sybaseiq_ipport[0],sybaseiq_ipport[1],table_columns)
        os.system(load_sql)
        
        # 處理生成的表字段檔案    
        columns_sql = '''
                        cat /tmp/table_name.log | sed "s/'//g" | awk '{printf "%s,",$0}'| sed 's/,$//g'
                      '''
        result = Popen(columns_sql,shell=True,stdout=PIPE,stderr=PIPE)
        right_info = result.stdout.read().strip('\xef|\xbb|\xbf')
        err_info = result.stderr.read()               
           
        loadsql = '''
                    load table %s.cpms_area_user
                    (
                        %s    
                    )
                    USING FILE '%s'
                    FORMAT ASCII
                    ESCAPES OFF
                    QUOTES OFF
                    NOTIFY 1000000
                    DELIMITED BY '%s'
                    WITH CHECKPOINT ON;
                    COMMIT;
                  '''%(stage_schema, right_info, ftp_stage_absolute_filename, delimiter_type_id)                          

        try:
            iserr = 0
            print "*************Begin to execute load table command...*************\n"
            if self.debug == 1:
                self._printinfo(loadsql.strip())   
            #self.cursor.execute(loadsql.strip())    
            loadsql='''dbisql -c "uid=%s;pwd=%s" -Host %s -port %s -nogui "%s"'''%(self.UID,self.PWD,sybaseiq_ipport[0],sybaseiq_ipport[1],loadsql)
            os.system(loadsql)            
            print "\n*************End to execute load table command...*************"
            print "**************************Successful**************************"
        except Exception,err:
            iserr = 1
            print "Return value %s,Error %s" % (iserr,err)
       
        return iserr    
#Main
def main():
    # 檢查傳入引數個數     
    if len(sys.argv) < 6 :
        print 'usage: python SybaseIQ_LoadData.py SybaseDSN username password ftp_id cur_static_time\n'
        sys.exit(1)
    
    # 定義連線Sybase IQ的資訊      
    dbinfo = []
    #dbinfo.append('SybaseIQDSN')
    #dbinfo.append('jiang')
    #dbinfo.append('jiang')
    dbinfo.append(sys.argv[1])
    dbinfo.append(sys.argv[2])
    dbinfo.append(sys.argv[3])
    
    ftp_id             = sys.argv[4]
    cur_static_time    = sys.argv[5]
    
    SIQ  =  SybaseIQLoad(dbinfo)
    ret =  SIQ.loaddata(ftp_id,cur_static_time)
    return  ret
if __name__ == '__main__':
   sys.exit(main())