1. 程式人生 > >python 學習筆記(九): 資料庫壓測程式設計

python 學習筆記(九): 資料庫壓測程式設計

這個程式碼是利用多執行緒多mysql資料庫批量插入資料,可用於mysql壓測

#!/usr/bin/python
# -*- coding: utf-8 -*-
from __future__ import print_function
import argparse
import threading
import pymysql
from contextlib import contextmanager
import string
import random
import time


DB_NAME = 'test_insert_data_db'
TABLE_NAME = 'test_insert_data_table'
CREATE_TABLE = """create table {0} (id int(10) not null auto_increment,name varchar(255) not null default 0,datetime double not null,PRIMARY KEY (`id`))""".format(TABLE_NAME) def _argparse(): parse = argparse.ArgumentParser(description='benchmark tool for mysql database') parse.add_argument('--host'
,action='store',default='localhost',help='connect to host',dest='host') parse.add_argument('--user',action='store',required=True,dest='user',help='user to login mysql') parse.add_argument('--password',action='store',dest='passwd',required=True,help='login mysql passwd') parse.add_argument('--port'
,action='store',dest='port',default=3306,type=int,help='port number to use for connection or 3306 for default') parse.add_argument('--thread_size',action='store',dest='thread_size',default=5,type=int,help='how much connection for database usage') parse.add_argument('--row_size',action='store',dest='row_size',default=5000,type=int,help='how much rows') parse.add_argument('--v','--version',action='version',version='%(prog)s 0.1') return parse.parse_args(); @contextmanager def get_conn(**kwargs): conn = pymysql.connect(**kwargs) try: yield conn finally: conn.close() def create_table(conn): sqls = [ 'drop database if exists {0};'.format(DB_NAME), 'create database {0};'.format(DB_NAME), 'use {0};'.format(DB_NAME), CREATE_TABLE ] with conn as cur: for sql in sqls: cur.execute(sql) # 生成隨機字串 def random_string(length=10): s = string.letters + string.digits return ''.join(random.sample(s,length)) def add_rows(cur): sql = 'insert into `{0}` (name,datetime) value ("{1}","{2}")'.format(TABLE_NAME,random_string(),time.time()) cur.execute(sql) # 插入多條資料 def insert_data(conn_args,rowsize): with get_conn(**conn_args) as conn: with conn as cur: cur.execute('use {0}'.format(DB_NAME)) with conn as cur: for i in range(rowsize): add_rows(cur) conn.commit() def main(): parser = _argparse() conn_args = dict(host=parser.host,user=parser.user,password=parser.passwd,port=parser.port) with get_conn(**conn_args) as conn: create_table(conn) threads = [] for i in range(parser.thread_size): t = threading.Thread(target=insert_data,args=(conn_args,parser.row_size)) threads.append(t) t.start() for t in threads: t.join() if __name__ == '__main__': main()