第9課:備份mysql數據庫、重寫父類、unittest框架、多線程
阿新 • • 發佈:2018-03-08
.sql 普通 解鎖 info root size print add AC
1. 寫代碼備份mysql數據庫:
1)Linux下,備份mysql數據庫,在shell下執行命令:mysqldump -uroot -p123456 -A >db_bak.sql即可
import os import datetime class BakDb(object): def __init__(self, ip, username, passwd, port=3306, path=‘/tmp/db_bak‘): self.ip = ip self.username = username self.passwd = passwd self.port = port self.path = path def check_path_exist(self): if not os.path.isdir(self.path): os.mkdir(self.path) def bak_db(self): # mysqldump -u%s -p%s -h%s -A > **.sql filename = str(datetime.date.today()) + ‘.sql‘ self.check_path_exist() abs_file = os.path.join(self.path, filename) # 變為絕對路徑 command = ‘‘‘ mysqldump -u{username} -p{passwd} -P{port} -h{ip} -A > {filename} ‘‘‘.format(username=self.username, passwd=self.passwd, port=self.port, ip=self.ip, filename=abs_file) print(command) os.system(command) print("數據庫備份完成") obj = BakDb(‘**.**.**.**‘, ‘user‘, ‘passwd‘) obj.bak_db()
2. 重寫父類的方法
3. 單元測試-unittest框架:
import unittest import HTMLTestRunner from BeautifulReport import BeautifulReport def calc(x, y): return x + y class TestCalc(unittest.TestCase): def setUp(self): print(‘我是setUp‘) def test_pass_case(self): ‘‘‘這個是通過的測試用例‘‘‘ res = calc(1, 2) self.assertEqual(3, res) def test_fail_case(self): ‘‘‘這個是失敗的測試用例‘‘‘ res = calc(9, 8) self.assertEqual(98, res) def test_a(self): ‘‘‘這是個普通的測試用例‘‘‘ pass def test_haha(self): ‘‘‘這是哈哈哈測試用例‘‘‘ def tearDown(self): print(‘我是tearDown‘) @classmethod def setUpClass(cls): # 所有的用例執行前運行一次 print(‘我是setUpClass‘) @classmethod def tearDownClass(cls): # 所有的用例運行完成後運行一次 print(‘我是tearDownClass‘) if __name__ == ‘__main__‘: unittest.main() # 會運行當前python文件中的所有測試用例 # suite = unittest.TestSuite() # 定義一個測試套件 # suite.addTest(TestCalc(‘test_pass_case‘))# addTest的參數是TestCase實例或TestSu # suite.addTest(TestCalc(‘test_fail_case‘)) # suite.addTest(TestCalc(‘test_a‘)) # 單個添加測試用例 # suite.addTests(unittest.makeSuite(TestCalc)) # addTests的參數是由測試用例或測試套件組成 # f = open(‘report.html‘, ‘wb‘) # runner = HTMLTestRunner.HTMLTestRunner(stream=f,title=‘測試報告‘,descriptio # runner.run(suite) # BeautifulReport包能夠生成界面更好看的測試報告 # result = BeautifulReport(suite) # result.report(filename=‘reportB.html‘,description=‘測試報告‘,log_path=‘.‘)
2)還有一種寫法,將所有的case寫在一個目錄下,然後寫一個運行所有case的代碼:
import unittest from BeautifulReport import BeautifulReport import xmlrunner # pip inttall xmlrunner suite = unittest.TestSuite() # TestLoader是用來加載TestCase到TestSuite中的 all_case = unittest.defaultTestLoader.discover(‘cases‘, ‘test*.py‘) # 第一個參數是目錄,第二個參數是以test開頭的是用例文件 for case in all_case: suite.addTests(case) # case的類型是<class ‘unittest.suite.TestSuite‘> print(suite) # 列表生成式 # [ suite.addTests(case) for case in all_case ] result = BeautifulReport(suite) result.report(filename=‘report_all_case.html‘, description=‘測試報告‘, log_path=‘.‘) # runner = xmlrunner.XMLTestRunner(‘.‘) # 為了產生xml格式的報告給Jenkins用,在當前目錄生成報告 # runner.run(suite) # 運行用例
test_buy.py import unittest class TestBuy(unittest.TestCase): def test_a(self): self.assertEqual(1, 1) def test_b(self): self.assertEqual(1, 2)
4. 多線程:
1)咱們打開的程序都是進程,進程中至少有個一個線程
2)線程包含在進程裏,線程是最小的執行單元,線程之間是相互獨立的
3)主線程起了n個子線程之後,繼續執行至結束,子線程可能仍在執行,子線程幹活時間未統計上,
所以要告訴主線程要等子線程執行完成後再結束程序
import threading import time import requests def sayHi(name): time.sleep(2) print(name) def downHtml(url, name): content = requests.get(url).content f = open(name, ‘wb‘) f.write(content) f.close() urls = [ [‘nnzhp‘, ‘http://www.nnzhp.cn‘], [‘dsx‘, ‘http://www.imdsx.cn‘], [‘besttest‘, ‘http://www.besttest.cn‘] ] # 單線程運行 # start_time = time.time() # for url in urls: # downHtml(url[1], url[0]) # end_time = time.time() # print(end_time-start_time) # 多線程運行 start_time = time.time() threads = [] for url in urls: t = threading.Thread(target=downHtml, args=(url[1], url[0])) # 啟動一個線程 t.start() # 運行 # 等待子線程幹完活 # t.join() # 主線程起了一個子線程後,等待子線程運行結束;再循環下一次起一個新的線程 threads.append(t) for t in threads: # 主線程一直循環等待3個子線程 直到它們都幹完活 t.join() # 主線程等待子線程 end_time = time.time() print(end_time - start_time) # for i in range(10): # t = threading.Thread(target=sayHi, args=(‘小黑‘,)) # 啟動一個線程,這裏小黑後面要加個逗號 # t.start() # 運行
4) 多進程模塊
import multiprocessing import time def run(): time.sleep(2) print("多進程") for i in range(5): p = multiprocessing.Process(target=run2) p.start() def run2(): print("多進程啟動") if __name__ == ‘__main__‘: for i in range(5): p = multiprocessing.Process(target=run) p.start()
5) 守護進程
import threading import time def pz(): time.sleep(2) print(‘跟班‘) threads = [] for i in range(50): t = threading.Thread(target=pz) t.setDaemon(True) # 設置子線程為守護線程,守護線程:一旦主線程立刻結束,那麽子線程立刻結束,不管子線程有沒有運行完, t.start() threads.append(t) # for t in threads: # t.join() # 如果線程調用t.join(),守護線程就不起作用了 time.sleep(3) print(‘done‘) # 不加t.join(), 先打印done,後打印50個跟班
6) 線程鎖
import threading from threading import Lock num = 0 lock = Lock() # 申請一把鎖 def run(): global num lock.acquire() # 加鎖 num += 1 lock.release() # 解鎖 lis = [] for i in range(5): t = threading.Thread(target=run) t.start() lis.append(t) for t in lis: t.join() print(‘over‘, num) # 加鎖是為了防止多線程時同時修改數據,可能會導致數據不正確。 # python3中不加鎖也無所謂,
第9課:備份mysql數據庫、重寫父類、unittest框架、多線程