1. 程式人生 > >python中多執行緒,多程序,多協程概念及程式設計上的應用

python中多執行緒,多程序,多協程概念及程式設計上的應用

1, 多執行緒

 

  1.  執行緒是程序的一個實體,是CPU進行排程的最小單位,他是比程序更小能獨立執行的基本單位。
  2.  執行緒基本不擁有系統資源,只佔用一點執行中的資源(如程式計數器,一組暫存器和棧),但是它可以與同屬於一個程序的其他執行緒共享全部的資源。
  3.  提高程式的執行速率,上下文切換快,開銷比較少,但是不夠穩定,容易丟失資料,形成死鎖。

 

直接上程式碼:

import time
import threading

# 函式1用時2秒
def fun1():
    time.sleep(2)
    print(threading.current_thread().name, time.ctime())

# 函式2用時4秒 def fun2(): time.sleep(4) print(threading.current_thread().name, time.ctime()) # 函式3用時6秒 def fun3(): time.sleep(6) print('hello python', time.ctime()) th1 = threading.Thread(target=fun1) th2 = threading.Thread(target=fun2) th3 = threading.Thread(target=fun3) th1.start() th2.start() th3.start()

列印結果:

Thread-1 Mon Jan  7     11:01:52 2019
Thread-2 Mon Jan  7     11:01:54 2019
hello python Mon Jan  7 11:01:56 2019

解析:從結果看出,他們同一時間 11:01:50開始執行,分別用了不同的時間結束

接著往下看,新增jion阻塞執行緒

''''''

th1.start() th1.join() th2.start() th2.join() th3.start() th3.join()

列印結果:

Thread-1 Mon Jan 7     11:19:00 2019
Thread-2 Mon Jan 7     11:19:04 2019
hello python Mon Jan 7 11:19:10 2019

我們看到這三執行緒按順序依次執行。

我們接著看看執行緒的方法使用:

threading.enumerate()            #列舉執行緒,返回列表,其中裡面會有一條主執行緒
threading.activeCount()          #檢視執行緒執行個數
threading.current_thread().name     #檢視當前執行執行緒名稱
join()                      #阻塞執行緒執行

 

我們接著看第二種開執行緒的方式:

import threading
import time

class MyThread(threading.Thread):
    def run(self):
        for i in range(3):
            time.sleep(1)
            msg = "I'm "+self.name+' @ '+str(i) #name屬性中儲存的是當前執行緒的名字
            print(msg)

if __name__ == '__main__':
    t = MyThread()
    t.setName('yangzhenyu')
    a = t.isAlive()
    print(a)
    print(t.getName())

    t.start()
    b = t.isAlive()
    print(b)

列印結果:

False
yanzghenyu
True
I'm yanzghenyu @ 0
I'm yanzghenyu @ 1
I'm yanzghenyu @ 2

方法總結:

t.setName()    #設定執行執行緒名稱,不指定預設Thread-1 
t.getName()    #獲取執行緒名稱
t.isAlive()        #判斷執行緒是否執行,返回布林型別   

 

執行緒間共享全域性變數:

import threading
import time

n = 100

def work01():
    global n
    for i in range(3):
        n += 1
    print(n)                          //103

def work02():
    global n
    print(n)                         //103

print(n)                             //100

t1 = threading.Thread(target=work01)
t1.start()
time.sleep(
1)
t2
= threading.Thread(target=work02) t2.start()

關於執行緒鎖 

  1. 用threading.Lock()建立鎖,用acquire()申請鎖,每次只有一個執行緒獲得鎖,其他執行緒必須等此執行緒release()後才能獲得鎖
  2. RLock允許在同一執行緒中被多次acquire。而Lock卻不允許這種情況。
  3. 注意:如果使用RLock,那麼acquire和release必須成對出現,即同一執行緒中呼叫了n次acquire,必須呼叫n次的release才能真正釋放所佔用的瑣

下面例子中我們用到的是Lock(),當加完鎖之後,該方法同一時間內只能被一個執行緒呼叫。

import threading
mylock=threading.Lock()#建立鎖
num = 0
def add_num(name):
    global num
    while True:
        mylock.acquire()#申請鎖也就是加鎖
        print('thread %s locked! num=%d'%(name,num))
        if num>=5:
            print('thread %s release! num=%d'%(name,num))
            mylock.release()#釋放鎖
            return 
        num += 1
        print('thread %s release! num = %d'%(name,num))
        mylock.release()

t1 = threading.Thread(target=add_num,args=('A',))
t2 = threading.Thread(target=add_num,args=('B',))
t1.start()
t2.start()

列印結果:

thread A locked! num=0
thread A release! num = 1
thread A locked! num=1
thread A release! num = 2
thread A locked! num=2
thread A release! num = 3
thread A locked! num=3
thread A release! num = 4
thread A locked! num=4
thread A release! num = 5
thread A locked! num=5
thread A release! num=5
thread B locked! num=5
thread B release! num=5

 


 


 

關於程序:

  • 程序是系統進行資源分配的最小單位,每個程序都有自己的獨立記憶體空間,不用程序通過程序間通訊來通訊。
  • 但是程序佔據獨立空間,比較重量級,所以上下文程序間的切換開銷比較大,但是比較穩定安全。

 

程序建立:

第一種建立程序的方式:

from multiprocessing import Process
import time
import random
import os


def piao(name):
    print("%s is piaoping"%name)
    time.sleep(random.randint(0,1))
    print("%s is piao end"%name)

if __name__ == '__main__':
    print("CPU的個數是:%d"%os.cpu_count())
    p1 = Process(target=piao,args=("alex",),name="程序1")
    print(p1.name)
    p1.start()
    print("父程序!") #執行速度要遠快於建立新程序的時間

 

列印結果:

CPU的個數是:2
程序1
父程序!
alex is piaoping
alex is piao end

 

第二種建立程序的方式:

from multiprocessing import Process
import time
import random

#繼承Process類,並實現自己的run方法

class Piao(Process):
    def __init__(self,name):
        #必須呼叫父類的init方法
        super().__init__()
        self.name = name

    def run(self):
        print("%s is piaoing"%self.name)
        time.sleep(random.randint(1,3))
        print("%s is piaoeng"%self.name)


if __name__ == '__main__':
    p1 = Piao("Alex")
    #開闢一個新的程序實際上就是執行本程序所對應的run()方法
    p1.start()
    print("主程序!")

結果:

主程序!
Alex is piaoing
Alex is piaoeng

解析:join括號中不攜帶引數,表示父程序在這個位置要等待p1程序執行完成後,如果指定引數,也就是等待時間s,那麼主程序將在這個時間內結束,

     用is_active()  方法即可檢測程序的狀態,不加join() 返回True,表示程序還在進行。

 

程序的方法,

start()    啟動程序例項(建立子程序);
terminate():不管任務是否完成,立即終止;
name:    當前程序例項別名,預設為Process-N,N為從1開始遞增的整數;
pid:     當前程序例項的PID值;  os.getpid()
is_alive(): 判斷程序例項是否還在執行;
join([timeout]):是否等待程序例項執行結束,或等待多少秒;

 

程序池:

  在程式實際處理問題時,忙時會有成千上萬個任務需要執行,閒時有零星任務,建立時需要消耗時間,銷燬也需要時間,

即使開啟成千上萬個程序,作業系統也不能 讓他同時執行。這裡就用到了程序池,用於管理小塊記憶體的申請與釋放。

,

1,上程式碼:

from multiprocessing.pool import Pool
from time import sleep


def fun(a):
    sleep(1)
    print(a)

if __name__ == '__main__':
    p = Pool()   # 這裡不加引數,但是程序池的預設大小,等於電腦CPU的核數
            # 也是建立子程序的個數,也是每次列印的數字的個數
    for i in range(10):
        p.apply_async(fun, args=(i,))
p.close() p.join()
# 等待所有子程序結束,再往後執行 print("end")

 

2,callback 舉例:

from multiprocessing import Process,Pool


def func(i):
    i+=1
    return i#普通程序處理過的資料返回給主程序p1

def call_back(p1):
    p1+=1
    print(p1)

if __name__ == '__main__':
    p = Pool()
    for i in range(10):
        p1 = p.apply_async(func,args=(i,),callback = call_back)#p呼叫普通程序並且接受其返回值,將返回值給要執行的回撥函式處理
    p.close()
    p.join()

解析:   1,p.apply ( func,args = ())     同步的效率,也就是說池中的程序一個一個的去執行任務

      p.apply_async( func,args = () , callback = None) : 非同步的效率,也就是池中的程序一次性都去執行任務.

    2,非同步處理任務時 : 必須要加上close和join. 程序池的所有程序都是守護程序(主程序程式碼執行結束,守護程序就結束). 

    3,func : 程序池中的程序執行的任務函式

       4,args : 可迭代物件性的引數,是傳給任務函式的引數

       5,callback : 回撥函式,就是每當程序池中有程序處理完任務了,返回的結果可以交給回撥函式,

                由回撥函式進行進一步處理,回撥函式只非同步才有,同步沒有.回撥函式是父程序呼叫.

 

3. map( func,iterable)  (該方法經常用到爬蟲)

from multiprocessing import Pool

def func(num):
    num += 1
    print(num)
    return num

if __name__ == '__main__':
    p = Pool(2)
    res = p.map(func,[i for i in range(100)])
    # p.close()#map方法自帶這兩種功能
    # p.join()
    print('主程序中map的返回值',res)

func : 程序池中的程序執行的任務函式

iterable : 可迭代物件,是把可迭代物件那個中的每個元素一次傳給任務函式當引數.

map方法自帶close和join

 

程序間的通訊:

1)佇列

from multiprocessing import Queue,Process
import os,time,random

#新增資料函式
def proc_write(queue,urls):
    print("程序(%s)正在寫入..."%(os.getpid()))
    for url in urls:
        queue.put(url)
        print("%s被寫入到佇列中"%(url))
        time.sleep(random.random()*3)

#讀取資料函式
def proc_read(queue):
    print("程序(%s)正在讀取..."%(os.getpid()))

    while True:
        url = queue.get()
        print("從佇列中提取到:%s"%(url))

if __name__ =="__main__":
queue
= Queue() proc_writer1 = Process(target=proc_write,args=(queue,["ur1","ur2","ur3","ur4"])) proc_writer2 = Process(target=proc_write,args=(queue,["ur5","ur6","ur7","ur8"])) proc_reader = Process(target=proc_read,args=(queue,)) proc_writer1.start() proc_writer1.join() proc_writer2.start() proc_writer2.join() proc_reader.start() proc_reader.terminate()

生產者與消費者模式(執行緒間的通訊):

from queue import Queue
import threading,time


class Producer(threading.Thread):
    def run(self):
        global queue
        count = 0
        while True:
            if queue.qsize() < 1000:
                for i in range(100):
                    count = count +1
                    msg = '生成產品'+str(count)
                    queue.put(msg)
                    print(msg)
            time.sleep(0.5)

class Consumer(threading.Thread):
    def run(self):
        global queue
        while True:
            if queue.qsize() > 100:
                for i in range(3):
                    msg = self.name + '消費了 '+queue.get()
                    print(msg)
            time.sleep(1)


if __name__ == '__main__':
    queue = Queue()

    for i in range(500):
        queue.put('初始產品'+str(i))
    for i in range(2):
        p = Producer()
        p.start()
    for i in range(5):
        c = Consumer()
        c.start()

 

2) 程序間的通訊(管道)

from multiprocessing import Pipe,Process
import random,time,os

def proc_send(pipe,urls):
    for url in urls:
        print("程序(%s)傳送:%s"%(os.getpid(),url))
        pipe.send(url)
        time.sleep(random.random())

def proc_recv(pipe):
    while True:
        print("程序(%s)接收到:%s"%(os.getpid(),pipe.recv()))
        time.sleep(random.random())

if __name__ == "__main__":
    pipe = Pipe()
    p1 = Process(target=proc_send,args=(pipe[0],["url_"+str(i) for i in range(10)],))   
    p2 = Process(target=proc_recv,args=(pipe[1],))
    p1.start()
    p2.start()
    p1.join()
    p2.terminate()

解析:

  pipe用於兩個程序間的通訊,兩個程序分別位於管道的兩端,Pipe方法返回(conn1,conn2)代表一個管道的兩端,

  Pipe方法有dumplex引數,若該引數為True,管道為全雙工模式,

  若為Fasle,conn1只負責接收訊息,conn2只負責傳送訊息.send和recv方法分別是傳送和接收訊息的方法

 



 

 

協程:

協程:是更小的執行單位,是一種輕量級的執行緒,協程的切換隻是單純的操作CPU的上下文,所以切換速度特別快,且耗能小。

gevent是第三方庫,通過greenlet實現協程,其基本思想是:

當一個greenlet遇到IO操作時,比如訪問網路,就自動切換到其他的greenlet,等到IO操作完成,再在適當的時候切換回來繼續執行。由於IO操作非常耗時,經常使程式處於等待狀態,有了gevent為我們自動切換協程,就保證總有greenlet在執行,而不是等待IO。

由於切換是在IO操作時自動完成,所以gevent需要修改Python自帶的一些標準庫,這一過程在啟動時通過monkey patch完成:

from gevent import monkey

monkey.patch_all()  # 用來在執行時動態修改已有的程式碼,而不需要修改原始程式碼。

import gevent
import requests


def f(url):
    print('GET: %s' % url)
    html = requests.get(url).text
    print(url, len(html))


gevent.joinall([
    gevent.spawn(f, 'http://i.maxthon.cn/'),  # 先執行這個函式,傳送請求,等待的時候傳送第二個請求
    gevent.spawn(f, 'http://www.jianshu.com/u/3cfeb3395a95'),
    gevent.spawn(f, 'http://edu.51cto.com/?jydh')])

執行結果:

GET: http://i.maxthon.cn/
GET: http://www.jianshu.com/u/3cfeb3395a95
GET: http://edu.51cto.com/?jydh
http://i.maxthon.cn/ 461786
http://edu.51cto.com/?jydh 353858
http://www.jianshu.com/u/3cfeb3395a95 597

從結果看,3個網路操作是併發執行的,而且結束順序不同,但只有一個執行緒。

使用gevent,可以獲得極高的併發效能,但gevent只能在Unix/Linux下執行,在Windows下不保證正常安裝和執行。