1. 程式人生 > >Python操作MySQL與Python多程序

Python操作MySQL與Python多程序

一、Python操作MySQL資料庫

利用Python語言操作資料庫,需要先下載pymysql,由於我之前下載了Anaconda並配置了系統變數,直接在命令列輸出:

conda install pymysql

如果沒有安裝過Anaconda,可通過以下命令列安裝:

pip install pymysql

安裝完畢後,通過以下程式碼訪問並操作資料庫MySQL。

import pymysql
# user為資料庫使用者名稱,password為登入密碼,db為目標資料庫名
conn = pymysql.connect(host="127.0.0.1",user="root",password="123456",db="easyvideo")
cs = conn.cursor()
cs.execute("select * from admin")
for i in cs:
    print("當前是第" + str(cs.rownumber) + "行")
    print("id:" + i[0]) #輸出資料庫表中對應該行的id
    print("username:" + i[1]) #輸出資料庫表中對應該行的username

二、Python多程序

建立程序的multiprocessing.Process類

我們來看看這個類的原型:

class multiprocessing.Process(group=None, target=None, name=None, args=(), kwargs={})

引數說明:

  • target:表示呼叫物件,一般為函式,也可以為類。
  • args:表示呼叫物件的置引數元組。
  • kwargs:表示呼叫物件的字典。
  • name:為程序的別名。
  • group:引數不使用,可忽略。

類提供的常用方法:

  • is_alive():返回程序是否是啟用的。
  • join([timeout]) :可以等待子程序結束後再繼續往下執行,通常用於程序間的同步。進一步地解釋,哪個子程序呼叫了join方法,主程序就要等該子程序執行完後才能繼續向下執行。
  • run() :代表程序執行的任務函式,可被重寫。
  • start() :啟用程序。
  • terminate():終止程序。

屬性:

  • authkey:位元組碼,程序的誰金鑰.
  • daemon:父程序終止後自動終止而不會等待子程序,且自己不能產生新程序,必須在start()之前設定。
  • exitcode:退出碼,程序在執行時為None,如果為–N,表示被訊號N結束。
  • name:獲取程序名稱.
  • pid:程序id。

multiprocessing模組提供了一個建立程序的Process類,其建立程序有兩種方法

  1. 建立一個Process類的例項,並指定目標任務函式。
  2. 自定義一個類,並繼承Process類,重寫其init ()方法和run ()方法。

首先我們使用第一種方法建立兩個程序,並與單程序執行的時間做比較:

import multiprocessing
import os
import time
#子程序執行的程式碼
def child_process(num):
    result = 0
    for i in range(num * 10000000):
        result += i
    print("程序為:{0:d}".format(os.getpid()))
if __name__ == '__main__':
    print("父程序為:{0:d}".format(os.getpid()))
    t0 = time.time()
    child_process(5)
    child_process(5)
    t1 = time.time()
    print("順序執行耗時:{0:.2f}".format(t1 - t0))
    p1 = multiprocessing.Process(target=child_process,args=(5,))
    p2 = multiprocessing.Process(target=child_process,args=(5,))
    t2 = time.time()
    p1.start()
    p2.start()
    p1.join()
    p2.join()
    t3 = time.time()
    print("多程序執行耗時:{0:.2f}".format(t3 - t2))

上面的程式碼首先定義了一個千萬次資料累加的耗時函式,先通過單程序順序執行兩個耗時函式,然後輸出所用的時間;接著通過多程序併發執行,並指定目標函式為child_process,執行完成後列印耗時。其執行結果如下所示:

很明顯發現,通過多程序執行同樣的耗時函式,所用時間更少。

我們再用第二種方法對上面的耗時函式進行測試:

import multiprocessing
import os
import time
class MyProcess(multiprocessing.Process):
    def __init__(self,num):
        super().__init__()
        self.num = num
    #子程序執行的程式碼
    def run(self):
        result = 0;
        for i in range(self.num * 10000000):
            result += i
        print("程序為:{0:d}".format(os.getpid()))
if __name__ == '__main__':
    print("父程序為:{0:d}".format(os.getpid()))
    p1 = MyProcess(5)
    p2 = MyProcess(5)
    t1 = time.time()
    #程序p1,p2呼叫start()時,自動呼叫其run()方法
    p1.start()
    p2.start()
    p1.join()
    p2.join()
    t2 = time.time()
    print("多程序執行耗時:{0:.2f}".format(t2 - t1))

執行結果如下:

daemon屬性

import multiprocessing
import os
import time
# 子程序要執行的程式碼
def child_process(delay):
    print(time.strftime('%Y-%m-%d %H:%M:%S',time.gmtime()) + ":子程序執行開始。")
    print("sleep {0:d}s".format(delay))
    time.sleep(delay)
    print(time.strftime('%Y-%m-%d %H:%M:%S',time.gmtime()) + ":子程序執行結束。")

if __name__=='__main__':
    print(time.strftime('%Y-%m-%d %H:%M:%S',time.gmtime()) + ":父程序執行開始。")
    p1 = multiprocessing.Process(target=child_process, args=(3,))
    #設定 daemon屬性為True
    p1.daemon = True
    p1.start()
    # p1.join() #如果此行程式碼被註釋,那麼父程序不會等待子程序而提前結束,子程序會因為父程序的結束而結束
    print(time.strftime('%Y-%m-%d %H:%M:%S',time.gmtime()) + ":父程序執行結束。")

如果p1.join()註釋,則結果為:

如果p1.join()保留,則結果為:

總結:

在多執行緒模型中,預設情況下daemon=False,主執行緒會等待子執行緒退出然後再退出。而如果將多程序的daemon設定為True時,主執行緒不會等待子執行緒,直接退出,而此時子執行緒會隨著主執行緒的退出而退出。為避免這種情況,主執行緒中需要對子執行緒進行join,等待子執行緒執行完畢後再退出。

併發控制之Semaphore

Semaphore用來控制對共享資源的訪問數量,即每一時刻允許同時執行的最大程序數。

import multiprocessing
import time

def f(s, i):
    s.acquire()
    print(time.strftime('%H:%M:%S',time.gmtime()),multiprocessing.current_process().name + " 獲得鎖執行");
    time.sleep(i)
    print(time.strftime('%H:%M:%S',time.gmtime()),multiprocessing.current_process().name + " 釋放鎖結束");
    s.release()

if __name__ == "__main__":
    s = multiprocessing.Semaphore(3)
    for i in range(5):
        p = multiprocessing.Process(target = f, args=(s, 2))
        p.start()

執行結果如下:

可以看出,由於我設定了s = multiprocessing.Semaphore(3),所以同一時刻最多有三個程序執行。

程序同步之Lock

在某些情況下某些時刻,我們只需要一個程序訪問某個資源,這時我們就需要使用鎖Lock。

不加鎖:

import multiprocessing
import time
def work1():
    num = 4;
    while num > 1:
        print(time.strftime('%H:%M:%S',time.gmtime()) + " work1")
        time.sleep(1)
        num -= 1
def work2():
    num = 4;
    while num > 1:
        print(time.strftime('%H:%M:%S',time.gmtime()) + " work2")
        time.sleep(1)
        num -= 1
if __name__ == '__main__':
    p1 = multiprocessing.Process(target=work1)
    p2 = multiprocessing.Process(target=work2)
    p1.start()
    p2.start()

執行結果如下:

可以看出,同一時刻不同的work被輸出,每個子程序各自列印自己的資訊,在實際應用中,容易造成資訊混亂,這時就要用到Lock,保證同一時刻只有一個程序執行。

加鎖:

import multiprocessing
import time
def work1(lock):
    with lock:
        num = 4;
        while num > 1:
            print(time.strftime('%H:%M:%S',time.gmtime()) + " work1")
            time.sleep(1)
            num -= 1
def work2(lock):
    lock.acquire()
    num = 4;
    while num > 1:
        print(time.strftime('%H:%M:%S',time.gmtime()) + " work2")
        time.sleep(1)
        num -= 1
    lock.release()
if __name__ == '__main__':
    lock = multiprocessing.Lock()
    p1 = multiprocessing.Process(target=work1,args=(lock,))
    p2 = multiprocessing.Process(target=work2,args=(lock,))
    p1.start()
    p2.start()

執行結果如下:

每一個子程序函式中都加了鎖Lock:首先初始化一個鎖的例項lock = multiprocessing.Lock(),然後在需要獨佔的程式碼前後加鎖:先獲取鎖,即呼叫lock.acquire()方法,執行完成後釋放鎖,即呼叫lock.release()方法;也可以簡單地使用上下文關鍵字with (見work1的程式碼)。

程序池Pool

在利用Python進行系統管理的時候,特別是同時操作多個檔案目錄,或者遠端控制多臺主機,並行操作可以節約大量的時間。當被操作物件數目不大時,可以直接利用multiprocessing中的Process類動態生成多個程序。但如果是生成上百個、上千個目標,手動地去限制程序數量太過繁瑣,此時可以發揮程序池的功效。

Pool可以提供指定數量的程序,供使用者呼叫,當有新的請求提交到pool中時,如果池還沒有滿,那麼就會建立一個新的程序用來執行該請求;但如果池中的程序數已經達到規定最大值,那麼該請求就會等待,直到池中有程序結束,才會建立新的程序。

import multiprocessing
import time

def task(name):
    print(f"{time.strftime('%H:%M:%S')}:{name} 開始執行")
    time.sleep(3)

if __name__ == "__main__":
    pool = multiprocessing.Pool(processes = 3)
    for i in range(10):
        #維持執行的程序總數為processes,當一個程序執行完畢後會新增新的程序進去
        pool.apply_async(func = task, args=(i,))
    pool.close()
    pool.join()
    print("hello")

執行結果如下:

從上面結果可以看出,同一時刻,只有執行緒池中的三個程序執行。

程序同步之Event

Event用來實現程序間同步通訊。

import multiprocessing
import time
def wait_for_event(e):
    e.wait() #等待
    time.sleep(1)
    # 喚醒後清除Event狀態,為後續繼續等待
    e.clear()
    print(f"{time.strftime('%H:%M:%S')} 程序 A: 我們是兄弟,我等你...")
    e.wait()
    print(f"{time.strftime('%H:%M:%S')} 程序 A: 好的,是兄弟一起走")

def wait_for_event_timeout(e, t):
    e.wait() #等待
    time.sleep(1)
    # 喚醒後清除Event狀態,為後續繼續等待
    e.clear()
    print(f"{time.strftime('%H:%M:%S')} 程序 B: 好吧,最多等你 {t} 秒")
    e.wait(t)
    print(f"{time.strftime('%H:%M:%S')} 程序 B: 我繼續往前走了")

if __name__ == "__main__":
    e = multiprocessing.Event()
    w1 = multiprocessing.Process(target=wait_for_event, args=(e,))
    w2 = multiprocessing.Process( target=wait_for_event_timeout, args=(e, 5) )
    w1.start()
    w2.start()
    # 主程序發話
    print(f"{time.strftime('%H:%M:%S')} 主程序: 誰等我下,我需要 8 s 時間")
    # 喚醒等待的程序
    e.set()
    time.sleep(8)
    print(f"{time.strftime('%H:%M:%S')} 主程序: 好了,我趕上了")
    # 再次喚醒等待的程序
    e.set()
    w1.join()
    w2.join()
    print(f"{time.strftime('%H:%M:%S')} 主程序:退出")

上面的程式碼定義了兩個程序函式,一個是等待事件發生函式,一個等待事件發生並設定了超時時間的函式。主程序呼叫事件的set()方法喚醒等待事件的程序,事件喚醒後呼叫clear()方法清除事件的狀態,並重新等待,以此達到程序的同步。執行結果如下:

優先順序佇列Queue

Queue是多程序安全的佇列,可以使用Queue實現多程序之間的資料傳遞。

put方法插入資料到佇列中,put方法還有兩個可選引數:blocked和timeout。如果blocked為True(預設值),並且timeout為正值,該方法會阻塞timeout指定的時間,直到該佇列有剩餘的空間。如果超時,會丟擲Queue.Full異常。如果blocked為False,但該Queue已滿,會立即丟擲Queue.Full異常。

get方法可以從佇列讀取並且刪除一個元素。同樣,get方法有兩個可選引數:blocked和timeout。如果blocked為True(預設值),並且timeout為正值,那麼在等待時間內沒有取到任何元素,會丟擲Queue.Empty異常。如果blocked為False,有兩種情況存在,如果Queue有一個值可用,則立即返回該值,否則,如果佇列為空,則立即丟擲Queue.Empty異常。

import multiprocessing
import time

def ProducerA(q):
    count = 1
    while True:
        q.put(f"冷飲 {count}")
        print(f"{time.strftime('%H:%M:%S')} A 放入:[冷飲 {count}]")
        count +=1
        time.sleep(1)

def  ConsumerB(q):
    while True:
        print(f"{time.strftime('%H:%M:%S')} B 取出 [{q.get()}]")
        time.sleep(2)
if __name__ == '__main__':
    q = multiprocessing.Queue(maxsize=5)
    p = multiprocessing.Process(target=ProducerA,args=(q,))
    c = multiprocessing.Process(target=ConsumerB,args=(q,))
    p.start()
    c.start()
    p.join()
    c.join()

上面的程式碼定義了生產者函式和消費者函式,設定其佇列的最大容量是5,生產者生產冷飲,消費者取出冷飲消費,當佇列滿時,生產者等待,當佇列空時,消費者等待。他們放入和取出的速度可能不一致,但使用Queue可以讓生產者和消費者有條不紊的一直程序下去,執行結果如下所示:

資料交換Pipe

有時候,我們需要將一個程序的輸出作為另一個程序的輸入,multiprocessing.Pipe()方法返回一個管道的兩個埠,埠1的輸入可作為另一個埠2的輸出。如果反過來,讓埠2的輸出作為埠1的輸入,這就是全雙工管道,預設是全雙工管道,如果想設定半雙工管理,只需要給方法 Pipe()傳遞引數duplex = False即可。

Pipe()方法返回的物件具有傳送訊息send()方法和接收訊息recv()方法,如果沒有訊息可接收, recv()方法會一直阻塞。如果管道已經被關閉,那麼 recv()方法會丟擲異常。

import multiprocessing
import time

def task1(pipe):
    for i in range(4):
        str = f"task1-{i}"
        print(f"{time.strftime('%H:%M:%S')} task1 傳送:{str}")
        pipe.send(str)
    time.sleep(2)
    for i in range(4):
        print(f"{time.strftime('%H:%M:%S')} task1 接收: { pipe.recv() }")

def task2(pipe):
    for i in range(4):
        print(f"{time.strftime('%H:%M:%S')} task2 接收: { pipe.recv() }")
    time.sleep(1)
    for i in range(4):
        str = f"task2-{i}"
        print(f"{time.strftime('%H:%M:%S')} task2 傳送:{str}")
        pipe.send(str)

if __name__ == "__main__":
    pipe = multiprocessing.Pipe()
    p1 = multiprocessing.Process(target=task1, args=(pipe[0],))
    p2 = multiprocessing.Process(target=task2, args=(pipe[1],))
    p1.start()
    p2.start()
    p1.join()
    p2.join()

上面的程式碼定義了兩個子程序函式,task1先發送4條訊息,再接收訊息,task2先接收訊息,再發送訊息,執行結果如下: