1. 程式人生 > >9 應用非同步和協程

9 應用非同步和協程

為什麼要用非同步

一般程式碼的同步執行

同步非同步通常用來形容一次方法呼叫。

  • 同步方法

    呼叫一旦開始,呼叫者必須等到方法呼叫返回後,才能繼續後續的行為。

    每次只能向目標伺服器傳送一個請求,待其返回資料後才能進行下一次請求,若請求較多的情況下易發生阻塞。

  • 非同步方法

    呼叫更像一個訊息傳遞,一旦開始,方法呼叫就會立即返回,呼叫者就可以繼續後續的操作。而非同步方法通常會在另外一個執行緒中執行著。整個過程,不會阻礙呼叫者的工作。

    對於呼叫者來說,非同步呼叫似乎是在一瞬間完成的。如果非同步呼叫需要返回結果,那麼當非同步呼叫真實完成時,則會通知呼叫者。

    可同時傳送多個請求到目標伺服器,較早返回資料的將會被優先處理

舉個例子

打個比方,比如我們去購物,如果你去商場實體店買一臺空調,當你到了商場看中了一款空調,你就想售貨員下單。售貨員去倉庫幫你調配物品。這天你熱的實在不行了。就催著商家趕緊給你配送,於是你就等在商場裡,候著他們,直到商家把你和空調一起送回家,一次愉快的購物就結束了。這就是同步呼叫。

不過,如果我們趕時髦,就坐再家裡開啟電腦,在網上訂購了一臺空調。當你完成網上支付的時候,對你來說購物過程已經結束了。雖然空調還沒有送到家,但是你的任務都已經完成了。商家接到你的訂單後,就會加緊安排送貨,當然這一切已經跟你無關了,你已經支付完成,想什麼就能去幹什麼了,出去溜達幾圈都不成問題。等送貨上門的時候,接到商家電話,回家一趟簽收即可。這就是非同步呼叫。

非同步解決的問題:

非同步處理可以讓應用在長時間的API和資料庫請求中避免阻塞的時間耗費,最終更快地服務更多請求

如果client請求server處理的handler裡面有一個阻塞的耗時操作,那麼整體的server效能就會下降。

比如: 訪問一個耗時的網站請求 www.douban.com/search, 這個結果要在5秒後才返回值。
當我訪問的話,肯定是要等5秒鐘,這時候,要是有別的客戶要連線的別的頁面,(不堵塞的頁面)
你猜他能馬上顯示嗎?不能的。。。 他也是要等當前這個5秒延遲過後,才能訪問的。

幸運的是,tornado提供了一套非同步機制,方便我們實現自己的非同步操作。
當handler處理需要進行其餘的網路操作的時候,tornado提供了一個AsyncHTTPClient來支援非同步。

注意

非同步程式碼增加了複雜度,只在特定場景使用

應用非同步

先從同步版本開始

  • 增加一個儲存 URL 圖片的功能
  • handler 和路由
  • 使用協程 coroutine

tornado 的非同步模組

  • tornado.gen.coroutine + yield
  • tornado.httpclient.AsyncHTTPClient

更多說明文件 非同步和非阻塞I/O — Tornado 4.3 文件

tornado.httpclient.HTTPClient()

tornado內建的HTTP客戶端物件 ( 阻塞 )

後端同步操作,伺服器通過內建的客戶端物件,抓取目標url地址的資料,返回給前端頁面,通過dom操作渲染頁面
http_client = httpclient.HTTPClient()

try:
    response = http_client.fetch("http://www.google.com/")
    print response.body
    
except httpclient.HTTPError as e:
    print("Error: " + str(e))
    
except Exception as e:
    print("Error: " + str(e))
    
http_client.close()

close()

關閉該 HTTPClient, 釋放所有使用的資源.

fetch()

執行一個請求, 返回一個 HTTPResponse物件.

code


class URLSaveHandler(AuthBaseHandler):
	"""儲存指定url的圖片 同步方法"""

	@tornado.web.authenticated
	def get(self, *args, **kwargs):
		url = self.get_argument('url', None)
		response = self.fetch_image(url)  # 獲取指定url的圖片
		if not response.body:  # 資料被封裝在響應物件的body屬性中
			self.write('empty data')
			return

		image_saver = ImageSave(self.settings['static_path'], 'x.jpg')
		image_saver.save_image(response.body)  # body 就是圖片資料 儲存圖片
		image_saver.make_thumbs()  # 做縮圖
		# 新增到資料庫,拿到 post 例項
		post = Posts.add_post_for(self.current_user, image_saver.image_url, image_saver.thumb_url)

		print("-- {} -end fetch:#{}".format(datetime.now(), post.id))

		self.redirect('/post/{}'.format(post.id))  # 跳轉到 post 頁面

	def fetch_image(self, url):
		"""獲取指定url的圖片"""
		client = tornado.httpclient.HTTPClient()  # 獲取同步操作物件
		print("-- {} -going to fetch:{}".format(datetime.now(), url))
		response = client.fetch(url)  # 獲取url對應的內容  得到響應物件
		return response

tornado.httpclient.AsyncHTTPClient

tornado內建的HTTP客戶端物件的非同步操作物件 (非阻塞 )

def handle_request(response):
    if response.error:
        print "Error:", response.error
    else:
        print response.body

http_client = AsyncHTTPClient()
http_client.fetch("http://www.google.com/", handle_request)

code


class AsyncURLSaveHandler(AuthBaseHandler):
	"""儲存指定url的圖片 非同步方法"""

	@tornado.web.authenticated
	@tornado.gen.coroutine
	def get(self, *args, **kwargs):
		url = self.get_argument('url', None)
		response = yield self.fetch_image(url)  # 獲取指定url的圖片
		if not response.body:  # 資料被封裝在響應物件的body屬性中
			self.write('empty data')
			return

		image_saver = ImageSave(self.settings['static_path'], 'x.jpg')
		image_saver.save_image(response.body)  # 儲存圖片
		image_saver.make_thumbs()  # 縮圖

		post = Posts.add_post_for(self.current_user, image_saver.image_url, image_saver.thumb_url)  # 新增到資料庫

		print("-- {} -end fetch:#{}".format(datetime.now(), post.id))

		self.redirect('/post/{}'.format(post.id))

	@tornado.gen.coroutine
	def fetch_image(self, url):
		"""獲取指定url的圖片"""
		client = tornado.httpclient.AsyncHTTPClient()  # 獲取非同步操作物件
		print("-- {} -going to fetch:{}".format(datetime.now(), url))
		yield tornado.gen.sleep(6)
		response = yield client.fetch(url)  # 獲取url對應的內容  得到響應物件
		return response

coroutine 裝飾器

指定改請求為協程模式,說明白點就是能使用 yield 配合 Tornado 編寫非同步程式。


from tornado import gen

@gen.coroutine
def fetch_coroutine(url):
    client = AsyncHTTPClient()
    response = yield client.fetch(url)
	return response.body

  • @gen.coroutine此裝飾器代表的是協程, 與關鍵字yield搭配使用
  • client.fetch(url)請求網路是耗時操作, 通過關鍵字yield來掛起呼叫, 而當client.fetch(url)請求完成時再繼續從函式掛起的位置繼續往下執行.

協程模組tornado.gen

tornado.gen是根據生成器(generator)實現的,用來更加簡單的實現非同步。

tornado.gen.coroutine的實現思路:

generator中的yield語句可以使函式暫停執行,而send()方法則可以恢復函式的執行。

tornado將那些非同步操作(fetch())放置到yield語句後,當這些非同步操作完成後,tornado會將結果send()至generator中恢復函式執行。

在tornado中大多數的非同步操作返回一個Future物件

yield Future物件 會 返回該非同步操作的結果,這句話的意思就是說 假如 response = yield some_future_objsome_future_obj所對應的非同步操作完成後會自動的將該非同步操作的結果賦值給 response

Response 物件

class tornado.httpclient.HTTPResponse()

HTTP 響應物件

屬性:

request: HTTPRequest 物件

body: string 化的響應體 (從 self.buffer 的需求建立)

https://www.cnblogs.com/Erick-L/p/7068112.html

協程詳解:

同步非同步I/O客戶端

import tornado.httpclient


def ssync_visit():
	client = tornado.httpclient.HTTPClient() # 獲取同步操作物件
    # 獲取url對應的內容  得到響應物件
	response = client.fetch('www.baidu.com') # 阻塞,直到網站請求完成
	print(response.body)


def async_visit():
	client = tornado.httpclient.AsyncHTTPClient()  # 獲取非同步操作物件
	response = yield client.fetch('www.baidu.com')  # 非阻塞
	print(response.body)

協程

編寫協程函式

import tornado.httpclient
from tornado import gen # 引入協程庫


@tornado.gen.coroutine
def coroutine_visit():
	client = tornado.httpclient.AsyncHTTPClient()  # 獲取非同步操作物件
	response = yield client.fetch('www.baidu.com')  # 非阻塞
	print(response.body)

呼叫協程函式

由於Tornado協程基於python的yield關鍵字實現,所以不能呼叫普通函式一樣呼叫協程函式

協程函式可通過以下三種方式呼叫

  • 在本身是協程的函式內通過yield關鍵字呼叫

  • 在IOLoop尚未啟動時,通過IOLoop的run_sync()函式呼叫

  • 在IOLoop已經啟動時,通過IOLoop的spawn_callback()函式呼叫

在本身是協程的函式內通過yield關鍵字呼叫

下面是一個通過協程函式呼叫協程函式的例子

@gen.coroutine
def outer_coroutine():
    print('開始呼叫另一個協程')
    yield coroutine_visit()
    print('outer_coroutine 呼叫結束')
outer_coroutine和coroutine_visit都是協程函式,他們之間可以通過yield關鍵字進行呼叫
@tornado.gen.coroutine
def get(self, *args, **kwargs):
   url = self.get_argument('url', None)
   response = yield self.fetch_image(url) 
   print(response.body)

@tornado.gen.coroutine
def fetch_image(self, url):
   """獲取指定url的圖片"""
   client = tornado.httpclient.AsyncHTTPClient()  
   response = yield client.fetch(url)  
   return response
get和fetch_image都是協程函式,他們之間可以通過yield關鍵字進行呼叫
在IOLoop尚未啟動時,通過IOLoop的run_sync()函式呼叫

IOLoop 是Tornado的主事件迴圈物件,Tornado程式通過它監聽外部客戶端的訪問請求,並執行相應的操作,當程式尚未進入IOLoop的runing狀態時,可以通過run_sync()函式呼叫協程函式,比如:

from tornado import gen # 引入協程庫
from tornado.ioloop import IOLoop
from tornado.httpclient import AsyncHTTPClient

@tornado.gen.coroutine
def coroutine_visit():
	client = tornado.httpclient.AsyncHTTPClient() 
	response = yield client.fetch('http://www.baidu.com/')  
	print(response.body)

def func_normal():
    print('開始呼叫協程')
    IOLoop.current().run_sync(lambda: coroutine_visit())
    print('結束協程呼叫')
    
func_normal()

本例中run_sync()函式將當前函式的執行進行阻塞,直到被呼叫的協程執行完成

Tornado 要求協程函式在IOloop的running狀態才能被呼叫,只不過run_sync函式自動完成了啟動,停止IOLoop的步驟,他的實現邏輯為:啟動IOLoop-呼叫被lambda封裝的協程函式-停止IOLoop

在IOLoop已經啟動時,通過IOLoop的spawn_callback()函式呼叫
from tornado import gen # 引入協程庫
from tornado.ioloop import IOLoop
from tornado.httpclient import AsyncHTTPClient

@tornado.gen.coroutine
def coroutine_visit():
	client = tornado.httpclient.AsyncHTTPClient() 
	response = yield client.fetch('http://www.baidu.com/')  
	print(response.body)

def func_normal():
    print('開始呼叫協程')
    IOLoop.current().spawn_callback(coroutine_visit)
    print('結束協程呼叫')
func_normal()

本例中spawn_callback函式不會等待被呼叫的協程執行完成,而協程函式將會由IOLoop在合適的時機進行呼叫,並且spawn_callback函式沒有提供返回值的方法,所以只能用該函式呼叫沒有返回值的協程函式

tornado 協程結合非同步

import tornado.web
import tornado.httpclient

class AsyncURLSaveHandler(tornado.web.RequestHandler):

    @tornado.gen.coroutine
    def get(self):
        http = tornado.httpclient.AsyncHTTPClient()
        response = yield http.fetch('http://www.baidu.com')
        self.write(response.body)

用tornado.gen.coroutine裝飾AsyncURLSaveHandler的get(),post()函式

使用非同步物件處理耗時操作,

呼叫yield關鍵字獲取非同步物件的處理結果

作業

增加 /save 的 handler,實現非同步的功能

請求隨機圖片的網址 : http://source.unsplash.com/random

code

http://127.0.0.1:8000/save?url=http://source.unsplash.com/random

http://127.0.0.1:8000/async?url=http://source.unsplash.com/random

service.py

from datetime import datetime
import time

import tornado.web
import tornado.httpclient
import tornado.gen

from .main import AuthBaseHandler
from utils.photo import ImageSave
from models.account import Posts


class URLSaveHandler(AuthBaseHandler):
	"""儲存指定url的圖片 同步方法"""

	@tornado.web.authenticated
	def get(self, *args, **kwargs):
		url = self.get_argument('url', None)
		response = self.fetch_image(url)  # 獲取指定url的圖片
		if not response.body:  # 資料被封裝在響應物件的body屬性中
			self.write('empty data')
			return

		image_saver = ImageSave(self.settings['static_path'], 'x.jpg')
		image_saver.save_image(response.body)  # body 就是圖片資料 儲存圖片
		image_saver.make_thumbs()  # 做縮圖
		# 新增到資料庫,拿到 post 例項
		post = Posts.add_post_for(self.current_user, image_saver.image_url, image_saver.thumb_url)

		print("-- {} -end fetch:#{}".format(datetime.now(), post.id))

		self.redirect('/post/{}'.format(post.id))  # 跳轉到 post 頁面

	def fetch_image(self, url):
		"""獲取指定url的圖片"""
		client = tornado.httpclient.HTTPClient()  # 獲取同步操作物件
		print("-- {} -going to fetch:{}".format(datetime.now(), url))
		response = client.fetch(url)  # 獲取url對應的內容  得到響應物件
		return response


class AsyncURLSaveHandler(AuthBaseHandler):
	"""儲存指定url的圖片 非同步方法"""

	@tornado.web.authenticated
	@tornado.gen.coroutine
	def get(self, *args, **kwargs):
		url = self.get_argument('url', None)
		response = yield self.fetch_image(url)  # 獲取指定url的圖片
		if not response.body:  # 資料被封裝在響應物件的body屬性中
			self.write('empty data')
			return

		image_saver = ImageSave(self.settings['static_path'], 'x.jpg')
		image_saver.save_image(response.body)  # 儲存圖片
		image_saver.make_thumbs()  # 縮圖

		post = Posts.add_post_for(self.current_user, image_saver.image_url, image_saver.thumb_url)  # 新增到資料庫

		print("-- {} -end fetch:#{}".format(datetime.now(), post.id))

		self.redirect('/post/{}'.format(post.id))

	@tornado.gen.coroutine
	def fetch_image(self, url):
		"""獲取指定url的圖片"""
		client = tornado.httpclient.AsyncHTTPClient()  # 獲取非同步操作物件
		print("-- {} -going to fetch:{}".format(datetime.now(), url))
		yield tornado.gen.sleep(6)
		response = yield client.fetch(url)  # 獲取url對應的內容  得到響應物件
		return response

app.py

import tornado.web
import tornado.options
import tornado.ioloop
from tornado.options import define, options

from handlers import main,auth,chat,service

define(name='port', default='8000', type=int, help='run port')


class Application(tornado.web.Application):
   def __init__(self):
      handlers = [
         (r'/', main.IndexHandler),
         (r'/explore', main.ExploreHandler),
         (r'/post/(?P<post_id>[0-9]+)', main.PostHandler),
         (r'/upload', main.UploadHandler),
         (r'/profile', main.ProfileHandler),
         (r'/login', auth.LoginHandler),
         (r'/logout', auth.LogoutHandler),
         (r'/signup', auth.SignupHandler),
         (r'/room', chat.RoomHandler),
         (r'/ws', chat.ChatSocketHandler),
         (r'/save', service.URLSaveHandler),
         (r'/async', service.AsyncURLSaveHandler),
      ]
      settings = dict(
         debug=True,
         template_path='templates',
         static_path='static',
         login_url='/login',
         cookie_secret='bZJc2sWbQLKos6GkHn/VB9oXwQt8S0R0kRvJ5/xJ89E=',
         pycket={
            'engine': 'redis',
            'storage': {
               'host': 'localhost',
               'port': 6379,
               # 'password': '',
               'db_sessions': 5,  # redis db index
               'db_notifications': 11,
               'max_connections': 2 ** 30,
            },
            'cookies': {
               'expires_days': 30,
            },
         }
      )

      super(Application, self).__init__(handlers, **settings)


application = Application()

if __name__ == '__main__':
   tornado.options.parse_command_line()
   application.listen(options.port)
   print("Server start on port {}".format(str(options.port)))
   tornado.ioloop.IOLoop.current().start()