1. 程式人生 > >重修課程day34(網絡編程八之線程二)

重修課程day34(網絡編程八之線程二)

append .cn 自己 als 元組 timeout priority dom back

一 concurrent.futures模塊

 開啟進程池和線程池模塊。

  線程池和進程池的區別:進程池開啟的數量範圍只能在CPU對的核數以內。而線程池的數量可以自己隨便的定義,默認為是核數的五倍。

 相關用法:

  ThreadPoolExecutor:創建一個線程池

  ProcessPoolExecutor:創建一個進程池

  Executor:是一個抽象類,上面的兩個都是方法繼承這個抽象類的

  submit:異步的提交方式。

  map:簡化提交的方式,map自帶循環,只能單純的提交,用於int類型,並且沒有放回的結果。

  shutdown:提供的一個借口,等待進程池或著線程池執行完畢過後,回收調那裏的資源。它裏面的wait=True的時候,要等待執行解釋後才會有返回結果;如果wait=False,就會理解返回結果,等到任務執行結束後才能回收調進程池或者線程池裏面的資源。但是不能提交任務了。

  result:拿到結果。

# import concurrent.futures
# import os
# import time
# import random
#
# def work(n):
#     print(‘%s is working‘%os.getpid())
#     time.sleep(random.random())
#     return n
#
# if __name__==‘__main__‘:
#     excutor=concurrent.futures.ProcessPoolExecutor(4)
#     futures=[]
#     for i in range(20):
#         future=excutor.submit(work,i)
#         futures.append(future)
#     excutor.shutdown(wait=True)
#     print(‘%s is zhuing‘%os.getpid())
#     for future in futures:
#         print(future.result())

簡寫如下:

# import concurrent.futures
# import os
# import time
# import random
# def work(n):
#     print(‘%s is working‘ % os.getpid())
#     time.sleep(random.random())
#     return n
# if __name__ == ‘__main__‘:
#     excutor = concurrent.futures.ThreadPoolExecutor(4)
#     futures = [excutor.submit(work, i) for i in range(20)]
#     excutor.shutdown(wait=True)
#     print(‘%s is zhuing‘ % os.getpid())
#     for future in futures:
#         print(future.result())

回調函數:add_done_callback後面括號裏加上一個回調函數,回調函數接收的就是第一個函數返回的一個對像,使用時,還需要在回調函數的內部sunmit提交一下。

如下:

# import concurrent.futures
# import requests
# import time
# import random
# import os
# def work(url):
#     print(‘%s is %s‘%(os.getpid(),url))
#     ret=requests.get(url)
#     time.sleep(3)
#     if ret.status_code==200:
#         print(‘%s DONE %s‘%(os.getpid(),url))
#         return {‘url‘:url,‘text‘:ret.text}
# def foo(ret):
#     ret = ret.result()
#     print(‘%s FOO %s‘%(os.getpid(),ret[‘url‘]))
#     time.sleep(1)
#     res=‘%s;長度:%s‘%(ret[‘url‘],len(ret[‘text‘]))
#     with open(‘a.txt‘,‘a‘,encoding=‘utf-8‘)as f:
#         f.write(res+‘\n‘)
#
# if __name__==‘__main__‘:
#     url_list = [
#         ‘http://tool.chinaz.com/regex/‘,
#         ‘http://www.cnblogs.com/fangjie0410/‘,
#         "http://www.cnblogs.com/xuanan",
#         "http://www.cnblogs.com/bg0131/p/6430943.html",
#         "http://www.cnblogs.com/wupeiqi/",
#         "http://www.cnblogs.com/linhaifeng/",
#         "http://www.cnblogs.com/Eva-J/articles/7125925.html",
#         "http://www.cnblogs.com/Eva-J/articles/6993515.html",
#     ]
#     excutres=concurrent.futures.ProcessPoolExecutor()
#     for i in url_list:
#         excutres.submit(work,i).add_done_callback(foo)
#
#     print(‘主‘,os.getpid())

異常處理:exception:異常接口

concurrent.futures:自帶異常,所以python內置的異常是不能識別出來的。

raise:拋出異常

import concurrent.futures
import os
import time
import random
def work(n):
    print(‘%s is working‘ % os.getpid())
    time.sleep(random.random())
    raise Exception
    return n
if __name__ == ‘__main__‘:
    excutor = concurrent.futures.ThreadPoolExecutor(4)
    futures = []
    for i in range(20):
        future = excutor.submit(work, i).result()
        futures.append(future)
    excutor.shutdown(wait=True)
    print(‘%s is zhuing‘ % os.getpid())
    for future in futures:
        print(future)

cuncel:取消終止異常

二 死鎖現象和遞歸鎖現象

 什麽是死鎖:各自拿著對方想要的一把鎖,但是各自缺一把鎖而不能釋放

 Lock:就是互斥鎖但是容易出現死鎖。只能夠acquire一次,只要鎖不釋放(selease),就不能acquire了。

死鎖現象如下:

# import threading
# import time
# import random
# l1=threading.Lock()
# l2=threading.Lock()
# class Func(threading.Thread):
#     def run(self):
#         self.aa()
#         self.bb()
#
#     def aa(self):
#         l1.acquire()
#         print(111)
#         l2.acquire()
#         print(222)
#         l2.release()
#         l1.release()
#
#     def bb(self):
#         l2.acquire()
#         print(333)
#         time.sleep(random.random())
#         l1.acquire()
#         print(444)
#         l1.release()
#         l2.release()
#
# if __name__==‘__main__‘:
#     for i in range(10):
#         ret=Func()
#         ret.start()

RLock:遞歸鎖,可以賦值多個變量。

遞歸鎖現象如下:

# import threading
# import time
# import random
# r1=r2=threading.RLock()
# class Func(threading.Thread):
#     def run(self):
#         self.aa()
#         self.bb()
#
#     def aa(self):
#         r1.acquire()
#         print(111)
#         r2.acquire()
#         print(222)
#         r2.release()
#         r1.release()
#
#     def bb(self):
#         r1.acquire()
#         print(333)
#         time.sleep(random.random())
#         r2.acquire()
#         print(444)
#         r2.release()
#         r1.release()
#
# if __name__==‘__main__‘:
#     for i in range(10):
#         ret=Func()
#         ret.start()

遞歸鎖:沒加一次鎖,引用技術就會加1,沒減一次鎖,引用技術就會減一,並且可以同時acquire多次,只要技術不為0,就不能被其他的線程搶到。

三 信號量

 什麽是信號量:其實就是鎖,同時能夠創建多個鎖,實現了一個並發的效果。超出鎖的範圍的任務就只有等待所得釋放,才能夠搶到鎖。相當於產生一堆新的線程和進程。

  Semaphore:創建信號量,同時管理一個內置的計數器

# import threading
# import time
# import random
# import os
# def task(n):
#     with sm:
#         time.sleep(random.randint(1, 5))
#         print(‘%s is tasking‘%threading.current_thread().getName())
#
# if __name__==‘__main__‘:
#     sm=threading.Semaphore(5)
#     for i in range(20):
#         t=threading.Thread(target=task,args=(i,))
#         t.start()

可以指定信號量的個數。

四 事件Event

  Event:創建事件。用於線程之間的通信。Event對象實現了簡單的線程之間通信機制,它提供了設置信號,清除信號,等待等用於實現線程間的通信。

  set:設置Event內部的信號為真。

  wait:只有在內部信號為真的時候才會很快的執行並完成返回。當Event對象的內部信號標誌位假時,則wait方法一直等待到其為真時才返回。

  timeout:wait裏面的一個參數,時間參數,等待的時間範圍。

  使用Event對象的clear()方法可以清除Event對象內部的信號標誌,即將其設為假,當使用Event的clear方法後,isSet()方法返回假

  is_set:判斷是否傳入了信號

  isSet:返回event的狀態值

# import threading
# import time
# import random
# e=threading.Event()
# def work():
#     print(‘%s 正在檢測‘%threading.current_thread().getName())
#     time.sleep(random.randint(1,5))
#     e.set()
#
# def foo():
#     count=1
#     while not e.is_set():
#         if count > 3:
#             raise TimeoutError(‘等待超時‘)
#         print(‘%s 正在等待%s次連接‘%(threading.current_thread().getName(),count))
#         e.wait(timeout=random.randint(1,5))
#         count+=1
#     print(‘%s 正在連接‘%threading.current_thread().getName())
# if __name__==‘__main__‘:
#     t1=threading.Thread(target=work)
#     t2=threading.Thread(target=foo)
#     t1.start()
#     t2.start()

五 定時器

 Timer:定時器,是thread的一個派生類,用於在指定的時間後調用調用某個方法。

# import threading
# import random
# def hello(n):
#     print(‘hello‘,n)
#
# if __name__==‘__main__‘:
#     for i in range(20):
#         t=threading.Timer(random.random(),hello,args=(i,))
#         t.start()

六 線程queue:隊列

 Queue:先放入隊列裏的數據先讀取出來

# import queue
# import time
# import random
# import threading
# q=queue.Queue(5)
# def work(n):
#     time.sleep(random.randint(1,5))
#     q.put(‘%s is working‘%n)
#     print(n)
#
# def foo():
#     time.sleep(random.randint(1,3))
#     print(q.get())
#
# if __name__==‘__main__‘:
#     for i in range(20):
#         t1=threading.Thread(target=work,args=(i,))
#         t2=threading.Thread(target=foo)
#         t1.start()
#         t2.start()

 PriorityQueue:先讀取優先級最高的數據,傳參時,以元組的格式傳入,前面傳入數字,後面傳入內容。數字從小到大排的優先級,如果是數字相同,就會按照ascii碼從小到大排序

# import queue
# import time
# import random
# import threading
# q=queue.PriorityQueue()
# def work(n):
#     time.sleep(random.randint(1,5))
#     q.put(‘%s is working‘%n)
#     print(n)
#
# def foo():
#     time.sleep(random.randint(1,3))
#     print(q.get())
#
# if __name__==‘__main__‘:
#     for i in range(20):
#         t1=threading.Thread(target=work,args=(i,))
#         t2=threading.Thread(target=foo)
#         t1.start()
#         t2.start()

 LifoQueue:先進後出,也叫做堆棧。讀取數據時,按照時間短的先讀取。

# import queue
# import random
# import time
# import threading
# q=queue.LifoQueue()
# def work(n):
#     time.sleep(random.randint(1, 5))
#     q.put((random.randint(1,20),‘jie_%s‘%n))
#     print(n)
#
# def foo():
#     print(q.get())
#     time.sleep(random.randint(1,3))
#
# if __name__==‘__main__‘:
#     for i in range(30):
#         t1=threading.Thread(target=work,args=(1,))
#         t2=threading.Thread(target=foo)
#         t1.start()
#         t2.start()

重修課程day34(網絡編程八之線程二)