1. 程式人生 > >Python封裝連線資料庫

Python封裝連線資料庫

1. 資料庫配置

#db.conf
#配置資料庫
[database]
dbhost=127.0.0.1
dbport=3366
dbname=test
dbuser=test
dbpassword=test
dbcharset=utf8

2. 讀取配置

#encoding:utf-8
#name:mod_config.py

import ConfigParser
import os

#獲取config配置檔案
def getConfig(section, key):
    config = ConfigParser.ConfigParser()
    path = os.path.split(os.path.realpath(__file__))[0
] + '\db.conf' # linux / config.read(path) return config.get(section, key) #其中 os.path.split(os.path.realpath(__file__))[0] 得到的是當前檔案模組的目錄 # print getConfig("database", "dbname") # print getConfig("database", "dbport")

讀取

import mod_config
dbname = mod_config.getConfig("database", "dbname"
)

3. log

# -*- coding: utf-8 -*-
"""
例程:
from somelib import logger

# 預設log存放目錄,需要在程式入口呼叫才能生效,可省略
logger.log_dir = "./app"
# log檔名字首,需要在程式入口呼叫才能生效,可省略
logger.log_name = "test_log"

conf = logger.Logger()
conf.debug('debug')
conf.warn('tr-warn')
conf.info('ds-info')
conf.error('ss-error')

"""

import
os, time, threading import datetime import logging import logging.handlers try: import codecs except ImportError: codecs = None log_dir = "log" log_name = "applog" _logger_init_lock = threading.Lock() class MyHandler(logging.handlers.TimedRotatingFileHandler): """ 自己定義的TimedRotatingFileHandler """ def __init__(self, log_dir, file_name_prefix): self.log_dir = log_dir self.file_name_prefix = file_name_prefix self._mkdirs() self.baseFilename = "%s.%s.log" % (os.path.join(self.log_dir, file_name_prefix), time.strftime("%Y%m%d")) logging.handlers.TimedRotatingFileHandler.__init__(self, self.baseFilename, when='midnight', interval=1, backupCount=0, encoding=None) def doRollover(self): self.stream.close() # get the time that this sequence started at and make it a TimeTuple t = self.rolloverAt - self.interval timeTuple = time.localtime(t) self.baseFilename = "%s.%s.log" % (os.path.join(self.log_dir, self.file_name_prefix), time.strftime("%Y%m%d")) if self.encoding: self.stream = codecs.open(self.baseFilename, 'a', self.encoding) else: self.stream = open(self.baseFilename, 'a') self.rolloverAt = self.rolloverAt + self.interval def _mkdirs(self): if not os.path.exists(self.log_dir): try: os.makedirs(self.log_dir) except Exception, e: print str(e) class Logger(object): __instance = None def __new__(classtype, *args, **kwargs): _logger_init_lock.acquire() if classtype != type(classtype.__instance): classtype.__instance = object.__new__(classtype, *args, **kwargs) classtype.__instance.init() _logger_init_lock.release() return classtype.__instance def init(self): # 建立日誌目錄 global log_dir, log_name self.log_dir = log_dir self.log_name = log_name self.is_debug = True self.is_info = True self.is_warn = True self.is_error = True self.logger_formatter = "[%(asctime)-15s,%(levelname)s] %(message)s" self.file_formatter = "[%(asctime)-15s,%(levelname)s] %(message)s" self._initLogger() def _initLogger(self): # 初始化logger logging.basicConfig(format=self.logger_formatter) self.logger = logging.getLogger("_sys") self.logger.setLevel(logging.DEBUG) # info、warn、error都放到info檔案 # error單獨放到error檔案 for t in (("info", logging.INFO), ("error", logging.ERROR)): filehandler = MyHandler(self.log_dir, "%s.%s" % (self.log_name, t[0])) filehandler.suffix = "%Y%m%d.log" filehandler.setLevel(t[1]) filehandler.setFormatter(logging.Formatter(self.file_formatter)) self.logger.addHandler(filehandler) # debug 單獨放到debug檔案 filehandler = MyHandler(self.log_dir, "%s.debug" % self.log_name) filehandler.suffix = "%Y%m%d.log" filehandler.setLevel(logging.DEBUG) filehandler.setFormatter(logging.Formatter(self.file_formatter)) self.logger.addHandler(filehandler) def getLogger(self): return self.logger def debug(self, msg): if self.is_debug: self.logger.debug(msg) def info(self, msg): if self.is_info: self.logger.info(msg) def warn(self, msg): if self.is_warn: self.logger.warn(msg) def error(self, msg): if self.is_error: self.logger.error(msg) def info(msg): Logger().info(msg) def warn(msg): Logger().warn(msg) def debug(msg): Logger().debug(msg) def error(msg): Logger().error(msg)

4. 連線資料庫

# encoding:utf-8
# name:mod_db.py
'''
使用方法:1.在主程式中先例項化DB Mysql資料庫操作類。
      2.使用方法:db=database()  db.fetch_all("sql")
'''
import MySQLdb
import MySQLdb.cursors
import mod_config
import mod_logger

DB = "database"
LOGPATH = mod_config.getConfig('path', 'logpath') + 'database.log'
DBNAME = mod_config.getConfig(DB, 'dbname')
DBHOST = mod_config.getConfig(DB, 'dbhost')
DBUSER = mod_config.getConfig(DB, 'dbuser')
DBPWD = mod_config.getConfig(DB, 'dbpassword')
DBCHARSET = mod_config.getConfig(DB, 'dbcharset')
DBPORT = mod_config.getConfig(DB, "dbport")
logger = mod_logger


# 資料庫操作類
class database:
    # 注,python的self等於其它語言的this
    def __init__(self, dbname=None, dbhost=None):
        self._logger = logger
        # 這裡的None相當於其它語言的NULL
        if dbname is None:
            self._dbname = DBNAME
        else:
            self._dbname = dbname
        if dbhost is None:
            self._dbhost = DBHOST
        else:
            self._dbhost = dbhost

        self._dbuser = DBUSER
        self._dbpassword = DBPWD
        self._dbcharset = DBCHARSET
        self._dbport = int(DBPORT)
        self._conn = self.connectMySQL()

        if (self._conn):
            self._cursor = self._conn.cursor()

    # 資料庫連線
    def connectMySQL(self):
        conn = False
        try:
            conn = MySQLdb.connect(host=self._dbhost,
                                   user=self._dbuser,
                                   passwd=self._dbpassword,
                                   db=self._dbname,
                                   port=self._dbport,
                                   cursorclass=MySQLdb.cursors.DictCursor,
                                   charset=self._dbcharset,
                                   )
            cursor = conn.cursor()

        except Exception, data:
            self._logger.error("connect database failed, %s" % data)
            conn = False
        return conn

    # 獲取查詢結果集
    def fetch_all(self, sql):
        res = ''
        if (self._conn):
            try:
                self._cursor.execute(sql)
                res = self._cursor.fetchall()
            except Exception, data:
                res = False
                self._logger.warn("query database exception, %s" % data)
        return res

    def update(self, sql):
        flag = False
        if (self._conn):
            try:
                self._cursor.execute(sql)
                self._conn.commit()
                flag = True
            except Exception, data:
                flag = False
                self._logger.warn("update database exception, %s" % data)

        return flag

    # 關閉資料庫連線
    def close(self):
        if (self._conn):
            try:
                if (type(self._cursor) == 'object'):
                    self._cursor.close()
                if (type(self._conn) == 'object'):
                    self._conn.close()
            except Exception, data:
                self._logger.warn("close database exception, %s,%s,%s" % (data, type(self._cursor), type(self._conn)))