1. 程式人生 > >快速將資料寫入PG 庫當中 Copy 方法 通過python 呼叫

快速將資料寫入PG 庫當中 Copy 方法 通過python 呼叫

1、在PG庫中建立表

create table sip_sip.tablename
(
id bigserial not null,
column1 varchar(32),
column2 varchar(32)
)

2、 清洗資料整理好格式如下:

IP 地址 逗號 IP地址這種形式

3、 上傳檔案到linux 上

4、編寫python 程式碼

# coding=utf-8
import io
import sys
import datetime
import pandas as pd
from sqlalchemy import create_engine

class Copy_Data2PG:
    def __init__(self, table_name):
        self.table_name = table_name
        config = {
            "pg_config": {
                "host": "xxxx",
                "user": "xxxx",
                "password": "xxxx",
                "port": "xxxx",
                "database": "xxxx"
                    }
        }
        user = config["pg_config"]["user"]
        password = config["pg_config"]["password"]
        host = config["pg_config"]["host"]
        database = config["pg_config"]["database"]
        postgresql_url = 'postgresql+psycopg2://{}:{}@{}:5432/{}'.format(user, password, host, database)
        sip_black_data_engine = create_engine(str(postgresql_url), pool_size=10, max_overflow=20)

        # 初始化連結引擎 設定引數為: pool_size=10, max_overflow=20
        self.sip_black_data_engine = sip_black_data_engine

    """
        入庫寫入到PG庫中
    """

    def write_to_table(self, df, table_name, sip_black_data_engine, if_exists=u'append'):
        db_engine = sip_black_data_engine
        string_data_io = io.BytesIO()
        df.to_csv(string_data_io, sep=',', index=False)
        pd_sql_engine = pd.io.sql.pandasSQL_builder(db_engine)
        table = pd.io.sql.SQLTable(table_name, pd_sql_engine, frame=df,
                                   index=False, if_exists=if_exists, schema=u'sip_sip')
        table.create()
        string_data_io.seek(0)
        string_data_io.readline()  # remove header
        with db_engine.connect() as connection:
            with connection.connection.cursor() as cursor:
                copy_cmd = "COPY 模式名稱.%s(col1,col2) FROM STDIN HEADER DELIMITER ',' CSV" %table_name
                cursor.copy_expert(copy_cmd, string_data_io)
            connection.connection.commit()



    def main(self):
        result_dataframe = pd.read_csv('/dist/MonitoringBlackIP/Monitor_history_abroad2telecom_ip_data/11.txt',delimiter=',')

        self.write_to_table(result_dataframe, self.table_name, self.sip_black_data_engine)

if __name__ == "__main__":

    table_name = "history_abroad2telecom_ip_data"
    gddfttb = Copy_Data2PG(table_name)
    gddfttb.main()

執行程式碼:

 

 

可檢視資料:

 

資料全部很快匯入了。

記得點個贊哦!