1. 程式人生 > >python程序和執行緒

python程序和執行緒

程序是執行程式過程中產生一系列內容,是作業系統資源分配的基本單位,執行緒是任務排程和執行的基本單位,每個程序中至少包括一個主執行緒,一個執行緒只能屬於一個程序,而一個程序可以有多個執行緒,程序之間不能直接共享資源

python中常用的執行緒模組有_threadthreading 列印啟用的執行緒個數及執行緒資訊

import threading
import time


def job():
    print("這是一個需要執行的任務")
    # 啟用的執行緒個數
    print("當前執行緒的個數:", threading.active_count())
    # 列印當前執行緒的詳細資訊
    print("當前執行緒資訊:", threading.current_thread())
    time.sleep(100)


job()

在這裡插入圖片描述

  • _thread模組建立多執行緒
import _thread
import threading
import time


def job(name):
    print("這是一個需要執行的任務")
    # 啟用的執行緒個數
    print("當前執行緒的個數:", threading.active_count())
    # 列印當前執行緒的詳細資訊
    print("當前執行緒資訊:", threading.current_thread())
    print(name, time.ctime())
    time.sleep(2)


if __name__ == "__main__":
    # 建立多個執行緒, 但是沒有開始執行任務;
    _thread.start_new_thread(job, ('thread1',))
    _thread.start_new_thread(job, ('thread2',))
    while True:
        pass

在這裡插入圖片描述

  • threading模組中通過例項化物件建立多執行緒
import threading
import time


def job(name):
    print("這是一個需要執行的任務: %s" % (name))
    # 啟用的執行緒個數
    print("當前執行緒的個數:", threading.active_count())
    # 列印當前執行緒的詳細資訊
    print("當前執行緒資訊:", threading.current_thread())
    time.sleep(1)

    print(name, time.ctime())


if __name__ == "__main__":
    # 建立多個執行緒
    t1 = threading.Thread(target=job, name='job1', args=("job1-name",))
    t1.start()
    t2 = threading.Thread(target=job, name='job2', args=("job2-name",))
    t2.start()
    t1.join()
    t2.join()
    print('hello')

在這裡插入圖片描述 可以看到實現了多執行緒,執行緒個數統計包括主執行緒,t1.start()表示啟動執行緒,t1.join()表示阻塞主執行緒,等待子程序 t1 執行結束後,執行主執行緒的內容

  • set_daemon設定守護程序,當主執行緒執行結束,讓沒有執行的執行緒強制結束
import threading
import time


# 任務1:
def music(name):
    for i in range(2):
        print("正在聽音樂%s" % (name))
        time.sleep(1)
        print('完成')


# 任務2:
def code(name):
    for i in range(2):
        print("正在編寫程式碼%s" % (name))
        time.sleep(2)
        print('完成')


if __name__ == '__main__':
    start_time = time.time()
    # music("中國夢")
    # code("爬蟲")

    t1 = threading.Thread(target=music, args=("中國夢",))
    t2 = threading.Thread(target=code, args=("爬蟲",))
    # 將t1執行緒生命為守護執行緒, 如果設定為True, 子執行緒啟動, 當主執行緒執行結束, 子執行緒也結束
    # 設定setDaemon必須在啟動執行緒之前進行設定;
    t1.setDaemon(True)
    t2.setDaemon(True)
    t1.start()
    t2.start()
    # 等待所有的子執行緒執行結束之後, 繼續執行主執行緒的內容;
    # t1.join()
    # t2.join()
    print("花費時間: %s" % (time.time() - start_time))

在這裡插入圖片描述

  • 通過繼承建立執行緒
import threading


# 類的繼承
class IpThread(threading.Thread):
    # 重寫構造方法;
    def __init__(self, jobname):
        super(IpThread, self).__init__()
        self.jobname = jobname

    # 將多執行緒需要執行的任務重寫到run方法中;
    def run(self):
        print("this is  a job")


t1 = IpThread(jobname="new job")
t1.start()

在這裡插入圖片描述

  • 帶引數的繼承
import json
import threading
from urllib.error import HTTPError
from urllib.request import urlopen
import time


class IpThread(threading.Thread):
    # 重寫構造方法;如果執行的任務需要傳遞引數, 那將引數通過建構函式與self繫結;
    def __init__(self, jobname, ip):
        super(IpThread, self).__init__()
        self.jobname = jobname
        self.ip = ip

        # 將多執行緒需要執行的任務重寫到run方法中;

    def run(self):
        try:
            # 需要有一個引數, 傳ip;
            url = "http://ip.taobao.com/service/getIpInfo.php?ip=%s" % (self.ip)
            # 根據url獲取網頁的內容, 並且解碼為utf-8格式, 識別中文;
            text = urlopen(url).read().decode('utf-8')
        except HTTPError as e:
            print("Error: %s獲取地理位置網路錯誤" % (self.ip))
        else:
            # 將獲取的字串型別轉換為字典, 方便處理
            d = json.loads(text)['data']
            country = d['country']
            city = d['city']
            print("%s:" % (self.ip), country, city)


def use_thread():
    start_time = time.time()
    ips = ['172.25.254.250', '8.8.8.8',
           '172.25.254.250', '8.8.8.8',
           '172.25.254.250', '8.8.8.8']
    threads = []
    for ip in ips:
        t = IpThread(jobname="爬蟲", ip=ip)
        threads.append(t)
        t.start()
    # 等待所有的子執行緒執行結束
    [thread.join() for thread in threads]
    print("Success, 執行時間為%s" % (time.time() - start_time))


if __name__ == "__main__":
    use_thread()

在這裡插入圖片描述 通過遍歷引數列表,建立多執行緒並啟動,在遍歷完成後設定阻塞主執行緒,等待子程序全部完成後繼續執行主執行緒

  • 執行緒同步執行緒鎖 當多個執行緒對同一個資料進行修改時,可能會出現不可預料的情況 新增方式
import threading


def add(lock):
    # 2. 操作變數之前進行加鎖
    lock.acquire()
    global money
    for i in range(1000000):
        money += 1
    # 3. 操作變數完成後進行解鎖
    lock.release()


def reduce(lock):
    # 2. 操作變數之前進行加鎖
    lock.acquire()
    global money
    for i in range(1000000):
        money -= 1
    # 3. 操作變數完成後進行解鎖
    lock.release()


if __name__ == '__main__':
    money = 0
    # 1. 例項化一個鎖物件
    lock = threading.Lock()
    t1 = threading.Thread(target=add, args=(lock,))
    t2 = threading.Thread(target=reduce, args=(lock,))
    t1.start()
    t2.start()
    # 等待所有子執行緒執行結束
    t1.join()
    t2.join()
    print("最終金額為:%s" % (money))
  • GIL全域性直譯器鎖 python的直譯器預設每次只允許一個執行緒執行,在執行過程中,會先設定GIL,切換到執行緒去執行對應的任務,在執行階段若是執行完成,或者 time.sleep() ,再或者需要其他資訊才能繼續執行時,會把當前執行緒設定為睡眠狀態,然後解鎖GIL,並重復執行

Python並不支援真正意義上的多執行緒。Python中提供了多執行緒包,但是如果你想通過多執行緒提高程式碼的速度,使用多執行緒包並不是個好主意。Python中有一個被稱為Global Interpreter Lock(GIL)的東西,它會確保任何時候你的多個執行緒中,只有一個被執行。執行緒的執行速度非常之快,會讓你誤以為執行緒是並行執行的,但是實際上都是輪流執行。經過GIL這一道關卡處理,會增加執行的開銷。這意味著,如果你想提高程式碼的執行速度,使用threading包並不是一個很好的方法。

對於I/O密集型操作,適合使用多執行緒操作,對於CPU/計算密集型操作,則不適合使用多執行緒操作

  • 佇列與多執行緒 在多執行緒執行任務中,會產生一些資料,為其他程式執行作鋪墊;在多執行緒中是不能返回任務執行結果的,因此需要一個容器來儲存多執行緒產生的資料
import threading
from collections import Iterable
from queue import Queue


def job(l, queue):
    # 將任務的結果儲存到佇列中;
    queue.put(sum(l))


def use_thread():
    # 例項化一個佇列, 用來儲存每個執行緒執行的結果;
    q = Queue()
    # # 入隊
    # q.put(1)
    li = [[1, 2, 3, 4, 5], [2, 3, 4, 5, 6], [2, 3, 4, 5, 6, 7, 8], [2, 3, 4, 5, 6]]
    threads = []
    for i in li:
        t = threading.Thread(target=job, args=(i, q))
        threads.append(t)
        t.start()
    # join方法等待所有子執行緒之心結束
    [thread.join() for thread in threads]
    # 從佇列裡面拿出所有的執行結果
    result = [q.get() for _ in li]
    print(result)
    print(isinstance(q, Iterable))


if __name__ == "__main__":
    use_thread()

在這裡插入圖片描述

  • ThreadPool執行緒池
from concurrent.futures import ThreadPoolExecutor
import time


# 需要執行的任務
def job():
    print("this is a job")
    return "hello"


if __name__ == '__main__':
    # 例項化物件, 執行緒池包含10個執行緒來處理任務;
    pool = ThreadPoolExecutor(max_workers=10)
    # 往執行緒池裡面扔需要執行的任務, 返回一個物件,( _base.Future例項化出來的)
    f1 = pool.submit(job)
    f2 = pool.submit(job)
    # 判斷任務是否執行結束
    print(f1.done())
    time.sleep(1)
    print(f2.done())
    # 獲取任務執行的結果
    print(f1.result())
    print(f2.result())

在這裡插入圖片描述 線上程池中,submit 提交任務

  • 執行緒池與map的關係 方法一:
from urllib.error import HTTPError
from urllib.request import urlopen
from concurrent.futures import ThreadPoolExecutor
from concurrent.futures import as_completed
import time

URLS = ['http://httpbin.org', 'http://example.com/',
        'https://api.github.com/'] * 10


def get_page(url, timeout=3):
    try:
        content = urlopen(url).read()
        return {'url': url, 'len': len(content)}
    except HTTPError as e:
        return {'url': url, 'len': 0}


# 方法1: submit提交任務
start_time = time.time()
pool = ThreadPoolExecutor(max_workers=20)
futuresObj = [pool.submit(get_page, url) for url in URLS]
# 注意: 傳遞的時包含futures物件的序列, as_complete, 返回已經執行完任務的future物件,
# 直到所有的future對應的任務執行完成, 迴圈結束;
for finish_fs in as_completed(futuresObj):
    print(finish_fs.result() )
for future in futuresObj:
    print(future.result())
print("執行時間:%s" % (time.time() - start_time))
#####
{'url': 'http://example.com/', 'len': 1270}
{'url': 'http://example.com/', 'len': 1270}
{'url': 'http://example.com/', 'len': 1270}
{'url': 'http://example.com/', 'len': 1270}
{'url': 'http://example.com/', 'len': 1270}
{'url': 'http://example.com/', 'len': 1270}
{'url': 'http://httpbin.org', 'len': 10122}
{'url': 'http://httpbin.org', 'len': 10122}
{'url': 'http://httpbin.org', 'len': 10122}
{'url': 'http://httpbin.org', 'len': 10122}
{'url': 'http://httpbin.org', 'len': 10122}
{'url': 'http://httpbin.org', 'len': 10122}
{'url': 'http://httpbin.org', 'len': 10122}
{'url': 'http://httpbin.org', 'len': 10122}
{'url': 'http://httpbin.org', 'len': 10122}
{'url': 'http://httpbin.org', 'len': 10122}
{'url': 'http://example.com/', 'len': 1270}
{'url': 'https://api.github.com/', 'len': 2039}
{'url': 'https://api.github.com/', 'len': 2039}
{'url': 'https://api.github.com/', 'len': 2039}
{'url': 'https://api.github.com/', 'len': 2039}
{'url': 'http://example.com/', 'len': 1270}
{'url': 'https://api.github.com/', 'len': 2039}
{'url': 'https://api.github.com/', 'len': 2039}
{'url': 'http://example.com/', 'len': 1270}
{'url': 'https://api.github.com/', 'len': 2039}
{'url': 'https://api.github.com/', 'len': 2039}
{'url': 'https://api.github.com/', 'len': 2039}
{'url': 'http://example.com/', 'len': 1270}
{'url': 'https://api.github.com/', 'len': 2039}
{'url': 'http://httpbin.org', 'len': 10122}
{'url': 'http://example.com/', 'len': 1270}
{'url': 'https://api.github.com/', 'len': 2039}
{'url': 'http://httpbin.org', 'len': 10122}
{'url': 'http://example.com/', 'len': 1270}
{'url': 'https://api.github.com/', 'len': 2039}
{'url': 'http://httpbin.org', 'len': 10122}
{'url': 'http://example.com/', 'len': 1270}
{'url': 'https://api.github.com/', 'len': 2039}
{'url': 'http://httpbin.org', 'len': 10122}
{'url': 'http://example.com/', 'len': 1270}
{'url': 'https://api.github.com/', 'len': 2039}
{'url': 'http://httpbin.org', 'len': 10122}
{'url': 'http://example.com/', 'len': 1270}
{'url': 'https://api.github.com/', 'len': 2039}
{'url': 'http://httpbin.org', 'len': 10122}
{'url': 'http://example.com/', 'len': 1270}
{'url': 'https://api.github.com/', 'len': 2039}
{'url': 'http://httpbin.org', 'len': 10122}
{'url': 'http://example.com/', 'len': 1270}
{'url': 'https://api.github.com/', 'len': 2039}
{'url': 'http://httpbin.org', 'len': 10122}
{'url': 'http://example.com/', 'len': 1270}
{'url': 'https://api.github.com/', 'len': 2039}
{'url': 'http://httpbin.org', 'len': 10122}
{'url': 'http://example.com/', 'len': 1270}
{'url': 'https://api.github.com/', 'len': 2039}
{'url': 'http://httpbin.org', 'len': 10122}
{'url': 'http://example.com/', 'len': 1270}
{'url': 'https://api.github.com/', 'len': 2039}
執行時間:3.0462844371795654

方法二:

from urllib.error import HTTPError
from urllib.request import urlopen
from concurrent.futures import ThreadPoolExecutor

URLS = ['http://httpbin.org', 'http://example.com/',
        'https://api.github.com/'] * 10


def get_page(url, timeout=3):
    try:
        content = urlopen(url).read()
        return {'url': url, 'len': len(content)}
    except HTTPError as e:
        return {'url': url, 'len': 0}


# 方法2:通過map方式執行
pool = ThreadPoolExecutor(max_workers=20)
for res in pool.map(get_page, URLS):
    print(res)
#####
{'url': 'http://httpbin.org', 'len': 10122}
{'url': 'http://example.com/', 'len': 1270}
{'url': 'https://api.github.com/', 'len': 2039}
{'url': 'http://httpbin.org', 'len': 10122}
{'url': 'http://example.com/', 'len': 1270}
{'url': 'https://api.github.com/', 'len': 2039}
{'url': 'http://httpbin.org', 'len': 10122}
{'url': 'http://example.com/', 'len': 1270}
{'url': 'https://api.github.com/', 'len': 2039}
{'url': 'http://httpbin.org', 'len': 10122}
{'url': 'http://example.com/', 'len': 1270}
{'url': 'https://api.github.com/', 'len': 2039}
{'url': 'http://httpbin.org', 'len': 10122}
{'url': 'http://example.com/', 'len': 1270}
{'url': 'https://api.github.com/', 'len': 2039}
{'url': 'http://httpbin.org', 'len': 10122}
{'url': 'http://example.com/', 'len': 1270}
{'url': 'https://api.github.com/', 'len': 2039}
{'url': 'http://httpbin.org', 'len': 10122}
{'url': 'http://example.com/', 'len': 1270}
{'url': 'https://api.github.com/', 'len': 2039}
{'url': 'http://httpbin.org', 'len': 10122}
{'url': 'http://example.com/', 'len': 1270}
{'url': 'https://api.github.com/', 'len': 2039}
{'url': 'http://httpbin.org', 'len': 10122}
{'url': 'http://example.com/', 'len': 1270}
{'url': 'https://api.github.com/', 'len': 2039}
{'url': 'http://httpbin.org', 'len': 10122}
{'url': 'http://example.com/', 'len': 1270}
{'url': 'https://api.github.com/', 'len': 2039}
  • LINUX建立子程序
import os
import time

print("當前程序(pid=%d)正在執行..." % (os.getpid()))
print("當前程序的父程序(pid=%d)正在執行..." % (os.getppid()))
print("正在建立子程序......")
pid = os.fork()
pid2 = os.fork()
print("第1個:", pid)
print("第2個: ", pid2)

if pid == 0:
    print("這是建立的子程序, 子程序的id為%s, 父程序的id為%s"
          % (os.getpid(), os.getppid()))
else:
    print("當前是父程序[%s]的返回值%s" % (os.getpid(), pid))
time.sleep(100)
#####
當前程序(pid=5714)正在執行...
當前程序的父程序(pid=5378)正在執行...
正在建立子程序......
第1個: 5715
第2個:  5716
當前是父程序[5714]的返回值5715
第1個: 5715
第2個:  0
當前是父程序[5716]的返回值5715
第1個: 0
第2個:  5717
這是建立的子程序, 子程序的id為5715, 父程序的id為5714
第1個: 0
第2個:  0
這是建立的子程序, 子程序的id為5717, 父程序的id為5715

Linux建立子程序的原理:先有父程序,再有子程序,通過 fork() 函式實現,如果父程序結束, 子程序也隨之結束,其中fork函式的返回值:呼叫該方法一次, 返回兩次,子程序返回一個0,父程序返回子程序的pid,os.getpid()獲取當前程序的pid,os.getppid()獲取當前程序的父程序的id

  • 通過例項化物件建立多程序 在面對I/O密集型任務時,使用多執行緒,在處理密集型任務時,則使用多程序,充分利用CPU,將任務提交給多個CPU去執行。
import multiprocessing


def job():
    print("當前子程序的名稱為%s" % (multiprocessing.current_process()))


# 建立一個程序物件
p1 = multiprocessing.Process(target=job)
# 執行多程序, 執行任務
p1.start()
# 建立一個程序物件
p2 = multiprocessing.Process(target=job)
# 執行多程序, 執行任務
p2.start()
# 等待所有的子程序執行結束, 再執行主程序的內容
p1.join()
p2.join()
print("任務執行結束.......")
#####
當前子程序的名稱為<Process(Process-1, started)>
當前子程序的名稱為<Process(Process-2, started)>
任務執行結束.......
  • 通過繼承實現多程序
import multiprocessing


class JobProcess(multiprocessing.Process):
    # 重寫Process的構造方法, 獲取新的屬性
    def __init__(self, queue):
        super(JobProcess, self).__init__()
        self.queue = queue

    # 重寫run方法, 將執行的任務放在裡面即可
    def run(self):
        print("當前子程序的名稱為%s" % (multiprocessing.current_process()))


processes = []
# 啟動10個子程序, 來處理需要執行的任務;
for i in range(10):
    # 例項化物件;
    p = JobProcess(queue=3)
    processes.append(p)
    # 啟動多程序, 處理需要執行的任務;
    p.start()
# 等待所有的子程序執行結束, 再繼續執行主程序
[process.join() for process in processes]
# 執行主程序
print("任務執行結束.......")
#####
當前子程序的名稱為<JobProcess(JobProcess-1, started)>
當前子程序的名稱為<JobProcess(JobProcess-2, started)>
當前子程序的名稱為<JobProcess(JobProcess-3, started)>
當前子程序的名稱為<JobProcess(JobProcess-4, started)>
當前子程序的名稱為<JobProcess(JobProcess-5, started)>
當前子程序的名稱為<JobProcess(JobProcess-6, started)>
當前子程序的名稱為<JobProcess(JobProcess-7, started)>
當前子程序的名稱為<JobProcess(JobProcess-8, started)>
當前子程序的名稱為<JobProcess(JobProcess-9, started)>
當前子程序的名稱為<JobProcess(JobProcess-10, started)>
任務執行結束.......

對於多程序,開啟的程序數有瓶頸,取決於CPU的個數。如果處理的資料比較小, 不建議使用多程序, 因為程序的建立和銷燬需要時間,開啟的程序數越多,不一定效率越高。如果處理的資料量足夠大, 0<開啟的程序數<cpu個數, 開啟的程序數越多, 效率越高。

  • 守護程序
import multiprocessing
import time


def job():
    name = multiprocessing.current_process()
    print("開始執行")
    time.sleep(3)
    print("結束程序")


if __name__ == '__main__':
    # 啟動一個子程序
    p1 = multiprocessing.Process(target=job, name='use deamon')
    # True/False
    p1.daemon = True
    p1.start()
    # join等待所有子程序執行結束, 再執行主程序
    # p1.join()
    # 主程序執行
    print("程式執行結束")

在這裡插入圖片描述 守護程序,主程序執行結束, 子程序不再繼續執行

  • 終止程序
import time
import multiprocessing


def job():
    print("start.....")
    time.sleep(10)
    print('end.......')


if __name__ == '__main__':
    p = multiprocessing.Process(target=job)
    print("Before:", p.is_alive())
    p.start()  # 啟動子程序
    print("During:", p.is_alive())
    p.terminate()  # 終止子程序
    print('terminate:', p.is_alive())
    # p.join()  # 等待子程序徹底終止
    time.sleep(1)
    print("joined:", p.is_alive())
#####
Before: False
During: True
terminate: True
joined: False

在執行terminate()後,程序被終止

  • 生產者消費者
import multiprocessing
from multiprocessing import Queue
import time


class Producer(multiprocessing.Process):
    # 往佇列裡面寫內容
    def __init__(self, queue):
        super(Producer, self).__init__()
        self.queue = queue

    def run(self):
        for i in range(10):
            self.queue.put(i)
            time.sleep(0.1)
            print("傳遞訊息, 內容為:%s" % (i))


class Consumer(multiprocessing.Process):
    # 讀取佇列裡面的內容
    def __init__(self, queue):
        super(Consumer, self).__init__()
        self.queue = queue

    def run(self):
        # 判斷佇列是否為空, 如果是, 跳出迴圈, 不會再去從佇列獲取資料;
        # while not self.queue.empty():
        while True:
            time.sleep(0.1)
            print("讀取程序傳遞的訊息:%s" % (self.queue.get()))


if __name__ == "__main__":
    q = Queue()
    p1 = Producer(q)
    c1 = Consumer(q)
    p1.start()
    c1.start()
    p1.join()
    c1.terminate()
    c1.join()
    print('all done')
#
讀取程序傳遞的訊息:0
傳遞訊息, 內容為:0
讀取程序傳遞的訊息:1
傳遞訊息, 內容為:1
讀取程序傳遞的訊息:2
傳遞訊息, 內容為:2
讀取程序傳遞的訊息:3
傳遞訊息, 內容為:3
讀取程序傳遞的訊息:4
傳遞訊息, 內容為:4
讀取程序傳遞的訊息:5
傳遞訊息, 內容為:5
讀取程序傳遞的訊息:6
傳遞訊息, 內容為:6
讀取程序傳遞的訊息:7
傳遞訊息, 內容為:7
讀取程序傳遞的訊息:8
傳遞訊息, 內容為:8
讀取程序傳遞的訊息:9
傳遞訊息, 內容為:9
all done
  • 分散式程序 當任務需要處理的資料非常大,希望多臺主機共同處理任務 實現方式: multiprocessing.managers子模組裡面可以實現將程序分佈到多臺機器 BaseManager: 提供了不同機器程序之間共享資料的一種方法(ip:port) Master: 管理端, 分配任務給其他主機; Worker1: 被管理端, 處理master給予的任務; Worker2:被管理端, 處理master給予的任務; 管理端:
import random
from queue import Queue
from multiprocessing.managers import  BaseManager
# 1. 建立需要的佇列
# task_queue儲存的是任務需要傳遞的引數
task_queue = Queue()
# result_queue儲存的是任務執行結果
result_queue = Queue()
# 2. 將佇列註冊到網路上
# 需要將兩個佇列註冊到網路上, 使得其他主機可以訪問;
BaseManager.register('get_task_queue',  callable=lambda : task_queue)
BaseManager.register('get_result_queue',  callable=lambda : result_queue)
# 繫結埠為為4000, 暗號/金鑰為hello;
manager = BaseManager(address=('192.168.1.137', 4000), authkey=b'hello')
# 3. 啟動manager, 開始共享佇列;
manager.start()
# 4. 通過網路訪問共享的Queue物件,
# manager.register: 註冊一個佇列, 唯一標識'get_task_queue'
# manager.get_task_queue()呼叫註冊, 呼叫過程中執行的內容為callable只想的函式;
task = manager.get_task_queue()
result =  manager.get_result_queue()
# 5. 開始往佇列裡面放執行任務需要的資料;
for i in range(100):
    n = random.randint(1,1000)
    task.put(n)
    print("任務列表加入任務: %d" %(n))
# 6. 從result佇列裡面讀取各個及機器執行的結果;
for j in range(100):
    res = result.get()
    print("佇列任務的執行結果:%s" %(res))
# 7. 關閉manager, 取消共享佇列;
manager.shutdown()

被管理端:

from multiprocessing.managers import BaseManager
# 1. 連線Master端, 獲取共享的佇列;
# address寫的是master端的ip和共享的埠, authkey與master端保持一致;
import time
worker = BaseManager(address=('192.168.1.137', 4000), authkey=b'hello')
# 2. 註冊佇列, 獲取共享端的佇列內容
BaseManager.register('get_task_queue')
BaseManager.register('get_result_queue')
# 3. 去連線
worker.connect()
# 4. 通過網路訪問共享的Queue物件,
# manager.register: 註冊一個佇列, 唯一標識'get_task_queue'
# manager.get_task_queue()呼叫註冊, 呼叫過程中執行的內容為callable只想的函式;
task = worker.get_task_queue()
result =  worker.get_result_queue()
# 5. 讀取管理端共享的任務, 依次執行;
for i in range(50):
    n = task.get()
    print("執行任務 %d ** 2:" %(n))
    res = "%d ** 2 = %d" %(n, n**2)
    time.sleep(1)
    # 將執行結果放入reesult佇列
    result.put(res)
print("執行結束.....")

管理端: 在這裡插入圖片描述 被管理端: 在這裡插入圖片描述

  • 程序池
import multiprocessing


def job(id):
    print('start %d.....' % (id))
    print('end %d.....' % (id))


# 建立一個程序池物件
pool = multiprocessing.Pool(10)
# 給程序池的程序分配任務
for i in range(10):
    pool.apply_async(job, args=(i,))
pool.close()
# 等待所有的子程序執行結束
pool.join()
print('success')
#
start 0.....
end 0.....
start 1.....
end 1.....
start 3.....
start 2.....
end 3.....
end 2.....
start 5.....
end 5.....
start 4.....
end 4.....
start 6.....
end 6.....
start 7.....
end 7.....
start 8.....
end 8.....
start 9.....
end 9.....
success
  • 程序池—ProcessPoolExecutor submit方法
from concurrent.futures import ProcessPoolExecutor


def job(id):
    print('start %d.....' % (id))
    print('end %d.....' % (id))
    return id


# submit
pool = ProcessPoolExecutor(max_workers=4)
# 分配任務給子程序, 並且返回一個Future物件;
f1 = pool.submit(job, 1)
# 獲取程序是否執行結束;
f1.done()
# 獲取子程序執行的結果
f1.result()
#
start 1.....
end 1.....

map方法

from concurrent.futures import ProcessPoolExecutor


def job(id):
    print('start %d.....' % (id))
    print('end %d.....' % (id))
    return id


pool = ProcessPoolExecutor(max_workers=4)
for res in pool.map(job, range(1, 100)):
    print(res)

在這裡插入圖片描述