Python操作MySQL與Python多程序
一、Python操作MySQL資料庫
利用Python語言操作資料庫,需要先下載pymysql,由於我之前下載了Anaconda並配置了系統變數,直接在命令列輸出:
conda install pymysql
如果沒有安裝過Anaconda,可通過以下命令列安裝:
pip install pymysql
安裝完畢後,通過以下程式碼訪問並操作資料庫MySQL。
import pymysql # user為資料庫使用者名稱,password為登入密碼,db為目標資料庫名 conn = pymysql.connect(host="127.0.0.1",user="root",password="123456",db="easyvideo") cs = conn.cursor() cs.execute("select * from admin") for i in cs: print("當前是第" + str(cs.rownumber) + "行") print("id:" + i[0]) #輸出資料庫表中對應該行的id print("username:" + i[1]) #輸出資料庫表中對應該行的username
二、Python多程序
建立程序的multiprocessing.Process類
我們來看看這個類的原型:
class multiprocessing.Process(group=None, target=None, name=None, args=(), kwargs={})
引數說明:
- target:表示呼叫物件,一般為函式,也可以為類。
- args:表示呼叫物件的置引數元組。
- kwargs:表示呼叫物件的字典。
- name:為程序的別名。
- group:引數不使用,可忽略。
類提供的常用方法:
- is_alive():返回程序是否是啟用的。
- join([timeout]) :可以等待子程序結束後再繼續往下執行,通常用於程序間的同步。進一步地解釋,哪個子程序呼叫了join方法,主程序就要等該子程序執行完後才能繼續向下執行。
- run() :代表程序執行的任務函式,可被重寫。
- start() :啟用程序。
- terminate():終止程序。
屬性:
- authkey:位元組碼,程序的誰金鑰.
- daemon:父程序終止後自動終止而不會等待子程序,且自己不能產生新程序,必須在start()之前設定。
- exitcode:退出碼,程序在執行時為None,如果為–N,表示被訊號N結束。
- name:獲取程序名稱.
- pid:程序id。
multiprocessing模組提供了一個建立程序的Process類,其建立程序有兩種方法:
- 建立一個Process類的例項,並指定目標任務函式。
- 自定義一個類,並繼承Process類,重寫其init ()方法和run ()方法。
首先我們使用第一種方法建立兩個程序,並與單程序執行的時間做比較:
import multiprocessing
import os
import time
#子程序執行的程式碼
def child_process(num):
result = 0
for i in range(num * 10000000):
result += i
print("程序為:{0:d}".format(os.getpid()))
if __name__ == '__main__':
print("父程序為:{0:d}".format(os.getpid()))
t0 = time.time()
child_process(5)
child_process(5)
t1 = time.time()
print("順序執行耗時:{0:.2f}".format(t1 - t0))
p1 = multiprocessing.Process(target=child_process,args=(5,))
p2 = multiprocessing.Process(target=child_process,args=(5,))
t2 = time.time()
p1.start()
p2.start()
p1.join()
p2.join()
t3 = time.time()
print("多程序執行耗時:{0:.2f}".format(t3 - t2))
上面的程式碼首先定義了一個千萬次資料累加的耗時函式,先通過單程序順序執行兩個耗時函式,然後輸出所用的時間;接著通過多程序併發執行,並指定目標函式為child_process,執行完成後列印耗時。其執行結果如下所示:
很明顯發現,通過多程序執行同樣的耗時函式,所用時間更少。
我們再用第二種方法對上面的耗時函式進行測試:
import multiprocessing
import os
import time
class MyProcess(multiprocessing.Process):
def __init__(self,num):
super().__init__()
self.num = num
#子程序執行的程式碼
def run(self):
result = 0;
for i in range(self.num * 10000000):
result += i
print("程序為:{0:d}".format(os.getpid()))
if __name__ == '__main__':
print("父程序為:{0:d}".format(os.getpid()))
p1 = MyProcess(5)
p2 = MyProcess(5)
t1 = time.time()
#程序p1,p2呼叫start()時,自動呼叫其run()方法
p1.start()
p2.start()
p1.join()
p2.join()
t2 = time.time()
print("多程序執行耗時:{0:.2f}".format(t2 - t1))
執行結果如下:
daemon屬性
import multiprocessing
import os
import time
# 子程序要執行的程式碼
def child_process(delay):
print(time.strftime('%Y-%m-%d %H:%M:%S',time.gmtime()) + ":子程序執行開始。")
print("sleep {0:d}s".format(delay))
time.sleep(delay)
print(time.strftime('%Y-%m-%d %H:%M:%S',time.gmtime()) + ":子程序執行結束。")
if __name__=='__main__':
print(time.strftime('%Y-%m-%d %H:%M:%S',time.gmtime()) + ":父程序執行開始。")
p1 = multiprocessing.Process(target=child_process, args=(3,))
#設定 daemon屬性為True
p1.daemon = True
p1.start()
# p1.join() #如果此行程式碼被註釋,那麼父程序不會等待子程序而提前結束,子程序會因為父程序的結束而結束
print(time.strftime('%Y-%m-%d %H:%M:%S',time.gmtime()) + ":父程序執行結束。")
如果p1.join()註釋,則結果為:
如果p1.join()保留,則結果為:
總結:
在多執行緒模型中,預設情況下daemon=False,主執行緒會等待子執行緒退出然後再退出。而如果將多程序的daemon設定為True時,主執行緒不會等待子執行緒,直接退出,而此時子執行緒會隨著主執行緒的退出而退出。為避免這種情況,主執行緒中需要對子執行緒進行join,等待子執行緒執行完畢後再退出。
併發控制之Semaphore
Semaphore用來控制對共享資源的訪問數量,即每一時刻允許同時執行的最大程序數。
import multiprocessing
import time
def f(s, i):
s.acquire()
print(time.strftime('%H:%M:%S',time.gmtime()),multiprocessing.current_process().name + " 獲得鎖執行");
time.sleep(i)
print(time.strftime('%H:%M:%S',time.gmtime()),multiprocessing.current_process().name + " 釋放鎖結束");
s.release()
if __name__ == "__main__":
s = multiprocessing.Semaphore(3)
for i in range(5):
p = multiprocessing.Process(target = f, args=(s, 2))
p.start()
執行結果如下:
可以看出,由於我設定了s = multiprocessing.Semaphore(3),所以同一時刻最多有三個程序執行。
程序同步之Lock
在某些情況下某些時刻,我們只需要一個程序訪問某個資源,這時我們就需要使用鎖Lock。
不加鎖:
import multiprocessing
import time
def work1():
num = 4;
while num > 1:
print(time.strftime('%H:%M:%S',time.gmtime()) + " work1")
time.sleep(1)
num -= 1
def work2():
num = 4;
while num > 1:
print(time.strftime('%H:%M:%S',time.gmtime()) + " work2")
time.sleep(1)
num -= 1
if __name__ == '__main__':
p1 = multiprocessing.Process(target=work1)
p2 = multiprocessing.Process(target=work2)
p1.start()
p2.start()
執行結果如下:
可以看出,同一時刻不同的work被輸出,每個子程序各自列印自己的資訊,在實際應用中,容易造成資訊混亂,這時就要用到Lock,保證同一時刻只有一個程序執行。
加鎖:
import multiprocessing
import time
def work1(lock):
with lock:
num = 4;
while num > 1:
print(time.strftime('%H:%M:%S',time.gmtime()) + " work1")
time.sleep(1)
num -= 1
def work2(lock):
lock.acquire()
num = 4;
while num > 1:
print(time.strftime('%H:%M:%S',time.gmtime()) + " work2")
time.sleep(1)
num -= 1
lock.release()
if __name__ == '__main__':
lock = multiprocessing.Lock()
p1 = multiprocessing.Process(target=work1,args=(lock,))
p2 = multiprocessing.Process(target=work2,args=(lock,))
p1.start()
p2.start()
執行結果如下:
每一個子程序函式中都加了鎖Lock:首先初始化一個鎖的例項lock = multiprocessing.Lock(),然後在需要獨佔的程式碼前後加鎖:先獲取鎖,即呼叫lock.acquire()方法,執行完成後釋放鎖,即呼叫lock.release()方法;也可以簡單地使用上下文關鍵字with (見work1的程式碼)。
程序池Pool
在利用Python進行系統管理的時候,特別是同時操作多個檔案目錄,或者遠端控制多臺主機,並行操作可以節約大量的時間。當被操作物件數目不大時,可以直接利用multiprocessing中的Process類動態生成多個程序。但如果是生成上百個、上千個目標,手動地去限制程序數量太過繁瑣,此時可以發揮程序池的功效。
Pool可以提供指定數量的程序,供使用者呼叫,當有新的請求提交到pool中時,如果池還沒有滿,那麼就會建立一個新的程序用來執行該請求;但如果池中的程序數已經達到規定最大值,那麼該請求就會等待,直到池中有程序結束,才會建立新的程序。
import multiprocessing
import time
def task(name):
print(f"{time.strftime('%H:%M:%S')}:{name} 開始執行")
time.sleep(3)
if __name__ == "__main__":
pool = multiprocessing.Pool(processes = 3)
for i in range(10):
#維持執行的程序總數為processes,當一個程序執行完畢後會新增新的程序進去
pool.apply_async(func = task, args=(i,))
pool.close()
pool.join()
print("hello")
執行結果如下:
從上面結果可以看出,同一時刻,只有執行緒池中的三個程序執行。
程序同步之Event
Event用來實現程序間同步通訊。
import multiprocessing
import time
def wait_for_event(e):
e.wait() #等待
time.sleep(1)
# 喚醒後清除Event狀態,為後續繼續等待
e.clear()
print(f"{time.strftime('%H:%M:%S')} 程序 A: 我們是兄弟,我等你...")
e.wait()
print(f"{time.strftime('%H:%M:%S')} 程序 A: 好的,是兄弟一起走")
def wait_for_event_timeout(e, t):
e.wait() #等待
time.sleep(1)
# 喚醒後清除Event狀態,為後續繼續等待
e.clear()
print(f"{time.strftime('%H:%M:%S')} 程序 B: 好吧,最多等你 {t} 秒")
e.wait(t)
print(f"{time.strftime('%H:%M:%S')} 程序 B: 我繼續往前走了")
if __name__ == "__main__":
e = multiprocessing.Event()
w1 = multiprocessing.Process(target=wait_for_event, args=(e,))
w2 = multiprocessing.Process( target=wait_for_event_timeout, args=(e, 5) )
w1.start()
w2.start()
# 主程序發話
print(f"{time.strftime('%H:%M:%S')} 主程序: 誰等我下,我需要 8 s 時間")
# 喚醒等待的程序
e.set()
time.sleep(8)
print(f"{time.strftime('%H:%M:%S')} 主程序: 好了,我趕上了")
# 再次喚醒等待的程序
e.set()
w1.join()
w2.join()
print(f"{time.strftime('%H:%M:%S')} 主程序:退出")
上面的程式碼定義了兩個程序函式,一個是等待事件發生函式,一個等待事件發生並設定了超時時間的函式。主程序呼叫事件的set()方法喚醒等待事件的程序,事件喚醒後呼叫clear()方法清除事件的狀態,並重新等待,以此達到程序的同步。執行結果如下:
優先順序佇列Queue
Queue是多程序安全的佇列,可以使用Queue實現多程序之間的資料傳遞。
put方法插入資料到佇列中,put方法還有兩個可選引數:blocked和timeout。如果blocked為True(預設值),並且timeout為正值,該方法會阻塞timeout指定的時間,直到該佇列有剩餘的空間。如果超時,會丟擲Queue.Full異常。如果blocked為False,但該Queue已滿,會立即丟擲Queue.Full異常。
get方法可以從佇列讀取並且刪除一個元素。同樣,get方法有兩個可選引數:blocked和timeout。如果blocked為True(預設值),並且timeout為正值,那麼在等待時間內沒有取到任何元素,會丟擲Queue.Empty異常。如果blocked為False,有兩種情況存在,如果Queue有一個值可用,則立即返回該值,否則,如果佇列為空,則立即丟擲Queue.Empty異常。
import multiprocessing
import time
def ProducerA(q):
count = 1
while True:
q.put(f"冷飲 {count}")
print(f"{time.strftime('%H:%M:%S')} A 放入:[冷飲 {count}]")
count +=1
time.sleep(1)
def ConsumerB(q):
while True:
print(f"{time.strftime('%H:%M:%S')} B 取出 [{q.get()}]")
time.sleep(2)
if __name__ == '__main__':
q = multiprocessing.Queue(maxsize=5)
p = multiprocessing.Process(target=ProducerA,args=(q,))
c = multiprocessing.Process(target=ConsumerB,args=(q,))
p.start()
c.start()
p.join()
c.join()
上面的程式碼定義了生產者函式和消費者函式,設定其佇列的最大容量是5,生產者生產冷飲,消費者取出冷飲消費,當佇列滿時,生產者等待,當佇列空時,消費者等待。他們放入和取出的速度可能不一致,但使用Queue可以讓生產者和消費者有條不紊的一直程序下去,執行結果如下所示:
資料交換Pipe
有時候,我們需要將一個程序的輸出作為另一個程序的輸入,multiprocessing.Pipe()方法返回一個管道的兩個埠,埠1的輸入可作為另一個埠2的輸出。如果反過來,讓埠2的輸出作為埠1的輸入,這就是全雙工管道,預設是全雙工管道,如果想設定半雙工管理,只需要給方法 Pipe()傳遞引數duplex = False即可。
Pipe()方法返回的物件具有傳送訊息send()方法和接收訊息recv()方法,如果沒有訊息可接收, recv()方法會一直阻塞。如果管道已經被關閉,那麼 recv()方法會丟擲異常。
import multiprocessing
import time
def task1(pipe):
for i in range(4):
str = f"task1-{i}"
print(f"{time.strftime('%H:%M:%S')} task1 傳送:{str}")
pipe.send(str)
time.sleep(2)
for i in range(4):
print(f"{time.strftime('%H:%M:%S')} task1 接收: { pipe.recv() }")
def task2(pipe):
for i in range(4):
print(f"{time.strftime('%H:%M:%S')} task2 接收: { pipe.recv() }")
time.sleep(1)
for i in range(4):
str = f"task2-{i}"
print(f"{time.strftime('%H:%M:%S')} task2 傳送:{str}")
pipe.send(str)
if __name__ == "__main__":
pipe = multiprocessing.Pipe()
p1 = multiprocessing.Process(target=task1, args=(pipe[0],))
p2 = multiprocessing.Process(target=task2, args=(pipe[1],))
p1.start()
p2.start()
p1.join()
p2.join()
上面的程式碼定義了兩個子程序函式,task1先發送4條訊息,再接收訊息,task2先接收訊息,再發送訊息,執行結果如下: