2018年5月22日筆記
- Python共享內存
共享內存有兩個結構,一個是 Value
, 一個是 Array
,這兩個結構內部都實現了鎖機制,因此是多進程安全的。
Value 和 Array 都需要設置其中存放值的類型,d 是 double 類型,i 是 int 類型,具體的對應關系在Python 標準庫的 sharedctypes 模塊中查看。
- 習題1
1 from multiprocessing import Value, Array, Process 2 3 def woker(arr): 4 for i in range(len(arr)): 5 arr[i] = -arr[i]6 7 if __name__ == ‘__main__‘: 8 arr = Array(‘i‘, [x for x in range(10)]) 9 print(arr) 10 print(arr[:]) 11 p = Process(target=woker, args=(arr,)) 12 p.start() 13 p.join() # 等子進程先執行完,否則兩次print結果相同 14 print(arr[:])
<SynchronizedArray wrapper for <multiprocessing.sharedctypes.c_int_Array_10 object at 0x102c76510>> [0,1, 2, 3, 4, 5, 6, 7, 8, 9] [0, -1, -2, -3, -4, -5, -6, -7, -8, -9]
- Manager -- 專門用來做數據共享的模塊
上面的共享內存支持兩種結構 Value 和 Array。 Python 中還有一個強大的Manager,專門用來做數據共享。
其支持的類型非常多,比如list, dict, Namespace, Lock, RLock, Semaphore, BoundedSemaphore, Condition, Event, Queue, Value 和 Array。
一個 Manager 對象是一個服務進程,推薦多進程程序中,數據共享就用一個 manager 管理。
- 習題2
1 from multiprocessing import Manager, Process 2 3 def worker(dt, lt): 4 for i in range(10): 5 dt[i] = i*i 6 lt += [x for x in range(11, 20)] 7 8 if __name__ == ‘__main__‘: 9 manager = Manager() 10 dt = manager.dict() 11 lt = manager.list() 12 p = Process(target=worker, args=(dt, lt)) 13 p.start() 14 p.join(timeout=3) 15 print(type(dt)) 16 print(dt) 17 print(type(lt)) 18 print(lt)
<class ‘multiprocessing.managers.DictProxy‘> {0: 0, 1: 1, 2: 4, 3: 9, 4: 16, 5: 25, 6: 36, 7: 49, 8: 64, 9: 81} <class ‘multiprocessing.managers.ListProxy‘> [11, 12, 13, 14, 15, 16, 17, 18, 19]
- pool -- 進程管理
如果有50個任務要執行, 但是 CPU 只有4核, 你可以創建50個進程來做這個事情。但是大可不必,徒增管理開銷。如果你只想創建4個進程,讓他們輪流替你完成任務,不用自己去管理具體的進程的創建銷毀,那 Pool 是非常有用的。
Pool 是進程池,進程池能夠管理一定的進程,當有空閑進程時,則利用空閑進程完成任務,直到所有任務完成為止。
Pool 進程池創建4個進程,不管有沒有任務,都一直在進程池中等候,等到有數據的時候就開始執行。
Pool 的 API 列表如下:
- apply(func[, args[, kwds]])
- apply_async(func[, args[, kwds[, callback]]])
- close()
- terminate()
- join()
註意:在調用pool.join()之前,必須先調用pool.close(),否則會出錯。執行完close()後不會有新的進程加入到pool中,join()會等待所有子進程結束。
pool.apply_async 非阻塞(異步執行),定義的進程池最大進程數可同時執行
pool.apply 阻塞,一個進程結束後釋放回池,下一個進程才開始執行
- 習題3
1 import multiprocessing 2 import time 3 4 5 def fun(msg): 6 print("#########start#### {0}".format(msg)) 7 time.sleep(3) 8 print("#########end###### {0}".format(msg)) 9 10 11 if __name__ == ‘__main__‘: 12 print("start main") 13 pool = multiprocessing.Pool(processes=3) 14 for i in range(1, 7): 15 msg = "hello {0}".format(i) 16 # pool.apply_async(fun, (msg,))# 執行時間 6s+ 17 pool.apply(fun, (msg,)) #執行時間 6*3=18+ 18 pool.close()#在調用join之前,要先調用close,否則會報錯,close執行完不會有新的進程加入到pool 19 pool.join()#join 是等待所有的子進程結束 20 print("end main")
start main #########start#### hello 1 #########end###### hello 1 #########start#### hello 2 #########end###### hello 2 #########start#### hello 3 #########end###### hello 3 #########start#### hello 4 #########end###### hello 4 #########start#### hello 5 #########end###### hello 5 #########start#### hello 6 #########end###### hello 6 end main
1 import multiprocessing 2 import time 3 4 5 def fun(msg): 6 print("#########start#### {0}".format(msg)) 7 time.sleep(3) 8 print("#########end###### {0}".format(msg)) 9 10 11 if __name__ == ‘__main__‘: 12 print("start main") 13 pool = multiprocessing.Pool(processes=3) 14 for i in range(1, 7): 15 msg = "hello {0}".format(i) 16 pool.apply_async(fun, (msg,))# 執行時間 6s+ 17 # pool.apply(fun, (msg,)) #執行時間 6*3=18+ 18 pool.close()#在調用join之前,要先調用close,否則會報錯,close執行完不會有新的進程加入到pool 19 pool.join()#join 是等待所有的子進程結束 20 print("end main")
start main #########start#### hello 1 #########start#### hello 2 #########start#### hello 3 #########end###### hello 1 #########end###### hello 2 #########end###### hello 3 #########start#### hello 4 #########start#### hello 5 #########start#### hello 6 #########end###### hello 5 #########end###### hello 6 #########end###### hello 4 end main
- threading -- 多線程,IO密集型操作
多線程實現方式有兩種:
方法一:將要執行的方法作為參數傳給Thread的構造方法(和多進程類似)
t = threading.Thread(target=func, args=(i,))
方法二:從Thread()繼承,並重寫run()
線程 < 進程,1個父進程中含多個線程。
多線程和多進程的不同之處在於:多線程本身是可以和父進程共享內存的。這也是為什麽其中一個線程掛掉會導致其他線程也死掉的道理。
- 習題4
1 import threading 2 import time 3 4 def worker(args): 5 print("開始子進程 {0}".format(args)) 6 time.sleep(args) 7 print("結束子進程 {0}".format(args)) 8 9 if __name__ == ‘__main__‘: 10 11 print("start main") 12 t1 = threading.Thread(target=worker, args=(1,)) 13 t2 = threading.Thread(target=worker, args=(2,)) 14 t1.start() 15 t2.start() 16 t1.join() 17 t2.join() 18 print("end main")
start main 開始子進程 1 開始子進程 2 結束子進程 1 結束子進程 2 end main
- 習題5
1 import threading 2 import time 3 4 class Hello(threading.Thread): 5 def __init__(self, args): 6 super(Hello, self).__init__() 7 self.args = args 8 9 def run(self): 10 print("開始子進程 {0}".format(self.args)) 11 time.sleep(1) 12 print("結束子進程 {0}".format(self.args)) 13 14 if __name__ == ‘__main__‘: 15 16 a = 1 17 print("start main") 18 t1 = Hello(1) 19 t2 = Hello(2) 20 t1.start() 21 t2.start() 22 print("end main")
start main 開始子進程 1 開始子進程 2 end main 結束子進程 1 結束子進程 2
- 習題6
1 import threading 2 import time 3 4 class Hello(threading.Thread): 5 def __init__(self, args): 6 super(Hello, self).__init__() 7 self.args = args 8 global a 9 print("a = {0}".format(a)) 10 a += 1 11 12 def run(self): 13 print("開始子進程 {0}".format(self.args)) 14 print("結束子進程 {0}".format(self.args)) 15 16 if __name__ == ‘__main__‘: 17 a = 1 18 print("start main") 19 t1 = Hello(5) 20 time.sleep(3) 21 t2 = Hello(5) 22 t1.start() 23 t2.start() 24 print("#####a = {0}####".format(a)) 25 print("end main")
start main a = 1 a = 2 開始子進程 5 結束子進程 5 開始子進程 5 結束子進程 5 #####a = 3#### end main
- 習題7 (線程池)
1 import threadpool 2 3 def hello(m, n, o): 4 print("m = {0} n={1} o={2}".format(m, n, o)) 5 6 if __name__ == ‘__main__‘: 7 # 方法1 8 lst_vars_1 = [‘1‘, ‘2‘, ‘3‘] 9 lst_vars_2 = [‘4‘, ‘5‘, ‘6‘] 10 func_var = [(lst_vars_1, None), (lst_vars_2, None)] 11 # 方法2 12 # dict_vars_1 = {‘m‘: ‘1‘, ‘n‘: ‘2‘, ‘o‘: ‘3‘} 13 # dict_vars_2 = {‘m‘: ‘4‘, ‘n‘: ‘5‘, ‘o‘: ‘6‘} 14 # func_var = [(None, dict_vars_1), (None, dict_vars_2)] 15 16 pool = threadpool.ThreadPool(2) 17 requests = threadpool.makeRequests(hello, func_var) 18 [pool.putRequest(req) for req in requests] 19 pool.wait()
m = 1 n=2 o=3
m = 4 n=5 o=6
2018年5月22日筆記