1. 程式人生 > >Python_並發編程(線程 進程 協程)

Python_並發編程(線程 進程 協程)

eating oba ive 程序 setname rod 執行過程 生產者消費者模式 random

一、進程和線程

進程

進程就是一個程序在一個數據集上的一次動態執行過程。

進程一般由程序、數據集、進程控制塊三部分組成。我們編寫的程序用來描述進程要完成哪些功能以及如何完成;

數據集則是程序在執行過程中所需要使用的資源;

進程控制塊用來記錄進程的外部特征,描述進程的執行變化過程,系統可以利用它來控制和管理進程,它是系 統感知進程存在的唯一標誌。

進程的本質:本質上就是一段程序的運行過程(抽象的概念)

意義:為了能夠同時運行多個任務的並發而不是一次只能幹一件事情

線程

線程的出現是為了降低上下文切換的消耗,提高系統的並發性,並突破一個進程只能幹一樣事的缺陷, 使到進程內並發成為可能。

線程也叫輕量級進程,它是一個基本的CPU執行單元,也是程序執行過程中的最小單元,由線程ID、程序 計數器、寄存器集合和堆棧共同組成。

線程的引入減小了程序並發執行時的開銷,提高了操作系統的並發 性能。線程沒有自己的系統資源。

線程的本質:提高切換的效率,提高系統的並發性,並突破一個進程只能幹一件事的缺陷

意義:使得進程內並發成為可能

二者之間的關系

  1、一個程序至少有一個進程,一個進程至少有一個線程(進程可以理解為線程的容器)

  2、進程是最小的資源單位,線程是最小的執行單位

  3、進程的作用是一個資源管理

  4、進程在執行過程中擁有獨立的內存單元,而多個線程共享內存,從而極大地提高了程序運行效率

  5、一個線程可以創建和撤銷另一個線程,同一個進程中的多個線程之間可以並發執行

並發&並行

並發: 是指系統具有處理多個任務(動作)的能力 可通過CPU的切換實現


並行: 是指系統具有 同時 處理多個任務(動作)的能力


二者之間的關系:並行是並發的一個子集,並行的一定是並發,並發的不一定是並行

Python的線程與threading模塊

線程的調用方式

threading 模塊建立在thread 模塊之上。thread模塊以低級、原始的方式來處理和控制線程,而threading 模塊通過對thread進行二次封裝,

提供了更方便的api來處理線程。

一、直接調用

import threading
import time

def add():#定義每個線程要運行的函數
    sum=0

    for i in range(10000000):
        sum
+=i time.sleep(1) print("sum",sum) def sayhi(num): # 定義每個線程要運行的函數 print("running on number:%s" % num) time.sleep(3) if __name__ ==__main__: start=time.time() t1=threading.Thread(target=add)#生成一個線程實例 t2=threading.Thread(target=sayhi,args=(1,))#生成另一個線程實例,args是參數 t1.start() #啟動線程 t2.start() print(t1.getName()) #獲取線程名 print(t2.getName()) #獲取線程名

二、繼承式調用

import threading
import time

class Mythread(threading.Thread):
    def __init__(self,num):
        threading.Thread.__init__(self)
        self.num = num

    def run(self):#定義每個線程要運行的函數
        print("running on number:%s" % self.num)

        time.sleep(3)

if __name__ == __main__:
    t1 = Mythread(1)
    t2 = Mythread(2)

    t1.start()
    t2.start()
    print("ending......")

三、threading.Thread的實例方法

join&aemon方法

import threading
from time import ctime,sleep
import time


def ListenMusic(name):
    print("Begin listening to %s. %s" % (name, ctime()))
    sleep(2)  # sleep等同於IO操作
    print("end listening %s" % ctime())


def RecordBlog(title):
    print("Begin recording the %s! %s" % (title, ctime()))
    sleep(5)
    print(end recording %s % ctime())


l = []
t1 = threading.Thread(target=ListenMusic,args=("A",))
t2 = threading.Thread(target=RecordBlog,args=("B",))

l.append(t1)
l.append(t2)
if __name__ == __main__:
    for i in l :
        # i.setDaemon(True) # 兩個都只執行第一句然後結束
        i.start()
        # i.join() #線程一個一個執行

print ("all over %s" %ctime())

join():在子線程完成運行之前,這個子線程的父線程將一直被阻塞。

setDaemon(True):

將線程聲明為守護線程,必須在start() 方法調用之前設置, 如果不設置為守護線程程序會被無限掛起。這個方法基本和join是相反的。

當我們 在程序運行中,執行一個主線程,如果主線程又創建一個子線程,主線程和子線程 就分兵兩路,分別運行,那麽當主線程完成

想退出時,會檢驗子線程是否完成。如 果子線程未完成,則主線程會等待子線程完成後再退出。但是有時候我們需要的是 只要主線程

完成了,不管子線程是否完成,都要和主線程一起退出,這時就可以 用setDaemon方法啦

其他方法

# run():  線程被cpu調度後自動執行線程對象的run方法
# start():啟動線程活動。
# isAlive(): 返回線程是否活動的。
# getName(): 返回線程名。
# setName(): 設置線程名。

threading模塊提供的一些方法:
# threading.currentThread(): 返回當前的線程變量。
# threading.enumerate(): 返回一個包含正在運行的線程的list。正在運行指線程啟動後、結束前,不包括啟動前和終止後的線程。
# threading.activeCount(): 返回正在運行的線程數量,與len(threading.enumerate())有相同的結果。

四、同步鎖(Lock)

import time
import threading

def addNum():
    global num #在每個線程中都獲取這個全局變量
    #num-=1

    temp=num
    #print(‘--get num:‘,num )
    time.sleep(0.001)
    num =temp-1 #對此公共變量進行-1操作

num = 100  #設定一個共享變量
thread_list = []
for i in range(100):
    t = threading.Thread(target=addNum)
    t.start()
    thread_list.append(t)

for t in thread_list: #等待所有線程執行完畢
    t.join()

print(final num:, num )

#執行結果不固定,不為0

多個線程都在同時操作同一個共享資源,所以造成了資源破壞,怎麽辦呢?(join會造成串行,失去所線程的意義)

我們可以通過同步鎖來解決這種問題

import time
import threading

R = threading.Lock()


####
def sub():
    global num
    R.acquire()   #同步鎖開始
    temp = num - 1
    time.sleep(0.01) 
    num = temp
    R.release() #同步鎖結束

num = 100  #設定一個共享變量
thread_list = []
for i in range(100):
    t = threading.Thread(target=sub)
    t.start()
    thread_list.append(t)

for t in thread_list: #等待所有線程執行完畢
    t.join()

print(‘final num:‘, num )

#運行結果為0

  

五、線程死鎖和遞歸鎖

在線程間共享多個資源的時候,如果兩個線程分別占有一部分資源並且同時等待對方的資源,就會造成死鎖,因為系統判斷這部分資源都正在使用,所有這兩個線程在無外力作用下將一直等待下去。下面是一個死鎖的例子:

import threading,time

class myThread(threading.Thread):
    def doA(self):
        lockA.acquire()
        print(self.name,"gotlockA",time.ctime())
        time.sleep(3)
        lockB.acquire()
        print(self.name,"gotlockB",time.ctime())
        lockB.release()
        lockA.release()

    def doB(self):
        lockB.acquire()
        print(self.name,"gotlockB",time.ctime())
        time.sleep(2)
        lockA.acquire()
        print(self.name,"gotlockA",time.ctime())
        lockA.release()
        lockB.release()

    def run(self):
        self.doA()
        self.doB()
if __name__=="__main__":

    lockA=threading.Lock()
    lockB=threading.Lock()
    threads=[]
    for i in range(5):
        threads.append(myThread())
    for t in threads:
        t.start()
    for t in threads:
        t.join()

解決辦法:使用遞歸鎖,將

lockA=threading.Lock()
lockB=threading.Lock()<br>#--------------<br>lock=threading.RLock()
技術分享圖片
import  threading
import time


class MyThread(threading.Thread):

    def actionA(self):

        r_lcok.acquire() #count=1
        print(self.name,"gotA",time.ctime())
        time.sleep(2)
        r_lcok.acquire() #count=2

        print(self.name, "gotB", time.ctime())
        time.sleep(1)

        r_lcok.release() #count=1
        r_lcok.release() #count=0


    def actionB(self):

        r_lcok.acquire()
        print(self.name, "gotB", time.ctime())
        time.sleep(2)

        r_lcok.acquire()
        print(self.name, "gotA", time.ctime())
        time.sleep(1)

        r_lcok.release()
        r_lcok.release()


    def run(self):

        self.actionA()
        self.actionB()


if __name__ == __main__:

    # A=threading.Lock() 同步鎖會造成死鎖
    # B=threading.Lock()

    r_lcok=threading.RLock() #使用遞歸鎖解決這個問題
    L=[]

    for i in range(5):
        t=MyThread()
        t.start()
        L.append(t)


    for i in L:
        i.join()

    print("ending....")
遞歸鎖

多線程利器------隊列

隊列Queue是python標準庫中的線程安全的隊列(FIFO)實現,提供了一個適用於多線程編程的先進先出的數據結構,即隊列,用來在生產者和消費者線程之間的信息傳遞

queue隊列的方法:

創建一個“隊列”對象
import Queue
q = Queue.Queue(maxsize = 10)
Queue.Queue類即是一個隊列的同步實現。隊列長度可為無限或者有限。可通過Queue的構造函數的可選參數maxsize來設定隊列長度。如果maxsize小於1就表示隊列長度無限。

將一個值放入隊列中
q.put(10)
調用隊列對象的put()方法在隊尾插入一個項目。put()有兩個參數,第一個item為必需的,為插入項目的值;第二個block為可選參數,默認為
1。如果隊列當前為空且block為1,put()方法就使調用線程暫停,直到空出一個數據單元。如果block為0,put方法將引發Full異常。

將一個值從隊列中取出
q.get()
調用隊列對象的get()方法從隊頭刪除並返回一個項目。可選參數為block,默認為True。如果隊列為空且block為True,
get()就使調用線程暫停,直至有項目可用。如果隊列為空且block為False,隊列將引發Empty異常。
Python Queue模塊有三種隊列及構造函數:
1、Python Queue模塊的FIFO隊列先進先出。   class queue.Queue(maxsize)
2、LIFO類似於堆,即先進後出。               class queue.LifoQueue(maxsize)
3、還有一種是優先級隊列級別越低越先出來。        class queue.PriorityQueue(maxsize)

此包中的常用方法(q = Queue.Queue()):
q.qsize() 返回隊列的大小
q.empty() 如果隊列為空,返回True,反之False
q.full() 如果隊列滿了,返回True,反之False
q.full 與 maxsize 大小對應
q.get([block[, timeout]]) 獲取隊列,timeout等待時間
q.get_nowait() 相當q.get(False)
非阻塞 q.put(item) 寫入隊列,timeout等待時間
q.put_nowait(item) 相當q.put(item, False)
q.task_done() 在完成一項工作之後,q.task_done() 函數向任務已經完成的隊列發送一個信號
q.join() 實際上意味著等到隊列為空,再執行別的操作

其他模式:

技術分享圖片
import queue

#先進後出

q=queue.LifoQueue()

q.put(34)
q.put(56)
q.put(12)

#優先級
# q=queue.PriorityQueue()
# q.put([5,100])
# q.put([7,200])
# q.put([3,"hello"])
# q.put([4,{"name":"alex"}])

while 1:

  data=q.get()
  print(data)
其他模式例子

生產者消費者模型

為什麽要使用生產者和消費者模式

在線程世界裏,生產者就是生產數據的線程,消費者就是消費數據的線程。在多線程開發當中,如果生產者處理速度很快,而消費者處理速度很慢,那麽生產者就必須等待消費者處理完,才能繼續生產數據。同樣的道理,如果消費者的處理能力大於生產者,那麽消費者就必須等待生產者。為了解決這個問題於是引入了生產者和消費者模式。

什麽是生產者消費者模式

生產者消費者模式是通過一個容器來解決生產者和消費者的強耦合問題。生產者和消費者彼此之間不直接通訊,而通過阻塞隊列來進行通訊,所以生產者生產完數據之後不用等待消費者處理,直接扔給阻塞隊列,消費者不找生產者要數據,而是直接從阻塞隊列裏取,阻塞隊列就相當於一個緩沖區,平衡了生產者和消費者的處理能力。

這就像,在餐廳,廚師做好菜,不需要直接和客戶交流,而是交給前臺,而客戶去飯菜也不需要不找廚師,直接去前臺領取即可,這也是一個結耦的過程。

import time,random
import queue,threading

q = queue.Queue()

def Producer(name):
  count = 0
  while count <10:   #做十個包子
    print("making........")
    time.sleep(5)  #正在做包子
    q.put(count)  #做好包子放入籠屜(隊列)裏
    print(Producer %s has produced %s baozi.. %(name, count))
    count +=1
    q.join() #等到隊列為空,再執行別的操作
    print("ok......")


def Consumer(name):
  count = 0
  while count <10: #吃包子
        time.sleep(random.randrange(4)) #先等待包子

        data = q.get()   #拿包子  如果沒有包子等待包子的到來
        print("eating....")
        time.sleep(3)  #吃包子ing....

        q.task_done()#在完成一項工作之後,q.task_done() 函數向任務已經完成的隊列發送一個信號

        print(\033[32;1mConsumer %s has eat %s baozi...\033[0m %(name, data))

        count +=1
        # q.join()
p1 = threading.Thread(target=Producer, args=(A君,))
c1 = threading.Thread(target=Consumer, args=(B君,))
# c2 = threading.Thread(target=Consumer, args=(‘C君‘,))
# c3 = threading.Thread(target=Consumer, args=(‘D君‘,))

p1.start()
c1.start()
# c2.start()
# c3.start()

多進程模塊multiprocessing

multiprocessing包是Python中的多進程管理包。與threading.Thread類似,它可以利用multiprocessing.Process對象來創建一個進程。該進程可以運行在Python程序內部編寫的函數。該Process對象與Thread對象的用法相同,也有start(), run(), join()的方法。此外multiprocessing包中也有Lock/Event/Semaphore/Condition類 (這些對象可以像多線程那樣,通過參數傳遞給各個進程),用以同步進程,其用法與threading包中的同名類一致。所以,multiprocessing的很大一部份與threading使用同一套API,只不過換到了多進程的情境。

一、進程的調用

直接調用

from multiprocessing import Process
import time
def f(name):
    time.sleep(1)
    print(hello, name,time.ctime())

if __name__ == __main__:
    p_list=[]
    for i in range(3):
        p = Process(target=f, args=(alvin,))
        p_list.append(p)
        p.start()
    for i in p_list:
        p.join()
    print(end)

繼承式調用

from multiprocessing import Process
import time

class MyProcess(Process):
    def __init__(self):
        super(MyProcess, self).__init__()
        #self.name = name

    def run(self):
        time.sleep(1)
        print (hello, self.name,time.ctime())


if __name__ == __main__:
    p_list=[]
    for i in range(3):
        p = MyProcess()
        p.start()
        p_list.append(p)

    for p in p_list:
        p.join()

    print(end)

二、Process類

構造方法:

Process([group [, target [, name [, args [, kwargs]]]]])

  group: 線程組,目前還沒有實現,庫引用中提示必須是None;
  target: 要執行的方法;
  name: 進程名;
  args/kwargs: 要傳入方法的參數。

實例方法:

  is_alive():返回進程是否在運行。

  join([timeout]):阻塞當前上下文環境的進程程,直到調用此方法的進程終止或到達指定的timeout(可選參數)。

  start():進程準備就緒,等待CPU調度

  run():strat()調用run方法,如果實例進程時未制定傳入target,這star執行t默認run()方法。

  terminate():不管任務是否完成,立即停止工作進程

屬性:

  daemon:和線程的setDeamon功能一樣

  name:進程名字。

  pid:進程號。

三、進程間通訊

3.1進程隊列Queue

from multiprocessing import Process, Queue
import queue

def f(q,n):
    #q.put([123, 456, ‘hello‘])
    q.put(n*n+1)
    print("son process",id(q))

if __name__ == __main__:
    q = Queue()  #try: q=queue.Queue()
    print("main process",id(q))

    for i in range(3):
        p = Process(target=f, args=(q,i))
        p.start()

    print(q.get())
    print(q.get())
    print(q.get())

3.2管道

from multiprocessing import Process, Pipe

def f(conn):
    conn.send([12, {"name":"yuan"}, hello])
    response=conn.recv()
    print("response",response)
    conn.close()
    print("q_ID2:",id(child_conn))

if __name__ == __main__:

    parent_conn, child_conn = Pipe()
    print("q_ID1:",id(child_conn))
    p = Process(target=f, args=(child_conn,))
    p.start()
    print(parent_conn.recv())   # prints "[42, None, ‘hello‘]"
    parent_conn.send("兒子你好!")
    p.join()

3.3 Managers

Queue和pipe只是實現了數據交互,並沒實現數據共享,即一個進程去更改另一個進程的數據。

from multiprocessing import Process, Manager

def f(d, l,n):

    d[n] = 1    #{0:"1"}
    d[2] = 2    #{0:"1","2":2}

    l.append(n)    #[0,1,2,3,4,   0,1,2,3,4,5,6,7,8,9]
    #print(l)


if __name__ == __main__:

    with Manager() as manager:

        d = manager.dict()#{}

        l = manager.list(range(5))#[0,1,2,3,4]


        p_list = []

        for i in range(10):
            p = Process(target=f, args=(d,l,i))
            p.start()
            p_list.append(p)

        for res in p_list:
            res.join()

        print(d)
        print(l)

四、進程同步

在創建多個進程的過程中,可能會出現多個進程爭相輸出資源(打印,顯示等)的情況,前一個進程沒來得及輸出換行符,該資源就被下一個進程調用,導致兩個進程打印在一行之中,為了避免這種情況,我們可以使用加鎖

from multiprocessing import Process, Lock
import time

def f(l, i):

        l.acquire()  #加鎖
        time.sleep(1)
        print(hello world %s % i)
        l.release() #釋放鎖

if __name__ == __main__:
    lock = Lock()

    for num in range(10):
        Process(target=f, args=(lock, num)).start()

五、進程池

進程池內部維護一個進程序列,當使用時,則去進程池中獲取一個進程,如果進程池序列中沒有可供使用的進進程,那麽程序就會等待,直到進程池中有可用進程為止。

進程池中有兩個方法:

  • apply   同步方法 
  • apply_async 異步方法
from  multiprocessing import Process,Pool
import time,os

def Foo(i):

    time.sleep(1)
    print(i)
    print("son",os.getpid())

    return "HELLO %s"%i


def Bar(arg):
    print(arg)
    # print("hello")
    # print("Bar:",os.getpid())

if __name__ == __main__:

    pool = Pool(5)  #創建進程池  進程數為5
    print("main pid",os.getpid())
    for i in range(100):   #開啟100個任務
        #pool.apply(func=Foo, args=(i,))  #同步接口
        #pool.apply_async(func=Foo, args=(i,))

        #回調函數:  就是某個動作或者函數執行成功後再去執行的函數

        pool.apply_async(func=Foo, args=(i,),callback=Bar) #相當於threading.Thread實例化對象  func=target
                                                          # callback 是回調函數

    pool.close()
    pool.join()         # 進程池中join與close調用順序是固定的

    print(end)

協程

協程,又稱微線程,纖程。英文名Coroutine。

優點1: 協程極高的執行效率。因為子程序切換不是線程切換,而是由程序自身控制,因此,沒有線程切換的開銷,和多線程比,線程數量越多,協程的性能優勢就越明顯。

優點2: 不需要多線程的鎖機制,因為只有一個線程,也不存在同時寫變量沖突,在協程中控制共享資源不加鎖,只需要判斷狀態就好了,所以執行效率比多線程高很多。

因為協程是一個線程執行,那怎麽利用多核CPU呢?最簡單的方法是多進程+協程,既充分利用多核,又充分發揮協程的高效率,可獲得極高的性能。

import  time
import  queue

def  consumer(name):
    print(準備做包子)

    while True:
        new_baozi = yield #協程的核心
        print("%s正在吃包子%s"%(name,new_baozi))


def  producer():
    n = 0

    con.__next__()   #執行函數到yield
    con2.__next__()

    while n<10:
        time.sleep(1)
        print("正在做包子%s 和 %s"%(n,n+1))
        con.send(n)    #把做的包子發送給yield
        con2.send(n+1) #把做的包子發送給yield
        n += 2


if __name__ == __main__:
    con = consumer(alex)
    con2 = consumer(selina)
    producer()

Greenlet

greenlet是一個用C實現的協程模塊,相比與python自帶的yield,它可以使你在任意函數之間隨意切換,而不需把這個函數先聲明為generator

from greenlet import greenlet
 
 
def test1():
    print(12)
    gr2.switch() #切換到gr2 並保存狀態
    print(34)
    gr2.switch() #切換到gr2 並保存狀態
 
 
def test2():
    print(56)
    gr1.switch()#切換到gr1 並保存狀態
    print(78)
 
 
gr1 = greenlet(test1)  #將test1封裝為gr1
gr2 = greenlet(test2) #將test2封裝為gr2
gr1.switch() #執行test1

#運行結果為
#12
#34
#56
#78

Python_並發編程(線程 進程 協程)