python 學習筆記(九): 資料庫壓測程式設計
阿新 • • 發佈:2018-12-09
這個程式碼是利用多執行緒多mysql資料庫批量插入資料,可用於mysql壓測
#!/usr/bin/python
# -*- coding: utf-8 -*-
from __future__ import print_function
import argparse
import threading
import pymysql
from contextlib import contextmanager
import string
import random
import time
DB_NAME = 'test_insert_data_db'
TABLE_NAME = 'test_insert_data_table'
CREATE_TABLE = """create table {0} (id int(10) not null auto_increment,name varchar(255) not null default 0,datetime double not null,PRIMARY KEY (`id`))""".format(TABLE_NAME)
def _argparse():
parse = argparse.ArgumentParser(description='benchmark tool for mysql database')
parse.add_argument('--host' ,action='store',default='localhost',help='connect to host',dest='host')
parse.add_argument('--user',action='store',required=True,dest='user',help='user to login mysql')
parse.add_argument('--password',action='store',dest='passwd',required=True,help='login mysql passwd')
parse.add_argument('--port' ,action='store',dest='port',default=3306,type=int,help='port number to use for connection or 3306 for default')
parse.add_argument('--thread_size',action='store',dest='thread_size',default=5,type=int,help='how much connection for database usage')
parse.add_argument('--row_size',action='store',dest='row_size',default=5000,type=int,help='how much rows')
parse.add_argument('--v','--version',action='version',version='%(prog)s 0.1')
return parse.parse_args();
@contextmanager
def get_conn(**kwargs):
conn = pymysql.connect(**kwargs)
try:
yield conn
finally:
conn.close()
def create_table(conn):
sqls = [
'drop database if exists {0};'.format(DB_NAME),
'create database {0};'.format(DB_NAME),
'use {0};'.format(DB_NAME),
CREATE_TABLE
]
with conn as cur:
for sql in sqls:
cur.execute(sql)
# 生成隨機字串
def random_string(length=10):
s = string.letters + string.digits
return ''.join(random.sample(s,length))
def add_rows(cur):
sql = 'insert into `{0}` (name,datetime) value ("{1}","{2}")'.format(TABLE_NAME,random_string(),time.time())
cur.execute(sql)
# 插入多條資料
def insert_data(conn_args,rowsize):
with get_conn(**conn_args) as conn:
with conn as cur:
cur.execute('use {0}'.format(DB_NAME))
with conn as cur:
for i in range(rowsize):
add_rows(cur)
conn.commit()
def main():
parser = _argparse()
conn_args = dict(host=parser.host,user=parser.user,password=parser.passwd,port=parser.port)
with get_conn(**conn_args) as conn:
create_table(conn)
threads = []
for i in range(parser.thread_size):
t = threading.Thread(target=insert_data,args=(conn_args,parser.row_size))
threads.append(t)
t.start()
for t in threads:
t.join()
if __name__ == '__main__':
main()