1. 程式人生 > >大資料ETL實踐探索(1)---- python 與oracle資料庫匯入匯出

大資料ETL實踐探索(1)---- python 與oracle資料庫匯入匯出

文章大綱

ETL 簡介

ETL,是英文 Extract-Transform-Load 的縮寫,用來描述將資料從來源端經過抽取(extract)、互動轉換(transform)、載入(load)至目的端的過程。

系列文章:

1.(大資料ETL實踐探索(1)
2.大資料ETL實踐探索(2)
3.windows下python3 使用cx_Oracle,xlrd外掛進行excel資料清洗錄入

工具的選擇

之前有過一些文章,對工作中使用到的工具和指令碼進行一些梳理


1. oracle資料泵 匯入匯出實戰

1.1 資料庫建立

本文主要使用最新版本的oracle 12c,如果建立oracle資料庫時候使用了資料庫容器(CDB)承載多個可插拔資料庫(PDB)的模式,那麼資料庫的使用者名稱需要用c##開頭,使用資料泵進行操作 的時候也有一些不同:

在CDB中,只能建立以c##或C##開頭的使用者,如果不加c##,則會提示錯誤“ORA-65096:公用使用者名稱或角色名無效”,只有在PDB資料庫中才能建立我們習慣性命名的使用者,oracle稱之為Local User,前者稱之為Common User。

建立的時候不要勾選:

https://www.cnblogs.com/xqzt/p/5034261.html
https://www.cnblogs.com/fanyongbin/p/5699482.html


1.2. installs Oracle

Download and install Oracle 12C,

Http://www.oracle.com/technetwork/database/enterprise-edition/downloads/index.html

Under windows10, use sqlplus to log in

you should first


set oracle_sid=orcl

Sqlplus /nolog

conn /as SYSDBA

Creating a user

Syntax [creating the user]:

create user username identified by password [that is the password];

E.g.

Create user [username] identified by [password];

Default tablespace [tablespacename]

Temporary tablespace temp;

Grant DBA to username;

.
由於全庫匯入的時候oracle_home和之前的資料庫發生了改變,所以原來資料庫的表空間需要提前建立。可以根據匯出日誌或者匯入日誌的報錯,檢視原來資料庫中到底有那些表空間。特別注意有關檢視和索引的表空間和使用者也需要提起建立好。當然如果你只要資料的話就不太影像了。基本上使用表空間就可以全部匯入。

Create table space :

E.g

Create tablespace xxx datafile'f:\xxx.dbf'size 200M AUTOEXTEND on;

1.3 export / import data from oracle

從oracle庫中匯出 資料可以使用oracle資料泵程式,全庫匯出例項如下:


Expdp username/password FULL=y DUMPFILE=dpump_dir1:full1%U.dmp, dpump_dir2:full2%U.dmp

FILESIZE=2G PARALLEL=3 LOGFILE=dpump_dir1:expfull.log JOB_NAME=job

以下命令的匯入並不是全庫匯入,如果需要全庫匯入的話,由於oracle_home 的改變,需要提前建立好使用者和表空間,以及索引的表空間,檢視的使用者等

command as follow:

Impdp username/[email protected] full=y directory=dir_yiliao dumpfile=full1%U.dmp remap_schema=username_old:username_new exclude=GRANT remap_tablespace='(t1:tempt1, t2:tempt2) '  tablespaces=tempt1,temp2

以下兩種匯入方式只能二選一:

  • full=y
  • tablespaces=tempt1,temp2

整體說明
https://www.2cto.com/database/201605/508212.html


2. 將資料庫表匯出成 CSV, 並批量上傳至 AWS

2.1 export all table to CSV

使用oracle函式 utl_file 進行快速匯入匯出(一分鐘300萬條的量級),這個比spool快多啦

CREATE OR REPLACE PROCEDURE SQL_TO_CSV
(
 P_QUERY IN VARCHAR2, -- PLSQL文
 P_DIR IN VARCHAR2, -- 匯出的檔案放置目錄
 P_FILENAME IN VARCHAR2 -- CSV名
 )
 IS
  L_OUTPUT UTL_FILE.FILE_TYPE;
  L_THECURSOR INTEGER DEFAULT DBMS_SQL.OPEN_CURSOR;
  L_COLUMNVALUE VARCHAR2(4000);
  L_STATUS INTEGER;
  L_COLCNT NUMBER := 0;
  L_SEPARATOR VARCHAR2(1);
  L_DESCTBL DBMS_SQL.DESC_TAB;
  P_MAX_LINESIZE NUMBER := 32000;
BEGIN
  --OPEN FILE
  L_OUTPUT := UTL_FILE.FOPEN(P_DIR, P_FILENAME, 'W', P_MAX_LINESIZE);
  --DEFINE DATE FORMAT
  EXECUTE IMMEDIATE 'ALTER SESSION SET NLS_DATE_FORMAT=''YYYY-MM-DD HH24:MI:SS''';
  --OPEN CURSOR
  DBMS_SQL.PARSE(L_THECURSOR, P_QUERY, DBMS_SQL.NATIVE);
  DBMS_SQL.DESCRIBE_COLUMNS(L_THECURSOR, L_COLCNT, L_DESCTBL);
  --DUMP TABLE COLUMN NAME
  FOR I IN 1 .. L_COLCNT LOOP
    UTL_FILE.PUT(L_OUTPUT,L_SEPARATOR || '"' || L_DESCTBL(I).COL_NAME || '"'); --輸出表欄位
    DBMS_SQL.DEFINE_COLUMN(L_THECURSOR, I, L_COLUMNVALUE, 4000);
    L_SEPARATOR := ',';
  END LOOP;
  UTL_FILE.NEW_LINE(L_OUTPUT); --輸出表欄位
  --EXECUTE THE QUERY STATEMENT
  L_STATUS := DBMS_SQL.EXECUTE(L_THECURSOR);
 
  --DUMP TABLE COLUMN VALUE
  WHILE (DBMS_SQL.FETCH_ROWS(L_THECURSOR) > 0) LOOP
    L_SEPARATOR := '';
    FOR I IN 1 .. L_COLCNT LOOP
      DBMS_SQL.COLUMN_VALUE(L_THECURSOR, I, L_COLUMNVALUE);
      UTL_FILE.PUT(L_OUTPUT,
                  L_SEPARATOR || '"' ||
                  TRIM(BOTH ' ' FROM REPLACE(L_COLUMNVALUE, '"', '""')) || '"');
      L_SEPARATOR := ',';
    END LOOP;
    UTL_FILE.NEW_LINE(L_OUTPUT);
  END LOOP;
  --CLOSE CURSOR
  DBMS_SQL.CLOSE_CURSOR(L_THECURSOR);
  --CLOSE FILE
  UTL_FILE.FCLOSE(L_OUTPUT);
EXCEPTION
  WHEN OTHERS THEN
    RAISE;
END;
 
/

建立資料庫目錄

sql>create or replace directory OUT_PATH as 'D:\';

執行以下sql

SELECT 'EXEC sql_to_csv(''select * from ' ||T.TABLE_NAME ||''',''OUT_PATH''' || ',''ODS_MDS.' || T.TABLE_NAME ||'.csv'');' FROM user_TABLES T where t.TABLE_NAME='表名'

得到以下的批量sql,匯出來,生成.sql指令碼,在命令列中執行即可.

EXEC sql_to_csv('select * from table1','OUT_PATH','table1.csv');
EXEC sql_to_csv('select * from table2','OUT_PATH','table2.csv');

For reference, the links are as follows

Https://blog.csdn.net/huangzhijie3918/article/details/72732816


4. oracle table-檢視 windows 批處理 匯出

4.1 使用win32 指令碼呼叫sqlplus 匯出檢視

輸入年月等資訊,拼接字串匯出表, 下面 的指令碼可以迴圈接受輸入

@echo off
:begin
::年份
set input_year=
set /p input_year=Please input year :
::月份
set input_month=
set /p input_month=Please input month :

::字串字首
set prefix=ex_vw_
::字串拼接

set "table_name=%prefix%%input_year%%input_month%"

echo Your input table_name:%table_name%
echo Your input time:%input_year%-%input_month%

::sqlplus 執行sql指令碼 後帶引數
sqlplus username/[email protected]/instanceNname @createtable.sql %table_name% %input_year%-%input_month%



rem pause>null

goto begin

以下sql指令碼為createtable.sql,接受兩個引數,寫做:&1 ,&2 …如果多個引數可以依次寫下去。


drop table &1;

create table &1 as select * from some_table_view where incur_date_from = to_date('&2-01','yyyy-mm-dd');

Insert into &1 select * from some_table_view where incur_date_from = to_date('&2-02','yyyy-mm-dd');

commit;
quit;

後來發現一個問題,比如上面的第2小節的儲存過程 SQL_TO_CSV,死活沒法成功執行,只好安裝cx_oracle ,用python 匯出了,程式碼如下。

4.2 使用python 執行檢視匯出

主要邏輯是,按照月份 ,執行檢視生成這個月每天的資料插入到表中,當一個月的資料執行完畢,將這個月份表匯出。
類似這種流程化的東西,python果然還是利器

# -*- coding:utf-8 -*-
"""@author:[email protected]:[email protected]:2018/5/211:19"""

import cx_Oracle
import calendar

########################連結資料庫相關######################################

def getConnOracle(username,password,ip,service_name):
    try:
        conn = cx_Oracle.connect(username+'/'+password+'@'+ip+'/'+service_name)  # 連線資料庫
        return conn
    except Exception:
        print(Exception)
#######################進行資料批量插入#######################


def insertOracle(conn,year,month,day):

    yearandmonth = year + month

    table_name ='ex_vw_'+ yearandmonth
    cursor = conn.cursor()
##建立表之前先刪除表
    try:
        str_drop_table = '''drop table ''' + table_name
        cursor.execute(str_drop_table)
    except cx_Oracle.DatabaseError as msg:
        print(msg)

    try:
#create table and insert
        str_create_table = '''create table ''' + table_name+ ''' as select * from EXPORT where date_from = to_date(' '''+ year + '-'+ month + '''-01','yyyy-mm-dd')'''
        print(str_create_table)
        cursor.execute(str_create_table)

        for i in range(2,day):
            if i < 10:
                str_incert =  '''Insert into ''' + table_name +''' select * from EXPORT where date_from = to_date(' '''+ year+'-'+month+'-'+str(0)+str(i)+'''','yyyy-mm-dd')'''
            else:
                str_incert = '''Insert into ''' + table_name + ''' select * from EXPORT where date_from = to_date(' '''+ year+'-'+month+'-'+ str(i)+'''','yyyy-mm-dd')'''
            print(str_incert)
            cursor.execute(str_incert)
            conn.commit()

        conn.commit()

#export big_table
        str_QUERY  = 'select * from ex_vw_'+ yearandmonth
        str_DIR = 'OUT_PATH'
        str_FILENAME  = 'EXPORT'+yearandmonth+'.csv'

        cursor.callproc('SQL_TO_CSV', [str_QUERY,str_DIR, str_FILENAME])

    except cx_Oracle.DatabaseError as msg:
        print(msg)


#匯出資料後drop table
    try:
        str_drop_table = '''drop table ''' + table_name
        print(str_drop_table)
        cursor.execute(str_drop_table)
    except cx_Oracle.DatabaseError as msg:
        print(msg)

    cursor.close()


def main():
    username = 'xxx'
    password = 'xxx'
    ip = '127.0.0.1'
    service_name = 'orcl'
    #獲取資料庫連結
    conn = getConnOracle(username,password,ip,service_name)

    monthlist = ['06','05','04','03','02','01']
    daylist = [30,31,30,31,28,31]
    for i in range(0,len(monthlist)):
        insertOracle(conn,'2018',monthlist[i],daylist[i]+1)




    conn.close()

if __name__ == '__main__':
    main()

參考

OS.ENVIRON詳解
python編碼

再次強烈推薦,精通oracle+python系列:官方文件

http://www.oracle.com/technetwork/cn/articles/dsl/mastering-oracle-python-1391323-zhs.html

離線版本下載連結:
http://download.csdn.net/detail/wangyaninglm/9815726