一個簡單的tornado案例(post方法)
阿新 • • 發佈:2018-11-14
之前講了tornado簡單案例( get方法)。
個人理解:一般地,get方法只是為了通過url傳參(不是傳json資料),伺服器作出響應,get可以向伺服器獲取到資料(想要的資料)。而post方法可以向伺服器傳入一個數據進行處理(你想讓伺服器對資料做什麼,達到你的目的),它的資料是放在request body中需要獲取的。
下面介紹一個簡單的tornado案例(post方法的)------>
1. service.py (開啟服務)
這裡設定了埠號,指定了要執行哪一個handler處理器,處理響應。
#!/usr/bin/env python # -*- coding:utf-8 -*- # @author: 小何 import tornado.web from tornado.web import URLSpec from tornado.ioloop import IOLoop from scpy.logger import get_logger from handler import PostHandler logger = get_logger(__file__) HANDLERS = [ URLSpec(r'/example', PostHandler, name=PostHandler.__name__) ] if __name__ == '__main__': SERVER_PORT = 2333 app = tornado.web.Application(handlers=HANDLERS, debug=True) app.listen(SERVER_PORT) logger.info("Clean Panel server started on port {SERVER_PORT}".format(SERVER_PORT=SERVER_PORT)) IOLoop.current().start()
2. handler.py (處理)
#!/usr/bin/env python # -*- coding:utf-8 -*- # @author: 小何 import sys from controller_example import ControllerExample reload(sys) sys.setdefaultencoding('utf-8') import abc import json import tornado.web from tornado import gen from tornado.web import HTTPError from tornado.concurrent import run_on_executor from concurrent.futures import ThreadPoolExecutor import scpy.logger logger = scpy.logger.get_logger(__file__) NUMBER_OF_EXECUTOR = 6 ''' handler --- post方法測試 ''' class BasePostRequestHandler(tornado.web.RequestHandler): executor = ThreadPoolExecutor(NUMBER_OF_EXECUTOR) @tornado.web.asynchronous @gen.coroutine def post(self, *args, **kwargs): try: result = yield self._post(*args, **kwargs) self.write(result) except HTTPError as e: self.write(e) except Exception as e: logger.error(e) raise HTTPError(404, "No results") @run_on_executor def _post(self, *args, **kwargs): request = self._post_request_arguments(*args, **kwargs) res = self._request_service(**request) return res @abc.abstractmethod def _post_request_arguments(self, *args, **kwargs): raise NotImplementedError('call to abstract method %s._get_request_arguments' % self.__class__) @abc.abstractmethod def _request_service(self, **kwargs): raise NotImplementedError('call to abstract method %s._request_service' % self.__class__) class PostHandler(BasePostRequestHandler): def _post_request_arguments(self, *args, **kwargs): ''' 獲取資料 :param args: :param kwargs: :return: ''' logger.info(self.__class__.__name__) data = json.loads(self.request.body) if not data: raise HTTPError(400, "Query argument cannot be empty string") return data def _request_service(self, **kwargs): ''' 處理資料 :param kwargs: :return: ''' if kwargs: c_e = ControllerExample(kwargs) res = c_e.do_add() else: raise HTTPError(400, "Query argument cannot be empty string") return res
這裡邊基類中,實現了post請求方法,postHandler實現了獲取資料,並將資料傳入Controller業務層進行處理。
3 .controller_example.py (業務執行層)
#!/usr/bin/env python # -*- coding: utf-8 -*- # @author : he class ControllerExample(object): ''' 控制層業務舉例 ''' def __init__(self, data): self.test_data = data def do_add(self): self.test_data['aaa'] = self.test_data.get('aaa') + 100 self.test_data['bbb'] = self.test_data.get('bbb') + 100 return self.test_data
這裡邊執行了一點簡單的add操作
4. 測試整個環節 --- test_service.py (一般get方法採用瀏覽器位址列輸入url訪問,post做不到,會報405err,post方法只有使用postman工具檢視,或者寫指令碼測試,我選擇了後者)
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @createTime : 18-6-19 17:00
# @author : 小何
from unittest import TestCase
import requests
import json
class TestPostHandler(TestCase):
def test_handler_result(self):
content = {'aaa': 123, 'bbb': 456}
json_data = json.dumps(content)
resp = requests.post('http://localhost:2333/example', json_data)
print resp.json()
5. 測試效果,訪問通過,併成功執行了業務層中的簡單操作!!!
整個tornado簡單的過程,也算個經典而簡潔的MVC結構了。。。。。。。。。。。。