1. 程式人生 > >Python分散式程序報錯:pickle模組不能序列化lambda函式

Python分散式程序報錯:pickle模組不能序列化lambda函式

今天在學習到廖老師Python教程的分散式程序時,遇到了一個錯誤:_pickle.PicklingError: Can't pickle <function <lambda> at 0x000001710FDC2EA0>: attribute lookup <lambda> on __main__ failed(pickle模組不能序列化lambda函式)

程式碼如下:

#!/usr/bin/env python
# _*_ coding:utf-8 _*_

import random, queue
from multiprocessing.managers import BaseManager

# 傳送任務的佇列
task_queue = queue.Queue()
# 接收結果的佇列
result_queue = queue.Queue()
# 從BaseManager繼承的QueueManager


class QueueManager(BaseManager):
    pass


# 吧兩個Queue都註冊到網路上,callable引數關聯了Queue物件
QueueManager.register('get_task_queue', callable=lambda: task_queue)
QueueManager.register('get_result_queue', callable=lambda: result_queue)

# 繫結埠5000,設定驗證碼abc
manager = QueueManager(address=('', 5000), authkey=b'abc')

# 啟動queue
manager.start()

# 獲得通過網路訪問的Queue物件
task = manager.get_task_queue()
result = manager.get_result_queue()

# 放幾個任務
for i in range(10):
    n = random.randint(0, 1000)
    print('新增任務 %d' % n)
    task.put(n)

# 從result佇列讀取結果
print('嘗試獲取結果')
for i in range(10):
    r = result.get(timeout=10)
    print('結果是:%s' % r)

# 關閉
manager.shutdown()
print('master exit')

在教程中我記得有關pickle的事兒(有印象看來思想還在線上,哈哈),翻了一下,看到:

 原來是系統問題造成的,那麼,如何解決呢?在教程中我也看到,遇到這樣的情況,需要我們自己定義函式,實現序列化。

所以對程式碼稍加修改,定義兩個函式return_task_queuereturn_result_queue實現序列化:

#!/usr/bin/env python
# _*_ coding:utf-8 _*_

import random, queue
from multiprocessing.managers import BaseManager

# 傳送任務的佇列
task_queue = queue.Queue()
# 接收結果的佇列
result_queue = queue.Queue()


def return_task_queue():
    global task_queue
    return task_queue


def return_result_queue():
    global result_queue
    return result_queue


# 從BaseManager繼承的QueueManager


class QueueManager(BaseManager):
    pass


if __name__ == '__main__':
    # 吧兩個Queue都註冊到網路上,callable引數關聯了Queue物件
    QueueManager.register('get_task_queue', callable=return_task_queue)
    QueueManager.register('get_result_queue', callable=return_result_queue)

    # 繫結埠5000,設定驗證碼abc
    manager = QueueManager(address=('127.0.0.1', 5000), authkey=b'abc')

    # 啟動queue
    manager.start()

    # 獲得通過網路訪問的Queue物件
    task = manager.get_task_queue()
    result = manager.get_result_queue()

    # 放幾個任務
    for i in range(10):
        n = random.randint(0, 1000)
        print('新增任務 %d' % n)
        task.put(n)

    # 從result佇列讀取結果
    print('嘗試獲取結果')
    for i in range(10):
        r = result.get(timeout=10)
        print('結果是:%s' % r)

    # 關閉
    manager.shutdown()
    print('master exit')

執行結果:

歡迎各位來和我一起交流技術,分享學習心得