1. 程式人生 > >15.python併發程式設計(執行緒--程序--協程)

15.python併發程式設計(執行緒--程序--協程)

一.程序:
1.定義:程序最小的資源單位,本質就是一個程式在一個數據集上的一次動態執行(執行)的過程
2.組成:程序一般由程式,資料集,程序控制三部分組成:
(1)程式:用來描述程序要完成哪些功能以及如何完成
(2)資料集:是程式在執行過程中所需要使用的一切資源
(3)程序控制塊:用來記錄程序外部特徵,描述程序的執行變化過程,系統可以利用它來控制和管理程序,它是系統感知程序存在的唯一標誌。
3.程序的作用:是想完成多工併發,程序之間的記憶體地址是相互獨立的
二.執行緒:
1.定義:最小的執行單位,執行緒的出現是為了降低上下文切換的消耗,提高系統的併發性,並突破一個程序只能幹一樣事的缺陷,使到進2.程內併發成為可能


2.組成:執行緒也叫輕量級程序,它是一個基本的CPU執行單元,也是程式執行過程中的最小單元,由執行緒ID,程式,計數器,暫存器集合和堆疊共同組成。
3.作用:執行緒的引入減小了程式併發執行的開銷,提高了作業系統併發性,執行緒沒有自己的系統資源
三.併發和並行的關係
1.併發:指系統具有處理多個任務(動作)的能力,一個CPU可以實現併發,因為它能實現多個任務的處理
2.並行:指系統具有同時處理多個任務(動作)的能力,一個程式只能對應一個CPU核數,一核不可能有倆個程式同時進行,只能是併發不是並行
3.併發和並行的關係:並行是併發的一個子集
四.程序和執行緒的關係:
1.一個程式至少有一個程序,一個程序至少有一個執行緒(程序可以理解成執行緒的容器)

2.程序在執行過程中擁有獨立的記憶體單元,而多個執行緒共享記憶體(程序的記憶體),從而極大地提高了程式的執行效率
3.執行緒在執行過程中與程序的區別是,每個獨立的執行緒有一個程式執行的入口,順序執行序列和程式的出口。但是執行緒不能夠獨立執行,必須依存在應用程式中,由應用程式提供多個執行緒執行控制。
4.程序是具有一定獨立功能的程式關於某個資料集合上的一次執行活動,程序是系統進行資源分配和排程的一個獨立單位。執行緒是程序的一個實體,是CPU排程和分派的基本單位,它是比程序更小的能獨立執行的基本單位。執行緒自己基本上不擁有系統資源,只擁有一點在執行中必不可少的資源(如程式計數器,一組暫存器和棧)但是它可與同屬一個程序的其他的執行緒共享程序所擁有的全部資源。一個執行緒可以建立和撤銷另一個執行緒;同一個程序中的多個執行緒之間可以併發執行,一個程序裡邊可以有多個執行緒,CPU執行的是執行緒,程序是做資源管理的,它代表一個過程,管理這些執行緒,一個程序最少有一個執行緒,這個執行緒叫主執行緒,可以在這個執行緒裡開多個子執行緒

五.python的執行緒與threading模組
1.threading模組建立在thread模組之上,thread模組以低階,原始的方式來處理和控制執行緒,而threading模組通過對thread進行二次封裝,提供了更方便的api來處理執行緒。
(1)直接呼叫實現併發

import threading                              #引入執行緒模組
import time

#從上到下按照主執行緒執行的
def Hi(num):
    print("hello %d"%num)
    time.sleep(5)                              #模擬時間消耗

if __name__ == '__main__':
    #建立第一個子執行緒
    t1=threading.Thread(target=Hi,args=(10,))    #threading.給Thread這個類建立了一個子執行緒物件t1----target等於要處理的函式名字
    t1.start()                                   #Thread這個類例項出物件t1通過start啟動第一個子執行緒

    #建立第二個子執行緒
    t2 = threading.Thread(target=Hi, args=(5,))  #threading.給Thread這個類建立了一個子執行緒物件t2----target等於要處理的函式名字
    t2.start()                                   #Thread這個類例項出物件t2通過start啟動第二個子執行緒

    print("結束..........")                      #主執行緒

列印:同時打印出三條後等5秒程式結束
hello 10
hello 5
結束..........
(2)直接呼叫實現併發:

import threading
from time import ctime,sleep
import time

def ListenMusic(name):        #定義函式一

        print ("我是開始 %s. %s" %(name,ctime()))
        sleep(3)
        print("t1結束 %s"%ctime())


def RecordBlog(title):        #定義函式二

        print ("我是開始 %s! %s" %(title,ctime()))
        sleep(5)
        print('t2結束 %s'%ctime())

threads = []                   #定義一個空列表

t1 = threading.Thread(target=ListenMusic,args=('t1',))    #建立t1把t1加到列表threads裡面去
t2 = threading.Thread(target=RecordBlog,args=('t2',))     #建立t2把t2加到列表threads裡面去

threads.append(t1)
threads.append(t2)

if __name__ == '__main__':

    for t in threads:                                      #對列表執行遍歷
        t.start()                                          #把所有執行緒都開啟

    print ("所有都結束 %s" %ctime())                       #主執行緒

列印結果:首先打印出我是開始 t1,我是開始 t2和所有都結束,三秒後列印t1結束,倆秒鐘後列印t2結束,總共耗時5秒實現併發
我是開始 t1. Mon Nov 26 11:42:02 2018
我是開始 t2! Mon Nov 26 11:42:02 2018
所有都結束 Mon Nov 26 11:42:02 2018
t1結束 Mon Nov 26 11:42:05 2018
t2結束 Mon Nov 26 11:42:07 2018
(3)通過繼承式呼叫

import threading
import time

class MyThread(threading.Thread):

    def __init__(self, num):
        threading.Thread.__init__(self)
        self.num = num

    def run(self):                                  #定義每個執行緒要執行的函式,必須是run

        print("running on number:%s" % self.num)

        time.sleep(3)

if __name__ == '__main__':

    t1 = MyThread(1)          #例項類是自己定製的繼承threading.Thread,t1就是執行緒物件
    t2 = MyThread(2)
    t1.start()                #啟用run方法
    t2.start()
    print("結束......")       #主執行緒

返回結果:
running on number:1
running on number:2
結束......
2.join():在子執行緒完成執行之前,這個子執行緒的父執行緒將一直被阻塞(join方法執行緒物件例項的方法)
(1)模擬併發效果之join方法1

import threading
import time

def music():
    print("music開始時間 %s"%time.ctime())
    time.sleep(3)
    print("music結束時間 %s" % time.ctime())

def game():
    print("game開始時間 %s"%time.ctime())
    time.sleep(5)
    print("game結束時間 %s" % time.ctime())

if __name__ == '__main__':
    #建立第一個子執行緒
    t1=  threading.Thread(target=music)   #threading.給Thread這個類建立了一個子執行緒物件t1
    #建立第二個子執行緒
    t2 = threading.Thread(target=game)    #threading.給Thread這個類建立了一個子執行緒物件t2

    t1.start()                           #Thread這個類例項出物件t1通過start啟動
    t2.start()                           #Thread這個類例項出物件t2通過start啟動
    
    #join是t1,t2不執行完主執行緒不往下執行
    t1.join()
    t2.join()

    #主執行緒
    print("結束")

列印流程:首先打印出music開始時間和game開始時間三秒後列印music結束時間倆秒後列印game結束時間和結束,總共耗時5秒實現併發
music開始時間 Sat Nov 24 14:47:40 2018
game開始時間 Sat Nov 24 14:47:40 2018
music結束時間 Sat Nov 24 14:47:43 2018
game結束時間 Sat Nov 24 14:47:45 2018
結束
(2)模擬併發效果之join方法2把t2給join起來

import threading
from time import ctime,sleep
import time

def ListenMusic(name):        #定義函式一

        print ("我是開始 %s. %s" %(name,ctime()))
        sleep(3)
        print("t1結束 %s"%ctime())


def RecordBlog(title):       #定義函式二

        print ("我是開始 %s! %s" %(title,ctime()))
        sleep(5)
        print('t2結束 %s'%ctime())

threads = []                 #定義一個空列表

t1 = threading.Thread(target=ListenMusic,args=('t1',))                #建立t1把t1加到列表threads裡面去
t2 = threading.Thread(target=RecordBlog,args=('t2',))          #建立t2把t2加到列表threads裡面去

threads.append(t1)
threads.append(t2)

if __name__ == '__main__':

    for t in threads:                                                   #對列表執行遍歷
        #t.setDaemon(True) #注意:一定在start之前設
        t.start()                                                       #把所有執行緒都開啟

    t.join()                                                            #把t2給join起來
    print ("所有都結束 %s" %ctime())                                      #主執行緒

列印輸出:首先列印我是開始 t1和我是開始 t2,三秒鐘後列印t1結束,倆秒鐘後列印t2結束和所有都結束
我是開始 t1. Mon Nov 26 11:49:35 2018
我是開始 t2! Mon Nov 26 11:49:35 2018
t1結束 Mon Nov 26 11:49:38 2018
t2結束 Mon Nov 26 11:49:40 2018
所有都結束 Mon Nov 26 11:49:40 2018
(3)模擬併發效果之join無用方法序列

import threading
from time import ctime,sleep
import time

def ListenMusic(name):        #定義函式一

        print ("我是開始 %s. %s" %(name,ctime()))
        sleep(3)
        print("t1結束 %s"%ctime())


def RecordBlog(title):       #定義函式二

        print ("我是開始 %s! %s" %(title,ctime()))
        sleep(5)
        print('t2結束 %s'%ctime())

threads = []                 #定義一個空列表

t1 = threading.Thread(target=ListenMusic,args=('t1',))                #建立t1把t1加到列表threads裡面去
t2 = threading.Thread(target=RecordBlog,args=('t2',))          #建立t2把t2加到列表threads裡面去

threads.append(t1)
threads.append(t2)

if __name__ == '__main__':

    for t in threads:                                                   #對列表執行遍歷
        t.start()                                                        #把所有執行緒都開啟
        t.join()                                                         #序列
        
    print ("所有都結束 %s" %ctime())

列印輸出:首先列印我是開始 t1,三秒鐘後列印t1結束和我是開始 t2,5秒鐘列印t2結束和所有都結束總共用時8秒沒有實現併發
我是開始 t1. Mon Nov 26 11:45:10 2018
t1結束 Mon Nov 26 11:45:13 2018
我是開始 t2! Mon Nov 26 11:45:13 2018
t2結束 Mon Nov 26 11:45:18 2018
所有都結束 Mon Nov 26 11:45:18 2018
3.setDaemon(True)將執行緒宣告為守護執行緒,必須在start()方法呼叫之前設定,如果不設定為守護執行緒程式會被無線掛起。這個方法基本和join是相反的(setDaemon(True)方法執行緒物件例項的方法)。
當我們在程式執行中,執行一個主執行緒,如果主執行緒又建立了一個子執行緒,主執行緒和子執行緒就分兵倆路,分別執行,那麼當主執行緒完成想退出時,會檢驗子執行緒是否完成。如果子執行緒未完成,則主執行緒會等待子執行緒完成後在退出。但是有時候我們需要的是,只要主執行緒完成了,不管子執行緒是否完成,都要和主執行緒一起退出
(1)模擬併發效果值setDaemon守護執行緒把t1和t2設定成守護執行緒跟著主執行緒一起退

import threading
from time import ctime,sleep
import time

def xiancheng1(name):        #定義函式一(執行緒1)

        print ("我是開始 %s %s" %(name,ctime()))
        sleep(3)
        print("t1結束 %s"%ctime())

def xiancheng2(title):       #定義函式二(執行緒2)

        print ("我是開始 %s %s" %(title,ctime()))
        sleep(5)
        print('t2結束 %s'%ctime())

threads = []                                                  #定義一個空列表

t1 = threading.Thread(target=xiancheng1,args=('t1',))         #建立執行緒物件t1
t2 = threading.Thread(target=xiancheng2,args=('t2',))         #建立執行緒物件t2

threads.append(t1)                                             #把t1加到列表threads裡面
threads.append(t2)                                             #把t2加到列表threads裡面

if __name__ == '__main__':

    for t in threads:                                                   #通過for迴圈對列表執t.start
        t.setDaemon(True)                                               #把t1和t2都設定成守護執行緒,子執行緒守護主執行緒(一定在start之前設)
        t.start()                                                        #把所有執行緒都開啟

    print ("我是主執行緒 %s" %ctime())

列印結果:同時輸出我是開始 t1和我是開始 t2和我是主執行緒結束
我是開始 t1 Tue Nov 27 11:59:49 2018
我是開始 t2 Tue Nov 27 11:59:49 2018
我是主執行緒 Tue Nov 27 11:59:49 2018
(2)模擬併發效果值setDaemon守護執行緒把t2設定成守護執行緒跟著主執行緒一起退

import threading
from time import ctime,sleep
import time

def xiancheng1(name):        #定義函式一(執行緒1)

        print ("我是開始 %s %s" %(name,ctime()))
        sleep(3)
        print("t1結束 %s"%ctime())

def xiancheng2(title):       #定義函式二(執行緒2)

        print ("我是開始 %s %s" %(title,ctime()))
        sleep(5)
        print('t2結束 %s'%ctime())

threads = []                                                  #定義一個空列表

t1 = threading.Thread(target=xiancheng1,args=('t1',))         #建立執行緒物件t1
t2 = threading.Thread(target=xiancheng2,args=('t2',))         #建立執行緒物件t2

threads.append(t1)                                             #把t1加到列表threads裡面
threads.append(t2)                                             #把t2加到列表threads裡面

if __name__ == '__main__':

    for t in threads:                                                   #通過for迴圈對列表執t.start
        t2.setDaemon(True)                                              #把t2設定成守護執行緒,t2子執行緒要守護主執行緒(一定在start之前設)
        t.start()                                                        #把所有執行緒都開啟

    print ("我是主執行緒 %s" %ctime())

列印輸出:首先列印輸出我是開始 t1和我是開始 t2和我是主執行緒,3秒鐘後列印t1結束,因為t1不是主執行緒守護,要等待t1子執行緒結束後退出程式
我是開始 t1 Tue Nov 27 12:02:40 2018
我是開始 t2 Tue Nov 27 12:02:40 2018
我是主執行緒 Tue Nov 27 12:02:40 2018
t1結束 Tue Nov 27 12:02:43 2018
4.其它方法
(1)run():用以表示執行緒活動的方法
(2)start():啟動執行緒活動(讓執行緒處於就緒的狀態,等待作業系統調cpu去執行)
(3)isAlive():返回執行緒是否活動的返回布林值
(4)getName():檢視當前執行緒名,(5)setName():設定執行緒名

import threading
from time import ctime,sleep
import time

def ListenMusic(name):        #定義函式一(執行緒1)

        print ("我是開始 %s. %s" %(name,ctime()))
        sleep(3)
        print("我是結束t1 %s"%ctime())

def RecordBlog(title):       #定義函式二(執行緒2)

        print ("我是開始 %s! %s" %(title,ctime()))
        sleep(5)
        print('我是結束t2 %s'%ctime())

threads = []                 #定義一個空列表

t1 = threading.Thread(target=ListenMusic,args=('t1',))         #建立t1把t1加到列表threads裡面去
t2 = threading.Thread(target=RecordBlog,args=('t2',))          #建立t2把t2加到列表threads裡面去

threads.append(t1)
threads.append(t2)

if __name__ == '__main__':
    t2.setDaemon(True)                                         #把t1設定成守護執行緒(一定在start之前設)
    for t in threads:                                          #對列表執行遍歷
        t.start()                                               #把所有執行緒都開啟
        print(t.getName())                                      #預設執行緒名字
        print("count:", threading.active_count())              #修改執行緒名字

    print ("我是主執行緒 %s" %ctime())

返回:
我是開始 t1. Mon Nov 26 14:09:12 2018
Thread-1
count: 2
我是開始 t2! Mon Nov 26 14:09:12 2018
Thread-2
count: 3
我是主執行緒 Mon Nov 26 14:09:12 2018
我是結束t1 Mon Nov 26 14:09:15 2018
5.threading模組提供的一些方法:
(1)threading.currentThread():返回當前的執行緒變數
(2)threading.enumerate():返回一個包含正在執行的執行緒的list。正在執行指執行緒啟動後,結束前,不包括啟動前和終止後的執行緒
(3)threading.activeCount():返回正在執行的執行緒數量,與len(threading.enumerate)有相同結果

import threading
from time import ctime,sleep
import time

def ListenMusic(name):        #定義函式一(執行緒1)

        print ("我是開始 %s. %s" %(name,ctime()))
        sleep(3)
        print("我是結束t1 %s"%ctime())

def RecordBlog(title):       #定義函式二(執行緒2)

        print ("我是開始 %s! %s" %(title,ctime()))
        sleep(5)
        print('我是結束t2 %s'%ctime())

threads = []                 #定義一個空列表

t1 = threading.Thread(target=ListenMusic,args=('t1',))         #建立t1把t1加到列表threads裡面去
t2 = threading.Thread(target=RecordBlog,args=('t2',))          #建立t2把t2加到列表threads裡面去

threads.append(t1)
threads.append(t2)

if __name__ == '__main__':

    for t in threads:                                          #對列表執行遍歷
        t.setDaemon(True)  # 把t1和t2都設定成守護執行緒(一定在start之前設)
        t.start()                                               #把所有執行緒都開啟

    while threading.active_count() == 1:                       #表示此時此刻就一個主執行緒
        print ("我是主執行緒 %s" %ctime())

輸出結果:
我是開始 t1. Mon Nov 26 14:22:07 2018
我是開始 t2! Mon Nov 26 14:22:07 2018
六.python的GIL(全域性解釋鎖)
1.GIL:全域性解釋鎖:無論你啟動多少個執行緒,你有多少CPU,python在執行的時候在同一時刻只允許一個執行緒被CPU執行
2.任務:分IO密集型和計算密集型,對於IO密集型的任務python的多執行緒不會被GIL影響,對於計算密集型的任務python的多執行緒會被GIL影響
3.任務解決方法:多程序+協程解決GIL
4.同步鎖
5.需求開100個執行緒對數字100做累減100的操作
(1)未使用同步鎖

import threading
import time
def sub():                            #累減函式sub
    global num                        #在每個執行緒中都獲取這個全域性變數

    temp=num                          #把num賦值給temp
    time.sleep(0.001)                 #等待0.001秒
    num=temp-1                        #對此公共變數進行-1操作賦值給num

num=100                               #設定一個共享變數

l=[]                                       #定義空列表

for i in range(100):
    t=threading.Thread(target=sub)         #建立執行緒執行sub賦值給t
    t.start()                              #執行t
    l.append(t)                            #建立t的時候加到列表l裡

for t in l:
    t.join()                              #等著主執行緒,讓主執行緒在這裡不要去執行

print(num)

列印輸出:多個執行緒都在同時操作同一個共享資源,所以造成資源損壞導致輸入結果不是0
87
(2)使用同步鎖

import threading
import time
def sub():                            #累減函式sub
    global num                        #在每個執行緒中都獲取這個全域性變數
    #使用同步鎖把這部分改為序列
    lock.acquire()                    #獲得一把鎖,在鎖下面的程式碼誰都不可以有CPU的切換,只能有一個執行緒被執行
    temp=num
    time.sleep(0.001)
    num=temp-1                        #對此公共變數進行-1操作賦值給num
    lock.release()                    #釋放這把鎖,獲取鎖中間的程式碼不可以有CPU的切換

num=100                               #設定一個共享變數

l=[]                                       #定義空列表
lock=threading.Lock()                      #建立同步鎖lock

for i in range(100):
    t=threading.Thread(target=sub)         #建立執行緒執行sub賦值給t
    t.start()                              #執行t
    l.append(t)                            #建立t的時候加到列表l裡

for t in l:                            #等待所有子執行緒執行完畢
    t.join()                              

print(num)

列印輸出:
0
5.執行緒死鎖和遞迴鎖
線上程間共享多個資源的時候,如果倆個執行緒分別佔有一部分資源並且同時等待對方的資源,就會造成死鎖,因為系統判斷這部分資源都正在使用,所有這倆個執行緒在無外力作用下將一直等待下去。
(1)執行緒死鎖

import  threading
import time

class MyThread(threading.Thread):               #建立一個類MyThread

    def actionA(self):    #例項方法1
        #五個執行緒同時搶佔
        A.acquire()                               #建立A鎖
        print(self.name,"A鎖",time.ctime())      #self.name是執行緒的名字列印A錯名字
        time.sleep(2)

        B.acquire()                               #建立B鎖
        print(self.name, "B鎖", time.ctime())
        time.sleep(1)

        B.release()                               #結束B鎖
        A.release()                               #結束A鎖

    def actionB(self):     #例項方法2

        B.acquire()                                #建立B鎖
        print(self.name, "B鎖", time.ctime())
        time.sleep(2)

        A.acquire()                                #建立A鎖
        print(self.name, "A鎖", time.ctime())
        time.sleep(1)

        A.release()
        B.release()

    def run(self):                   #先執行run方法(開了5分執行緒同時去執行下面倆函式)
        self.actionA()               #run方法執行actionA()
        self.actionB()               #run方法執行actionB()


if __name__ == '__main__':         #類的繼承方式來啟動多執行緒

    #建立同步鎖A
    A=threading.Lock()
    #建立同步鎖B
    B=threading.Lock()

    L=[]                        #定義空列表L

    for i in range(5):         #建立5個執行緒物件同時執行run方法
        t=MyThread()            #繼承方式建立執行緒物件例項自己的類
        t.start()               #執行t
        L.append(t)             #建立t的時候加到列表L裡

    for i in L:                #等待所有子執行緒執行完畢
        i.join()

    print("結束....")

列印結果:
Thread-1 A鎖 Tue Nov 27 17:09:54 2018
Thread-1 B鎖 Tue Nov 27 17:09:56 2018
Thread-1 B鎖 Tue Nov 27 17:09:57 2018
Thread-2 A鎖 Tue Nov 27 17:09:57 2018
造成死鎖:第一個執行緒在執行actionB(self):的A.acquire()這位置想要一把A鎖的時候,第二個執行緒已經把A鎖拿走了,需要等待A鎖什麼時候被釋放什麼時候才可以拿來,第二個執行緒在執行actionA(self)的B.acquire()這位置想獲得B鎖但B鎖已經被第一個執行緒拿著,倆邊都不釋放造成了死鎖
(2)遞迴鎖解決死鎖

import  threading
import time

class MyThread(threading.Thread):               #建立一個類MyThread

    def actionA(self):    #例項方法1
        #count大於1其它執行緒是無法進來不會造成死鎖
        r_lcok.acquire()                               #acquire一次加一個1  內部count計數器=1
        print(self.name,"A鎖",time.ctime())           #self.name是執行緒的名字列印A錯名字
        time.sleep(2)

        r_lcok.acquire()                               #acquire一次加一個1  內部count計數器=2
        print(self.name, "B鎖", time.ctime())
        time.sleep(1)

        r_lcok.release()                               #release減一個1   內部count計數器=1
        r_lcok.release()                               #count計數器=0的時候第二個執行緒就可以再用這把鎖

    def actionB(self):     #例項方法2

        r_lcok.acquire()
        print(self.name, "B鎖", time.ctime())
        time.sleep(2)

        r_lcok.acquire()
        print(self.name, "A鎖", time.ctime())
        time.sleep(1)

        r_lcok.release()
        r_lcok.release()

    def run(self):                   #先執行run方法(開了5分執行緒同時去執行下面倆函式)
        self.actionA()               #run方法執行actionA()
        self.actionB()               #run方法執行actionB()

if __name__ == '__main__':         #類的繼承方式來啟動多執行緒

    r_lcok = threading.RLock()       #建立遞迴鎖

    L=[]                             #定義空列表L

    for i in range(5):         #建立5個執行緒物件同時執行run方法
        t=MyThread()            #繼承方式建立執行緒物件例項自己的類
        t.start()               #執行t
        L.append(t)             #建立t的時候加到列表L裡

    for i in L:                #等待所有子執行緒執行完畢
        i.join()

    print("結束....")

列印結果:
Thread-1 A鎖 Tue Nov 27 17:47:27 2018
Thread-1 B鎖 Tue Nov 27 17:47:29 2018
Thread-2 A鎖 Tue Nov 27 17:47:30 2018
Thread-2 B鎖 Tue Nov 27 17:47:32 2018
Thread-3 A鎖 Tue Nov 27 17:47:33 2018
Thread-3 B鎖 Tue Nov 27 17:47:35 2018
Thread-4 A鎖 Tue Nov 27 17:47:36 2018
Thread-4 B鎖 Tue Nov 27 17:47:38 2018
Thread-5 A鎖 Tue Nov 27 17:47:39 2018
Thread-5 B鎖 Tue Nov 27 17:47:41 2018
Thread-1 B鎖 Tue Nov 27 17:47:42 2018
Thread-1 A鎖 Tue Nov 27 17:47:44 2018
Thread-2 B鎖 Tue Nov 27 17:47:45 2018
Thread-2 A鎖 Tue Nov 27 17:47:47 2018
Thread-3 B鎖 Tue Nov 27 17:47:48 2018
Thread-3 A鎖 Tue Nov 27 17:47:50 2018
Thread-4 B鎖 Tue Nov 27 17:47:51 2018
Thread-4 A鎖 Tue Nov 27 17:47:53 2018
Thread-5 B鎖 Tue Nov 27 17:47:54 2018
Thread-5 A鎖 Tue Nov 27 17:47:56 2018
結束....
七.同步條件物件(Event)
1.同步:當你一個程序執行到一個IO操作(等待外部資料)的時候,這個等的過程就是同步
2.非同步:當你一個程序執行到一個IO操作(等待外部資料)的時候,不等待,一直等到資料接收成功,再回來處理
3.讓兩個執行緒之間處於同步操作
(1)event.wati():如果flag被設定,就不等待了繼續往下執行
(2)event.set():設定Event
(3)event.clear():清除Event
(4)如果flag被設定了,wait方法就不會做任何事情,如果flag被清除(沒有被設定)了,wait會阻塞,一直等待被設定為止,一個Event物件可以用在多個執行緒裡去

import threading,time
class Teacher(threading.Thread):

    def run(self):
        print("Teacher:今天放學都不要走啊。")
        #print(event.isSet())           #列印當前是否被設定event,返回False
        event.set()                     #event.set()設定event,第一個event.wait()以下的程式碼就可以繼續執行
        time.sleep(3)
        print("Teacher:可以下課了。")
        #print(event.isSet())           #列印當前是否被設定event,返回False
        event.set()                     #event.set()設定event

class Student(threading.Thread):
    def run(self):

        event.wait()                     #遇到wait卡住返回到threads.append(Teacher())繼續執行。一旦event被設定,等同於pass,下面可以繼續執行

        print("Student:命苦啊!")
        time.sleep(1)
        event.clear()                    #event.clear()清空event
        event.wait()                     #遇到wait卡住返回到time.sleep(3)繼續執行。一旦event被設定,等同於pass,下面可以繼續執行
        print("Student:OhYeah!")

if __name__=="__main__":
    event=threading.Event()              #第一步:建立event物件

    threads=[]                           #第二步:建立列表
    for i in range(3):                  #建立3個Worker執行緒物件加到列表裡
        threads.append(Student())        #Student類繼承了threading.Thread,裡面有run方法,一旦有這個執行緒會執行run方法
    threads.append(Teacher())            #第三步:建立一個Teacher物件,也是繼承是呼叫
    for t in threads:                   #第四步:
        t.start()
    for t in threads:                   #第五步:
        t.join()

    print("結束.....")

返回結果:
Teacher:今天放學都不要走啊。
Student:命苦啊!
Student:命苦啊!
Student:命苦啊!
Teacher:可以下課了。
Student:OhYeah!
Student:OhYeah!
Student:OhYeah!
結束.....
八.訊號量(Semaphore)
1.訊號量:用來控制執行緒併發數的,BoundedSemaphore或Semaphore管理一個內建的計數器,沒當呼叫acquire()時-1,呼叫release()時+1。計數器不能小於0,當計數器為0時,acquire()將阻塞執行緒至同步鎖定狀態,直到其他執行緒呼叫release()。
2.BoundedSemaphore或Semaphore的唯一區別在於前者將在呼叫release()時檢查計數器是否超過了計數器的初始值,如果超過了將丟擲一個異常。

import threading,time

class myThread(threading.Thread):
    def run(self):

        if semaphore.acquire():     #同時有5個執行緒進去
            print(self.name)
            time.sleep(3)
            semaphore.release()     #把semaphore釋放又可以進來5個執行緒

if __name__=="__main__":
    semaphore=threading.Semaphore(5)   #semaphore也是鎖的一種,threading.Semaphore()建立鎖物件,5是每次只能進去5個執行緒

    thrs=[]                            #空列表
    for i in range(15):               #50執行緒物件分別去啟動執行run方法
        thrs.append(myThread())
    for t in thrs:
        t.start()

返回結果:每隔3秒列印5個執行緒
Thread-1
Thread-2
Thread-3
Thread-4
Thread-5
Thread-7
Thread-8
Thread-9
Thread-6
Thread-10
Thread-11
Thread-12
Thread-13
Thread-14
Thread-15
九.多執行緒利器--佇列(queue)
1.先進先出模式

import queue              #執行緒佇列

q=queue.Queue(3)          #建立queue物件,3代表最多放3個數據(預設按照先進先出)

q.put(18)                 #將18放入佇列中
q.put("hello")            #將hello放入佇列中
q.put({"name":"xi"})      #將{"name":"xi"}放入佇列中
#q.put(34,False)            #加上引數False引數如果佇列滿了報錯

while 1:                    
    data=q.get()             #通過q.get()將值從佇列中取出賦值給data
    #data = q.get(block=False)    #加上引數False引數如果佇列滿了報錯
    print(data)
    print("----------")

列印結果:
18
----------
hello
----------
{'name': 'xi'}
----------
2.先進後出模式

import queue              #執行緒佇列

q=queue.LifoQueue(3)            #建立queue物件,3代表最多放3個數據(LifoQueue先進後出)

q.put(12)                   #建立格子放資料12
q.put("hello")             #建立格子放資料hello
q.put({"name":"xi"})      #建立格子放資料{"name":"xi"}

while 1:
    data=q.get()             #通過q.get()取值q的值賦值給data
    print(data)
    print("----------")

列印結果:
{'name': 'xi'}
----------
hello
----------
12
----------
3.優先順序模式

import queue              #執行緒佇列

q=queue.PriorityQueue(3)            #建立queue物件,3代表最多放3個數據(PriorityQueue優先順序)

q.put([2,12])                   #建立格子放資料12,優先順序2
q.put([3,"hello"])             #建立格子放資料hello,優先順序3
q.put([1,{"name":"xi"}])       #建立格子放資料{"name":"xi"},優先順序1

while 1:
    data=q.get()             #通過q.get()取值q的值賦值給data
    print(data[1])
    print("----------")

列印輸出:
{'name': 'xi'}
----------
12
----------
hello
----------
4.佇列中常用的方法
(1)q.qsize():返回佇列的大小
(2)q.empty():如果佇列為空,返回True,反之False
(3)q.full():如果佇列滿了,返回True,反之False
(4)q.full與maxsize大小對應
(5)q.get([block[,timeout]])獲取佇列,timeout等待時間
(6)q.get_nowait()相當q.get(False)
(7)非阻塞q.put(item)寫入佇列,timeout等待時間
(8)q.task_done():在完成一項工作之後,q.task_done()函式向任務已經完成的佇列傳送一個訊號
(9)q.join():等著列隊為空在執行別的操作

import queue              #執行緒佇列

q=queue.Queue(3)          #建立queue物件,3代表最多放3個數據(預設按照先進先出)

q.put(18)                 #將18放入佇列中
q.put("hello")           #將hello放入佇列中
q.put({"name":"xi"})     #將{"name":"xi"}放入佇列中

#q.put_nowait(11)        相當於q.get(False)

print(q.qsize())          #按照具體有多少值來的
print(q.empty())          #是否為空
print(q.full())           #是否是滿的

while 1:
    data=q.get()          #通過q.get()將值從佇列中取出賦值給data
    print(data)
    print("----------")

返回:
3
False
True
18
----------
hello
----------
{'name': 'xi'}
----------
5.生產者消費者模型
(1)為什麼要使用生產者和消費者
線上程世界裡,生產者就是生產資料的執行緒,消費者就是消費資料的執行緒。在多執行緒開發當中,如果生產者處理速度很快,而消費者處理速度很慢那麼生產者就必須等待消費者處理完,才能繼續生產資料。同樣的道理,如果消費者的處理能力大於生產者,那麼消費者就必須等待生產者。為了解決這個問題於是引入了生產者消費者模式。
(2)什麼是生產者消費者模式
生產者消費者模式是通過一個容器來解決生產者和消費者的強行解除耦合問題。生產者和消費者彼此之間不直接通訊,而通過阻塞佇列來進行通訊,所以生產者生產完資料之後不用等待消費者處理,直接扔給阻塞佇列,消費者不找生產者要資料,而是直接從阻塞佇列裡取,阻塞佇列就相當於一個緩衝區,平衡了生產者和消費者的處理能力
(3)通過佇列完成生產者消費者模型

import time,random
import queue,threading

q = queue.Queue()                                            #首先建立佇列物件q

def Producer(name):                                          #生產者函式Producer
  count = 0                        #生產者定義count = 0
  while count < 5:                #當0小於5的時候
    print("開始生產........")    #執行
    time.sleep(3)
    q.put(count)                    #把當前count的值放到佇列q裡去
    print('生產者 %s 已經生產 %s 號包子..' %(name, count))
    count +=1                       #給count加一個1
    q.join()                        #遇到q.join生產者執行緒等待佇列為,開始執行消費者執行緒,當包子被吃了(佇列為空)的時候再往下執行
    print("結束......")

def Consumer(name):                  #消費者函式Consumer
  count = 0
  while count <5:
        time.sleep(random.randrange(4))
        data = q.get()                #q.get()拿到包子
        print("開始吃包子....")
        time.sleep(2)
        q.task_done()                 #倆秒鐘後把吃完包子的訊息告訴生產者裡的q.join(),
        print('\033[32;1m消費者 %s 已吃 %s 號包子...\033[0m' %(name, data))

        count +=1

p1 = threading.Thread(target=Producer, args=('王大廚',))    #p1執行緒作為生產者執行的Producer函式
c1 = threading.Thread(target=Consumer, args=('西西',))      #c1執行緒作為消費者執行的Consumer函式
c2 = threading.Thread(target=Consumer, args=('一一',))      #c2執行緒作為消費者執行的Consumer函式

#來了3個執行緒去執行p1,c1,c2
p1.start()
c1.start()
c2.start()

列印輸出:
開始生產........
生產者 王大廚 已經生產 0 號包子..
開始吃....
消費者 西西 已吃 0 號包子...
結束......
開始生產........
生產者 王大廚 已經生產 1 號包子..
開始吃....
消費者 一一 已吃 1 號包子...
結束......
開始生產........
生產者 王大廚 已經生產 2 號包子..
開始吃....
消費者 西西 已吃 2 號包子...
結束......
開始生產........
生產者 王大廚 已經生產 3 號包子..
開始吃....
消費者 一一 已吃 3 號包子...
結束......
開始生產........
生產者 王大廚 已經生產 4 號包子..
開始吃....
消費者 西西 已吃 4 號包子...
結束......
十.多程序模組(multiprocessing)
由於GIL的存在,python中的多執行緒其實並不是真正的多執行緒,如果要充分地使用多核CPU的資源,在python中大部分情況需要使用多程序
multiprocessing包是python中的多程序管理包。與threading.Thread類似,它可以利用multiprocessing.Process物件來建立一個程序。該程序可以執行在python程式內部編寫的函式。該Process物件與Thread物件的用法相同。也有start(),run(),join()的方法。此外multiprocessing包中也有Lock/Event/Semaphore/Condition類(這些物件可以像多執行緒那樣,通過引數傳遞給各個程序),用以同步程序,其用法與threading包中的同名類一致。所以,multiprocessing的很大一部分與threading使用同一套API,只不過換到了多程序的情境。
1.呼叫方式一(直接呼叫)

from multiprocessing import Process
import time

def f(name):                                       #定義函式f
    time.sleep(1)
    print('hello', name,time.ctime())             #1秒鐘後列印引數和時間

if __name__ == '__main__':
    p_list=[]
    for i in range(3):                            #迴圈建立3個子程序

        p = Process(target=f, args=('xixi',))     #Process例項化程序物件
        p_list.append(p)                           #加到列表p_list裡
        p.start()                                  #啟動

    for i in p_list:
        i.join()
    print('結束')                                 #主程序

列印輸出:並行
hello xixi Thu Nov 29 11:05:08 2018
hello xixi Thu Nov 29 11:05:08 2018
hello xixi Thu Nov 29 11:05:08 2018
結束
2.呼叫方式二(繼承的方式)

from multiprocessing import Process
import time

class MyProcess(Process):

    def run(self):
        time.sleep(1)
        print ('hello', self.name,time.ctime())    #self.name是程序的名字

if __name__ == '__main__':
    p_list=[]

    for i in range(3):
        p = MyProcess()
        p.start()                 #啟動執行def run(self):裡的run方法
        p_list.append(p)

    for p in p_list:
        p.join()                  #等待子程序執行完主程序執行

    print('結束')                 #主程序

列印輸出:並行
hello MyProcess-1 Thu Nov 29 11:17:29 2018
hello MyProcess-3 Thu Nov 29 11:17:29 2018
hello MyProcess-2 Thu Nov 29 11:17:29 2018
結束
3.daemon守護程序

from multiprocessing import Process
import time

class MyProcess(Process):

    def run(self):
        time.sleep(1)
        print ('hello', self.name,time.ctime())    #self.name是程序的名字

if __name__ == '__main__':
    p_list=[]

    for i in range(3):
        p = MyProcess()
        p.daemon=True               #加上守護程序,主程序完了就結束了,不管子程序是否執行完
        p.start()
        p_list.append(p)

    print('結束')                   #主程序

列印結果:
結束
4.顯示涉及的各個程序ID

from multiprocessing import Process
import os
import time

def info(title):                                           #定義函式info
    print("title:", title)
    print('當前執行程式父程序的程序號:', os.getppid())    
    print('當前程式執行的程序號:', os.getpid())           

def f(name):                                                #定義函式f,f函式也呼叫了info
    info('function f')                                     #呼叫了info函式
    print('hello', name)


if __name__ == '__main__':

    info('主程序')                             #執行給info傳一個字串

    time.sleep(1)
    print("------------------")
    p = Process(target=info, args=('子程序',))   #建立一個子程序p
    p.start()                                   #開啟
    p.join()

返回結果:父程序號:37320,子程序號:109704,子子程序號:111536
title: 主程序
當前執行程式父程序的程序號: 37320
當前程式執行的程序號: 109704
------------------
title: 子程序
當前執行程式父程序的程序號: 109704
當前程式執行的程序號: 111536
3.程序間通訊
(1)程序佇列