1. 程式人生 > >Python 9 進程,線程

Python 9 進程,線程

有一個 prior remove option ocs upper ems data 復雜

本節內容

  python GIL全局解釋器鎖

  線程

  進程

Python GIL(Global Interpreter Lock)

In CPython, the global interpreter lock, or GIL, is a mutex that prevents multiple native threads from executing Python bytecodes at once. This lock is necessary mainly because CPython’s memory management is not thread-safe. (However, since the GIL exists, other features have grown to depend on the guarantees that it enforces.)

上面的意思大概就是說,無論你開啟多少個進程,有CPU是幾核,Python在執行的時候會淡定的在同一時刻只允許一個線程運行。這就是CPython的缺點,假的多線程。

首先,需要明確的是GIL並不是Python的特征,它是在實現Python解析器(CPython)時所引入的一個概念。就好比C++是一套語言(語法)標準,但是可以用不同的編譯器來便宜成可執行代碼。有名的編譯器例如GCC, INTEL C++,Visual C++等。Python也一樣,同樣一段代碼可以通過CPython,PyPy,Psyco等不同的Python執行環境來執行,像其中的JPython就沒有GIL。然而因為CPython是大部分環境下默認的Python執行換機,所以的很多人的概念裏面CPython就等同於Python,也就想當然的把GIL歸類為Python語言的缺陷。所以首先要明確一點:GIL並不是Python的特性,Python完全可以不依賴與GIL。

附上一個透徹分析GIL對Python多線程影響的鏈接:http://www.dabeaz.com/python/UnderstandingGIL.pdf

Python threading 模塊

線程有2種調用方式,如下:

直接調用

import threading
import time

def sayhi(num):
    print("test run ==>", num)
    time.sleep(2)

if __name__ == "__main__":
    t1 = threading.Thread(target=sayhi, args=(1,))
    t2 = threading.Thread(target=sayhi, args=(2,))

    t1.start()
    t2.start()
    print(t1.getName())
    print(t2.getName())

繼承式調用

class MyThread(threading.Thread):
    def __init__(self, num):
        threading.Thread.__init__(self)
        self.num = num

    def run(self):
        print("test run ==>", self.num)
        time.sleep(3)

if __name__ == "__main__":
    t1 = MyThread(1)
    t2 = MyThread(2)
    t1.start()
    t2.start()

Join &Daemon

Some threads do background tasks, like sending keepalive packets, or performing periodic garbage collection, or whatever. These are only useful when the main program is running, and it‘s okay to kill them off once the other, non-daemon, threads have exited.

Without daemon threads, you‘d have to keep track of them, and tell them to exit, before your program can completely quit. By setting them as daemon threads, you can let them run and forget about them, and when your program quits, any daemon threads are killed automatically.

import time
import threading
def run(n):
    print("----- run:%s -----" % n)
    time.sleep(2)
    print("----done----")
def main():
    for i in range(5):
        t = threading.Thread(target=run, args=(i, ))
        t.start()
        t.join(1)
        print("start threading", t.getName())
m = threading.Thread(target=main, args=[])
m.setDaemon(True)
m.start()
m.join(timeout=2)
print("----main thread done----")

Join的作用是等待所有線程結束,但是這邊設置了timeout,setDaemon是設置守護線程,守護線程通俗上講就是起輔助作用的線程,不影響主線程與其他線程的執行,待其他線程執行完畢,可以不必考慮守護線程的運行狀態,直接結束。

線程鎖(互斥鎖Mutex)

一個進程下可以啟動多個線程,多個線程共享父進程的內存空間,也就意味著所有線程都可以訪問同一份數據,此時,如果2個線程都要對同一份數據進行修改更新操作,會出現什麽情況?

import time
import threading

def addNum():
    global num
    print("--get num:", num)
    time.sleep(1)
    num +=1

num = 0
thread_list = []
for i in range(100):
    t =  threading.Thread(target=addNum)
    t.start()
    thread_list.append(t)
    
for t in thread_list:
    t.join()
    
print("ending ===>", num)

# 講道理,這個在python2.7上多運行幾次應該可以發現結果不是100, anyway。

這時候就可以引入線程鎖了,在python執行線程的過程中,只要給線程加了線程鎖,在次縣城運行過程中,其他進程便不可訪問這份數據,直到此線程結束。

import time
import threading

def addNum():
    global num
    print("--get num:", num)
    time.sleep(1)
    lock.acquire()  # 先加鎖後處理數據
    num += 1
    lock.release()  # 數據處理完後解除,釋放掉線程鎖(互斥鎖)

num = 0  # 全局變量
thread_list = []
lock = thread_list.Lock()  # 生成線程鎖
for i in range(100):
    t =  threading.Thread(target=addNum)
    t.start()
    thread_list.append(t)

for t in thread_list:
    t.join()
# 等待所有線程執行完成
print("ending ===>", num)

這時你可能有一點疑惑, 因為之前我們提到GIL保證了在同一時間只有一個線程執行,為什麽這裏還是要Mutex這樣一個互斥鎖呢?

其實這裏的lock是用戶級的lock,跟那個GIL沒關系,具體我們可以根據一張圖來看一下:

技術分享

基本就是說,其實線程是根據python裏面的上下文執行解釋器來串行執行的,當線程1去到count值,但是線程一還沒執行完,這時候就要執行線程2時,線程2對count進行了+1處理,返回給了公共數據池,但是再繼續執行線程1沒有走完的部分,線程一因為已經取到了數據count=0, 會執行繼續+1,count就會=1,這就是問題所在.

也許你又會問,既然用戶程序已經自己加上了互斥鎖,那麽CPython問為什麽還需要GIL呢?加入GIL主要的原因是為了降低程序開發的復雜度,比如現在你寫python不需要關心內存回收的問題,因為python解釋器幫你自動定期進行內存回收,你可以理解為python解釋器裏有一個獨立的線程,美國一段時間,便喚醒它進行一次全局查詢,看看哪些內存數據是可以被清空的,此時你自己程序裏面的線程和Python解釋器自己的線程是並發運行的,假設你的線程刪除了一個變量,py解釋器的垃圾回收線程在清空這個變量的過程中的clearing時刻,可能一個其他線程剛好又重新給這個還沒來得及清空的內存空間賦值了,結果就是,新賦值的數據被刪除,為了解決類似的問題,python就簡單粗暴的加了一個鎖,只允許單線程運行,即當一個線程運行的時候,其他線程都不可以動,這樣就解決了上述問題,這可以說是python早期版本的遺留問題.

RLock(遞歸鎖)

其實就是大鎖裏面的小鎖

#!/user/bin/env python
# -*-coding: utf-8-*-

import threading, time

def run1():
    print("grab the first part data")
    lock.acquire()
    global num
    num += 1
    lock.release()
    return num

def run2():
    print("grab the second part data")
    lock.acquire()
    global num2
    num2 += 1
    lock.release()
    return num2

def run3():
    lock.acquire()
    res = run1()
    print(‘--------between run1 and run2-----‘)
    res2 = run2()
    lock.release()
    print(res, res2)

if __name__ == ‘__main__‘:
    num, num2 = 0, 0
    lock = threading.RLock()
    for i in range(10):
        t = threading.Thread(target=run3)
        t.start()
while threading.active_count() != 1:
    print(threading.active_count())
else:
    print(‘----all threads done---‘)
    print(num, num2)

Semaphore(信號量)

互斥鎖,同時只允許一個線程更改數據,而Semaphore是同事允許一定數量的線程更改數據,比如網吧只有50臺電腦,最多只能50個人上網,後面的人只能等前面的人下機了才能去上網。

#!/user/bin/env python
# -*-coding: utf-8-*-
import threading,time

def run(n):
    semaphore.acquire()
    time.sleep(1)
    print("run ==> %s " % n)
    semaphore.release()

if __name__ == "__main__":
    semaphore = threading.BoundedSemaphore(5)  # 最大同時運行線程的數量
    for i in range(20):
        t = threading.Thread(target=run, args=(i, ))
        t.start()

while threading.active_count()!= 1:
    time.sleep(1)
    print(threading.active_count())
else:
    print("------All Done------")

Timer

This class represents an action that should be run only after a certain amount of time has passed

Timers are started, as with threads, by calling their start() method. The timer can be stopped (before its action has begun) by calling thecancel() method. The interval the timer will wait before executing its action may not be exactly the same as the interval specified by the user.

import threading

def hello():
    print("hello world")

t = threading.Timer(5, hello)
t.start()

Events

An event is a simple synchronization object;

the event represents an internal flag, and threads can wait for the flag to be set, or set or clear the flag themselves.
event = threading.Event()
# a client thread can wait for the flag to be set event.wait()
# a server thread can set or reset it event.set() event.clear() If the flag is set, the wait method doesn’t do anything. If the flag is cleared, wait will block until it becomes set again. Any number of threads may wait for the same event.

可以通過Event來實現兩個獲多個線程的交互,下面我們通過一個紅綠燈例子,來看多個線程之間的執行。

#!/user/bin/env python
# -*-coding: utf-8-*-

import threading,time


def light():
    i = 0
    event.set()
    while True:
        if i >= 10:
            event.set()  # 信號代表綠燈
            i = 0
            print("\033[42;1m綠燈請出行》》》》\033[0m")
        elif i >= 5 and i < 10:
            event.clear()
            print("\033[41;1m紅燈請停步》》》》\033[0m")
        else:
            print("\033[42;1m綠燈請出行》》》》\033[0m")
        time.sleep(1)
        i += 1


def car():
    while True:
        if event.is_set():
            print("car is running")
            time.sleep(1)
        else:
            print("car is waiting for greening light")
            event.wait()

event = threading.Event()


_Light = threading.Thread(target=light)
_Car = threading.Thread(target=car)
_Light.start()
_Car.start()

再給一個例子:

#_*_coding:utf-8_*_
__author__ = ‘Alex Li‘
import threading
import time
import random

def door():
    door_open_time_counter = 0
    while True:
        if door_swiping_event.is_set():
            print("\033[32;1mdoor opening....\033[0m")
            door_open_time_counter +=1

        else:
            print("\033[31;1mdoor closed...., swipe to open.\033[0m")
            door_open_time_counter = 0 #清空計時器
            door_swiping_event.wait()


        if door_open_time_counter > 3:#門開了已經3s了,該關了
            door_swiping_event.clear()

        time.sleep(0.5)


def staff(n):

    print("staff [%s] is comming..." % n )
    while True:
        if door_swiping_event.is_set():
            print("\033[34;1mdoor is opened, passing.....\033[0m")
            break
        else:
            print("staff [%s] sees door got closed, swipping the card....." % n)
            print(door_swiping_event.set())
            door_swiping_event.set()
            print("after set ",door_swiping_event.set())
        time.sleep(0.5)
door_swiping_event  = threading.Event() #設置事件


door_thread = threading.Thread(target=door)
door_thread.start()



for i in range(5):
    p = threading.Thread(target=staff,args=(i,))
    time.sleep(random.randrange(3))
    p.start()

queue隊列

queue is especially useful in threaded programming when information must be exchanged safely between multiple threads.

class queue.Queue(maxsize=0) #先入先出
class queue.LifoQueue(maxsize=0) #last in fisrt out
class queue.PriorityQueue(maxsize=0) #存儲數據時可設置優先級的隊列

Constructor for a priority queue. maxsize is an integer that sets the upperbound limit on the number of items that can be placed in the queue. Insertion will block once this size has been reached, until queue items are consumed. If maxsize is less than or equal to zero, the queue size is infinite.

The lowest valued entries are retrieved first (the lowest valued entry is the one returned by sorted(list(entries))[0]). A typical pattern for entries is a tuple in the form: (priority_number, data).

exception queue.Empty

Exception raised when non-blocking get() (or get_nowait()) is called on a Queue object which is empty.

exception queue.Full

Exception raised when non-blocking put() (or put_nowait()) is called on a Queue object which is full.

Queue.qsize()
Queue.empty() #return True if empty
Queue.full() # return True if full
Queue.put(item, block=True, timeout=None)

Put item into the queue. If optional args block is true and timeout is None (the default), block if necessary until a free slot is available. If timeout is a positive number, it blocks at most timeout seconds and raises the Full exception if no free slot was available within that time. Otherwise (block is false), put an item on the queue if a free slot is immediately available, else raise the Full exception (timeout is ignored in that case).

Queue.put_nowait(item)

Equivalent to put(item, False).

Queue.get(block=True, timeout=None)

Remove and return an item from the queue. If optional args block is true and timeout is None (the default), block if necessary until an item is available. If timeout is a positive number, it blocks at most timeout seconds and raises the Empty exception if no item was available within that time. Otherwise (block is false), return an item if one is immediately available, else raise the Empty exception (timeout is ignored in that case).

Queue.get_nowait()

Equivalent to get(False).

Two methods are offered to support tracking whether enqueued tasks have been fully processed by daemon consumer threads.

Queue.task_done()

Indicate that a formerly enqueued task is complete. Used by queue consumer threads. For each get() used to fetch a task, a subsequent call to task_done() tells the queue that the processing on the task is complete.

If a join() is currently blocking, it will resume when all items have been processed (meaning that a task_done() call was received for every item that had been put() into the queue).

Raises a ValueError if called more times than there were items placed in the queue.

Queue.join() block直到queue被消費完畢

生產者消費者模型

在並發編程中使用生產者和消費者模式能夠解決絕大多數並發問題。該模式通過平衡生產線程和消費線程的工作能力來提高程序的整體處理數據的速度。

為什麽要使用生產者和消費者模式

在線程世界裏,生產者就是生產數據的線程,消費者就是消費數據的線程。在多線程開發當中,如果生產者處理速度很快,而消費者處理速度很慢,那麽生產者就必須等待消費者處理完,才能繼續生產數據。同樣的道理,如果消費者的處理能力大於生產者,那麽消費者就必須等待生產者。為了解決這個問題於是引入了生產者和消費者模式。

什麽是生產者消費者模式

生產者消費者模式是通過一個容器來解決生產者和消費者的強耦合問題。生產者和消費者彼此之間不直接通訊,而通過阻塞隊列來進行通訊,所以生產者生產完數據之後不用等待消費者處理,直接扔給阻塞隊列,消費者不找生產者要數據,而是直接從阻塞隊列裏取,阻塞隊列就相當於一個緩沖區,平衡了生產者和消費者的處理能力。

舉個栗子:

import threading
import queue
import time
q = queue.Queue()

def producer():
    count = 1
    while True:
        q.put("Pizza %s" % count)
        print("\033[42;1mPizza %s 做好了。。\033[0m" % count )
        time.sleep(1)
        count += 1

def consumer(n):
    while True > 0:
        print("%s 取到" % n, q.get())
        time.sleep(1)

p = threading.Thread(target=producer)
c = threading.Thread(target=consumer, args=("dandy",))
c1 = threading.Thread(target=consumer, args=("renee",))

p.start()
c.start()
c1.start()

多進程multiprocessing

multiprocessing is a package that supports spawning processes using an API similar to the threading module. The multiprocessing package offers both local and remote concurrency, effectively side-stepping the Global Interpreter Lock by using subprocesses instead of threads. Due to this, the multiprocessing module allows the programmer to fully leverage multiple processors on a given machine. It runs on both Unix and Windows.

from multiprocessing import Process
import time
def Foo(name):
    time.sleep(2)
    print("hello ", name)

if __name__ == "__main__":
    p = Process(target=Foo, args=("dandy", ))
    p.start()
    p.join()

下面看看進程的ID:

from multiprocessing import Process
import time
import os


def info(title):
    print(title)
    print("module name:", __name__)
    print("parent process:", os.getppid())
    print("process id:", os.getpid())


def Foo(name):
    info("Here is the title.")
    print("hello", name)

if __name__ == "__main__":
    info("\033[32;1mMain Process Line\033[0m")
    p = Process(target=Foo, args=("dandy", ))
    p.start()
    p.join()

打印一下上面這段的執行結果:

Main Process Line
module name: __main__
parent process: 8404
process id: 9136
Here is the title.
module name: __mp_main__
parent process: 9136
process id: 7300
hello dandy

首先先是運行主程序方法:先傳參給info,然後運行info,這時候打印出來的module 那麽肯定是main,即主程序,我們分析一下裏面的2個進程id,getppid==》get parent process,獲取父進程ID;getpid==》get process ID獲取進程ID。這邊需要解釋的也就是順序問題,8404在window裏面可以查到,是PyCharm的進程ID,9136是這個py文件的主程序或者主進程的ID,即每一個子進程都是由一個父進程產生的。然後下面的語法就是調用Process,實例化出來一個進程,我們既然實例化出來了一個進程,那麽這個進程很顯然是由父進程9136,文件主進程起來的子進程:7300.這樣理解起來應該可以輕松很多。

進程間通訊

前面我們已經說過了線程,回顧一下就是,線程之間是共享一份數據的,因為單線程的緣故,GIL全局解釋器鎖保證了同一時間只能有一個現成運行。但是由於線程是共享數據的,所以2個線程在上下文切換執行時就需要鎖來保證數據的準確性。但是進程間能共享一份數據麽?

舉個例子,QQ跟WeChat的數據共享麽?或者說QQ可以動用更改支付寶裏面的數據麽?明顯是不可以的。所以進程間的數據,內存都是是相互獨立的。所以進程間有數據交換就需要管道,需要通訊,我們就來簡單介紹一下這一塊.

Queues

這個使用方法跟線程threading裏的queue差不多

from multiprocessing import Process,Queue

def Foo(qq):
    qq.put("hello dandy")

if __name__ == "__main__":
    q = Queue()
    p = Process(target=Foo, args=(q, ))
    p.start()
    print(q.get())
    p.join()

很簡單,一眼望穿,queue就是一邊put,一邊get,2個不同的進程各在一邊就形成了數據交換。註意進程的實例化,其實跟線程差不多,然後就是process queue的import。

Pipes

Pipes相當於一個管道,更類似於socket的發送接收。

from multiprocessing import  Process,Pipe

def Foo(conn):
    conn.send("hello dandy")
    conn.close()

if __name__ == "__main__":
    parent_conn, child_conn = Pipe()

    p = Process(target=Foo, args=(child_conn, ))
    p.start()
    print(parent_conn.recv())
    p.join ()

The Pipe() function returns a pair of connection objects connected by a pipe which by default is duplex (two-way).

The two connection objects returned by Pipe() represent the two ends of the pipe. Each connection object has send() and recv() methods (among others). Note that data in a pipe may become corrupted if two processes (or threads) try to read from or write to the same end of the pipe at the same time. Of course there is no risk of corruption from processes using different ends of the pipe at the same time.

Managers

A manager object returned by Manager() controls a server process which holds Python objects and allows other processes to manipulate them using proxies.

A manager returned by Manager() will support types list, dict, Namespace, Lock, RLock, Semaphore, BoundedSemaphore, Condition, Event, Barrier, Queue, Value and Array. For example,

#!/user/bin/env python
# -*-coding: utf-8-*-

from multiprocessing import Process,Manager
import os
def Foo(d, l):
    d[os.getpid()] = os.getpid()
    l.append(os.getpid())
if __name__ == "__main__":
    with Manager() as manager:  # manager = Manager( )==>d = Manager(.dict(
        d = manager.dict()  # 通過manager實例化出來一個用於進程通訊的dict
        l = manager.list(range(5))  # 這個就不要說了吧。。
        p_list = []
        for i in range(10):
            p = Process(target=Foo, args=(d,l))
            p.start()
            p_list.append(p)
        for res in p_list:
            res.join()
        print(d)
        print(l)

進程同步

Without using the lock output from the different processes is liable to get all mixed up.

def Foo(l,i):
    l.acquire()
    try:
        print("hello world", i)
    finally:
        l.release()

if __name__ == "__main__":
    lock = Lock()

    for i in range(10):
        Process(target=Foo,args=(lock, i)).start()

進程池

進程池內部維護一個進程序列,當使用時,則去進程池中獲取一個進程,如果進程池序列中沒有可供使用的進程,那麽程序就會等待直到進程池中有可用的進程為止。

進程池中有2個方法:

apply & apply_async

#!/user/bin/env python
# -*-coding: utf-8-*-
from multiprocessing import Pool
import os,time

def Foo(i):
    time.sleep(2)
    print("In process Foo", os.getpid())
    return i +50

def bar(args):
    print("===> Done:", args, os.getpid())

if __name__ == "__main__":
    pool = Pool(processes=3)  # 允許的最大進程數
    print("主進程", os.getpid())

    for i in range(10):
        # pool.apply(func=Foo, args=(i, ), callback= bar) # 串行
        pool.apply_async(func=Foo, args=(i, ), callback= bar)  # 異步
    print("Ending..")
    pool.close()
    pool.join()
# 這裏需要主要的是close 跟join的位置跟之前的所有遇到的都不一樣,這裏的順序只能是這樣
# 如果註釋掉join,進程池直接關閉,進程被關閉

Python 9 進程,線程