1. 程式人生 > >Python中使用threading.Condition交替列印兩個字元的程式

Python中使用threading.Condition交替列印兩個字元的程式

這個程式涉及到兩個執行緒的的協調問題,兩個執行緒為了能夠相互協調執行,必須持有一個共同的狀態,通過這個狀態來維護兩個執行緒的執行,通過使用threading.Condition物件就能夠完成兩個執行緒之間的這種協調工作。

threading.Condition預設情況下會通過持有一個ReentrantLock來協調執行緒之間的工作,所謂可重入鎖,是隻一個可以由一個執行緒遞迴獲取的鎖,此鎖物件會維護當前鎖的所有者(執行緒)和當前所有者遞迴獲取鎖的次數(本文在邏輯上和可重入鎖沒有任何關係,完全可以用一個普通鎖替代)。

Python文件中給出的描述是:它是一個與某個鎖相聯絡的變數。同時它實現了上下文管理協議。其物件中除了acquire和release方法之外,其它方法的呼叫的前提是,當前執行緒必須是這個鎖的所有者。

通過程式碼和其中的註釋,能夠非常明白地弄清楚Condition的原理是怎樣的:

import threading
import time
import functools


def worker(cond, name):
    """worker running in different thread"""
    with cond:  # 通過__enter__方法,獲取cond物件中的鎖,預設是一個ReentrantLock物件
        print('...{}-{}-{}'.format(name, threading.current_thread().getName(), cond._is_owned()))
        cond.wait()  # 建立一個新的鎖NEWLOCK,呼叫acquire將NEWLOCK獲取,然後將NEWLOCK放入等待列表中,\
        # 釋放cond._lock鎖(_release_save),最後再次呼叫acquire讓NEWLOCK阻塞
    print('wait returned in {}'.format(name))


if __name__ == '__main__':
    condition = threading.Condition()
    t1 = threading.Thread(target=functools.partial(worker, condition, 't1'))
    t2 = threading.Thread(target=functools.partial(worker, condition, 't2'))

    t2.start()  # 啟動執行緒2
    t1.start()  # 啟動執行緒1

    time.sleep(2)
    with condition:
        condition.notify(1)  # 按照FIFO順序(wait呼叫順序),釋放一個鎖,並將其從等待列表中刪除

    time.sleep(2)

    with condition:
        condition.notify(1)  # 按照FIFO順序(wait呼叫順序),釋放另一個鎖,並將其從等待佇列中刪除

    t1.join()  # 主執行緒等待子執行緒結束
    t2.join()  # 主執行緒等待子執行緒結束

    print('All done')

其輸出為:

...t2-Thread-2-True
...t1-Thread-1-True
wait returned in t2
wait returned in t1
All done

其中wait方法要求獲取到threading.Condition物件中的鎖(如果沒有提供,預設使用一個可重入鎖),然後自己建立一個新的普通鎖(NEWLOCK),並獲取這個NEWLOCK;之後呼叫_release_save方法釋放threading.Condition物件中的鎖,讓其它執行緒能夠獲取到;最後再次呼叫NEWLOCK上的acquire方法,由於在建立時已經acquire過,所以此執行緒會阻塞在此。而wait想要繼續執行,必須等待其它執行緒將產生阻塞的這個NEWLOCK給release掉,當然,這就是notify方法的責任了。

notify方法接收一個數字n,從等待列表中取出相應數量的等待物件(讓wait方法阻塞的鎖物件),呼叫其release方法,讓對應的wait方法能夠返回。而notify_all方法僅僅就是將n設定為等待列表的總長度而已。

在理解了threading.Condition物件中wait和notify的工作原理之後,我們就可以利用它們來實現兩個執行緒交替列印字元的功能了:

import threading
import functools
import time


def print_a(state):
    while True:
        if state.closed:
            print('Close a')
            return
        print('A')
        time.sleep(2)
        state.set_current_is_a(True)
        state.wait_for_b()


def print_b(state):
    while True:
        if state.closed:
            print('Close b')
            return
        state.wait_for_a()
        print('B')
        time.sleep(2)
        state.set_current_is_a(False)


if __name__ == '__main__':
    class State(object):
        """state used to coordinate multiple(two here) threads"""
        def __init__(self):
            self.condition = threading.Condition()
            self.current_is_a = False
            self.closed = False

        def wait_for_a(self):
            with self.condition:
                while not self.current_is_a:
                    self.condition.wait()

        def wait_for_b(self):
            with self.condition:
                while self.current_is_a:
                    self.condition.wait()

        def set_current_is_a(self, flag):
            self.current_is_a = flag
            with self.condition:
                self.condition.notify_all()


    state = State()
    t1 = threading.Thread(target=functools.partial(print_a, state))
    t2 = threading.Thread(target=functools.partial(print_b, state))

    try:
        t1.start()
        t2.start()

        t1.join()
        t2.join()
    except KeyboardInterrupt:
        state.closed = True
        print('Closed')

可以看到有兩種型別的任務,一個用於列印字元A,一個用於列印字元B,我們的實現種讓A先於B列印,所以在print_a中,先列印A,再設定當前字元狀態並釋放等待列表中的所有鎖(set_current_is_a),如果沒有這一步,current_is_a將一直是False,wait_for_b能夠返回,而wait_for_a卻永遠不會返回,最終效果就是每隔兩秒就列印一個字元A,而B永遠不會列印。另一個副作用是如果wait_for_a永遠不會返回,那print_b所線上程的關閉邏輯也就無法執行,最終會成為殭屍執行緒(這裡的關閉邏輯只用作示例,生產環境需要更加完善的關閉機制)。

考慮另一種情況,print_a種將set_current_is_a和wait_for_b交換一下位置會怎麼樣。從觀察到的輸出我們看到,程式首先輸出了一個字元A,以後,每隔2秒鐘,就會同時輸出A和B,而不是交替輸出。原因在於,由於current_is_a還是False,我們先呼叫的wait_for_b其會立即返回,之後呼叫set_current_is_a,將current_is_a設定為True,並釋放所有的阻塞wait的鎖(notify_all),這個過程中沒有阻塞,print_a緊接著進入了下一個列印迴圈;與此同時,print_b中的wait_for_a也返回了,進入到B的列印迴圈,故最終我們看到A和B總是一起列印。

可見對於threading.Condition的使用需要多加小心,要注意邏輯上的嚴謹性。

附一個佇列版本:

import threading
import functools
import time
from queue import Queue


def print_a(q_a, q_b):
    while True:
        char_a = q_a.get()
        if char_a == 'closed':
            return
        print(char_a)
        time.sleep(2)
        q_b.put('B')


def print_b(q_a, q_b):
    while True:
        char_b = q_b.get()
        if char_b == 'closed':
            return
        print(char_b)
        time.sleep(2)
        q_a.put('A')


if __name__ == '__main__':
    q_a = Queue()
    q_b = Queue()

    t1 = threading.Thread(target=functools.partial(print_a, q_a, q_b))
    t2 = threading.Thread(target=functools.partial(print_b, q_a, q_b))

    try:
        t1.start()
        t2.start()

        q_a.put('A')

        t1.join()
        t2.join()
    except KeyboardInterrupt:
        q_a.put('closed')
        q_b.put('closed')

    print('Done')

佇列版本邏輯更清晰,更不容易出錯,實際應用中應該選用佇列。

附一個協程版本(Python 3.5+):

import time
import asyncio


async def print_a():
    while True:
        print('a')
        time.sleep(2)   # simulate the CPU block time
        await asyncio.sleep(0)  # release control to event loop


async def print_b():
    while True:
        print('b')
        time.sleep(2)  # simulate the CPU block time
        await asyncio.sleep(0)  # release control to event loop


async def main():
    await asyncio.wait([print_a(), print_b()])


if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    loop.run_until_complete(main())


協程的執行需要依附於一個事件迴圈(select/poll/epoll/kqueue),通過async def將一個函式定義為協程,通過await主動讓渡控制權,通過相互讓渡控制權完成交替列印字元。整個程式運行於一個執行緒中,這樣就沒有執行緒間協調的工作,僅僅是控制權的讓渡邏輯。對於IO密集型操作,而沒有明顯的CPU阻塞(計算複雜,以致出現明顯的延時,比如複雜加解密演算法)的情況下非常合適。

附一個Java版本:

PrintMain類,用於管理和協調列印A和列印B的兩個執行緒:

package com.cuttyfox.tests.self.version1;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

public class PrintMain {
    private boolean currentIsA = false;

    public synchronized void waitingForPrintingA() throws InterruptedException {
        while (this.currentIsA == false) {
            wait();
        }
    }

    public synchronized void waitingForPrintingB() throws InterruptedException {
        while (this.currentIsA == true) {
            wait();
        }
    }

    public synchronized void setCurrentIsA(boolean flag) {
        this.currentIsA = flag;
        notifyAll();
    }

    public static void main(String[] args) throws Exception {
        PrintMain state = new PrintMain();
        ExecutorService executorService = Executors.newCachedThreadPool();
        executorService.execute(new PrintB(state));
        executorService.execute(new PrintA(state));
        executorService.shutdown();
        executorService.awaitTermination(10, TimeUnit.SECONDS);
        System.out.println("Done");
        System.exit(0);
    }
}

列印A的執行緒(首先列印A):

package com.cuttyfox.tests.self.version1;

import java.util.concurrent.TimeUnit;

public class PrintA implements Runnable{
    private PrintMain state;

    public PrintA(PrintMain state) {
        this.state = state;
    }

    public void run() {
        try {
            while (!Thread.interrupted()){
                System.out.println("Print A");
                TimeUnit.SECONDS.sleep(1);
                this.state.setCurrentIsA(true);
                this.state.waitingForPrintingB();
            }
        } catch (InterruptedException e) {
            System.out.println("Exit through Interrupting.");
        }

    }
}

列印B的執行緒:

package com.cuttyfox.tests.self.version1;

import java.util.concurrent.TimeUnit;

public class PrintB implements Runnable{
    private PrintMain state;

    public PrintB(PrintMain state) {
        this.state = state;
    }

    public void run() {
        try{
            while (!Thread.interrupted()) {
                this.state.waitingForPrintingA();
                System.out.println("Print B");
                TimeUnit.SECONDS.sleep(1);
                this.state.setCurrentIsA(false);
            }
        } catch (InterruptedException e) {
            System.out.println("Exit through Interrupting.");
        }

    }
}

Java物件本身有物件鎖,故這裡沒有像Python中那樣需要顯式通過建立一個Condition物件來得到一把鎖。

使用Python實現交替列印abcdef的過程:

    import threading
    import time
    import functools
    from collections import deque

    LETTERS = [chr(code) for code in range(97, 97+6)]
    LENGTH = len(LETTERS)


    class State(object):
        def __init__(self):
            self.condition = threading.Condition()
            self.index_value = 0

        def set_next_index(self, index):
            with self.condition:
                self.index_value = index
                self.condition.notify_all()

        def wait_for(self, index_value):
            with self.condition:
                while not self.index_value == index_value:
                    self.condition.wait()


    def print_letter(state: State, wait_ident: int):
        print('Got: {}!'.format(wait_ident))
        while True:
            state.wait_for(wait_ident)
            time.sleep(2)
            print(LETTERS[state.index_value])
            print('PRINT: {} AND SET NEXT: {}'.format(state.index_value,
                                                      (state.index_value + 1) % LENGTH
                                                      ))
            state.set_next_index((state.index_value + 1) % LENGTH)


    state = State()
    d = deque()
    d.extend(range(LENGTH))
    d.rotate(1)
    print(d)

    threads = []
    for wait_ident in d:
        t = threading.Thread(target=functools.partial(print_letter, state, wait_ident))
        threads.append(t)

    for thread in threads:
        thread.start()

    for thread in threads:
        thread.join()