1. 程式人生 > >python中socket、程序、執行緒、協程、池的建立方式和應用場景

python中socket、程序、執行緒、協程、池的建立方式和應用場景

複製程式碼 複製程式碼
程序
場景 利用多核、高計算型的程式、啟動數量有限
    程序是計算機中最小的資源分配單位
    程序和執行緒是包含關係 每個程序中都至少有一條執行緒
    可以利用多核,資料隔離
    建立 銷燬 切換 時間開銷都比較大
    隨著開啟的數量增加 給作業系統帶來負擔

執行緒 高IO型 排程是我們不能干預的 我們只能寫我們自己的邏輯
場景 一些協程現有的模組不能完成幫助我們規避IO操作的功能 適合使用多執行緒   urllib
    被CPU排程的最小單位,執行緒的切換時作業系統完成的
    在cpython直譯器下不能利用多核,資料共享
    建立 銷燬 切換 時間開銷都比程序小很多
    隨著開啟的數量增加 給作業系統帶來負擔

協程 高IO型 使用者可以自己控制的 我們能否搶佔更多的資源完全取決於我們切換策略
場景 一些通用的場景 可以用協程現有的模組來規避一些IO操作適合使用協程
    協程的切換工作是使用者完成的
    是一個執行緒,完全不能利用多核,不會產生資料不安全的現象
    多個任務之間互相切換不依賴作業系統,無論開啟多少個協程都不會給作業系統帶來負擔
複製程式碼

一、TCP-socket 服務端: import socket tcp_sk = socket.socket() tcp_sk.setsockopt(socket.SOL_SOCKET,socket.SO_REUSEADDR,1) tcp_sk.bind(('127.0.0.1',8000)) tcp_sk.listen() conn,addr = tcp_sk.accept() conn.send('你好'.encode('utf-8')) print(conn.recv(1024).decode('utf-8
')) conn.close() tcp_sk.close() 客戶端: import socket sk = socket.socket() sk.connect(('127.0.0.1',8000)) print(sk.recv(1024).decode('utf-8')) sk.send('嘿嘿嘿'.encode('utf-8')) sk.close() 二、UDP-socket 服務端: import socket udp_sk = socket.socket(type=socket.SOCK_DGRAM) udp_sk.setsockopt(socket.SOL_SOCKET,socket.SO_REUSEADDR,
1) udp_sk.bind(('127.0.0.1',8001)) msg,addr = udp_sk.recvfrom(1024) print(msg.decode('utf-8')) udp_sk.sendto('你好'.encode('utf-8'),addr) udp_sk.close() 客戶端: import socket sk = socket.socket(type=socket.SOCK_DGRAM) sk.sendto('哈哈'.encode('utf-8'),('127.0.0.1',8001)) msg,addr = sk.recvfrom(1024) print(msg.decode('utf-8')) sk.close() 三、socketserver 服務端: import socketserver class Myserver(socketserver.BaseRequestHandler): def handle(self): conn = self.request while True: conn.send(b'hello') print(conn.recv(1024).decode('utf-8')) socketserver.TCPServer.allow_reuse_address = True server = socketserver.ThreadingTCPServer(('127.0.0.1',8080),Myserver) server.serve_forever() 客戶端: import socket sk = socket.socket() sk.connect(('127.0.0.1',8080)) while True: ret = sk.recv(1024) print(ret.decode('utf-8')) sk.send(b'hiworld') sk.close() 四、程序 方式一、 from multiprocessing import Process def func(arg): print(arg) if __name__ == '__main__': p = Process(target=func,args=('子程序',)) p.start() p.join() print('主程序') 方式二、 from multiprocessing import Process class MyProcess(Process): def __init__(self,name): super().__init__() self.name = name def run(self): print(self.name) if __name__ == '__main__': p = MyProcess('小明') p.start() 五、執行緒 方式一、 from threading import Thread import time def sleep_boy(name): time.sleep(1) print('%s is sleeping' %name) t = Thread(target=sleep_boy,args=('xiaoming',)) # 這裡可以不需要main,因為現在只是在一個程序內操作,不需要匯入程序就不會import主程序了 t.start() print('主執行緒') 方式二、 from threading import Thread import time class Sleep_boy(Thread): def __init__(self,name): super().__init__() self.name = name def run(self): time.sleep(1) print('%s is sleeping' % self.name) t = Sleep_boy('xiaoming') t.start() print('主執行緒') 六、協程 1、greenlet例子: import time from greenlet import greenlet def cooking(): print('cooking 1') g2.switch() # 切換到g2,讓g2的函式工作 time.sleep(1) print('cooking 2') def watch(): print('watch TV 1') time.sleep(1) print('watch TV 2') g1.switch() # 切換到g1,讓g1的函式工作 g1 = greenlet(cooking) g2 = greenlet(watch) g1.switch() # 切換到g1,讓g1的函式工作 greenlet的缺陷:很顯然greenlet實現了協程的切換功能,可以自己設定什麼時候切,在哪切,但是它遇到阻塞並沒有自動切換, 因此並不能提高效率。所以一般我們都使用gevent模組實現協程 2、gevent例子1: from gevent import monkey monkey.patch_all() import time import gevent def cooking(): print('cooking 1') time.sleep(1) print('cooking 2') def watch(): print('watch TV 1') time.sleep(1) print('watch TV 2') g1 = gevent.spawn(cooking) # 自動檢測阻塞事件,遇見阻塞了就會進行切換 g2 = gevent.spawn(watch) g1.join() # 阻塞直到g1結束 g2.join() # 阻塞直到g2結束

複製程式碼
gevent例子2:
import gevent
def cooking(i):
    print('%s號在煮飯' %i)
    return i

g_lst = []
for i in range(10):
    g = gevent.spawn(cooking,i) # 函式名,引數
    g_lst.append(g)   # 把協程物件放入列表

for g in g_lst:
    g.join()
    print(g.value)  # 列印返回值

# gevent.joinall(g_lst) # joinall一次性把全部物件都阻塞
複製程式碼
 
  

七、程序池 1、同步提交apply: import os import time from multiprocessing import Pool def test(num): time.sleep(1) print('%s:%s' %(num,os.getpid())) return num*2 if __name__ == '__main__': p = Pool() for i in range(20): res = p.apply(test,args=(i,)) # 提交任務的方法 同步提交 print('-->',res) # res就是test的return的值,同步提交的返回值可以直接使用 2、非同步提交apply_async: 2-1無返回值: import time from multiprocessing import Pool def func(num): time.sleep(1) print('做了%s件衣服'%num) if __name__ == '__main__': p = Pool(4) # 程序池中建立4個程序,不寫的話,預設值為你電腦的CUP數量 for i in range(50): p.apply_async(func,args=(i,)) # 非同步提交func到一個子程序中執行,沒有返回值的情況 p.close() # 關閉程序池,使用者不能再向這個池中提交任務了 p.join() # 阻塞,直到程序池中所有的任務都被執行完 2-2有返回值: import time import os from multiprocessing import Pool def test(num): time.sleep(1) print('%s:%s' %(num,os.getpid())) return num*2 if __name__ == '__main__': p = Pool() res_lst = [] for i in range(20): res = p.apply_async(test,args=(i,)) # 提交任務的方法 非同步提交 res_lst.append(res) for res in res_lst: print(res.get()) # 非同步提交的返回值需要get,get有阻塞效果,此時就不需要close和join 2-3map: map接收一個函式和一個可迭代物件,是非同步提交的簡化版本,自帶close和join方法 可迭代物件的每一個值就是函式接收的實參,可迭代物件的長度就是建立的任務數量 map可以直接拿到返回值的可迭代物件(列表),迴圈就可以獲取返回值 import time from multiprocessing import Pool def func(num): print('子程序:',num) # time.sleep(1) return num if __name__ == '__main__': p = Pool() ret = p.map(func,range(10)) # ret是列表 for i in ret: print('返回值:',i) 2-4回撥函式: import os from multiprocessing import Pool def func(i): print('子程序:',os.getpid()) return i def call_back(res): print('回撥函式:',os.getpid()) print('res--->',res) if __name__ == '__main__': p = Pool() print('主程序:',os.getpid()) p.apply_async(func,args=(1,),callback=call_back) # callback關鍵字傳參,引數是回撥函式 p.close() p.join() 八、程序池、執行緒池 執行緒池: 1import time from concurrent.futures import ThreadPoolExecutor def func(i): print('thread',i) time.sleep(1) print('thread %s end'%i) tp = ThreadPoolExecutor(5) # 相當於tp = Pool(5) tp.submit(func,1) # 相當於tp.apply_async(func,args=(1,)) tp.shutdown() # 相當於tp.close() + tp.join() print('主執行緒') 2import time from concurrent.futures import ThreadPoolExecutor from threading import currentThread def func(i): print('thread',i,currentThread().ident) time.sleep(1) print('thread %s end'%i) tp = ThreadPoolExecutor(5) for i in range(20): tp.submit(func,i) tp.shutdown() # shutdown一次就夠了,會自動把所有的執行緒都join() print('主執行緒') 3、返回值 import time from concurrent.futures import ThreadPoolExecutor from threading import currentThread def func(i): print('thread',i,currentThread().ident) time.sleep(1) print('thread %s end' %i) return i * '*' tp = ThreadPoolExecutor(5) ret_lst = [] for i in range(20): ret = tp.submit(func,i) ret_lst.append(ret) for ret in ret_lst: print(ret.result()) # 相當於ret.get() print('主執行緒') 4、map map接收一個函式和一個可迭代物件 可迭代物件的每一個值就是函式接收的實參,可迭代物件的長度就是建立的執行緒數量 map可以直接拿到返回值的可迭代物件(列表),迴圈就可以獲取返回值 import time from concurrent.futures import ThreadPoolExecutor def func(i): print('thread',i) time.sleep(1) print('thread %s end'%i) return i * '*' tp = ThreadPoolExecutor(5) ret = tp.map(func,range(20)) for i in ret: print(i) 5、回撥函式 回撥函式在程序池是由主程序實現的 回撥函式線上程池是由子執行緒實現的 import time from concurrent.futures import ThreadPoolExecutor from threading import currentThread def func(i): print('thread',i,currentThread().ident) time.sleep(1) print('thread %s end'%i) return i * '*' def call_back(arg): print('call back : ',currentThread().ident) print('ret : ',arg.result()) # multiprocessing的Pool回撥函式中的引數不需要get(),這裡需要result() tp = ThreadPoolExecutor(5) ret_lst = [] for i in range(20): tp.submit(func,i).add_done_callback(call_back) # 使用add_done_callback()方法實現回撥函式 print('主執行緒',currentThread().ident)
複製程式碼

 

複製程式碼 複製程式碼
程序
場景 利用多核、高計算型的程式、啟動數量有限
    程序是計算機中最小的資源分配單位
    程序和執行緒是包含關係 每個程序中都至少有一條執行緒
    可以利用多核,資料隔離
    建立 銷燬 切換 時間開銷都比較大
    隨著開啟的數量增加 給作業系統帶來負擔

執行緒 高IO型 排程是我們不能干預的 我們只能寫我們自己的邏輯
場景 一些協程現有的模組不能完成幫助我們規避IO操作的功能 適合使用多執行緒   urllib
    被CPU排程的最小單位,執行緒的切換時作業系統完成的
    在cpython直譯器下不能利用多核,資料共享
    建立 銷燬 切換 時間開銷都比程序小很多
    隨著開啟的數量增加 給作業系統帶來負擔

協程 高IO型 使用者可以自己控制的 我們能否搶佔更多的資源完全取決於我們切換策略
場景 一些通用的場景 可以用協程現有的模組來規避一些IO操作適合使用協程
    協程的切換工作是使用者完成的
    是一個執行緒,完全不能利用多核,不會產生資料不安全的現象
    多個任務之間互相切換不依賴作業系統,無論開啟多少個協程都不會給作業系統帶來負擔
複製程式碼

一、TCP-socket 服務端: import socket tcp_sk = socket.socket() tcp_sk.setsockopt(socket.SOL_SOCKET,socket.SO_REUSEADDR,1) tcp_sk.bind(('127.0.0.1',8000)) tcp_sk.listen() conn,addr = tcp_sk.accept() conn.send('你好'.encode('utf-8')) print(conn.recv(1024).decode('utf-8')) conn.close() tcp_sk.close() 客戶端: import socket sk = socket.socket() sk.connect(('127.0.0.1',8000)) print(sk.recv(1024).decode('utf-8')) sk.send('嘿嘿嘿'.encode('utf-8')) sk.close() 二、UDP-socket 服務端: import socket udp_sk = socket.socket(type=socket.SOCK_DGRAM) udp_sk.setsockopt(socket.SOL_SOCKET,socket.SO_REUSEADDR,1) udp_sk.bind(('127.0.0.1',8001)) msg,addr = udp_sk.recvfrom(1024) print(msg.decode('utf-8')) udp_sk.sendto('你好'.encode('utf-8'),addr) udp_sk.close() 客戶端: import socket sk = socket.socket(type=socket.SOCK_DGRAM) sk.sendto('哈哈'.encode('utf-8'),('127.0.0.1',8001)) msg,addr = sk.recvfrom(1024) print(msg.decode('utf-8')) sk.close() 三、socketserver 服務端: import socketserver class Myserver(socketserver.BaseRequestHandler): def handle(self): conn = self.request while True: conn.send(b'hello') print(conn.recv(1024).decode('utf-8')) socketserver.TCPServer.allow_reuse_address = True server = socketserver.ThreadingTCPServer(('127.0.0.1',8080),Myserver) server.serve_forever() 客戶端: import socket sk = socket.socket() sk.connect(('127.0.0.1',8080)) while True: ret = sk.recv(1024) print(ret.decode('utf-8')) sk.send(b'hiworld') sk.close() 四、程序 方式一、 from multiprocessing import Process def func(arg): print(arg) if __name__ == '__main__': p = Process(target=func,args=('子程序',)) p.start() p.join() print('主程序') 方式二、 from multiprocessing import Process class MyProcess(Process): def __init__(self,name): super().__init__() self.name = name def run(self): print(self.name) if __name__ == '__main__': p = MyProcess('小明') p.start() 五、執行緒 方式一、 from threading import Thread import time def sleep_boy(name): time.sleep(1) print('%s is sleeping' %name) t = Thread(target=sleep_boy,args=('xiaoming',)) # 這裡可以不需要main,因為現在只是在一個程序內操作,不需要匯入程序就不會import主程序了 t.start() print('主執行緒') 方式二、 from threading import Thread import time class Sleep_boy(Thread): def __init__(self,name): super().__init__() self.name = name def run(self): time.sleep(1) print('%s is sleeping' % self.name) t = Sleep_boy('xiaoming') t.start() print('主執行緒') 六、協程 1、greenlet例子: import time from greenlet import greenlet def cooking(): print('cooking 1') g2.switch() # 切換到g2,讓g2的函式工作 time.sleep(1) print('cooking 2') def watch(): print('watch TV 1') time.sleep(1) print('watch TV 2') g1.switch() # 切換到g1,讓g1的函式工作 g1 = greenlet(cooking) g2 = greenlet(watch) g1.switch() # 切換到g1,讓g1的函式工作 greenlet的缺陷:很顯然greenlet實現了協程的切換功能,可以自己設定什麼時候切,在哪切,但是它遇到阻塞並沒有自動切換, 因此並不能提高效率。所以一般我們都使用gevent模組實現協程 2、gevent例子1: from gevent import monkey monkey.patch_all() import time import gevent def cooking(): print('cooking 1') time.sleep(1) print('cooking 2') def watch(): print('watch TV 1') time.sleep(1) print('watch TV 2') g1 = gevent.spawn(cooking) # 自動檢測阻塞事件,遇見阻塞了就會進行切換 g2 = gevent.spawn(watch) g1.join() # 阻塞直到g1結束 g2.join() # 阻塞直到g2結束

複製程式碼
gevent例子2:
import gevent
def cooking(i):
    print('%s號在煮飯' %i)
    return i

g_lst = []
for i in range(10):
    g = gevent.spawn(cooking,i) # 函式名,引數
    g_lst.append(g)   # 把協程物件放入列表

for g in g_lst:
    g.join()
    print(g.value)  # 列印返回值

# gevent.joinall(g_lst) # joinall一次性把全部物件都阻塞
複製程式碼
 
 

七、程序池 1、同步提交apply: import os import time from multiprocessing import Pool def test(num): time.sleep(1) print('%s:%s' %(num,os.getpid())) return num*2 if __name__ == '__main__': p = Pool() for i in range(20): res = p.apply(test,args=(i,)) # 提交任務的方法 同步提交 print('-->',res) # res就是test的return的值,同步提交的返回值可以直接使用 2、非同步提交apply_async: 2-1無返回值: import time from multiprocessing import Pool def func(num): time.sleep(1) print('做了%s件衣服'%num) if __name__ == '__main__': p = Pool(4) # 程序池中建立4個程序,不寫的話,預設值為你電腦的CUP數量 for i in range(50): p.apply_async(func,args=(i,)) # 非同步提交func到一個子程序中執行,沒有返回值的情況 p.close() # 關閉程序池,使用者不能再向這個池中提交任務了 p.join() # 阻塞,直到程序池中所有的任務都被執行完 2-2有返回值: import time import os from multiprocessing import Pool def test(num): time.sleep(1) print('%s:%s' %(num,os.getpid())) return num*2 if __name__ == '__main__': p = Pool() res_lst = [] for i in range(20): res = p.apply_async(test,args=(i,)) # 提交任務的方法 非同步提交 res_lst.append(res) for res in res_lst: print(res.get()) # 非同步提交的返回值需要get,get有阻塞效果,此時就不需要close和join 2-3map: map接收一個函式和一個可迭代物件,是非同步提交的簡化版本,自帶close和join方法 可迭代物件的每一個值就是函式接收的實參,可迭代物件的長度就是建立的任務數量 map可以直接拿到返回值的可迭代物件(列表),迴圈就可以獲取返回值 import time from multiprocessing import Pool def func(num): print('子程序:',num) # time.sleep(1) return num if __name__ == '__main__': p = Pool() ret = p.map(func,range(10)) # ret是列表 for i in ret: print('返回值:',i) 2-4回撥函式: import os from multiprocessing import Pool def func(i): print('子程序:',os.getpid()) return i def call_back(res): print('回撥函式:',os.getpid()) print('res--->',res) if __name__ == '__main__': p = Pool() print('主程序:',os.getpid()) p.apply_async(func,args=(1,),callback=call_back) # callback關鍵字傳參,引數是回撥函式 p.close() p.join() 八、程序池、執行緒池 執行緒池: 1import time from concurrent.futures import ThreadPoolExecutor def func(i): print('thread',i) time.sleep(1) print('thread %s end'%i) tp = ThreadPoolExecutor(5) # 相當於tp = Pool(5) tp.submit(func,1) # 相當於tp.apply_async(func,args=(1,)) tp.shutdown() # 相當於tp.close() + tp.join() print('主執行緒') 2import time from concurrent.futures import ThreadPoolExecutor from threading import currentThread def func(i): print('thread',i,currentThread().ident) time.sleep(1) print('thread %s end'%i) tp = ThreadPoolExecutor(5) for i in range(20): tp.submit(func,i) tp.shutdown() # shutdown一次就夠了,會自動把所有的執行緒都join() print('主執行緒') 3、返回值 import time from concurrent.futures import ThreadPoolExecutor from threading import currentThread def func(i): print('thread',i,currentThread().ident) time.sleep(1) print('thread %s end' %i) return i * '*' tp = ThreadPoolExecutor(5) ret_lst = [] for i in range(20): ret = tp.submit(func,i) ret_lst.append(ret) for ret in ret_lst: print(ret.result()) # 相當於ret.get() print('主執行緒') 4、map map接收一個函式和一個可迭代物件 可迭代物件的每一個值就是函式接收的實參,可迭代物件的長度就是建立的執行緒數量 map可以直接拿到返回值的可迭代物件(列表),迴圈就可以獲取返回值 import time from concurrent.futures import ThreadPoolExecutor def func(i): print('thread',i) time.sleep(1) print('thread %s end'%i) return i * '*' tp = ThreadPoolExecutor(5) ret = tp.map(func,range(20)) for i in ret: print(i) 5、回撥函式 回撥函式在程序池是由主程序實現的 回撥函式線上程池是由子執行緒實現的 import time from concurrent.futures import ThreadPoolExecutor from threading import currentThread def func(i): print('thread',i,currentThread().ident) time.sleep(1) print('thread %s end'%i) return i * '*' def call_back(arg): print('call back : ',currentThread().ident) print('ret : ',arg.result()) # multiprocessing的Pool回撥函式中的引數不需要get(),這裡需要result() tp = ThreadPoolExecutor(5) ret_lst = [] for i in range(20): tp.submit(func,i).add_done_callback(call_back) # 使用add_done_callback()方法實現回撥函式 print('主執行緒',currentThread().ident)
複製程式碼