1. 程式人生 > >[python](windows)分布式進程問題:pickle模塊不能序列化lambda函數

[python](windows)分布式進程問題:pickle模塊不能序列化lambda函數

trace ttr 繼承 turn error ase col UC ret

運行錯誤:_pickle.PicklingError: Can‘t pickle <function <lambda> at 0x000002BAAEF12F28>: attribute lookup <lambda> on __main__ failed

代碼如下:

 1 #!/usr/bin/env python3
 2 # -*- coding: utf-8 -*-
 3 
 4 import random, time, queue
 5 from multiprocessing.managers import BaseManager
 6 
 7 # 發送任務的隊列:
8 task_queue = queue.Queue() 9 # 接收結果的隊列: 10 result_queue = queue.Queue() 11 12 # 從BaseManager繼承的QueueManager: 13 class QueueManager(BaseManager): 14 pass 15 16 # 把兩個Queue都註冊到網絡上, callable參數關聯了Queue對象: 17 QueueManager.register(get_task_queue, callable=lambda: task_queue) 18 QueueManager.register(
get_result_queue, callable=lambda: result_queue) 19 20 # 綁定端口5000, 設置驗證碼‘abc‘: 21 manager = QueueManager(address=(‘‘, 5000), authkey=babc) 22 23 # 啟動Queue: 24 manager.start() 25 26 # 獲得通過網絡訪問的Queue對象: 27 task = manager.get_task_queue() 28 result = manager.get_result_queue() 29 30 # 放幾個任務進去: 31 for i in
range(10): 32 n = random.randint(0, 10000) 33 print(Put task %d... % n) 34 task.put(n) 35 36 # 從result隊列讀取結果: 37 print(Try get results...) 38 for i in range(10): 39 r = result.get(timeout=10) 40 print(Result: %s % r) 41 42 # 關閉: 43 manager.shutdown() 44 print(master exit.)

報錯信息:

 1 Traceback (most recent call last):
 2   File "task_master.py", line 22, in <module>
 3     manager.start()
 4   File "E:\Anaconda\Anaconda3\lib\multiprocessing\managers.py", line 513, in start
 5     self._process.start()
 6   File "E:\Anaconda\Anaconda3\lib\multiprocessing\process.py", line 105, in start
 7     self._popen = self._Popen(self)
 8   File "E:\Anaconda\Anaconda3\lib\multiprocessing\context.py", line 322, in _Popen
 9     return Popen(process_obj)
10   File "E:\Anaconda\Anaconda3\lib\multiprocessing\popen_spawn_win32.py", line 65, in __init__
11     reduction.dump(process_obj, to_child)
12   File "E:\Anaconda\Anaconda3\lib\multiprocessing\reduction.py", line 60, in dump
13     ForkingPickler(file, protocol).dump(obj)
14 _pickle.PicklingError: Cant pickle <function <lambda> at 0x000002BAAEF12F28>: attribute lookup <lambda> on __main__ failed

錯誤原因:pickle模塊不能序列化lambda,需要自定義函數

修改代碼如下:

 1 #!/usr/bin/env python3
 2 # -*- coding: utf-8 -*-
 3 
 4 import random, time, queue
 5 from multiprocessing.managers import BaseManager
 6 
 7 # 發送任務的隊列:
 8 task_queue = queue.Queue()
 9 # 接收結果的隊列:
10 result_queue = queue.Queue()
11 
12 # 自定義函數re_task_queue
13 def re_task_queue():  
14     global task_queue  
15     return task_queue  
16 
17 # 自定義函數re_result_queue
18 def re_result_queue():  
19     global result_queue  
20     return result_queue
21 
22 # 從BaseManager繼承的QueueManager:
23 class QueueManager(BaseManager):
24     pass
25 
26 if __name__ == __main__:
27 
28     # 把兩個Queue都註冊到網絡上, callable參數關聯了Queue對象:
29     QueueManager.register(get_task_queue, callable=re_task_queue)
30     QueueManager.register(get_result_queue, callable=re_result_queue)
31 
32     # 綁定端口5000, 設置驗證碼‘abc‘:
33     manager = QueueManager(address=(127.0.0.1, 5000), authkey=babc)
34 
35     # 啟動Queue:
36     manager.start()
37 
38     # 獲得通過網絡訪問的Queue對象:
39     task = manager.get_task_queue()
40     result = manager.get_result_queue()
41 
42     # 放幾個任務進去:
43     for i in range(10):
44         n = random.randint(0, 10000)
45         print(Put task %d... % n)
46         task.put(n)
47 
48     # 從result隊列讀取結果:
49     print(Try get results...)
50     for i in range(10):
51         r = result.get(timeout=10)
52         print(Result: %s % r)
53         
54     # 關閉:
55     manager.shutdown()
56     print(master exit.)

運行結果:

C:\Users\Lucky丶M\python>python task_master.py
Put task 4962...
Put task 3460...
Put task 4774...
Put task 4301...
Put task 9120...
Put task 7183...
Put task 4915...
Put task 3173...
Put task 9138...
Put task 5798...
Try get results...

[python](windows)分布式進程問題:pickle模塊不能序列化lambda函數