python連線各種資料庫的類方法
阿新 • • 發佈:2018-11-02
連線postgresql資料庫
import psycopg2 # 用來操作資料庫的類 class GPCommand(object): # 類的初始化 def __init__(self): self.hostname = '10.1.2.xx' self.username = 'xxx' self.password = 'xxx' self.database = 'xxx' def connectGp(self): try: #連結資料庫 #讀取配置利用connect連結資料庫 self.connect = psycopg2.connect( host=self.hostname, user=self.username, password=self.password, dbname=self.database ) #建立一個新的cursor self.cursor = self.connect.cursor() print("connect gp successful."+'\n' + '資料庫連線成功') return ('con_successful') except psycopg2.Error: error = 'Failed to setup Postgres environment.\n{0}'.format(sys.exc_info()) print('connect gp error.'+'\n' + '資料庫連線失敗') return 'con_error'+ error def insertTrans(self,trans_name,url_name): pass
連線mysql資料庫
import pymysql class MysqlDB: def __init__(self): self.hostname = '1x.1.2.xx' self.port = 3306 self.username = 'xx' self.password = 'xx' self.database = 'xxxx' def connectmysql(self): try: condb = pymysql.connect(host=self.hostname, port=self.port, user=self.username, passwd=self.password, db=self.database) print("連結成功") return condb except Exception as e: print("連線異常", e) # except Exception: # info = sys.exc_info() # print("連線異常", info[1]) def insertTable(self): # 獲取資料庫連線 condb = self.connectmysql() # 使用cursor() 方法建立一個遊標物件 cursor cursor = condb.cursor() try: # 執行sql語句 sql = "select * from cn_customer.bank" cursor.execute(sql) # 提交到資料庫執行 condb.commit() except Exception: # 方法一:捕獲所有異常 # 如果發生異常,則回滾 info = sys.exc_info() print("發生異常", info[1]) condb.rollback() finally: # 最終關閉資料庫連線 condb.close()
連線sqlserver
import pymssql class SqlserverDB: def __init__(self): self.hostname = 'xxx.1.2.xxxx' self.username = 'xxxxxx' self.password = 'xxxxx' self.database = 'xxxxx' def connectmysql(self): try: condb = pymssql.connect(host=self.hostname, user=self.username, password=self.password, database=self.database) return condb except Exception as e: print("連線異常", e) #獲取的table_columns名稱,長度等資訊 def load_columns(self,table_name): # 獲取資料庫連線 condb = self.connectmysql() # 使用cursor() 方法建立一個遊標物件 cursor cursor = condb.cursor() fields = [] sql = "SELECT c.COLUMN_NAME ,COALESCE(a.ispriamarykey,'N') AS ispriamarykey,case when c.IS_NULLABLE = 'NO' then 'not null' else '' end AS IS_NULLABLE,c.DATA_TYPE,c.CHARACTER_MAXIMUM_LENGTH\ FROM [INFORMATION_SCHEMA].[COLUMNS] c\ left join (SELECT name,'Y' AS ispriamarykey FROM syscolumns WHERE id=Object_Id('%s') and colid IN(SELECT keyno from sysindexkeys WHERE id=Object_Id('%s'))) a \ on c.COLUMN_NAME = a.name WHERE TABLE_NAME = '%s'" % (table_name,table_name,table_name) cursor.execute(sql) table_info = cursor.fetchall() for res in table_info: #print(res) desc = { 'column_name': res[0].lower(), 'table_name': table_name.lower(), 'type': self._convert_type(res[3]).lower(), 'length':res[4] , 'primary_key': res[1], 'null':res[2] } fields.append(desc) #print(fields) self.postgres_create(fields)