1. 程式人生 > >Python中併發、多執行緒等

Python中併發、多執行緒等

1、基本概念

併發和並行的區別:

1)並行,parallel

同時做某些事,可以互不干擾的同一時刻做幾件事。(解決併發的一種方法)

高速公路多個車道,車輛都在跑。同一時刻。

2)併發 concurrency

同時做某些事,一個時段內有事情要處理。(遇到的問題)

高併發,同一時刻內,有很多事情要處理。

2、併發的解決

1)佇列、緩衝區

排隊就是把人排成佇列,先進先出,解決了資源使用的問題。

排成的佇列,其實就是一個緩衝地帶,就是緩衝區。

Queue模組的類queue、lifoqueue、priorityqueue。

2)爭搶的

會有一個人佔據視窗,其他人會繼續爭搶,可以鎖定視窗,視窗不在為其他人服務,這就是鎖機制。(鎖的概念,排他性鎖,非排他性鎖)。

 

3)預處理

一種提前載入使用者需要的資料的思路,預處理思想,快取常用。

 

4)並行

日常可以通過購買更多的伺服器,或者開多執行緒,實現並行處理,來解決併發問題。

水平擴充套件思想。

如果在但CPU上處理,就不是並行了。

但是多數服務都是多CPU的,服務的部署就是多機、分散式的,都是並行處理。

(序列比並行快)

5)提速

提高單個CPU效能,或單個伺服器安裝更多的CPU

這就是一種垂直擴充套件思想。

 

6)訊息中介軟體

例如地跌站外的九曲迴腸的走廊,緩衝人流。

常見的訊息中介軟體有RabbitMQ,ActiveMQ(Apache)、RocketMQ(Apache)。

 

3、程序和執行緒

在實現了執行緒的作業系統中,執行緒是作業系統能夠進行運算排程的最小單位。他包含在程序中,是程序中的實際運作單位。一個程式執行例項就是一個程序。

 

程序(process)是計算機中的程式關於某資料集合上的一次執行活動,是系統進行資源分配和排程的基本單位,是作業系統結構的基礎。

(可執行,可執行的載入到記憶體中。程式是有一定格式的,Python直譯器載入,所有程序都是有入口的。偏移多少位。主執行緒達不到要求,就會啟用多執行緒。

多核。排程到不同的CPU上面去,虛擬的計算單元。)

資源爭搶問題:鎖,排他性鎖。佇列,不爭搶的人排隊。預載入,減少資料處理速度,提前載入到記憶體中。一變多。

 

 

程序和程式的關係

程式是原始碼編譯後的檔案,而這些檔案存放在磁碟上。當程式被作業系統載入到記憶體

中,就是程序,程序中存放著指令和資料(資源),也是執行緒的容器。

 

Linux程序有父程序、子程序,Windows的程序是平等關係。

 

執行緒,有時被稱為輕量級程序,是程式執行流的最小單元,一個標準的執行緒由執行緒ID,當前指令指標(pc暫存器集合堆疊組成。每個執行緒有自己獨立的棧。

在許多系統中,建立一個執行緒比建立一個程序快10-100倍。

 

程序、執行緒的理解

現代作業系統提出的程序的概念,每一個程序都認為自己是獨佔所有的計算機硬體資源。

程序就是獨立的王國,程序間不可以隨便的共享資料。

執行緒就是省份,同一個程序內的執行緒可以共享程序的資源,每一個執行緒擁有自己獨立的堆疊。

 

4、執行緒狀態

狀態

含義

就緒(ready)

執行緒能夠執行,但在等待被排程,可能執行緒剛剛建立啟動,或剛剛從阻塞恢復,或者被其他執行緒搶佔。

執行(running)

執行緒正在執行

阻塞(Blocked)

執行緒等待外部事件發生而無法執行,如I/O操作。

終止(Terminated)

執行緒完成,或退出,或被取消。

 

 

5、Python中的執行緒和程序

程序會啟動一個直譯器程序,執行緒會共享一個直譯器程序。

 

1)Python的執行緒開發

Python的執行緒開發使用標準庫threading

 

2)Thread類

簽名:

def __init__(self, group: None = ...,
             target: Optional[Callable[..., None]] = ...,
             name: Optional[str] = ...,
             args: Iterable = ...,
             kwargs: Mapping[str, Any] = ...,
             *, daemon: Optional[bool] = ...) -> None: ...

引數名

含義

target

執行緒呼叫物件,就是目標函式

name

為執行緒起名字

args

為目標函式傳遞實參,元組

Kwargs

為目標函式關鍵詞傳參,字典

3)執行緒啟動

import threading
import time

def worker():
    print('before')
    time.sleep(3)
    print('finished')

t = threading.Thread(target=worker)  #執行緒物件
t.start()   #啟動

 

通過threading.Thread建立一個執行緒物件,target是目標函式,name可以指定名稱。

需要呼叫start方法啟動函式。

執行緒之所以執行函式,是因為執行緒中就是用來執行程式碼的,所以還是函式呼叫。

 

函式執行完畢後,執行緒也就退出了。

如果想讓一個執行緒一直工作,不讓執行緒退出就要利用到while迴圈。

import threading
import time

def worker():
    count = 0
    while True:
        count += 1
        print('before')
        time.sleep(3)
        if count >5:
            print('finished')
            break

t = threading.Thread(target=worker)  #執行緒物件
t.start()   #啟動

 

4)執行緒退出

Python中沒有提供終止執行緒的方法。執行緒在下面情況下退出。

(1)執行緒函式內語句執行完畢

(2)執行緒函式中丟擲未處理的異常。

import threading
import time

def worker():
    count = 0
    while True:
        if count >5:
            break
            #return
            #raise RuntimeError(count)
        time.sleep(3)
        print('before')
        count += 1
        print('finished')

t = threading.Thread(target=worker)  #執行緒物件
t.start()   #啟動
print('end')

 

執行緒沒有優先順序,沒有執行緒組的概念。也不能被銷燬、停止、掛起,那麼就是沒有恢復和中斷了。

5)執行緒的傳參

import threading
import time

def add(x,y):
    print('{}+{}={}'.format(x,y,x+y))


t1 = threading.Thread(target=add,name='1',args=(4,5))
t1.start()
time.sleep(2)

t2 = threading.Thread(target=add,name = '2',args=(4,),kwargs={'y':6})
t2.start()
time.sleep(2)
t3 = threading.Thread(target=add,name='3',kwargs={'x':4,'y':7})
t3.start()

 

執行緒中的傳參,和函式傳參沒有什麼區別,本質上就是函式傳承。

 

6)threading的屬性和方法

名稱

含義

current_thread()

返回當前主執行緒

main_thread()

返回主執行緒物件

active_count()

當前處於alive狀態的執行緒個數

enumerate()

返回所有活著的執行緒的列表,不包括已經終止的執行緒和未開始的執行緒

git_ident()

返回當前執行緒的ID,非0整數。

active_count、enumerate方法返回的值還包括主執行緒。


import threading
import time


def showinfo():
    print('currentthread = {}'.format(threading.current_thread()))
    print('main thread = {}'.format(threading.main_thread()))
    print('active count = {}'.format(threading.active_count()))

def worker():
    count = 0
    showinfo()
    while True:
        if count>5:
            break
        time.sleep(5)
        count += 1
        print('finsh')

t = threading.Thread(target=worker,name='work')
showinfo()
t.start()

print('end')

 

 

currentthread = <_MainThread(MainThread, started 4048)>

main thread = <_MainThread(MainThread, started 4048)>

active count = 1

currentthread = <Thread(work, started 9084)>

end

main thread = <_MainThread(MainThread, stopped 4048)>

active count = 2

finsh

finsh

finsh

finsh

finsh

Finsh

 

名稱

含義

Name

他只是一個名字,只是一個識別符號,名字可以重名,getname()獲取,setname()設定這個名詞

Ident

執行緒id,是非0的整數,執行緒啟動後才會有ID,否則為None,執行緒退出,此id依舊可以訪問,此id可以重複訪問。

Is_alive()

返回執行緒是否或者

執行緒的name只是一個名稱,可以重複;id必須唯一,但可以線上程退出後在利用。

 

import threading

import time

 

 

def worker():

    count = 0

    while True:

        if count > 5:

            break

        time.sleep(2)

        count += 1

        print(threading.current_thread().name)

 

t = threading.Thread(name='work',target=worker)

print(t.ident)

t.start()

 

while True:

    time.sleep(1)

    if t.is_alive():

        print('{}{}alive'.format(t.name,t.ident))

    else:

        print('{}{}dead'.format(t.name,t.ident))

名稱

含義

Start()

啟動執行緒,每一個執行緒必須且只能執行該方法一次

Run()

執行執行緒函式

 

Start()啟動執行緒,只能執行一次。作業系統。開闢新的執行緒。

Run()直接做的是主執行緒。函式呼叫。

(1)start()
import threading
import time

def worker():
    count = 0
    while True:
        if count > 5:
            break
        time.sleep(3)
        count += 1
        print('running')
class Mythread(threading.Thread):
    def start(self):
        print('start----')
        super().start()

    def run(self):
        print('run----')
        super().run()

t = Mythread(target=worker,name='work')

t.start()

start方法執行結果是start----

run----

Running

按照執行緒進行執行。

(2)run()
import threading
import time


def worker():
    count = 0
    while True:
        if count>3:
            break
        time.sleep(2)
        count += 1
        print('runing')
class Mythread(threading.Thread):
    def start(self):
        print('start----')
        super().start()

    def run(self):
        print('run----')
        super().run()

t = Mythread(target=worker,name='work1')
t.run()

# run----
# runing

總結:run()執行結果就是直接是函式,呼叫,呼叫run函式。

Start()方法會呼叫run()方法,而run()方法可以執行函式。

 

(3)start和run的區別

Start方法啟動執行緒,啟動了一個新的執行緒,名字叫做worker執行,但是run方法,並沒有啟動新的執行緒,只是在主執行緒內呼叫了一個普通的函式。

 

7)多執行緒

 

多執行緒,一個程序中如果有多個執行緒,就是多執行緒,是先一種併發。

import threading
import time


def worker():
    count = 0
    while True:
        if count>3:
            break
        time.sleep(2)
        count += 1
        print('runing')
        print(threading.current_thread().name,threading.current_thread().ident)
class Mythread(threading.Thread):
    def start(self):
        print('start----')
        super().start()

    def run(self):
        print('run----')
        super().run()

t1 = Mythread(target=worker,name='work1')
t2 = Mythread(target=worker,name='work2')

# t1.run()
# t2.run()
####runing
# MainThread 1380
# runing
# MainThread 1380
# runing
# MainThread 1380
t1.start()
t2.start()
# start----
# run----
# start----
# run----
# runing
# work2 5048
# runing
# work1 9048

 

Start()方法work1和work2交替執行。啟動執行緒後,程序內多個活動的執行緒並行工作,就是多執行緒。

Run()方法中沒有開啟新的執行緒,就是普通函式呼叫,所以執行完t1.run()

,然後執行t2.run(),run()方法就不是多執行緒。

 

一個程序中至少有一個執行緒,並作為程式的入口,這個執行緒就是主執行緒,一個執行緒必須有一個主執行緒。

其他執行緒成為工作執行緒。

 

8)執行緒安全

import threading

def worker():
    for x in range(100):
        print('{}is running'.format(threading.current_thread().name))


for x in range(1,4):
    name = 'worker{}'.format(x)
    t = threading.Thread(name=name,target=worker)
    t.start()

 

利用ipython執行的結果是不是一行行的列印,而是很多字串列印在了一起。

這樣說明了print函式被打斷了,被執行緒切換打斷了,print函式分為兩步,第一步是列印字串,第二部是換行,就在這個期間,發生了執行緒的切換,說明了print函式是執行緒不安全的。

 

執行緒安全:執行緒執行一段程式碼,不會產生不確定的結果,那麼這段程式碼是執行緒安全的。

也是要用鎖,程序的鎖是管程序內的執行緒。獨佔資源。

 

解決上面列印的問題:

(1)不讓print列印換行

import threading

 

def worker():

    for x in range(100):

        print('{} is running.\n'.format(threading.current_thread().name),end='')

 

for x in range(1,5):

    name = 'worker{}'.format(x)

    t = threading.Thread(name=name,target=worker)

    t.start()

利用字串是不可變型別,可以作為一個整體不可分割輸出,end=’’就不在print輸出換行了。

 

(2)使用logging

標準庫裡面的logging模組,是日誌處理模組,執行緒安全的,生產環境程式碼都使用logging。

import threading
import logging


def worker():
    for x in range(100):
        # print('{} is running.\n'.format(threading.current_thread().name),end='')
        logging.warning('{}is running'.format(threading.current_thread().name))
for x in range(1,5):
    name = 'worker{}'.format(x)
    t = threading.Thread(name=name,target=worker)
    t.start()

 

 

 

9)daemon執行緒和non-daemon執行緒

daemon不是Linux裡面的守護程序。

 

程序靠執行緒執行程式碼,至少有一個主執行緒,其他執行緒是工作執行緒。

主執行緒是第一個啟動的執行緒。

父執行緒:如果A中啟動了一個執行緒B,那麼A就是B的父執行緒。

子執行緒:B就是A的子執行緒。

 

原始碼Thread的__init__ 方法中。

If deamon is not None:

Self._daemonic = daemon

else:

Self._daemonic = current_thread().daemon

Self._ident = None

執行緒daemon屬性,如果設定就是使用者的設定,否則,就取當前執行緒的daemon的值。

主執行緒是non-daemon執行緒,即daemon = False。

import time
import threading


def foo():
    time.sleep(5)
    for i in range(20):
        print(i)

t = threading.Thread(target=foo,daemon=False)
t.start()
print('end')

 

daemon設定False值,主執行緒執行完畢後,等待工作執行緒。

import time
import threading


def foo():
    time.sleep(5)
    for i in range(20):
        print(i)

t = threading.Thread(target=foo,daemon=True)
t.start()
print('end')

Daemon值改為true,主執行緒執行完畢後直接退出。

名稱

含義

Daemon

表示執行緒是否是daemon,這個值必須在start()之前設定,否則引發RuntimeError異常

IsDaemon()

是否是daemon執行緒

SetDaemon

設定daemon執行緒,必須在start方法之前設定。

總結:執行緒具有一個daemon屬性,可以顯示設定為True或者False,也可以不設定,則取預設值None。

 

如果不設定daemon,就取當前執行緒的daemon來設定他。

主執行緒是non-daemon執行緒,即daemon = False。

從主執行緒建立的所有執行緒的不設定daemon屬性,則預設daemon = False,也就是non-daemon執行緒。

程式在沒有活著的non-daemon執行緒執行時推出,也是就剩下的只是daemon執行緒,主執行緒才能推出。否則主執行緒只能等待。

 

構造執行緒的時候,可以設定daemon屬性,這個屬性必須在start方法前設定好。

daemon=True主執行緒不等。工作執行緒

daemon=False主執行緒等。只要有一個non-daemon就會等待。

控制一個屬性的。

在start之前。

 

只是有一個non-daemon就會等待,沒有的話直接不等,直接結束執行緒。

總結:

執行緒具有daemon屬性,可以設定為True或者False。

(啟用的non-daemon,主執行緒才會等待工作執行緒。)

import time
import threading


def bar():
    time.sleep(10)
    print('bar')

def foo():
    for i in range(20):
        print(i)
    t = threading.Thread(target=bar,daemon=False)
    t.start()
t = threading.Thread(target=foo,daemon=True)
t.start()

print('end')

 

這樣不會執行bar的,因為主執行緒的daemon設定的值為True,改為False就好了。

活著讓主執行緒sleep幾秒。

import time
import threading


def bar():
    time.sleep(10)
    print('bar')

def foo(n):
    for i in range(n):
        print(i)
t1 = threading.Thread(target=foo,args=(10,),daemon=True)
t1.start()
t2 = threading.Thread(target=foo,args=(20,),daemon=False)
t2.start()

time.sleep(6)
print('end')

 

如果non-daemon執行緒的時候,主執行緒退出,也不會結束所有的daemon執行緒,直到所有的non-daemon執行緒全部結束,如果還有daemon執行緒,主執行緒需要退出,會結束所有的daemon執行緒,退出。

 

主執行緒是non-daemon。其他執行緒靠傳參。

決定的是是否需要等待。如果有啟用的non-daemon,就需要等待,沒有啟用的,主執行緒直接退出。

 

10)join方法

import time
import threading
def foo(n):
    for i in range(n):
        print(i)
        time.sleep(1)

t1 = threading.Thread(target=foo,args=(10,),daemon=True)
t1.start()
t1.join()

 

 

 利用join,主執行緒被迫等待他。把當前執行緒阻塞住了,x.join就等待誰。保證程式碼的執行順序。

 

使用了join方法後,daemon執行緒執行完了,主執行緒才退出了。

 

Join(timeout= None),是執行緒的標準方法之一。

一個執行緒中呼叫另一個執行緒的join方法,呼叫者將被阻塞,直到被呼叫執行緒終止。

一個執行緒可以被join多次。

Timeout引數指定呼叫者等待多久,沒有設定超時的,就會一直等待到呼叫的執行緒結束。

呼叫誰的join方法,就是join誰,就要等睡。

 

 

 

 

11)daemon執行緒應用場景

簡單來說,本來並沒有daemon Thread,這個概念唯一的作用是,當把一個執行緒設定為daemon,他會隨著主執行緒的退出而退出。

主要應用場景為:

(1)後臺任務。傳送心跳包,監控。

(2)主執行緒工作才有用的執行緒。如主執行緒中維護著公共的資源,主執行緒已經清理了,準備退出,而工作執行緒使用這些資源工作沒有意義了,一起退出最合適。

(3)隨時可以被終止的執行緒。

 

如果主執行緒退出,想所有其他工作執行緒一起退出,就使用daemon=True來建立工作執行緒。

import time
import threading


def bar():
    while True:
        time.sleep(1)
        print('bar')

def foo():
    print('t1 daemon = {}'.format(threading.current_thread().isDaemon()))
    t2 = threading.Thread(target=bar)
    t2.start()
   
    print('t2 daemon = {}'.format(t2.isDaemon()))

t1 = threading.Thread(target=foo,daemon=True)
t1.start()


time.sleep(3)
print('Main end')

 

改造成一直執行的:

import time
import threading


def bar():
    while True:
        time.sleep(1)
        print('bar')

def foo():
    print('t1 daemon = {}'.format(threading.current_thread().isDaemon()))
    t2 = threading.Thread(target=bar)
    t2.start()
    t2.join()
    print('t2 daemon = {}'.format(t2.isDaemon()))

t1 = threading.Thread(target=foo,daemon=True)
t1.start()
t1.join()

time.sleep(3)
print('Main end')

 

Daemon執行緒,簡化了手動關閉執行緒的工作。

 

12)threading.local 類

 

區域性變數的實現:

import threading
import time

def worker():
    x = 0
    for i in range(10):
        time.sleep(0.01)
        x += 1
        print(threading.current_thread(),x)


for i in  range(10):
    threading.Thread(target=worker).start()

 

利用全域性變數實現:


import threading
import time

globals_data = threading.local()

def worker():
    globals_data.x = 0
    for i in range(10):
        time.sleep(0.01)
        globals_data.x += 1
        print(threading.current_thread(),globals_data.x)


for i in  range(10):
    threading.Thread(target=worker).start()

 

import threading


X = 'abc'
ctx = threading.local()
ctx.x = 123

print(ctx,type(ctx),ctx.x)

def worker():
    print(X)
    print(ctx)
    print(ctx.x)   #列印的時候出錯,表示x不能跨執行緒
    print('working')

worker()
print()
threading.Thread(target=worker).start() #另一個執行緒啟動

 

threading.local類構建了一個大字典,其元素是每一執行緒例項地址為key和執行緒物件引用執行緒單獨的字典的對映。

 

通過threading.local例項就可在不同的執行緒中,安全的使用執行緒獨有的資料,做到了執行緒間資料隔離,如同本地變數一樣安全。

 

Local和執行緒相關的大字典,每次利用的時候利用執行緒的小字典來頂替local例項的大字典。

不利用的話,全域性變數的話直接就是threading.local和本地執行緒相關的資料。

 

 

13)定時器timer延遲執行

Threading.Timer繼承自thread,這個類用來另一多久執行一個函式。

Class threading.Timer(interval,function,args=None,kwargs=None)

Start方法執行以後,Timer物件會處於等待狀態,等待了interval之後,開始執行function函式的。如果在執行函式之前的等待階段,使用了cancel方法,就會跳過執行函式結果。

 

本質上就是一個Thread,只是沒有提供name,daemon。

import threading
import logging
import time


def worker():
    logging.info('in worker')
    time.sleep(2)


t = threading.Timer(5,worker)
t.start()  #啟動
print(threading.enumerate())
t.cancel()   #取消
time.sleep(1)
print(threading.enumerate())

 

[<_MainThread(MainThread, started 7512)>, <Timer(Thread-1, started 6644)>]

[<_MainThread(MainThread, started 7512)>]

 

import threading
import logging
import time


def worker():
    logging.info('in worker')
    time.sleep(2)


t = threading.Timer(5,worker)
t.cancel()   #取消
t.start()  #啟動
print(threading.enumerate())
time.sleep(1)
print(threading.enumerate())

 

[<_MainThread(MainThread, started 7512)>]

[<_MainThread(MainThread, started 7512)>]

 

 

 

二、執行緒同步

1、概念

執行緒同步,執行緒間協同,通過某種技術,讓一個執行緒訪問某些資料時候,其他執行緒不能訪問這些資料,直到該執行緒完成對資料的操作。

 

不同作業系統實現技術有所不同,有臨界區、互斥量、訊號量、事件Event。

 

2、Event

Event事件,是執行緒間通訊機制中最簡單的實現,使用一個內部的標記flag,通過flag的True或False的變化來進行操作。

名稱

含義

set()

標記為True

clear()

標記為False

is_set()

標記是否為True

Wait(timeout=None)

設定等待標記為True的時長,None為無限等待,等到返回True,未等到超時了返回False。

 

課堂練習:老闆僱傭了一個工人,讓他生產杯子,老闆一直等著這個工人,直到上產了十個杯子。

1)利用join

import threading
import time
import logging


def worker(count=10):
    cups = []
    while len(cups)<count:
        logging.info('wprking')
        time.sleep(0.01)
        cups.append(1)
        print(len(cups))
    logging.info('I am finished')
w = threading.Thread(target=worker)
w.start()
w.join()

 

 

 

 

2)利用event

import threading
import logging
import time


def boss(event:threading.Event):
    logging.info('I am boss,waiting')
    event.wait()
    logging.info('good job')

def worker(event:threading.Event,count=10):
    logging.info('I am working for u')
    cups = []
    while True:
        logging.info('makeing')
        time.sleep(1)
        cups.append(1)
        if len(cups) >= count:
            print(len(cups))
            event.set()
            break
    logging.info('finished my job.cups={}'.format(cups))

event = threading.Event()
w = threading.Thread(target=worker,args=(event,))
b = threading.Thread(target=boss,args=(event,))
w.start()
b.start()

 

 

 

 

 

 

3)wait的應用

import threading
import logging
logging.basicConfig(level=logging.INFO)

def do(event:threading.Event,interval:int):
    while not event.wait(interval):  #沒有置set,所以是False。   不是False的時候就不能進入迴圈了。
        logging.info('do sth')    #沒三秒列印一次。   not False執行此語句

e = threading.Event()
threading.Thread(target=do,args=(e,10)).start()

e.wait(12)  #整體停留了十秒。
e.set()    #重置為True。
print('end')

 

4)練習,實現timer。

 

總結:

使用同一個Event用來做標記。

Event的wait優於time.sleep,更快的切換到其他執行緒,提高併發效率。

 

 

import threading
import time


class MyTimer:
    def __init__(self,interval,function,args,kwargs):
        self.interval = interval
        self.target = function
        self.args = args
        self.kwargs = kwargs
        self.event = threading.Event()
        self.thread = threading.Thread(target=self.target,args=self.args,kwargs=self.kwargs)

    def start(self):
        self.event.wait(self.interval)
        if not self.event.is_set():   #如果沒有置False,那麼就是False,not False為True,執行run語句。
            self.run()
   
    def run(self):
        self.start()
       
        self.event.set()

    def cancel(self):
        self.event.set()

 

 

Lock鎖

1)鎖,凡是存在共享資源爭搶的地方都可以使用鎖。從而保證只有一個使用者可以完全使用這個資源。

 

lock.acquire  上鎖    lock.release  解鎖

import threading
import logging
import time
FORMAT = '%(asctime)s %(threadName)s %(thread)d %(message)s'
logging.basicConfig(format=FORMAT,level=logging.INFO)


cups = []
def worker(count=10):
    logging.info('i am work')
    while len(cups) < count:
        time.sleep(0.1)
        cups.append(1)
    logging.info('i am finsh.cups={}'.format(len(cups)))


for _ in range(10):
    threading.Thread(target=worker,args=(1000,)).start()

 

2018-05-26 15:38:25,913 Thread-1 32 i am work

2018-05-26 15:38:25,913 Thread-2 4332 i am work

2018-05-26 15:38:25,913 Thread-3 9992 i am work

2018-05-26 15:38:25,914 Thread-4 8464 i am work

2018-05-26 15:38:25,914 Thread-5 9968 i am work

2018-05-26 15:38:25,915 Thread-6 8712 i am work

2018-05-26 15:38:25,915 Thread-7 4412 i am work

2018-05-26 15:38:25,915 Thread-8 8456 i am work

2018-05-26 15:38:25,915 Thread-9 8316 i am work

2018-05-26 15:38:25,915 Thread-10 9772 i am work

2018-05-26 15:38:35,925 Thread-8 8456 i am finsh.cups=1000

2018-05-26 15:38:36,023 Thread-7 4412 i am finsh.cups=1001

2018-05-26 15:38:36,023 Thread-1 32 i am finsh.cups=1002

2018-05-26 15:38:36,023 Thread-6 8712 i am finsh.cups=1003

2018-05-26 15:38:36,024 Thread-5 9968 i am finsh.cups=1004

2018-05-26 15:38:36,024 Thread-4 8464 i am finsh.cups=1005

2018-05-26 15:38:36,024 Thread-10 9772 i am finsh.cups=1006

2018-05-26 15:38:36,024 Thread-2 4332 i am finsh.cups=1007

2018-05-26 15:38:36,025 Thread-3 9992 i am finsh.cups=1008

2018-05-26 15:38:36,025 Thread-9 8316 i am finsh.cups=1009

 

執行結果來看,多執行緒排程,導致了判斷失誤,多生產了杯子只有用到了鎖。

Lock,鎖,一旦執行緒獲得鎖,其他要獲得鎖的執行緒將被阻塞。

名稱

含義

acquire(blocking=True,timeout=-1)

預設阻塞,阻塞可以設定超時時間,非阻塞時,timeout禁止設定,成果獲取鎖,返回True,否則返回None

Release

釋放鎖,可以從任何執行緒呼叫釋放,

已上鎖的鎖,會被重置到unlocked未上鎖的鎖上呼叫,丟擲RuntimeError異常。

import threading
import logging
import time
FORMAT = '%(asctime)s %(threadName)s %(thread)d %(message)s'
logging.basicConfig(format=FORMAT,level=logging.INFO)


cups = []
lock = threading.Lock()
def worker(count=10):
    logging.info('i am work')
    lock.acquire()
    while len(cups) < count:
        print(threading.current_thread(),len(cups))
        time.sleep(0.000001)
        cups.append(1)
    logging.info('i am finsh.cups={}'.format(len(cups)))
    lock.release()

for _ in range(10):
    threading.Thread(target=worker,args=(1000,)).start()

 

上鎖位置不對,由一個執行緒搶佔,並獨自佔鎖並完成任務。

import threading
import logging
import time
FORMAT = '%(asctime)s %(threadName)s %(thread)d %(message)s'
logging.basicConfig(format=FORMAT,level=logging.INFO)


cups = []
lock = threading.Lock()
def worker(count=10):
    logging.info('i am work')
    flag= False
    while True:
        lock.acquire() #獲取鎖

        if len(cups) >= count:
            flag = True
        # print(threading.current_thread(),len(cups))
        time.sleep(0.000001)
        if not flag:
            cups.append(1)
            print(threading.current_thread(),len(cups))
        lock.release()   #追加後釋放鎖
        if flag:
            break
    logging.info('i am finsh.cups={}'.format(len(cups)))


for _ in range(10):
    threading.Thread(target=worker,args=(1000,)).start()

 

鎖保證了資料完整性,但是效能下降好多。

If flag:break是為了保證release方法被執行,否則就出現了死鎖,得到鎖的永遠沒有釋放。

 

計數器類,可以加可以減。

2)加鎖、解鎖

 一般加鎖就需要解鎖,但是加鎖後解鎖前,還要有一些程式碼執行,就有可能丟擲異常,一旦出現異常鎖是無法釋放的,但是當前執行緒可能因為這個就異常終止了,這就產生了死鎖。

 

加鎖。解鎖常用語句:

(1)使用try...finally語句保證鎖的釋放。

(2)With上下文管理,鎖物件支援上下文管理。

import threading
import time


class Counter:
    def __init__(self):
        self._val = 0
        self.__lock = threading.Lock()

    @property
    def value(self):
        return self._val

    def inc(self):
        try:
            self.__lock.acquire()
            self._val += 1
        finally:
            self.__lock.release()

    def dec(self):
        with self.__lock:
            self._val -= 1

def run(c:Counter,count=1000):
    for _ in range(10):
        for i in range(-50,50):
            if i<0:
                c.dec()
            else:
                c.inc()

c = Counter()
c1 = 10
c2 = 10
for i in range(c1):
    threading.Thread(target=run,args=(c,c2)).start()

while True:
    time.sleep(1)
    if threading.active_count() == 1:
        print(threading.enumerate())
        print(c.value)
        break
    else:
        print(threading.enumerate())

 

 

不影響其他執行緒的切換,但是上鎖後其他執行緒被阻塞了。只能等待。

 

 

 

3)鎖的應用場景

適用於訪問和修改同一個共享資源的時候,讀寫同一個資源的時候。

 

全部是讀取同一個共享資源需要鎖嗎?

因為共享資源是不可變的,每一次讀取都是一樣的值,所以不用加鎖。

 

使用鎖的注意事項:

 

少用鎖必要時用鎖,使用了鎖,多執行緒訪問被鎖的資源時候,就成了序列,要麼排隊執行,要麼爭搶執行。

 

加鎖時間越短越好,不需要拍就立即釋放鎖。

一定要避免死鎖。(死鎖,打不開,解不開,A有鎖,B也鎖,佔有這把鎖的人遲遲不釋放鎖。沒有使用上下文,持有鎖的的執行緒異常退出了)

 

不使用鎖,有了效率,但是結果是錯的。

使用了鎖,效率低下,但是結果是對的。

 

4)非阻塞鎖使用

import threading
import logging
import time

FORMAT = '%(asctime)s %(threadName)s %(thread)d %(message)s'
logging.basicConfig(format=FORMAT,level=logging.INFO)

def worker(tasks):
    for task in tasks:
        time.sleep(0.01)
        if task.lock.acquire(False):
            logging.info('{}{}begin to start'.format(threading.current_thread(),task.name))
        else:
            logging.info('{}{}is working'.format(threading.current_thread(),task.name))

class Task:
    def __init__(self,name):
        self.name = name
        self.lock = threading.Lock()


tasks = [Task('task-{}'.format(x))for x in range(10)]

for i in range(5):
    threading.Thread(target=worker,name='worker-{}'.format(i),args=(tasks,)).start()

 

 

5)可重入鎖RLock:

是執行緒相關的鎖

執行緒A可重複鎖,並可以多次成功獲取,不會阻塞 ,最後要線上程A中做和acquire次數相同的release。

 

拿到這把鎖的執行緒可以多次使用。

別的執行緒拿到的話也是被阻塞的。

一個執行緒佔用鎖的時候,其他執行緒不能拿到,只能的是阻塞。直到當前執行緒次有的鎖全部釋放完,其他執行緒才可以獲取。

 

可重入鎖,與執行緒相關,可在一個執行緒中獲取鎖,並可繼續在同一執行緒中不阻塞獲取鎖,當鎖未釋放完,其他執行緒獲取鎖就會阻塞。直到當前持有鎖的執行緒釋放完了鎖。

 

四、Condition

構造方法:condition(lock=None),可以傳入一個lock物件或Rlock物件,預設是Rlock。

名稱

含義

Acquire(*args)

獲取鎖

Wait(self,timeout=None)

等待超時

Notify(n=1)

喚醒之多指定書目個數的等待的執行緒,沒有等待的執行緒就沒有任何操作

Notify_all()

喚醒所有等待的執行緒。

 

用於生產者、消費者模型,為了解決生產者消費者速度匹配的問題:

 

import threading
import logging
import random

FORMAT = '%(asctime)s %(threadName)s %(thread)d %(message)s'
logging.basicConfig(format=FORMAT,level=logging.INFO)

class Dispatcher:
    def __init__(self):
        self.data = None
        self.event = threading.Event()

    def produce(self,total):
        for _ in range(total):
            data = random.randint(0,100)
            logging.info(data)
            self.data = data
            self.event.wait(1)
        self.event.set()

    def consume(self):
        while not self.event.is_set():
            data = self.data
            logging.info('recieved{}'.format(data))
            self.data = None
            self.event.wait(0.5)

d = Dispatcher()
p = threading.Thread(target=d.produce,args=(10,),name='producer')
c = threading.Thread(target=d.consume,name='consume')
c.start()
p.start()

 

消費者採用主動消費,消費者浪費了大量的時間,主動來檢視有沒有資料。換成通知的機制。

import threading
import logging
import random

FORMAT = '%(asctime)s %(threadName)s %(thread)d %(message)s'
logging.basicConfig(format=FORMAT,level=logging.INFO)

class Dispatcher:
    def __init__(self):
        self.data = None
        self.event = threading.Event()
        self.cond = threading.Condition()

    def produce(self,total):
        for _ in range(total):
            data = random.randint(0,100)
            with self.cond:
                logging.info(data)
                self.data = data
                self.cond.notify_all()
            self.event.wait(1)
        self.event.set()

    def consume(self):
        while not self.event.is_set():
            with self.cond:
                self.cond.wait()
                logging.info('recieved{}'.format(self.data))
                self.data = None
            self.event.wait(0.5)

d = Dispatcher()
p = threading.Thread(target=d.produce,args=(10,),name='producer')
c = threading.Thread(target=d.consume,name='consume')
c.start()
p.start()

 

如果是一個生產者,多個消費者呢:

import threading
import logging
import random

FORMAT = '%(asctime)s %(threadName)s %(thread)d %(message)s'
logging.basicConfig(format=FORMAT,level=logging.INFO)

class Dispatcher:
    def __init__(self):
        self.data = None
        self.event = threading.Event()
        self.cond = threading.Condition()

    def produce(self,total):
        for _ in range(total):
            data = random.randint(0,100)
            with self.cond:
                logging.info(data)
                self.data = data
                self.cond.notify_all()
            self.event.wait(1)  #模擬生產速度
        self.event.set()

    def consume(self):
        while not self.event.is_set():
            with self.cond:
                self.cond.wait()  #阻塞等通知
                logging.info('recieved{}'.format(self.data))
            self.event.wait(0.5)  #模擬消費 的速度

d = Dispatcher()
p = threading.Thread(target=d.produce,args=(10,),name='producer')


for i in range(5):
    c = threading.Thread(target=d.consume, name='consume{}'.format(i))
    c.start()
p.start()

 

 

Self.cond.notify_all()發通知:

修改為self.cond.notify(n=2)  隨機通知兩個消費者。

Condition總結:

用於生產者消費者模型中,解決生產者,消費者速度匹配的問題。

採用了通知機制,非常有效率。

 

使用方式:

使用condition,必須先acquire,用完了要release。因為內部實現了鎖,預設使用了RLock鎖。最好的方式就是使用上下文。

消費者wait,等待通知。

生產者生產好訊息,對消費者發出通知,可以使用notify或者notify_all方法。

 

 

 

 

 

作業系統中基本單位是程序,程序是獨立的王國,作業系統中不可呼叫執行緒,執行緒是輕量級程序,有獨立自己棧。資源就是獨立的棧。

載入到記憶體中是程序管理,子系統之一。變成為一個例項,程序ID號。

驅動管理,

協議,

Tcp udp,

http協議。

 

 

 

Linux:

Unix:b語言基礎上c語言寫的。

Windows:

 

資料在哪裡,計算就在哪裡。

 

 

五、Barrier

 

1、柵欄,屏障、為路障、道閘

達到一定的條件,才會開啟barrier。

名稱

含義

Barrier(parties,action=None,timeout=None)

構建barrier物件,指定參與方數目,timeout是wait方法未指定超時的預設值。

n_waiting

當前在屏障中等待的執行緒數

Parties

各方數,就是需要多少個等待

Wait(timeout=None)

等待通過屏障,返回0到執行緒數-1的整數,每個執行緒返回不同,如果wait方法設定了超時,並超時傳送,屏障將處於broken狀態。

 

import threading
import logging


FORMAT = '%(asctime)s %(threadName)s %(thread)d %(message)s'
logging.basicConfig(format=FORMAT,level=logging.INFO)


def worker(barrier:threading.Barrier):
    logging.info('waiting for {}threads'.format(barrier.n_waiting))
    try:
        barrier_id = barrier.wait()
        logging.info('after barrier{}'.format(barrier_id))
    except threading.BrokenBarrierError:
        logging.info('Broken Barrier')

barrier = threading.Barrier(3)

for x in range(3):
    threading.Thread(target=worker,name='worker-{}'.format(x),args=(barrier,)).start()

logging.info('started')

 

 

2018-06-11 21:29:45,173 worker-0 8804 waiting for 0threads

2018-06-11 21:29:45,198 worker-1 2668 waiting for 1threads

2018-06-11 21:29:45,199 worker-2 2716 waiting for 2threads

2018-06-11 21:29:45,199 MainThread 10160 started

2018-06-11 21:29:45,199 worker-2 2716 after barrier2

2018-06-11 21:29:45,199 worker-0 8804 after barrier0

2018-06-11 21:29:45,199 worker-1 2668 after barrier1

 

 

如果Barrier()的值設定為3,開啟5個執行緒,前三個執行緒執行後,後面兩個執行緒不夠三個執行緒,所以一直在等待,直到湊到三個barrier才打開。

 

上面的執行結果,所有執行緒衝到了barrier前等待,直到到達parties的數目,屏障才打開,所有執行緒停止等待,繼續執行。

再有執行緒wait,屏障就就緒等待到達引數方數目。

 

例如就是賽馬需要的馬匹全部就位,開閘,下一批陸續來到繼續等待比賽。。

 

 

 

名稱

含義

Broken

如果屏障處於打破的狀態  返回true。

Abort()

將屏障至於broken狀態,等待中的執行緒或者呼叫等待方法的執行緒中都會丟擲brokenbarriererror異常,直達reset方法來恢復屏障

Reset()

重置,恢復屏障,重新開始攔截

 

import threading
import logging


FORMAT = '%(asctime)s %(threadName)s %(thread)d %(message)s'
logging.basicConfig(format=FORMAT,level=logging.INFO)


def worker(barrier:threading.Barrier):
    logging.info('waiting for {}threads'.format(barrier.n_waiting))
    try:
        barrier_id = barrier.wait()
        logging.info('after barri