1. 程式人生 > >網站搭建筆記精簡版---廖雪峰WebApp實戰-Day5:編寫Web框架筆記

網站搭建筆記精簡版---廖雪峰WebApp實戰-Day5:編寫Web框架筆記

網站搭建筆記精簡版-廖雪峰教程學習@[三川水祭] 僅作學習交流使用,將來的你會感謝現在拼命努力的自己!!!

本文首先對web框架進行了程式碼上的解釋,之後對編輯middleware部分進行了程式碼的分析,最後講述瞭如何測試從開始到現在所有程式碼的流程。

web框架定義

在編寫過程中,由於aiohttp太過於底層,因此自己定義一個web框架,以實現自動化URL資訊提取與函式的註冊,增加的檔案為coroweb.py,如下程式碼

#!/usr/bin/env python3
# -*- coding: utf-8 -*-

# 匯入非同步工具包
import asyncio, os, inspect, logging, functools
# 匯入網頁處理工具包
from urllib import parse
# 匯入底層web框架
from aiohttp import web

from apis import APIError

# 將函式對映為URL處理函式,使得get函式附帶URL資訊
def get(path):
    '''
    Define decorator @get('/path')
    '''
    def decorator(func):
        @functools.wraps(func)
        def wrapper(*args, **kw):
            return func(*args, **kw)
        wrapper.__method__ = 'GET'
        wrapper.__route__ = path
        return wrapper
    return decorator

# 將函式對映為URL處理函式,使得post函式附帶URL資訊
def post(path):
    '''
    Define decorator @post('/path')
    '''
    def decorator(func):
        @functools.wraps(func)
        def wrapper(*args, **kw):
            return func(*args, **kw)
        wrapper.__method__ = 'POST' # 儲存方法資訊
        wrapper.__route__ = path # 儲存路徑資訊
        return wrapper
    return decorator

# 運用inspect模組,建立幾個函式用以獲取URL處理函式與request引數之間的關係
def get_required_kw_args(fn): # 收集沒有預設值的命名關鍵字引數
    args = []
    params = inspect.signature(fn).parameters # inspect模組是用來分析模組,函式
    for name, param in params.items():
        if param.kind == inspect.Parameter.KEYWORD_ONLY and param.default == inspect.Parameter.empty:
            args.append(name)
    return tuple(args)

def get_named_kw_args(fn): # 獲取命名關鍵字引數
    args = []
    params = inspect.signature(fn).parameters
    for name, param in params.items():
        if param.kind == inspect.Parameter.KEYWORD_ONLY:
            args.append(name)
    return tuple(args)

def has_named_kw_args(fn): # 判斷有沒有命名關鍵字引數
    params = inspect.signature(fn).parameters
    for name, param in params.items():
        if param.kind == inspect.Parameter.KEYWORD_ONLY:
            return True

def has_var_kw_arg(fn): # 判斷有沒有關鍵字引數
    params = inspect.signature(fn).parameters
    for name, param in params.items():
        if param.kind == inspect.Parameter.VAR_KEYWORD:
            return True

def has_request_arg(fn): # 判斷是否含有名字叫做'request'引數,且該引數是否為最後一個引數
    sig = inspect.signature(fn)
    params = sig.parameters
    found = False
    for name, param in params.items():
        if name == 'request':
            found = True
            continue
        if found and (param.kind != inspect.Parameter.VAR_POSITIONAL and param.kind != inspect.Parameter.KEYWORD_ONLY and param.kind != inspect.Parameter.VAR_KEYWORD):
            raise ValueError('request parameter must be the last named parameter in function: %s%s' % (fn.__name__, str(sig)))
    return found

# 使用RequestHandler函式封裝一個URL處理函式,向request引數獲取URL處理函式所需要的引數
class RequestHandler(object):

    def __init__(self, app, fn): # 接收app引數
        self._app = app
        self._func = fn
        self._has_request_arg = has_request_arg(fn)
        self._has_var_kw_arg = has_var_kw_arg(fn)
        self._has_named_kw_args = has_named_kw_args(fn)
        self._named_kw_args = get_named_kw_args(fn)
        self._required_kw_args = get_required_kw_args(fn)

	# RequestHandler本身是一個類,由於定義了__call__方法,因此將其例項視為函式
    # 該函式從request中獲取必要引數,之後呼叫URL函式
	# 最後將結果轉換為web.Response物件。上述比較符合aiohttp框架
	async def __call__(self, request): # 構造協程
        kw = None
        if self._has_var_kw_arg or self._has_named_kw_args or self._required_kw_args:
            if request.method == 'POST': # 判斷客戶端發來的方法是否是POST
                if not request.content_type: # 查詢有沒提交資料的格式(EncType)
                    return web.HTTPBadRequest(text='Missing Content-Type.')
                ct = request.content_type.lower()
                if ct.startswith('application/json'):
                    params = await request.json() # 讀取請求的body程式碼作為json檔案
                    if not isinstance(params, dict):
                        return web.HTTPBadRequest(text='JSON body must be object.')
                    kw = params
                elif ct.startswith('application/x-www-form-urlencoded') or ct.startswith('multipart/form-data'):
                    params = await request.post()
                    kw = dict(**params)
                else:
                    return web.HTTPBadRequest('Unsupported Content-Type: %s' % request.content_type)
            if request.method == 'GET': # 判斷客戶端發來的方法是否是GET
                qs = request.query_string
                if qs:
                    kw = dict()
                    for k, v in parse.parse_qs(qs, True).items():
                        kw[k] = v[0]
        if kw is None:
            kw = dict(**request.match_info)
        else:
			# 當函式引數沒有關鍵字引數時,移去request除命名關鍵字引數外所有的引數資訊
            if not self._has_var_kw_arg and self._named_kw_args:
                # remove all unamed kw:
                copy = dict()
                for name in self._named_kw_args:
                    if name in kw:
                        copy[name] = kw[name]
                kw = copy
            # check named arg:
            for k, v in request.match_info.items():
                if k in kw:
                    logging.warning('Duplicate arg name in named arg and kw args: %s' % k)
                kw[k] = v
        if self._has_request_arg:
            kw['request'] = request
        # check required kw:即加入命名關鍵字引數(沒有附加預設值),request沒有提供相應的數值,報錯
        if self._required_kw_args:
            for name in self._required_kw_args:
                if not name in kw:
                    return web.HTTPBadRequest('Missing argument: %s' % name)
        logging.info('call with args: %s' % str(kw))
        try:
            r = await self._func(**kw)
            return r
        except APIError as e: # APIError另外建立
            return dict(error=e.error, data=e.data, message=e.message)

# 新增靜態資料夾的路徑
def add_static(app):
    path = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'static')
    app.router.add_static('/static/', path)
    logging.info('add static %s => %s' % ('/static/', path))

# 用來註冊一個URL處理函式,主要起驗證函式是否包含URL的相應方法與路徑資訊,並將其函式變為協程
def add_route(app, fn):
    method = getattr(fn, '__method__', None)
    path = getattr(fn, '__route__', None)
    if path is None or method is None:
        raise ValueError('@get or @post not defined in %s.' % str(fn))
    if not asyncio.iscoroutinefunction(fn) and not inspect.isgeneratorfunction(fn):
        fn = asyncio.coroutine(fn)
    logging.info('add route %s %s => %s(%s)' % (method, path, fn.__name__, ', '.join(inspect.signature(fn).parameters.keys())))
    app.router.add_route(method, path, RequestHandler(app, fn))

# 自動將module_name模組中所有符合條件的函式進行註冊
# 只需要向這個函式提供要批量註冊函式的檔案路徑,新編寫的函式就會篩選,註冊檔案內所有符合註冊條件的函式
def add_routes(app, module_name):
    n = module_name.rfind('.')
    if n == (-1):
        mod = __import__(module_name, globals(), locals())
    else:
        name = module_name[n+1:]
        mod = getattr(__import__(module_name[:n], globals(), locals(), [name]), name)
    for attr in dir(mod):
        if attr.startswith('_'):
            continue
        fn = getattr(mod, attr)
        if callable(fn):
            method = getattr(fn, '__method__', None)
            path = getattr(fn, '__route__', None)
            if method and path: # 此處查詢path以及method是否存在而不是等待add_route函式查詢
		add_route(app, fn)

編輯middleware

將函式返回值變為web.response(),修改的檔案為app.py,程式碼如下:

#!/usr/bin/env python3
# -*- coding: utf-8 -*-

__author__ = 'Michael Liao'

'''
async web application.
'''

import logging; logging.basicConfig(level=logging.INFO)

import asyncio, os, json, time
from datetime import datetime

from aiohttp import web
from jinja2 import Environment, FileSystemLoader

import orm
from coroweb import add_routes, add_static

# 初始化jinja2模板,以便其他函式使用jinja2模板
def init_jinja2(app, **kw):
    logging.info('init jinja2...')
    options = dict(
        autoescape = kw.get('autoescape', True),
        block_start_string = kw.get('block_start_string', '{%'),
        block_end_string = kw.get('block_end_string', '%}'),
        variable_start_string = kw.get('variable_start_string', '{{'),
        variable_end_string = kw.get('variable_end_string', '}}'),
        auto_reload = kw.get('auto_reload', True)
    )
    path = kw.get('path', None)
    if path is None:
        path = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'templates')
    logging.info('set jinja2 template path: %s' % path)
    env = Environment(loader=FileSystemLoader(path), **options)
    filters = kw.get('filters', None)
    if filters is not None:
        for name, f in filters.items():
            env.filters[name] = f
    app['__templating__'] = env

# 使用middleware引數將函式返回值轉化為web.response物件
# middlewares是一個攔截器、中間鍵,即在URL真正被處理之前,需要經過一系列middleware的處理。
async def logger_factory(app, handler): # 協程,兩個引數
    async def logger(request):
        logging.info('Request: %s %s' % (request.method, request.path))
        # await asyncio.sleep(0.3)
        return (await handler(request))
    return logger

async def data_factory(app, handler):
    async def parse_data(request):
        if request.method == 'POST':
            if request.content_type.startswith('application/json'):
                request.__data__ = await request.json()
                logging.info('request json: %s' % str(request.__data__))
            elif request.content_type.startswith('application/x-www-form-urlencoded'):
                request.__data__ = await request.post()
                logging.info('request form: %s' % str(request.__data__))
        return (await handler(request))
    return parse_data

# 函式返回值轉化為'web.response'物件
async def response_factory(app, handler):
    async def response(request):
        logging.info('Response handler...')
        r = await handler(request)
        if isinstance(r, web.StreamResponse):
            return r
        if isinstance(r, bytes):
            resp = web.Response(body=r)
            resp.content_type = 'application/octet-stream'
            return resp
        if isinstance(r, str):
            if r.startswith('redirect:'): # 重定向
                return web.HTTPFound(r[9:]) # 轉入別的網站
            resp = web.Response(body=r.encode('utf-8'))
            resp.content_type = 'text/html;charset=utf-8'
            return resp
        if isinstance(r, dict):
            template = r.get('__template__')
            if template is None:
                resp = web.Response(body=json.dumps(r, ensure_ascii=False, default=lambda o: o.__dict__).encode('utf-8'))
                resp.content_type = 'application/json;charset=utf-8'
                return resp
            else: # jinja2模板
                resp = web.Response(body=app['__templating__'].get_template(template).render(**r).encode('utf-8'))
                resp.content_type = 'text/html;charset=utf-8'
                return resp
        if isinstance(r, int) and r >= 100 and r < 600:
            return web.Response(r)
        if isinstance(r, tuple) and len(r) == 2:
            t, m = r
            if isinstance(t, int) and t >= 100 and t < 600:
                return web.Response(t, str(m))
        # default:
        resp = web.Response(body=str(r).encode('utf-8'))
        resp.content_type = 'text/plain;charset=utf-8'
        return resp
    return response

def datetime_filter(t):
    delta = int(time.time() - t)
    if delta < 60:
        return u'1分鐘前'
    if delta < 3600:
        return u'%s分鐘前' % (delta // 60)
    if delta < 86400:
        return u'%s小時前' % (delta // 3600)
    if delta < 604800:
        return u'%s天前' % (delta // 86400)
    dt = datetime.fromtimestamp(t)
    return u'%s年%s月%s日' % (dt.year, dt.month, dt.day)

編寫測試程式碼

資料庫服務啟動

開啟cmd命令列,輸入net start mysql,如果未安裝mysql,參考本部落格

資料表建立

在cmd命令列輸入:mysql -u root -p,進而輸入密碼進入資料庫,之後輸入以下命令建立網站的資料表。

drop database if exists awesome;
# 建立資料庫
create database awesome;
# 使用資料庫
use awesome;
# 此處改為自己的主機名和密碼
grant select, insert, update, delete on awesome.* to 'root'@'localhost' identified by 'password';

create table users (
    `id` varchar(50) not null,
    `email` varchar(50) not null,
    `passwd` varchar(50) not null,
    `admin` bool not null,
    `name` varchar(50) not null,
    `image` varchar(500) not null,
    `created_at` real not null,
    unique key `idx_email` (`email`),
    key `idx_created_at` (`created_at`),
    primary key (`id`)
) engine=innodb default charset=utf8;

create table blogs (
    `id` varchar(50) not null,
    `user_id` varchar(50) not null,
    `user_name` varchar(50) not null,
    `user_image` varchar(500) not null,
    `name` varchar(50) not null,
    `summary` varchar(200) not null,
    `content` mediumtext not null,
    `created_at` real not null,
    key `idx_created_at` (`created_at`),
    primary key (`id`)
) engine=innodb default charset=utf8;

create table comments (
    `id` varchar(50) not null,
    `blog_id` varchar(50) not null,
    `user_id` varchar(50) not null,
    `user_name` varchar(50) not null,
    `user_image` varchar(500) not null,
    `content` mediumtext not null,
    `created_at` real not null,
    key `idx_created_at` (`created_at`),
    primary key (`id`)
) engine=innodb default charset=utf8;

app.py部分的程式碼:

async def init(loop):
	# 這裡的user和password改成自己的使用者名稱和密碼
    await orm.create_pool(loop=loop, host='127.0.0.1', port=3306, user='root', password='www', db='awesome')
    app = web.Application(loop=loop, middlewares=[
        logger_factory, response_factory
    ])
    init_jinja2(app, filters=dict(datetime=datetime_filter))
    # 對接收到的不同型別的瀏覽器請求語言具體處理的程式碼放在'handlers.py'檔案中
    add_routes(app, 'handlers')
    add_static(app)
    srv = await loop.create_server(app.make_handler(), '127.0.0.1', 9000)
    logging.info('server started at http://127.0.0.1:9000...')
    return srv

loop = asyncio.get_event_loop()
loop.run_until_complete(init(loop))
loop.run_forever()

Handlers.py部分的程式碼

import asyncio 
from coroweb import get,post

#編寫用於測試的URL處理函式 
@get('/') 
async def handler_url_blog(request): 
	body='<h1>Awesome</h1>' 
	return body