1. 程式人生 > >2018年5月22日筆記

2018年5月22日筆記

req 阻塞 OS rom ron fun star 因此 --

  • Python共享內存

共享內存有兩個結構,一個是 Value, 一個是 Array,這兩個結構內部都實現了鎖機制,因此是多進程安全的。

Value 和 Array 都需要設置其中存放值的類型,d 是 double 類型,i 是 int 類型,具體的對應關系在Python 標準庫的 sharedctypes 模塊中查看。

  • 習題1
 1 from multiprocessing import Value, Array, Process
 2 
 3 def woker(arr):
 4     for i in range(len(arr)):
 5         arr[i] = -arr[i]
6 7 if __name__ == __main__: 8 arr = Array(i, [x for x in range(10)]) 9 print(arr) 10 print(arr[:]) 11 p = Process(target=woker, args=(arr,)) 12 p.start() 13 p.join() # 等子進程先執行完,否則兩次print結果相同 14 print(arr[:])
<SynchronizedArray wrapper for <multiprocessing.sharedctypes.c_int_Array_10 object at 0x102c76510>>
[0, 
1, 2, 3, 4, 5, 6, 7, 8, 9] [0, -1, -2, -3, -4, -5, -6, -7, -8, -9]

  • Manager -- 專門用來做數據共享的模塊

上面的共享內存支持兩種結構 Value 和 Array。 Python 中還有一個強大的Manager,專門用來做數據共享。

其支持的類型非常多,比如list, dict, Namespace, Lock, RLock, Semaphore, BoundedSemaphore, Condition, Event, Queue, Value 和 Array。

一個 Manager 對象是一個服務進程,推薦多進程程序中,數據共享就用一個 manager 管理。

  • 習題2
 1 from multiprocessing import Manager, Process
 2 
 3 def worker(dt, lt):
 4     for i in range(10):
 5         dt[i] = i*i
 6     lt += [x for x in range(11, 20)]
 7 
 8 if __name__ == __main__:
 9     manager = Manager()
10     dt = manager.dict()
11     lt = manager.list()
12     p = Process(target=worker, args=(dt, lt))
13     p.start()
14     p.join(timeout=3)
15     print(type(dt))
16     print(dt)
17     print(type(lt))
18     print(lt)
<class multiprocessing.managers.DictProxy>
{0: 0, 1: 1, 2: 4, 3: 9, 4: 16, 5: 25, 6: 36, 7: 49, 8: 64, 9: 81}
<class multiprocessing.managers.ListProxy>
[11, 12, 13, 14, 15, 16, 17, 18, 19]

  • pool -- 進程管理

如果有50個任務要執行, 但是 CPU 只有4核, 你可以創建50個進程來做這個事情。但是大可不必,徒增管理開銷。如果你只想創建4個進程,讓他們輪流替你完成任務,不用自己去管理具體的進程的創建銷毀,那 Pool 是非常有用的。

Pool 是進程池,進程池能夠管理一定的進程,當有空閑進程時,則利用空閑進程完成任務,直到所有任務完成為止。

Pool 進程池創建4個進程,不管有沒有任務,都一直在進程池中等候,等到有數據的時候就開始執行。
Pool 的 API 列表如下:

    • apply(func[, args[, kwds]])
    • apply_async(func[, args[, kwds[, callback]]])
    • close()
    • terminate()
    • join()

註意:在調用pool.join()之前,必須先調用pool.close(),否則會出錯。執行完close()後不會有新的進程加入到pool中,join()會等待所有子進程結束。

pool.apply_async  非阻塞(異步執行),定義的進程池最大進程數可同時執行

pool.apply      阻塞,一個進程結束後釋放回池,下一個進程才開始執行

  • 習題3
 1 import multiprocessing
 2 import time
 3 
 4 
 5 def fun(msg):
 6     print("#########start#### {0}".format(msg))
 7     time.sleep(3)
 8     print("#########end###### {0}".format(msg))
 9 
10 
11 if __name__ == __main__:
12     print("start main")
13     pool = multiprocessing.Pool(processes=3)
14     for i in range(1, 7):
15         msg = "hello {0}".format(i)
16         # pool.apply_async(fun, (msg,))# 執行時間 6s+
17         pool.apply(fun, (msg,))   #執行時間 6*3=18+
18     pool.close()#在調用join之前,要先調用close,否則會報錯,close執行完不會有新的進程加入到pool
19     pool.join()#join 是等待所有的子進程結束
20     print("end main")
start main
#########start#### hello 1
#########end###### hello 1
#########start#### hello 2
#########end###### hello 2
#########start#### hello 3
#########end###### hello 3
#########start#### hello 4
#########end###### hello 4
#########start#### hello 5
#########end###### hello 5
#########start#### hello 6
#########end###### hello 6
end main

 1 import multiprocessing
 2 import time
 3 
 4 
 5 def fun(msg):
 6     print("#########start#### {0}".format(msg))
 7     time.sleep(3)
 8     print("#########end###### {0}".format(msg))
 9 
10 
11 if __name__ == __main__:
12     print("start main")
13     pool = multiprocessing.Pool(processes=3)
14     for i in range(1, 7):
15         msg = "hello {0}".format(i)
16         pool.apply_async(fun, (msg,))# 執行時間 6s+
17         # pool.apply(fun, (msg,))   #執行時間 6*3=18+
18     pool.close()#在調用join之前,要先調用close,否則會報錯,close執行完不會有新的進程加入到pool
19     pool.join()#join 是等待所有的子進程結束
20     print("end main")
start main
#########start#### hello 1
#########start#### hello 2
#########start#### hello 3
#########end###### hello 1
#########end###### hello 2
#########end###### hello 3
#########start#### hello 4
#########start#### hello 5
#########start#### hello 6
#########end###### hello 5
#########end###### hello 6
#########end###### hello 4
end main

  • threading -- 多線程,IO密集型操作

多線程實現方式有兩種:

  方法一:將要執行的方法作為參數傳給Thread的構造方法(和多進程類似)

      t = threading.Thread(target=func, args=(i,))

  方法二:從Thread()繼承,並重寫run()

線程 < 進程,1個父進程中含多個線程。

多線程和多進程的不同之處在於:多線程本身是可以和父進程共享內存的。這也是為什麽其中一個線程掛掉會導致其他線程也死掉的道理。

  • 習題4
 1 import threading
 2 import time
 3 
 4 def worker(args):
 5     print("開始子進程 {0}".format(args))
 6     time.sleep(args)
 7     print("結束子進程 {0}".format(args))
 8 
 9 if __name__ == __main__:
10 
11     print("start main")
12     t1 = threading.Thread(target=worker, args=(1,))
13     t2 = threading.Thread(target=worker, args=(2,))
14     t1.start()
15     t2.start()
16     t1.join()
17     t2.join()
18     print("end main")
start main
開始子進程 1
開始子進程 2
結束子進程 1
結束子進程 2
end main

  • 習題5
 1 import threading
 2 import time
 3 
 4 class Hello(threading.Thread):
 5     def __init__(self, args):
 6         super(Hello, self).__init__()
 7         self.args = args
 8 
 9     def run(self):
10         print("開始子進程 {0}".format(self.args))
11         time.sleep(1)
12         print("結束子進程 {0}".format(self.args))
13 
14 if __name__ == __main__:
15 
16     a = 1
17     print("start main")
18     t1 = Hello(1)
19     t2 = Hello(2)
20     t1.start()
21     t2.start()
22     print("end main")
start main
開始子進程 1
開始子進程 2
end main
結束子進程 1
結束子進程 2

  • 習題6
 1 import threading
 2 import time
 3 
 4 class Hello(threading.Thread):
 5     def __init__(self, args):
 6         super(Hello, self).__init__()
 7         self.args = args
 8         global a
 9         print("a = {0}".format(a))
10         a += 1
11 
12     def run(self):
13         print("開始子進程 {0}".format(self.args))
14         print("結束子進程 {0}".format(self.args))
15 
16 if __name__ == __main__:
17     a = 1
18     print("start main")
19     t1 = Hello(5)
20     time.sleep(3)
21     t2 = Hello(5)
22     t1.start()
23     t2.start()
24     print("#####a = {0}####".format(a))
25     print("end main")
start main
a = 1
a = 2
開始子進程 5
結束子進程 5
開始子進程 5
結束子進程 5
#####a = 3####
end main

  • 習題7 (線程池)
 1 import threadpool
 2 
 3 def hello(m, n, o):
 4     print("m = {0}  n={1}  o={2}".format(m, n, o))
 5 
 6 if __name__ == __main__:
 7     # 方法1
 8     lst_vars_1 = [1, 2, 3]
 9     lst_vars_2 = [4, 5, 6]
10     func_var = [(lst_vars_1, None), (lst_vars_2, None)]
11     # 方法2
12     # dict_vars_1 = {‘m‘: ‘1‘, ‘n‘: ‘2‘, ‘o‘: ‘3‘}
13     # dict_vars_2 = {‘m‘: ‘4‘, ‘n‘: ‘5‘, ‘o‘: ‘6‘}
14     # func_var = [(None, dict_vars_1), (None, dict_vars_2)]
15 
16     pool = threadpool.ThreadPool(2)
17     requests = threadpool.makeRequests(hello, func_var)
18     [pool.putRequest(req) for req in requests]
19     pool.wait()
m = 1  n=2  o=3
m = 4  n=5  o=6

2018年5月22日筆記