1. 程式人生 > >Django Rest Framework源碼剖析(三)-----頻率控制

Django Rest Framework源碼剖析(三)-----頻率控制

BE elf attr 基本使用 fix ddr integer als min

一、簡介

承接上篇文章Django Rest Framework源碼剖析(二)-----權限,當服務的接口被頻繁調用,導致資源緊張怎麽辦呢?當然或許有很多解決辦法,比如:負載均衡、提高服務器配置、通過代理限制訪問頻率等,但是django rest framework自身就提供了訪問頻率的控制,可以從代碼本身做控制。

二、頻率控制內部原理概述

django rest framework 中頻率控制基本原理基於訪問次數和時間,通過計算實現,當然我們也可以自己定義頻率控制方法。基本原理如下:

啟用頻率,DRF內部會有一個字典記錄來訪者的IP,以及訪問時間最近幾(通過配置)次的訪問時間,這樣確保每次列表中最後一個元素都是該用戶請求的最早時間,形式如下:

{
IP1:[第三次請求時間,第二次請求時間,第一次請求時間,],
IP2:[第二次請求時間,第一次請求時間,],
.....
}

舉例說明,比如我現在配置了5秒內只能訪問2次,每次請求到達頻率控制時候先判斷請求者IP是否已經在這個請求字典中,若存在,在判斷用戶請求5秒內的請求次數,若次數小於等於2,則允許請求,若大於2,則超過頻率,不允許請求。

關於請求頻率的的算法(以5秒內最多訪問兩次為例):

1.首先刪除掉列表裏5秒之前的請求,循環判斷當前請求時間和最早請求時間之差記作t1,若t1大於5則代表列表中最早的請求已經在5秒外了,刪除掉,繼續判斷倒數第二個請求,直到t1小於5.

2.當確保請求列表中只有5秒內請求時候,接著判斷其請求次數(列表長度),若長度大於2,則證明超過5秒內訪問超過2次了,則不允許,否則,通過並將此次訪問時間插入到列表最前面,作為最新訪問時間。

三、基本使用

同樣,先來了解下頻率控制的使用方法,後面在分析源碼

1.在utils目錄下新建立文件,throttle.py,添加頻率控制為每分鐘只能訪問5次

#!/usr/bin/env python3
#_*_ coding:utf-8 _*_
#Author:wd
from rest_framework.throttling import SimpleRateThrottle

class VisitThrottle(SimpleRateThrottle):
    """5秒內最多訪問三次"""
    scope = "WD"  #settings配置文件中的key,用於獲取配置的頻率
def get_cache_key(self, request, view): return self.get_ident(request)

2.settings.py中配置全局頻率控制

REST_FRAMEWORK = {
    #頻率控制配置
    "DEFAULT_THROTTLE_CLASSES":[utils.throttle.VisitThrottle],   #全局配置,
    "DEFAULT_THROTTLE_RATES":{
        WD:5/m,         #速率配置每分鐘不能超過5次訪問,WD是scope定義的值,

    }
}

urls.py

from django.conf.urls import url
from django.contrib import admin
from app01 import views

urlpatterns = [

    url(r^api/v1/auth, views.AuthView.as_view()),
    url(r^api/v1/order, views.OrderView.as_view()),
]

models.py

from django.db import models

class UserInfo(models.Model):
    user_type_choice = (
        (1,"普通用戶"),
        (2,"會員"),
    )
    user_type = models.IntegerField(choices=user_type_choice)
    username = models.CharField(max_length=32,unique=True)
    password = models.CharField(max_length=64)


class UserToken(models.Model):
    user = models.OneToOneField(to=UserInfo)
    token = models.CharField(max_length=64)

訂單視圖

class OrderView(APIView):
    ‘‘‘查看訂單‘‘‘
    from utils.permissions import MyPremission
    authentication_classes = [Authentication,]    #添加認證
    permission_classes = [MyPremission,]           #添加權限控制
    def get(self,request,*args,**kwargs):
        #request.user
        #request.auth
        ret = {code:1000,msg:"你的訂單已經完成",data:"買了一個mac"}
        return JsonResponse(ret,safe=True)

使用postman驗證如下圖,可以看到頻率限制已經起作用了。

技術分享圖片

四、頻率控制源碼剖析

在前面幾篇文章中已經分析了DRF的認證、權限源碼,頻率控制也一樣也從APIView的dispatch方法說起,參考註解:

dispatch()

def dispatch(self, request, *args, **kwargs):
        """
        `.dispatch()` is pretty much the same as Django‘s regular dispatch,
        but with extra hooks for startup, finalize, and exception handling.
        """
        self.args = args
        self.kwargs = kwargs
        #對原始request進行加工,豐富了一些功能
        #Request(
        #     request,
        #     parsers=self.get_parsers(),
        #     authenticators=self.get_authenticators(),
        #     negotiator=self.get_content_negotiator(),
        #     parser_context=parser_context
        # )
        #request(原始request,[BasicAuthentications對象,])
        #獲取原生request,request._request
        #獲取認證類的對象,request.authticators
        #1.封裝request
        request = self.initialize_request(request, *args, **kwargs)
        self.request = request
        self.headers = self.default_response_headers  # deprecate?

        try:
            self.initial(request, *args, **kwargs)

            # Get the appropriate handler method
            if request.method.lower() in self.http_method_names:
                handler = getattr(self, request.method.lower(),
                                  self.http_method_not_allowed)
            else:
                handler = self.http_method_not_allowed

            response = handler(request, *args, **kwargs)

        except Exception as exc:
            response = self.handle_exception(exc)

        self.response = self.finalize_response(request, response, *args, **kwargs)
        return self.response

2.執行inital方法,initial方法中執行check_throttles則開始頻率控制

def initial(self, request, *args, **kwargs):
        """
        Runs anything that needs to occur prior to calling the method handler.
        """
        self.format_kwarg = self.get_format_suffix(**kwargs)

        # Perform content negotiation and store the accepted info on the request
        neg = self.perform_content_negotiation(request)
        request.accepted_renderer, request.accepted_media_type = neg

        # Determine the API version, if versioning is in use.
        version, scheme = self.determine_version(request, *args, **kwargs)
        request.version, request.versioning_scheme = version, scheme

        # Ensure that the incoming request is permitted
        #2.實現認證
        self.perform_authentication(request)
        #3.權限判斷
        self.check_permissions(request)
        #4.頻率限制
        self.check_throttles(request)    

3.下面是check_throttles源碼,與認證、權限一樣采用列表對象方式,通過判斷allow_request方法返回值判斷頻率是否通過

  def check_throttles(self, request):
        """
        Check if request should be throttled.
        Raises an appropriate exception if the request is throttled.
        """
        for throttle in self.get_throttles(): #循環頻率控制類結果
            if not throttle.allow_request(request, self): #判斷其中的allow_requestf返回結果,true則頻率通過,否則返回等待多少秒可以訪問
                self.throttled(request, throttle.wait())

4.get_throttles方法,采用列表生成式生成頻率控制對象,與認證、權限一直

    def get_throttles(self):
        """
        Instantiates and returns the list of throttles that this view uses.
        """
        return [throttle() for throttle in self.throttle_classes] #列表生成式生成控制頻率對象列表

5.self.throttle_classes屬性獲取

class APIView(View):

    # The following policies may be set at either globally, or per-view.
    renderer_classes = api_settings.DEFAULT_RENDERER_CLASSES
    parser_classes = api_settings.DEFAULT_PARSER_CLASSES
    authentication_classes = api_settings.DEFAULT_AUTHENTICATION_CLASSES
    throttle_classes = api_settings.DEFAULT_THROTTLE_CLASSES     #頻率控制全局配置
    permission_classes = api_settings.DEFAULT_PERMISSION_CLASSES
    content_negotiation_class = api_settings.DEFAULT_CONTENT_NEGOTIATION_CLASS
    metadata_class = api_settings.DEFAULT_METADATA_CLASS
    versioning_class = api_settings.DEFAULT_VERSIONING_CLASS

6.通過以上分析,知道了頻率控制是通過判斷每個類中的allow_request放法的返回值來判斷頻率是否通過,下面我們來看看我們所使用的SimpleRateThrottle怎麽實現的,分析部分請看註解:

SimpleRateThrottle類源碼:

class SimpleRateThrottle(BaseThrottle):
    """
    A simple cache implementation, that only requires `.get_cache_key()`
    to be overridden.

    The rate (requests / seconds) is set by a `rate` attribute on the View
    class.  The attribute is a string of the form ‘number_of_requests/period‘.

    Period should be one of: (‘s‘, ‘sec‘, ‘m‘, ‘min‘, ‘h‘, ‘hour‘, ‘d‘, ‘day‘)

    Previous request information used for throttling is stored in the cache.
    """
    cache = default_cache  # 存放請求時間,類似與示例中的大字典,這裏使用的是django的緩存
    timer = time.time
    cache_format = throttle_%(scope)s_%(ident)s
    scope = None
    THROTTLE_RATES = api_settings.DEFAULT_THROTTLE_RATES

    def __init__(self):
        if not getattr(self, rate, None):
            self.rate = self.get_rate()
        self.num_requests, self.duration = self.parse_rate(self.rate)

    def get_cache_key(self, request, view):
# 獲取請求的key標識,必須要有否則會報錯,這裏可以重寫,使用用戶的用戶名、或其他作為key,在示例中使用的get_ident方法用戶獲取用戶IP作為key
""" Should return a unique cache-key which can be used for throttling. Must be overridden. May return `None` if the request should not be throttled. """ raise NotImplementedError(.get_cache_key() must be overridden) def get_rate(self): # 獲取配置文件的配置速率 """ Determine the string representation of the allowed request rate. """ if not getattr(self, scope, None): # 通過獲取共有屬性scope來獲取配置的速率 msg = ("You must set either `.scope` or `.rate` for ‘%s‘ throttle" % self.__class__.__name__) raise ImproperlyConfigured(msg) try: return self.THROTTLE_RATES[self.scope] except KeyError: msg = "No default throttle rate set for ‘%s‘ scope" % self.scope raise ImproperlyConfigured(msg) def parse_rate(self, rate): # 格式化速率 """ Given the request rate string, return a two tuple of: <allowed number of requests>, <period of time in seconds> """ if rate is None: return (None, None) num, period = rate.split(/) # 分離字符串 num_requests = int(num) duration = {s: 1, m: 60, h: 3600, d: 86400}[period[0]] # 轉換時間為數字,示例配置的5/m,m轉為60秒 return (num_requests, duration) def allow_request(self, request, view): # 判斷請求的速率是否通過 """ Implement the check to see if the request should be throttled. On success calls `throttle_success`. On failure calls `throttle_failure`. """ if self.rate is None: return True self.key = self.get_cache_key(request, view) if self.key is None: return True self.history = self.cache.get(self.key, []) self.now = self.timer() # Drop any requests from the history which have now passed the # throttle duration while self.history and self.history[-1] <= self.now - self.duration: # 頻率判斷實現原理,已經舉例進行了說明 self.history.pop() if len(self.history) >= self.num_requests: return self.throttle_failure() return self.throttle_success() def throttle_success(self): # 頻率通過返回true """ Inserts the current request‘s timestamp along with the key into the cache. """ self.history.insert(0, self.now) self.cache.set(self.key, self.history, self.duration) return True def throttle_failure(self): # 不通過返回false """ Called when a request to the API has failed due to throttling. """ return False def wait(self): # 返回等待時間 """ Returns the recommended next request time in seconds. """ if self.history: remaining_duration = self.duration - (self.now - self.history[-1]) else: remaining_duration = self.duration available_requests = self.num_requests - len(self.history) + 1 if available_requests <= 0: return None return remaining_duration / float(available_requests)

get_ident方法源碼,該方法用於獲取請求的IP:

    def get_ident(self, request):
        """
        Identify the machine making the request by parsing HTTP_X_FORWARDED_FOR
        if present and number of proxies is > 0. If not use all of
        HTTP_X_FORWARDED_FOR if it is available, if not use REMOTE_ADDR.
        """
        xff = request.META.get(HTTP_X_FORWARDED_FOR)
        remote_addr = request.META.get(REMOTE_ADDR)
        #這裏request是封裝以後的requst,django原生的是request._request.META 這樣也可以獲取
        num_proxies = api_settings.NUM_PROXIES

        if num_proxies is not None:
            if num_proxies == 0 or xff is None:
                return remote_addr
            addrs = xff.split(,)
            client_addr = addrs[-min(num_proxies, len(addrs))]
            return client_addr.strip()

        return ‘‘.join(xff.split()) if xff else remote_addr
五、內置頻率控制類

DRF內置了多種頻率控制類提供我們使用,其核心原理都是通過判斷request_allow方法返回值來判斷頻率是否通過,通過wait方法返回等待時間。

1.BaseThrottle:最基本的頻率控制需要重寫allow_request方法和wait方法

技術分享圖片
class BaseThrottle(object):
    """
    Rate throttling of requests.
    """

    def allow_request(self, request, view):
        """
        Return `True` if the request should be allowed, `False` otherwise.
        """
        raise NotImplementedError(.allow_request() must be overridden)

    def get_ident(self, request):
        """
        Identify the machine making the request by parsing HTTP_X_FORWARDED_FOR
        if present and number of proxies is > 0. If not use all of
        HTTP_X_FORWARDED_FOR if it is available, if not use REMOTE_ADDR.
        """
        xff = request.META.get(HTTP_X_FORWARDED_FOR)
        remote_addr = request.META.get(REMOTE_ADDR)
        num_proxies = api_settings.NUM_PROXIES

        if num_proxies is not None:
            if num_proxies == 0 or xff is None:
                return remote_addr
            addrs = xff.split(,)
            client_addr = addrs[-min(num_proxies, len(addrs))]
            return client_addr.strip()

        return ‘‘.join(xff.split()) if xff else remote_addr

    def wait(self):
        """
        Optionally, return a recommended number of seconds to wait before
        the next request.
        """
        return None
class BaseThrottle(object)

2.SimpleRateThrottle:示例中已經使用,並對源碼和原理進行了分析。

技術分享圖片
class SimpleRateThrottle(BaseThrottle):
    """
    A simple cache implementation, that only requires `.get_cache_key()`
    to be overridden.

    The rate (requests / seconds) is set by a `rate` attribute on the View
    class.  The attribute is a string of the form ‘number_of_requests/period‘.

    Period should be one of: (‘s‘, ‘sec‘, ‘m‘, ‘min‘, ‘h‘, ‘hour‘, ‘d‘, ‘day‘)

    Previous request information used for throttling is stored in the cache.
    """
    cache = default_cache
    timer = time.time
    cache_format = throttle_%(scope)s_%(ident)s
    scope = None
    THROTTLE_RATES = api_settings.DEFAULT_THROTTLE_RATES

    def __init__(self):
        if not getattr(self, rate, None):
            self.rate = self.get_rate()
        self.num_requests, self.duration = self.parse_rate(self.rate)

    def get_cache_key(self, request, view):
        """
        Should return a unique cache-key which can be used for throttling.
        Must be overridden.

        May return `None` if the request should not be throttled.
        """
        raise NotImplementedError(.get_cache_key() must be overridden)

    def get_rate(self):
        """
        Determine the string representation of the allowed request rate.
        """
        if not getattr(self, scope, None):
            msg = ("You must set either `.scope` or `.rate` for ‘%s‘ throttle" %
                   self.__class__.__name__)
            raise ImproperlyConfigured(msg)

        try:
            return self.THROTTLE_RATES[self.scope]
        except KeyError:
            msg = "No default throttle rate set for ‘%s‘ scope" % self.scope
            raise ImproperlyConfigured(msg)

    def parse_rate(self, rate):
        """
        Given the request rate string, return a two tuple of:
        <allowed number of requests>, <period of time in seconds>
        """
        if rate is None:
            return (None, None)
        num, period = rate.split(/)
        num_requests = int(num)
        duration = {s: 1, m: 60, h: 3600, d: 86400}[period[0]]
        return (num_requests, duration)

    def allow_request(self, request, view):
        """
        Implement the check to see if the request should be throttled.

        On success calls `throttle_success`.
        On failure calls `throttle_failure`.
        """
        if self.rate is None:
            return True

        self.key = self.get_cache_key(request, view)
        if self.key is None:
            return True

        self.history = self.cache.get(self.key, [])
        self.now = self.timer()

        # Drop any requests from the history which have now passed the
        # throttle duration
        while self.history and self.history[-1] <= self.now - self.duration:
            self.history.pop()
        if len(self.history) >= self.num_requests:
            return self.throttle_failure()
        return self.throttle_success()

    def throttle_success(self):
        """
        Inserts the current request‘s timestamp along with the key
        into the cache.
        """
        self.history.insert(0, self.now)
        self.cache.set(self.key, self.history, self.duration)
        return True

    def throttle_failure(self):
        """
        Called when a request to the API has failed due to throttling.
        """
        return False

    def wait(self):
        """
        Returns the recommended next request time in seconds.
        """
        if self.history:
            remaining_duration = self.duration - (self.now - self.history[-1])
        else:
            remaining_duration = self.duration

        available_requests = self.num_requests - len(self.history) + 1
        if available_requests <= 0:
            return None

        return remaining_duration / float(available_requests)
View Code

3.AnonRateThrottle:匿名用戶頻率控制

技術分享圖片
class AnonRateThrottle(SimpleRateThrottle):
    """
    Limits the rate of API calls that may be made by a anonymous users.

    The IP address of the request will be used as the unique cache key.
    """
    scope = anon

    def get_cache_key(self, request, view):
        if request.user.is_authenticated:
            return None  # Only throttle unauthenticated requests.

        return self.cache_format % {
            scope: self.scope,
            ident: self.get_ident(request)
        }
AnonRateThrottle

4.UserRateThrottle:基於SimpleRateThrottle,對用戶的頻率控制

技術分享圖片
class UserRateThrottle(SimpleRateThrottle):
    """
    Limits the rate of API calls that may be made by a given user.

    The user id will be used as a unique cache key if the user is
    authenticated.  For anonymous requests, the IP address of the request will
    be used.
    """
    scope = user

    def get_cache_key(self, request, view):
        if request.user.is_authenticated:
            ident = request.user.pk
        else:
            ident = self.get_ident(request)

        return self.cache_format % {
            scope: self.scope,
            ident: ident
        }
UserRateThrottle
六、自定義頻率控制

自定義頻率控制無非實現request_allow方法和wait方法,你可以根據實際需求來定制你的頻率控制,下面是示例:

from rest_framework.throttling import BaseThrottle
import time

REQUEST_RECORD = {}  # 訪問記錄,可使用nosql數據庫


class VisitThrottle(BaseThrottle):
    ‘‘‘60s內最多能訪問5次‘‘‘

    def __init__(self):
        self.history = None

    def allow_request(self, request, view):
        # 獲取用戶ip (get_ident)
        remote_addr = self.get_ident(request)
        ctime = time.time()

        if remote_addr not in REQUEST_RECORD:
            REQUEST_RECORD[remote_addr] = [ctime, ]  # 保持請求的時間,形式{ip:[時間,]}
            return True  # True表示可以訪問
        # 獲取當前ip的歷史訪問記錄
        history = REQUEST_RECORD.get(remote_addr)
       
        self.history = history

       
        while history and history[-1] < ctime - 60:
            # while循環確保每列表中是最新的60秒內的請求
            
            history.pop()
        # 訪問記錄小於5次,將本次請求插入到最前面,作為最新的請求
        if len(history) < 5:
            history.insert(0, ctime)
            return True

    def wait(self):
        ‘‘‘返回等待時間‘‘‘
        ctime = time.time()
        return 60 - (ctime - self.history[-1])
七、總結

1.使用方法:

  • 繼承BaseThrottle類
  • 重寫request_allow方法和wait方法,request_allow方法返回true代表通過,否則拒絕,wait返回等待的時間

2.配置

###全局使用

REST_FRAMEWORK = {
    #頻率控制配置
    "DEFAULT_THROTTLE_CLASSES":[utils.throttle.VisitThrottle],   #全局配置,
    "DEFAULT_THROTTLE_RATES":{
        WD:5/m,         #速率配置每分鐘不能超過5次訪問,WD是scope定義的值

    }
}

##單一視圖使用
throttle_classes = [VisitThrottle,]

##優先級
單一視圖>全局

Django Rest Framework源碼剖析(三)-----頻率控制