1. 程式人生 > >爬蟲必備—性能相關(異步非阻塞)

爬蟲必備—性能相關(異步非阻塞)

tornado 異步io 主機 quest cookie article Coding aps 技術

在編寫爬蟲時,性能的消耗主要在IO請求中,當單進程單線程模式下請求URL時必然會引起等待,從而使得請求整體變慢。

1. 同步執行

 1 import requests
 2 
 3 def fetch_async(url):
 4     response = requests.get(url)
 5     return response
 6 
 7 
 8 url_list = [http://www.github.com, http://www.bing.com]
 9 
10 for url in url_list:
11     fetch_async(url)

2. 多線程執行(多個線程並發執行,時間長短取決於最長的URL請求)

 1 from concurrent.futures import ThreadPoolExecutor
 2 import requests
 3 
 4 
 5 def fetch_async(url):
 6     response = requests.get(url)
 7     return response
 8 
 9 
10 url_list = [http://www.github.com, http://www.bing.com]
11 pool = ThreadPoolExecutor(5)
12 for url in url_list:
13     pool.submit(fetch_async, url)
14 pool.shutdown(wait=True)

3. 多進程執行(在CPU核心數足夠的情況下,多個進程並行執行,時間長短取決於最長的URL請求,理論上會快於多線程)

 1 from concurrent.futures import ProcessPoolExecutor
 2 import requests
 3 
 4 def fetch_async(url):
 5     response = requests.get(url)
 6     return response
 7 
 8 
 9 url_list = [http://www.github.com, http://www.bing.com
] 10 pool = ProcessPoolExecutor(5) 11 for url in url_list: 12 pool.submit(fetch_async, url) 13 pool.shutdown(wait=True)

4. 多線程+回調函數(實現了異步非阻塞,在IO等待的情況下可以做其它事情)

 1 from concurrent.futures import ThreadPoolExecutor
 2 import requests
 3 
 4 def fetch_async(url):
 5     response = requests.get(url)
 6     return response
 7 
 8 
 9 def callback(future):
10     print(future.result())
11 
12 
13 url_list = [http://www.github.com, http://www.bing.com]
14 pool = ThreadPoolExecutor(5)
15 for url in url_list:
16     v = pool.submit(fetch_async, url)
17     v.add_done_callback(callback)
18 pool.shutdown(wait=True)

5. 多進程+回調函數(實現了異步非阻塞,在IO等待的情況下可以做其它事情)

 1 from concurrent.futures import ProcessPoolExecutor
 2 import requests
 3 
 4 
 5 def fetch_async(url):
 6     response = requests.get(url)
 7     return response
 8 
 9 
10 def callback(future):
11     print(future.result())
12 
13 
14 url_list = [http://www.github.com, http://www.bing.com]
15 pool = ProcessPoolExecutor(5)
16 for url in url_list:
17     v = pool.submit(fetch_async, url)
18     v.add_done_callback(callback)
19 pool.shutdown(wait=True)

通過上述代碼均可以完成對請求性能的提高,對於多線程和多進程的缺點是在IO阻塞時會造成了線程和進程的浪費,所以異步IO會是首選:

1. asyncio示例一

import asyncio


@asyncio.coroutine
def func1():
    print(before...func1......)
    yield from asyncio.sleep(5)
    print(end...func1......)


tasks = [func1(), func1()]

loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.gather(*tasks))
loop.close()

2. asyncio示例二

 1 import asyncio
 2 
 3 
 4 @asyncio.coroutine
 5 def fetch_async(host, url=/):
 6     print(host, url)
 7     reader, writer = yield from asyncio.open_connection(host, 80)
 8 
 9     request_header_content = """GET %s HTTP/1.0\r\nHost: %s\r\n\r\n""" % (url, host,)
10     request_header_content = bytes(request_header_content, encoding=utf-8)
11 
12     writer.write(request_header_content)
13     yield from writer.drain()
14     text = yield from reader.read()
15     print(host, url, text)
16     writer.close()
17 
18 tasks = [
19     fetch_async(www.cnblogs.com, /wupeiqi/),
20     fetch_async(dig.chouti.com, /pic/show?nid=4073644713430508&lid=10273091)
21 ]
22 
23 loop = asyncio.get_event_loop()
24 results = loop.run_until_complete(asyncio.gather(*tasks))
25 loop.close()

3. asyncio + aiohttp

 1 import aiohttp
 2 import asyncio
 3 
 4 
 5 @asyncio.coroutine
 6 def fetch_async(url):
 7     print(url)
 8     response = yield from aiohttp.request(GET, url)
 9     # data = yield from response.read()
10     # print(url, data)
11     print(url, response)
12     response.close()
13 
14 
15 tasks = [fetch_async(http://www.google.com/), fetch_async(http://www.chouti.com/)]
16 
17 event_loop = asyncio.get_event_loop()
18 results = event_loop.run_until_complete(asyncio.gather(*tasks))
19 event_loop.close()

4. asyncio + requests

 1 import asyncio
 2 import requests
 3 
 4 
 5 @asyncio.coroutine
 6 def fetch_async(func, *args):
 7     loop = asyncio.get_event_loop()
 8     future = loop.run_in_executor(None, func, *args)
 9     response = yield from future
10     print(response.url, response.content)
11 
12 
13 tasks = [
14     fetch_async(requests.get, http://www.cnblogs.com/wupeiqi/),
15     fetch_async(requests.get, http://dig.chouti.com/pic/show?nid=4073644713430508&lid=10273091)
16 ]
17 
18 loop = asyncio.get_event_loop()
19 results = loop.run_until_complete(asyncio.gather(*tasks))
20 loop.close()

5. gevent + requests

 1 import gevent
 2 
 3 import requests
 4 from gevent import monkey
 5 
 6 monkey.patch_all()
 7 
 8 
 9 def fetch_async(method, url, req_kwargs):
10     print(method, url, req_kwargs)
11     response = requests.request(method=method, url=url, **req_kwargs)
12     print(response.url, response.content)
13 
14 # ##### 發送請求 #####
15 gevent.joinall([
16     gevent.spawn(fetch_async, method=get, url=https://www.python.org/, req_kwargs={}),
17     gevent.spawn(fetch_async, method=get, url=https://www.yahoo.com/, req_kwargs={}),
18     gevent.spawn(fetch_async, method=get, url=https://github.com/, req_kwargs={}),
19 ])
20 
21 # ##### 發送請求(協程池控制最大協程數量) #####
22 # from gevent.pool import Pool
23 # pool = Pool(None)
24 # gevent.joinall([
25 #     pool.spawn(fetch_async, method=‘get‘, url=‘https://www.python.org/‘, req_kwargs={}),
26 #     pool.spawn(fetch_async, method=‘get‘, url=‘https://www.yahoo.com/‘, req_kwargs={}),
27 #     pool.spawn(fetch_async, method=‘get‘, url=‘https://www.github.com/‘, req_kwargs={}),
28 # ])

6. grequests(封裝的 gevent + requests)

 1 import grequests
 2 
 3 
 4 request_list = [
 5     grequests.get(http://httpbin.org/delay/1, timeout=0.001),
 6     grequests.get(http://fakedomain/),
 7     grequests.get(http://httpbin.org/status/500)
 8 ]
 9 
10 
11 # ##### 執行並獲取響應列表 #####
12 # response_list = grequests.map(request_list)
13 # print(response_list)
14 
15 
16 # ##### 執行並獲取響應列表(處理異常) #####
17 # def exception_handler(request, exception):
18 # print(request,exception)
19 #     print("Request failed")
20 
21 # response_list = grequests.map(request_list, exception_handler=exception_handler)
22 # print(response_list)

7. Twisted示例

 1 from twisted.web.client import getPage, defer
 2 from twisted.internet import reactor
 3 
 4 
 5 def all_done(arg):
 6     reactor.stop()
 7 
 8 
 9 def callback(contents):
10     print(contents)
11 
12 
13 deferred_list = []
14 
15 url_list = [http://www.bing.com, http://www.baidu.com, ]
16 for url in url_list:
17     deferred = getPage(bytes(url, encoding=utf8))
18     deferred.addCallback(callback)
19     deferred_list.append(deferred)
20 
21 dlist = defer.DeferredList(deferred_list)
22 dlist.addBoth(all_done)
23 
24 reactor.run()

8. Tornado

 1 from tornado.httpclient import AsyncHTTPClient
 2 from tornado.httpclient import HTTPRequest
 3 from tornado import ioloop
 4 
 5 
 6 def handle_response(response):
 7     """
 8     處理返回值內容(需要維護計數器,來停止IO循環),調用 ioloop.IOLoop.current().stop()
 9     :param response: 
10     :return: 
11     """
12     if response.error:
13         print("Error:", response.error)
14     else:
15         print(response.body)
16 
17 
18 def func():
19     url_list = [
20         http://www.baidu.com,
21         http://www.bing.com,
22     ]
23     for url in url_list:
24         print(url)
25         http_client = AsyncHTTPClient()
26         http_client.fetch(HTTPRequest(url), handle_response)
27 
28 
29 ioloop.IOLoop.current().add_callback(func)
30 ioloop.IOLoop.current().start()

9. Twisted更多

 1 from twisted.internet import reactor
 2 from twisted.web.client import getPage
 3 import urllib.parse
 4 
 5 
 6 def one_done(arg):
 7     print(arg)
 8     reactor.stop()
 9 
10 post_data = urllib.parse.urlencode({check_data: adf})
11 post_data = bytes(post_data, encoding=utf8)
12 headers = {bContent-Type: bapplication/x-www-form-urlencoded}
13 response = getPage(bytes(http://dig.chouti.com/login, encoding=utf8),
14                    method=bytes(POST, encoding=utf8),
15                    postdata=post_data,
16                    cookies={},
17                    headers=headers)
18 response.addBoth(one_done)
19 
20 reactor.run()

以上均是Python內置以及第三方模塊提供異步IO請求模塊,使用簡便大大提高效率,而對於異步IO請求的本質則是【非阻塞Socket】+【IO多路復用】:

技術分享
  1 import select
  2 import socket
  3 import time
  4 
  5 
  6 class AsyncTimeoutException(TimeoutError):
  7     """
  8     請求超時異常類
  9     """
 10 
 11     def __init__(self, msg):
 12         self.msg = msg
 13         super(AsyncTimeoutException, self).__init__(msg)
 14 
 15 
 16 class HttpContext(object):
 17     """封裝請求和相應的基本數據"""
 18 
 19     def __init__(self, sock, host, port, method, url, data, callback, timeout=5):
 20         """
 21         sock: 請求的客戶端socket對象
 22         host: 請求的主機名
 23         port: 請求的端口
 24         port: 請求的端口
 25         method: 請求方式
 26         url: 請求的URL
 27         data: 請求時請求體中的數據
 28         callback: 請求完成後的回調函數
 29         timeout: 請求的超時時間
 30         """
 31         self.sock = sock
 32         self.callback = callback
 33         self.host = host
 34         self.port = port
 35         self.method = method
 36         self.url = url
 37         self.data = data
 38 
 39         self.timeout = timeout
 40 
 41         self.__start_time = time.time()
 42         self.__buffer = []
 43 
 44     def is_timeout(self):
 45         """當前請求是否已經超時"""
 46         current_time = time.time()
 47         if (self.__start_time + self.timeout) < current_time:
 48             return True
 49 
 50     def fileno(self):
 51         """請求sockect對象的文件描述符,用於select監聽"""
 52         return self.sock.fileno()
 53 
 54     def write(self, data):
 55         """在buffer中寫入響應內容"""
 56         self.__buffer.append(data)
 57 
 58     def finish(self, exc=None):
 59         """在buffer中寫入響應內容完成,執行請求的回調函數"""
 60         if not exc:
 61             response = b‘‘.join(self.__buffer)
 62             self.callback(self, response, exc)
 63         else:
 64             self.callback(self, None, exc)
 65 
 66     def send_request_data(self):
 67         content = """%s %s HTTP/1.0\r\nHost: %s\r\n\r\n%s""" % (
 68             self.method.upper(), self.url, self.host, self.data,)
 69 
 70         return content.encode(encoding=utf8)
 71 
 72 
 73 class AsyncRequest(object):
 74     def __init__(self):
 75         self.fds = []
 76         self.connections = []
 77 
 78     def add_request(self, host, port, method, url, data, callback, timeout):
 79         """創建一個要請求"""
 80         client = socket.socket()
 81         client.setblocking(False)
 82         try:
 83             client.connect((host, port))
 84         except BlockingIOError as e:
 85             pass
 86             # print(‘已經向遠程發送連接的請求‘)
 87         req = HttpContext(client, host, port, method, url, data, callback, timeout)
 88         self.connections.append(req)
 89         self.fds.append(req)
 90 
 91     def check_conn_timeout(self):
 92         """檢查所有的請求,是否有已經連接超時,如果有則終止"""
 93         timeout_list = []
 94         for context in self.connections:
 95             if context.is_timeout():
 96                 timeout_list.append(context)
 97         for context in timeout_list:
 98             context.finish(AsyncTimeoutException(請求超時))
 99             self.fds.remove(context)
100             self.connections.remove(context)
101 
102     def running(self):
103         """事件循環,用於檢測請求的socket是否已經就緒,從而執行相關操作"""
104         while True:
105             r, w, e = select.select(self.fds, self.connections, self.fds, 0.05)
106 
107             if not self.fds:
108                 return
109 
110             for context in r:
111                 sock = context.sock
112                 while True:
113                     try:
114                         data = sock.recv(8096)
115                         if not data:
116                             self.fds.remove(context)
117                             context.finish()
118                             break
119                         else:
120                             context.write(data)
121                     except BlockingIOError as e:
122                         break
123                     except TimeoutError as e:
124                         self.fds.remove(context)
125                         self.connections.remove(context)
126                         context.finish(e)
127                         break
128 
129             for context in w:
130                 # 已經連接成功遠程服務器,開始向遠程發送請求數據
131                 if context in self.fds:
132                     data = context.send_request_data()
133                     context.sock.sendall(data)
134                     self.connections.remove(context)
135 
136             self.check_conn_timeout()
137 
138 
139 if __name__ == __main__:
140     def callback_func(context, response, ex):
141         """
142         :param context: HttpContext對象,內部封裝了請求相關信息
143         :param response: 請求響應內容
144         :param ex: 是否出現異常(如果有異常則值為異常對象;否則值為None)
145         :return:
146         """
147         print(context, response, ex)
148 
149     obj = AsyncRequest()
150     url_list = [
151         {host: www.google.com, port: 80, method: GET, url: /, data: ‘‘, timeout: 5,
152          callback: callback_func},
153         {host: www.baidu.com, port: 80, method: GET, url: /, data: ‘‘, timeout: 5,
154          callback: callback_func},
155         {host: www.bing.com, port: 80, method: GET, url: /, data: ‘‘, timeout: 5,
156          callback: callback_func},
157     ]
158     for item in url_list:
159         print(item)
160         obj.add_request(**item)
161 
162     obj.running()
史上最牛逼的異步IO模塊

本文轉載自: 銀角大王

http://www.cnblogs.com/wupeiqi/articles/6229292.html

爬蟲必備—性能相關(異步非阻塞)