1. 程式人生 > >重修課程day32(網絡編程六之進程三)

重修課程day32(網絡編程六之進程三)

zha 水平擴展 生產者和消費者 就會 介紹 with open class 指向 空間

 什麽叫做水平擴展:增加計算機的數量,並沒有提高計算機的性能

 什麽叫開源:開放源代碼

 什麽叫做虛擬化:同時跑多個系統

一 JoinableQueue模塊

 JoinableQueue模塊介紹:比Queue多了兩個函數,一個是task_done,另一個是join,都是專用於全球進行編程的,大多數用於生產者和消費者之間的。

  task_done:是用在get後面的 ,告訴os已經處理完了內容。

  join:是說Queue裏面的生產數據全部處理完了。

使用方法如下:

# import multiprocessing
# import time
# import random
# def sheng(name,q,wuping):
#     for i in range(5):
#         time.sleep(random.random())
#         ret=‘%s%s‘%(wuping,i)
#         q.put(ret)
#         print(‘廚師%s創建了%s‘%(name,ret))
#     q.join()
# def xiao(name,q):
#     while True:
#         time.sleep(random.random())
#         ret=q.get()
#         print(‘%s吃了%s‘%(name,ret))
#         q.task_done()
# if __name__==‘__main__‘:
#     q=multiprocessing.JoinableQueue()
#     s1=multiprocessing.Process(target=sheng,args=(‘egon1‘,q,‘包子‘))
#     s2=multiprocessing.Process(target=sheng,args=(‘egon2‘,q,‘rou‘))
#     s3=multiprocessing.Process(target=sheng,args=(‘egon3‘,q,‘骨頭‘))
#     x1=multiprocessing.Process(target=xiao,args=(‘alex1‘,q))
#     x2=multiprocessing.Process(target=xiao,args=(‘alex2‘,q))
#     x1.daemon=True
#     x2.daemon=True
#     s1.start()
#     s2.start()
#     s3.start()
#     x1.start()
#     x2.start()
#     s1.join()
#     s2.join()
#     s3.join()
 

二 Manager:共享內存空間

 dict:共享的以惡搞字典,能夠被多個共享

如下1:

# import multiprocessing
# def walk(d):
#     d[‘count‘]-=1
# if __name__==‘__main__‘:
#     m=multiprocessing.Manager()
#     d=m.dict({‘count‘:100})
#     for i in range(100):
#         w=multiprocessing.Process(target=walk,args=(d,))
#         w.start()
#     w.join()
#     print(d)

如下2:

# import multiprocessing
# def walk(d,loak):
#     with loak:
#         d[‘count‘]-=1
# if __name__==‘__main__‘:
#     m=multiprocessing.Manager()
#     d=m.dict({‘count‘:100})
#     lock=multiprocessing.Lock()
#     li=[]
#     for i in range(100):
#         w=multiprocessing.Process(target=walk,args=(d,lock))
#         li.append(w)
#         w.start()
#     for j in li:
#         j.join()
#     print(j)
#     print(d)

三 進程池

 什麽是進程池:創建一定數量的進程個數

 同步和異步:提交任務的兩種方式。

 Pool:創建進程池和控制進程的數目,默認的個數是根據CPU的核數

 apply:傳入兩個參數,第一個是指定任務。向進程池提交一個任務,實現了串行和同步調用。結束任務後,立馬會拿到結果。

  開啟的進程數目有幾個,就會有幾個pid。

 什麽是同步調用:提交一個任務,等到任務結束後才能執行下一個任務。

# import multiprocessing
# import time
# import random
# import os
# def walk(n):
#     print(‘%s is walking‘%os.getpid())
#     time.sleep(random.random())
#     return n
#
# if __name__==‘__main__‘:
#     p=multiprocessing.Pool(4)
#     for i in range(10):
#         q=p.apply(walk,args=(i,))
#         print(q)

 apply_async:向進程池提交任務,提交完任務後就不管了,只管提交任務,不能直接執行任務。實現了一個並發和惡異步調用

  執行方法:先close:關閉任務,好讓任務結束

       然後join:等待一個進程池不在提交任務,並且任務結束和計算任務個數

       最後get:獲取返回值

 什麽是異步調用:提交完一個任務過後不會在原地等待,而是繼續提交下一個任務。等待所有的任務結束後在用get獲取任務

如下:

# import multiprocessing
# import time
# import random
# import os
# def walk(n):
#     print(‘%s is walking‘%os.getpid())
#     time.sleep(random.random())
#     return n
#
# if __name__==‘__main__‘:
#     p=multiprocessing.Pool(4)
#     li=[]
#     for i in range(10):
#         q=p.apply_async(walk,args=(i,))
#         li.append(q)
#     p.close()
#     p.join()
#     for i in li:
#         print(i.get())
#

四 回調函數

 什麽是回調函數:通過一個函數內存調用的函數,如果將這個函數的地址當作參數傳給另一個函數,當這個函數的地址用來調用其所只想的函數是,這個所指向的函數就是回調函數。詳情訪問:http://www.cnblogs.com/hainan-zhang/p/6222552.html

  callback:後面加上的是回調函數

  回調函數的進程其實就是主進程。

 用回調函數實現一個網絡爬蟲;需要用到requests模塊

  get:獲取網址

  status_code:返回狀態碼

  text:查看下載網址的內容

 

from multiprocessing import Pool,Process
import requests
import os
import time,random
def get(url):
    print(‘%s GET %s‘ %(os.getpid(),url))
    response=requests.get(url)
    time.sleep(random.randint(1,3))
    if response.status_code == 200:
        print(‘%s DONE %s‘ % (os.getpid(), url))
        return {‘url‘:url,‘text‘:response.text}

def parse(dic):
    print(‘%s PARSE %s‘ %(os.getpid(),dic[‘url‘]))
    time.sleep(1)
    res=‘%s:%s\n‘ %(dic[‘url‘],len(dic[‘text‘]))
    with open(‘db.txt‘,‘a‘) as f:
        f.write(res)

if __name__ == ‘__main__‘:
    urls=[
        ‘https://www.baidu.com‘,
        ‘https://www.python.org‘,
        ‘https://www.openstack.org‘,
        ‘https://help.github.com/‘,
        ‘http://www.sina.com.cn/‘
    ]
    p=Pool(2)
    start_time=time.time()
    objs=[]
    for url in urls:
        obj=p.apply_async(get,args=(url,),callback=parse) #主進程負責幹回調函數的活
        objs.append(obj)
    p.close()
    p.join()

    print(‘主‘,(time.time()-start_time))

  

重修課程day32(網絡編程六之進程三)