1. 程式人生 > >week7:網路程式設計之程序

week7:網路程式設計之程序

一、執行緒與程序

什麼是執行緒?

執行緒是作業系統能夠進行運算排程的最小單位。它被包含在程序之中,是程序中的實際運作單位。一條執行緒指的是程序中一個單一順序的控制流,一個程序中可以併發多個執行緒,每條執行緒並行執行不同的任務。

什麼是程序?

 threads are different from processes. A thread is a context of execution, while a process is a bunch of resources associated with a computation. A process can have one or many threads.

Clarification: the resources associated with a process include memory pages (all the threads in a process have the same view of the memory), file descriptors (e.g., open sockets), and security credentials (e.g., the ID of the user who started the process).

程序與執行緒誰快?不能問,沒有快慢

程序與執行緒的區別?

1.執行緒們共享程序的空間,程序們有自己的地址空間。

2.執行緒之間可以通訊,程序之間不可以。

3.執行緒之間可以互相操作,程序之間不可以。

python GIL(Global Interpreter Lock) 

CPython implementation detail: In CPython, due to the Global Interpreter Lock, only one thread can execute Python code at once (even though certain performance-oriented libraries might overcome this limitation). If you want your application to make better use of the computational resources of multi-core machines, you are advised to use multiprocessing. However, threading is still an appropriate model if you want to run multiple I/O-bound tasks simultaneously.

結論:在python裡:

如果任務是IO密集型,可以用多執行緒

如果是計算密集型的,序列比並行快,sorry,改c語言

二、threading模組

2.1、執行緒的2種呼叫方式

直接呼叫

import threading
import time
 
def sayhi(num): #定義每個執行緒要執行的函式
 
    print("running on number:%s" %num)
 
    time.sleep(3)
 
if __name__ == '__main__':
 
    t1 = threading.Thread(target=sayhi,args=(1,)) #生成一個執行緒例項
    t2 = threading.Thread(target=sayhi,args=(2,)) #生成另一個執行緒例項
 
    t1.start() #啟動執行緒
    t2.start() #啟動另一個執行緒
 
    print(t1.getName()) #獲取執行緒名
    print(t2.getName())

輸出結果:

running on number:1 Thread-1 Thread-2 running on number:2

繼承式呼叫:

import threading
import time


class MyThread(threading.Thread):
    def __init__(self, num):
        threading.Thread.__init__(self)
        self.num = num

    def run(self):  # 定義每個執行緒要執行的函式

        print("running on number:%s" % self.num)

        time.sleep(3)


if __name__ == '__main__':
    t1 = MyThread(1)
    t2 = MyThread(2)
    t1.start()
    t2.start()

三、Join & Daemon

import threading
from time import ctime, sleep
import time

def music(func):
    for i in range(2):
        print("Begin listening to %s. %s" % (func, ctime()))
        sleep(1)
        print("end listening %s" % ctime())

def move(func):
    for i in range(2):
        print("Begin watching at the %s! %s" % (func, ctime()))
        sleep(5)
        print('end watching %s' % ctime())

threads = []
t1 = threading.Thread(target=music, args=('七里香',))
threads.append(t1)
t2 = threading.Thread(target=move, args=('阿甘正傳',))
threads.append(t2)

if __name__ == '__main__':

    for t in threads:
        t.start()
        # t.join()
    # t1.join()
    t2.join()########考慮這三種join位置下的結果?
    print("all over %s" %ctime())

輸出結果:

Begin listening to 七里香. Tue Sep 11 17:49:15 2018 Begin watching at the 阿甘正傳! Tue Sep 11 17:49:15 2018

end listening Tue Sep 11 17:49:16 2018 Begin listening to 七里香. Tue Sep 11 17:49:16 2018

end listening Tue Sep 11 17:49:17 2018

end watching Tue Sep 11 17:49:20 2018 Begin watching at the 阿甘正傳! Tue Sep 11 17:49:20 2018

end watching Tue Sep 11 17:49:25 2018 all over Tue Sep 11 17:49:25 2018

join():

       在子執行緒完成執行之前,這個子執行緒的父執行緒將一直被阻塞。

import threading
from time import ctime, sleep
import time

def music(func):
    for i in range(2):
        print("Begin listening to %s. %s" % (func, ctime()))
        sleep(1)
        print("end listening %s" % ctime())

def move(func):
    for i in range(2):
        print("Begin watching at the %s! %s" % (func, ctime()))
        sleep(5)
        print('end watching %s' % ctime())

threads = []
t1 = threading.Thread(target=music, args=('七里香',))
threads.append(t1)
t2 = threading.Thread(target=move, args=('阿甘正傳',))
threads.append(t2)

if __name__ == '__main__':
    t2.setDaemon(True)
    for t in threads:
        # t.setDaemon(True)
        t.start()


    print("all over %s" %ctime())

setDaemon(True):

      將執行緒宣告為守護執行緒,必須在start() 方法呼叫之前設定, 如果不設定為守護執行緒程式會被無限掛起。這個方法基本和join是相反的。當我們 在程式執行中,執行一個主執行緒,如果主執行緒又建立一個子執行緒,主執行緒和子執行緒 就分兵兩路,分別執行,那麼當主執行緒完成想退出時,會檢驗子執行緒是否完成。如 果子執行緒未完成,則主執行緒會等待子執行緒完成後再退出。但是有時候我們需要的是 只要主執行緒完成了,不管子執行緒是否完成,都要和主執行緒一起退出,這時就可以 用setDaemon方法啦 

其它方法

thread 模組提供的其他方法:
# threading.currentThread(): 返回當前的執行緒變數。
# threading.enumerate(): 返回一個包含正在執行的執行緒的list。正在執行指執行緒啟動後、結束前,不包括啟動前和終止後的執行緒。
# threading.activeCount(): 返回正在執行的執行緒數量,與len(threading.enumerate())有相同的結果。
# 除了使用方法外,執行緒模組同樣提供了Thread類來處理執行緒,Thread類提供了以下方法:
# run(): 用以表示執行緒活動的方法。
# start():啟動執行緒活動。
# join([time]): 等待至執行緒中止。這阻塞呼叫執行緒直至執行緒的join() 方法被呼叫中止-正常退出或者丟擲未處理的異常-或者是可選的超時發生。
# isAlive(): 返回執行緒是否活動的。
# getName(): 返回執行緒名。
# setName(): 設定執行緒名。

四、 同步鎖(Lock)

# import time
# import threading
#
# def addNum():
#     global num #在每個執行緒中都獲取這個全域性變數
#     # num-=1
#
#     temp=num
#     print('--get num:',num)
#     #time.sleep(0.1)
#     num =temp-1 #對此公共變數進行-1操作
#
#
# num = 100  #設定一個共享變數
# thread_list = []
# for i in range(100):
#     t = threading.Thread(target=addNum)
#     t.start()
      #t.join()
#     thread_list.append(t)
#
# for t in thread_list: #等待所有執行緒執行完畢
#     t.join()
#
# print('final num:', num)

注意:

1:  why num-=1沒問題呢?這是因為動作太快(完成這個動作在切換的時間內)

2: if sleep(1),現象會更明顯,100個執行緒每一個都沒有執行完就進行了切換,我們說過sleep就等效於IO阻塞,1s之內不會再切換回來,所以最後的結果一定是99.

多個執行緒都在同時操作同一個共享資源,所以造成了資源破壞,怎麼辦呢?

用join,但join會把整個執行緒給停住,造成了序列,失去了多執行緒的意義,而我們只需要把計算(涉及到操作公共資料)的時候序列執行。

我們可以通過同步鎖來解決這種問題

import time
import threading

def addNum():
    global num #在每個執行緒中都獲取這個全域性變數
    # num-=1
    lock.acquire()
    temp=num
    print('--get num:',num )
    #time.sleep(0.1)
    num =temp-1 #對此公共變數進行-1操作
    lock.release()

num = 100  #設定一個共享變數
thread_list = []
lock=threading.Lock()

for i in range(100):
    t = threading.Thread(target=addNum)
    t.start()
    thread_list.append(t)

for t in thread_list: #等待所有執行緒執行完畢
    t.join()

print('final num:', num )

請問:同步鎖與GIL的關係?

Python的執行緒在GIL的控制之下,執行緒之間,對整個python直譯器,對python提供的C API的訪問都是互斥的,這可以看作是Python核心級的互斥機制。但是這種互斥是我們不能控制的,我們還需要另外一種可控的互斥機制———使用者級互斥。核心級通過互斥保護了核心的共享資源,同樣,使用者級互斥保護了使用者程式中的共享資源。

GIL 的作用是:對於一個直譯器,只能有一個thread在執行bytecode。所以每時每刻只有一條bytecode在被執行一個thread。GIL保證了bytecode 這層面上是thread safe的。 但是如果你有個操作比如 x += 1,這個操作需要多個bytecodes操作,在執行這個操作的多條bytecodes期間的時候可能中途就換thread了,這樣就出現了data races的情況了。

那我的同步鎖也是保證同一時刻只有一個執行緒被執行,是不是沒有GIL也可以?是的;那要GIL有什麼鳥用?你沒治;

五、 執行緒死鎖和遞迴鎖

import threading,time

class myThread(threading.Thread):
    def doA(self):
        lockA.acquire()
        print(self.name,"gotlockA",time.ctime())
        time.sleep(3)
        lockB.acquire()
        print(self.name,"gotlockB",time.ctime())
        lockB.release()
        lockA.release()

    def doB(self):
        lockB.acquire()
        print(self.name,"gotlockB",time.ctime())
        time.sleep(2)
        lockA.acquire()
        print(self.name,"gotlockA",time.ctime())
        lockA.release()
        lockB.release()
    def run(self):
        self.doA()
        self.doB()
if __name__=="__main__":

    lockA=threading.Lock()
    lockB=threading.Lock()
    threads=[]
    for i in range(5):
        threads.append(myThread())
    for t in threads:
        t.start()
    for t in threads:
        t.join()#等待執行緒結束,後面再講。

死鎖:

Thread-1 gotlockA Tue Sep 11 21:54:29 2018 Thread-1 gotlockB Tue Sep 11 21:54:32 2018 Thread-1 gotlockB Tue Sep 11 21:54:32 2018 Thread-2 gotlockA Tue Sep 11 21:54:32 2018

解決辦法:使用遞迴鎖,將

1

2

lockA=threading.Lock()

lockB=threading.Lock()<br>#--------------<br>lock=threading.RLock()

應用:

六、訊號量

訊號量用來控制執行緒併發數的,BoundedSemaphore或Semaphore管理一個內建的計數器,每當呼叫acquire()時-1,呼叫release()時+1。

      計數器不能小於0,當計數器為 0時,acquire()將阻塞執行緒至同步鎖定狀態,直到其他執行緒呼叫release()。(類似於停車位的概念)

      BoundedSemaphore與Semaphore的唯一區別在於前者將在呼叫release()時檢查計數器的值是否超過了計數器的初始值,如果超過了將丟擲一個異常。

import threading,time
class myThread(threading.Thread):
    def run(self):
        if semaphore.acquire():
            print(self.name)
            time.sleep(5)
            semaphore.release()
if __name__=="__main__":
    semaphore=threading.Semaphore(5)
    thrs=[]
    for i in range(100):
        thrs.append(myThread())
    for t in thrs:
        t.start()

七、 條件變數同步(Condition)

有一類執行緒需要滿足條件之後才能夠繼續執行,Python提供了threading.Condition 物件用於條件變數執行緒的支援,它除了能提供RLock()或Lock()的方法外,還提供了 wait()、notify()、notifyAll()方法。

      lock_con=threading.Condition([Lock/Rlock]): 鎖是可選選項,不傳人鎖,物件自動建立一個RLock()。

wait():條件不滿足時呼叫,執行緒會釋放鎖並進入等待阻塞;
notify():條件創造後呼叫,通知等待池啟用一個執行緒;
notifyAll():條件創造後呼叫,通知等待池啟用所有執行緒。
import threading,time
from random import randint
class Producer(threading.Thread):
    def run(self):
        global L
        while True:
            val=randint(0,100)
            print('生產者',self.name,":Append"+str(val),L)
            if lock_con.acquire():
                L.append(val)
                lock_con.notify()
                lock_con.release()
            time.sleep(3)
class Consumer(threading.Thread):
    def run(self):
        global L
        while True:
                lock_con.acquire()
                if len(L)==0:
                    lock_con.wait()
                print('消費者',self.name,":Delete"+str(L[0]),L)
                del L[0]
                lock_con.release()
                time.sleep(0.25)

if __name__=="__main__":

    L=[]
    lock_con=threading.Condition()
    threads=[]
    for i in range(5):
        threads.append(Producer())
    threads.append(Consumer())
    for t in threads:
        t.start()
    for t in threads:
        t.join()

八、 同步條件(Event)

條件同步和條件變數同步差不多意思,只是少了鎖功能,因為條件同步設計於不訪問共享資源的條件環境。event=threading.Event():條件環境物件,初始值 為False;

event.isSet():返回event的狀態值;

event.wait():如果 event.isSet()==False將阻塞執行緒;

event.set(): 設定event的狀態值為True,所有阻塞池的執行緒啟用進入就緒狀態, 等待作業系統排程;

event.clear():恢復event的狀態值為False。
import threading,time
class Boss(threading.Thread):
    def run(self):
        print("BOSS:今晚大家都要加班到22:00。")
        event.isSet() or event.set()
        time.sleep(5)
        print("BOSS:<22:00>可以下班了。")
        event.isSet() or event.set()
class Worker(threading.Thread):
    def run(self):
        event.wait()
        print("Worker:哎……命苦啊!")
        time.sleep(0.25)
        event.clear()
        event.wait()
        print("Worker:OhYeah!")
if __name__=="__main__":
    event=threading.Event()
    threads=[]
    for i in range(5):
        threads.append(Worker())
    threads.append(Boss())
    for t in threads:
        t.start()
    for t in threads:
        t.join()

九、 多執行緒利器(queue)

queue is especially useful in threaded programming when information must be exchanged safely between multiple threads.

queue列隊類的方法

建立一個“佇列”物件 import Queue q = Queue.Queue(maxsize = 10) Queue.Queue類即是一個佇列的同步實現。佇列長度可為無限或者有限。可通過Queue的建構函式的可選引數maxsize來設定佇列長度。如果maxsize小於1就表示佇列長度無限。

將一個值放入佇列中 q.put(10) 呼叫佇列物件的put()方法在隊尾插入一個專案。put()有兩個引數,第一個item為必需的,為插入專案的值;第二個block為可選引數,預設為1。如果隊列當前為空且block為1,put()方法就使呼叫執行緒暫停,直到空出一個數據單元。如果block為0,put方法將引發Full異常。

將一個值從佇列中取出 q.get() 呼叫佇列物件的get()方法從隊頭刪除並返回一個專案。可選引數為block,預設為True。如果佇列為空且block為True,get()就使呼叫執行緒暫停,直至有專案可用。如果佇列為空且block為False,佇列將引發Empty異常。

Python Queue模組有三種佇列及建構函式: 1、Python Queue模組的FIFO佇列先進先出。  class queue.Queue(maxsize) 2、LIFO類似於堆,即先進後出。             class queue.LifoQueue(maxsize) 3、還有一種是優先順序佇列級別越低越先出來。   class queue.PriorityQueue(maxsize)

此包中的常用方法(q = Queue.Queue()): q.qsize() 返回佇列的大小 q.empty() 如果佇列為空,返回True,反之False q.full() 如果佇列滿了,返回True,反之False q.full 與 maxsize 大小對應 q.get([block[, timeout]]) 獲取佇列,timeout等待時間 q.get_nowait() 相當q.get(False) 非阻塞 q.put(item) 寫入佇列,timeout等待時間 q.put_nowait(item) 相當q.put(item, False) q.task_done() 在完成一項工作之後,q.task_done() 函式向任務已經完成的佇列傳送一個訊號 q.join() 實際上意味著等到佇列為空,再執行別的操作

import threading,time

li=[1,2,3,4,5]

def pri():
    while li:
        a=li[-1]
        print(a)
        time.sleep(1)
        try:
            li.remove(a)
        except:
            print('----',a)

t1=threading.Thread(target=pri,args=())
t1.start()
t2=threading.Thread(target=pri,args=())
t2.start()
import threading,queue
from time import sleep
from random import randint
class Production(threading.Thread):
    def run(self):
        while True:
            r=randint(0,100)
            q.put(r)
            print("生產出來%s號包子"%r)
            sleep(1)
class Proces(threading.Thread):
    def run(self):
        while True:
            re=q.get()
            print("吃掉%s號包子"%re)
if __name__=="__main__":
    q=queue.Queue(10)
    threads=[Production(),Production(),Production(),Proces()]
    for t in threads:
        t.start()