1. 程式人生 > >python DBUtils 線程池 連接 Postgresql(多線程公用線程池,DB-API : psycopg2)

python DBUtils 線程池 連接 Postgresql(多線程公用線程池,DB-API : psycopg2)

work 風險 等待 put pro 連接數 exist eve self.

一、DBUtils

DBUtils 是一套允許線程化 Python 程序可以安全和有效的訪問數據庫的模塊,DBUtils提供兩種外部接口: PersistentDB :提供線程專用的數據庫連接,並自動管理連接。 PooledDB :提供線程間可共享的數據庫連接,並自動管理連接。

操作數據庫模板:

  1 import datetime
  2 import sys
  3 import os
  4 import configparser
  5 import logging
  6 import psycopg2
  7 
  8 from  DBUtils.PooledDB import
PooledDB 9 10 11 12 13 class DatabaseOperator(object): 14 ‘‘‘ 15 class for database operator 16 ‘‘‘ 17 18 19 def __init__(self, 20 database_config_path, database_config=None): 21 ‘‘‘ 22 Constructor 23 ‘‘‘ 24 self._database_config_path = database_config_path
25 26 # load database configuration 27 if not database_config : 28 self._database_config = self.parse_postgresql_config(database_config_path) 29 else: 30 self._database_config = database_config 31 self._pool = None 32 33
def database_config_empty(self): 34 if self._database_config: 35 return False 36 else: 37 return True 38 39 def parse_postgresql_config(self, database_config_path=None): 40 ‘‘‘解析pei數據庫配置文件 41 參數 42 --------- 43 arg1 : conf_file 44 數據庫配置文件路徑 45 返回值 46 -------- 47 dict 48 解析配置屬性dict--config 49 50 示例 51 -------- 52 53 ‘‘‘ 54 if database_config_path == None and self._database_config_path != None: 55 database_config_path = self._database_config_path 56 if not os.path.isfile(database_config_path): 57 sys.exit("ERROR: Could not find configuration file: {0}".format(database_config_path)) 58 parser = configparser.SafeConfigParser() 59 parser.read(database_config_path) 60 config = {} 61 config[database] = parser.get(UniMonDB, Database) 62 config[db_user] = parser.get(UniMonDB, UserName) 63 config[db_passwd] = parser.get(UniMonDB, Password) 64 config[db_port] = parser.getint(UniMonDB, Port) 65 config[db_host] = parser.get(UniMonDB, Servername) 66 self._database_config = config 67 68 return config 69 70 71 def get_pool_conn(self): 72 73 if not self._pool: 74 self.init_pgsql_pool() 75 return self._pool.connection() 76 77 def init_pgsql_pool(self): 78 ‘‘‘利用數據庫屬性連接數據庫 79 參數 80 --------- 81 arg1 : config 82 數據庫配置屬性 83 返回值 84 -------- 85 86 示例 87 -------- 88 89 ‘‘‘ 90 # 字典config是否為空 91 config = self.parse_postgresql_config() 92 POSTGREIP = config[db_host] 93 POSTGREPORT = config[db_port] 94 POSTGREDB = config[database] 95 POSTGREUSER = config[db_user] 96 POSTGREPASSWD = config[db_passwd] 97 try: 98 logging.info(Begin to create {0} postgresql pool on:{1}.\n.format(POSTGREIP, datetime.datetime.now())) 99 100 pool = PooledDB( 101 creator=psycopg2, # 使用鏈接數據庫的模塊mincached 102 maxconnections=6, # 連接池允許的最大連接數,0和None表示不限制連接數 103 mincached=1, # 初始化時,鏈接池中至少創建的空閑的鏈接,0表示不創建 104 maxcached=4, # 鏈接池中最多閑置的鏈接,0和None不限制 105 blocking=True, # 連接池中如果沒有可用連接後,是否阻塞等待。True,等待;False,不等待然後報錯 106 maxusage=None, # 一個鏈接最多被重復使用的次數,None表示無限制 107 setsession=[], # 開始會話前執行的命令列表。 108 host=POSTGREIP, 109 port=POSTGREPORT, 110 user=POSTGREUSER, 111 password=POSTGREPASSWD, 112 database=POSTGREDB) 113 self._pool = pool 114 logging.info(SUCCESS: create postgresql success.\n) 115 116 except Exception as e: 117 logging.error(ERROR: create postgresql pool failed:{0}\n) 118 self.close_db_cursor() 119 sys.exit(ERROR: create postgresql pool error caused by {0}.format(str(e))) 120 121 122 def pg_select_operator(self, sql): 123 ‘‘‘進行查詢操作,函數返回前關閉cursor,conn 124 參數 125 --------- 126 arg1 : sql查詢語句 127 返回值 128 -------- 129 list:result 130 類型為list的查詢結果:result 131 132 示例 133 -------- 134 135 ‘‘‘ 136 # 執行查詢 137 try: 138 conn = self.get_pool_conn() 139 cursor = conn.cursor() 140 cursor.execute(sql) 141 result = cursor.fetchall() 142 except Exception as e: 143 logging.error(ERROR: execute {0} causes error.format(sql)) 144 sys.exit(ERROR: load data from database error caused {0}.format(str(e))) 145 finally: 146 cursor.close() 147 conn.close() 148 return result 149 150 def test_pool_con(self): 151 sql = select * from tbl_devprofile 152 result = self.pg_select_operator(sql) 153 print(result) 154 155 def pg_insert_operator(self, sql): 156 157 result = False 158 try: 159 conn = self.get_pool_conn() 160 cursor = conn.cursor() 161 cursor.execute(sql) 162 result = True 163 except Exception as e: 164 logging.error(ERROR: execute {0} causes error.format(sql)) 165 sys.exit(ERROR: insert data from database error caused {0}.format(str(e))) 166 finally: 167 cursor.close() 168 conn.commit() 169 conn.close() 170 return result 171 172 def pg_update_operator(self, sql): 173 174 result = False 175 try: 176 conn = self.get_pool_conn() 177 cursor = conn.cursor() 178 cursor.execute(sql) 179 result = True 180 except Exception as e: 181 logging.error(ERROR: execute {0} causes error.format(sql)) 182 sys.exit(ERROR: update data from database error caused {0}.format(str(e))) 183 finally: 184 cursor.close() 185 conn.commit() 186 conn.close() 187 return result 188 189 def pg_delete_operator(self, sql): 190 result = False 191 # 執行查詢 192 try: 193 conn = self.get_pool_conn() 194 cursor = conn.cursor() 195 cursor.execute(sql) 196 result = True 197 except Exception as e: 198 logging.error(ERROR: execute {0} causes error.format(sql)) 199 sys.exit(ERROR: delete data from database error caused {0}.format(str(e))) 200 finally: 201 cursor.close() 202 conn.commit() 203 conn.close() 204 return result 205 206 207 def close_pool(self): 208 ‘‘‘關閉pool 209 參數 210 --------- 211 212 213 返回值 214 -------- 215 216 示例 217 -------- 218 219 ‘‘‘ 220 if self._pool != None: 221 self._pool.close() 222 223 if __name__ == __main__: 224 path = "E:\\Users\\Administrator\\eclipse-workspace\\com.leagsoft.basemodule\\base\\config\\sql_conf.conf" 225 db = DatabaseOperator( 226 database_config_path=path) 227 db.test_pool_con()

二、多線程

原理:創建多個線程類,多個線程類共享一個隊裏Queue,每一個線程類可以操作數據庫

 1 from threading import Thread
 2     
 3 class Worker(Thread):
 4     def __init__(self, queue):
 5         Thread.__init__(self)
 6         self.queue = queue
 7  
 8     def run(self):
 9         while True:
10             # Get the work from the queue and expand the tuple
11             # 從隊列中獲取任務
12             database_operator, device, stand_alone_result = self.queue.get()
13             operateResult(database_operator, device, stand_alone_result)
14             # 任務執行完之後要通知隊列
15             self.queue.task_done()

填充隊列

 1     # 使用隊列多線程
 2     logging.info(begin to update all device risk score by multi_processing.\n)
 3     from queue import Queue
 4     queue = Queue()
 5     # 六個線程,每個線程共享一個隊列
 6     for _ in range(6):
 7         worker = Worker(queue)
 8         worker.setDaemon(True)
 9         worker.start()
10           
11     for record in all_devid:
12         device = record[0]
13         devtype = record[1]
14         all_countlist = all_dict.get(device)
15         stand_alone_result = device_assess(all_countlist)
16         if (devtype in (server_devtype + computer_devtype)) and (stand_alone_result < 100):
17             stand_alone_result *= 0.8
18         # 將設備風險評分數據保存到數據庫中
19         queue.put((database_operator, device, stand_alone_result))
20      
21     #等待隊列任務執行完
22     queue.join()
23 
24 
25 def operateResult(database_operator, device, stand_alone_result):
26     ‘‘‘
27     函數名稱: device_assess
28     描述:  保存單臺設備分數到數據庫
29     調用: 無
30     被調用:  main
31     被訪問的表: tbl_devprofile
32     被修改的表: 無
33     輸入參數: database_operator, device:設備uid, stand_alone_result:單臺設備風險分數
34     輸出參數:無
35     返回值: 單臺設備風險分數值
36     其它:  無
37     ‘‘‘
38     import time
39     find_profile_sql = "SELECT uiddevrecordid FROM tbl_devprofile WHERE uiddevrecordid=‘{0}‘;".format(device)
40     isExistRecord = database_operator.pg_select_operator(find_profile_sql)
41     #currentTime=datetime.datetime.now().strftime(‘%Y-%m-%d %H:%M:%S‘)
42     currentTime=time.strftime(%Y-%m-%d %H:%M:%S,time.localtime(time.time()))
43     if len(isExistRecord) > 0:
44         updata_profile_sql = "UPDATE tbl_devprofile SET irisklevel={0}, dtrisktime=‘{1}‘ 45                               WHERE uiddevrecordid=‘{2}‘;".format(stand_alone_result, currentTime, device)
46         database_operator.pg_update_operator(updata_profile_sql)
47     else:
48         insert_profile_sql = "INSERT INTO tbl_devprofile VALUES(‘{0}‘,NULL,NULL,NULL,NULL,NULL,NULL,NULL,{1},‘{2}‘);".format(
49             device, stand_alone_result, currentTime)
50         database_operator.pg_insert_operator(insert_profile_sql)

使用單線程時,執行完代碼花費20s左右,使用多線程時花費5s左右。

Reference:

[1] https://blog.csdn.net/zhaihaifei/article/details/54016939

[2] https://www.cnblogs.com/hao-ming/p/7215050.html?utm_source=itdadao&utm_medium=referral

[3] https://www.cnblogs.com/wozijisun/p/6160065.html (多線程)

[4] http://www.lpfrx.com/archives/4431/

[5] https://www.cnblogs.com/95lyj/p/9047554.html

python DBUtils 線程池 連接 Postgresql(多線程公用線程池,DB-API : psycopg2)