Django Restful Framework【第三篇】認證、權限、限制訪問頻率
阿新 • • 發佈:2018-02-21
authent per ted inf png ip限制 users -a python
一、認證
認證請求頭
views.py#!/usr/bin/env python # -*- coding:utf-8 -*- from rest_framework.views import APIView from rest_framework.response import Response from rest_framework.authentication import BaseAuthentication from rest_framework.permissions import BasePermission from rest_framework.request import Request from rest_framework import exceptions token_list = [ ‘sfsfss123kuf3j123‘, ‘asijnfowerkkf9812‘, ] class TestAuthentication(BaseAuthentication): def authenticate(self, request): """ 用戶認證,如果驗證成功後返回元組: (用戶,用戶Token) :param request: :return: None,表示跳過該驗證; 如果跳過了所有認證,默認用戶和Token和使用配置文件進行設置 self._authenticator = None if api_settings.UNAUTHENTICATED_USER: self.user = api_settings.UNAUTHENTICATED_USER() # 默認值為:匿名用戶 else: self.user = None if api_settings.UNAUTHENTICATED_TOKEN: self.auth = api_settings.UNAUTHENTICATED_TOKEN()# 默認值為:None else: self.auth = None (user,token)表示驗證通過並設置用戶名和Token; AuthenticationFailed異常 """ val = request.query_params.get(‘token‘) if val not in token_list: raise exceptions.AuthenticationFailed("用戶認證失敗") return (‘登錄用戶‘, ‘用戶token‘) def authenticate_header(self, request): """ Return a string to be used as the value of the `WWW-Authenticate` header in a `401 Unauthenticated` response, or `None` if the authentication scheme should return `403 Permission Denied` responses. """ pass class TestPermission(BasePermission): message = "權限驗證失敗" def has_permission(self, request, view): """ 判斷是否有權限訪問當前請求 Return `True` if permission is granted, `False` otherwise. :param request: :param view: :return: True有權限;False無權限 """ if request.user == "管理員": return True # GenericAPIView中get_object時調用 def has_object_permission(self, request, view, obj): """ 視圖繼承GenericAPIView,並在其中使用get_object時獲取對象時,觸發單獨對象權限驗證 Return `True` if permission is granted, `False` otherwise. :param request: :param view: :param obj: :return: True有權限;False無權限 """ if request.user == "管理員": return True class TestView(APIView): # 認證的動作是由request.user觸發 authentication_classes = [TestAuthentication, ] # 權限 # 循環執行所有的權限 permission_classes = [TestPermission, ] def get(self, request, *args, **kwargs): # self.dispatch print(request.user) print(request.auth) return Response(‘GET請求,響應內容‘) def post(self, request, *args, **kwargs): return Response(‘POST請求,響應內容‘) def put(self, request, *args, **kwargs): return Response(‘PUT請求,響應內容‘)
自定義認證功能
class MyAuthtication(BasicAuthentication): def authenticate(self, request): token = request.query_params.get(‘token‘) #註意是沒有GET的,用query_params表示 if token == ‘zxxzzxzc‘: return (‘uuuuuu‘,‘afsdsgdf‘) #返回user,auth # raise AuthenticationFailed(‘認證錯誤‘) #只要拋出認證錯誤這樣的異常就會去執行下面的函數 raise APIException(‘認證錯誤‘) def authenticate_header(self, request): #認證不成功的時候執行 return ‘Basic reala="api"‘ class UserView(APIView): authentication_classes = [MyAuthtication,] def get(self,request,*args,**kwargs): print(request.user) print(request.auth) return Response(‘用戶列表‘)
二、權限
1、需求:Host是匿名用戶和用戶都能訪問 #匿名用戶的request.user = none;User只有註冊用戶能訪問
urls.pyfrom app03 import views from django.conf.urls import url urlpatterns = [ # django rest framework url(‘^auth/‘, views.AuthView.as_view()), url(r‘^hosts/‘, views.HostView.as_view()), url(r‘^users/‘, views.UsersView.as_view()), url(r‘^salary/‘, views.SalaryView.as_view()), ]
views.py
from django.shortcuts import render from rest_framework.views import APIView #繼承的view from rest_framework.response import Response #友好的返回 from rest_framework.authentication import BaseAuthentication #認證的類 from rest_framework.authentication import BasicAuthentication from app01 import models from rest_framework import exceptions from rest_framework.permissions import AllowAny #權限在這個類裏面 from rest_framework.throttling import BaseThrottle,SimpleRateThrottle # Create your views here. # +++++++++++++++認證類和權限類======================== class MyAuthentication(BaseAuthentication): def authenticate(self, request): token = request.query_params.get(‘token‘) obj = models.UserInfo.objects.filter(token=token).first() if obj : #如果認證成功,返回用戶名和auth return (obj.username,obj) return None #如果沒有認證成功就不處理,進行下一步 def authenticate_header(self, request): pass class MyPermission(object): message = ‘無權訪問‘ def has_permission(self,request,view): #has_permission裏面的self是view視圖對象 if request.user: return True #如果不是匿名用戶就說明有權限 return False #否則無權限 class AdminPermission(object): message = ‘無權訪問‘ def has_permission(self, request, view): # has_permission裏面的self是view視圖對象 if request.user==‘haiyun‘: return True # 返回True表示有權限 return False #返回False表示無權限 # +++++++++++++++++++++++++++ class AuthView(APIView): authentication_classes = [] #認證頁面不需要認證 def get(self,request): self.dispatch return ‘認證列表‘ class HostView(APIView): ‘‘‘需求: Host是匿名用戶和用戶都能訪問 #匿名用戶的request.user = none User只有註冊用戶能訪問 ‘‘‘ authentication_classes = [MyAuthentication,] permission_classes = [] #都能訪問就沒必要設置權限了 def get(self,request): print(request.user) print(request.auth) return Response(‘主機列表‘) class UsersView(APIView): ‘‘‘用戶能訪問,request.user裏面有值‘‘‘ authentication_classes = [MyAuthentication,] permission_classes = [MyPermission,] def get(self,request): print(request.user,‘111111111‘) return Response(‘用戶列表‘) def permission_denied(self, request, message=None): """ If request is not permitted, determine what kind of exception to raise. """ if request.authenticators and not request.successful_authenticator: ‘‘‘如果沒有通過認證,並且權限中return False了,就會報下面的這個異常了‘‘‘ raise exceptions.NotAuthenticated(detail=‘無權訪問‘) raise exceptions.PermissionDenied(detail=message)
認證和權限配合使用
class SalaryView(APIView): ‘‘‘用戶能訪問‘‘‘ message =‘無權訪問‘ authentication_classes = [MyAuthentication,] #驗證是不是用戶 permission_classes = [MyPermission,AdminPermission,] #再看用戶有沒有權限,如果有權限在判斷有沒有管理員的權限 def get(self,request): return Response(‘薪資列表‘) def permission_denied(self, request, message=None): """ If request is not permitted, determine what kind of exception to raise. """ if request.authenticators and not request.successful_authenticator: ‘‘‘如果沒有通過認證,並且權限中return False了,就會報下面的這個異常了‘‘‘ raise exceptions.NotAuthenticated(detail=‘無權訪問‘) raise exceptions.PermissionDenied(detail=message)
如果遇上這樣的,還可以自定制,參考源碼
def check_permissions(self, request): """ Check if the request should be permitted. Raises an appropriate exception if the request is not permitted. """ for permission in self.get_permissions(): #循環每一個permission對象,調用has_permission #如果False,則拋出異常 #True 說明有權訪問 if not permission.has_permission(request, self): self.permission_denied( request, message=getattr(permission, ‘message‘, None) )
def permission_denied(self, request, message=None): """ If request is not permitted, determine what kind of exception to raise. """ if request.authenticators and not request.successful_authenticator: ‘‘‘如果沒有通過認證,並且權限中return False了,就會報下面的這個異常了‘‘‘ raise exceptions.NotAuthenticated() raise exceptions.PermissionDenied(detail=message)
那麽我們可以重寫permission_denied這個方法,如下:
views.pyclass UsersView(APIView): ‘‘‘用戶能訪問,request.user裏面有值‘‘‘ authentication_classes = [MyAuthentication,] permission_classes = [MyPermission,] def get(self,request): return Response(‘用戶列表‘) def permission_denied(self, request, message=None): """ If request is not permitted, determine what kind of exception to raise. """ if request.authenticators and not request.successful_authenticator: ‘‘‘如果沒有通過認證,並且權限中return False了,就會報下面的這個異常了‘‘‘ raise exceptions.NotAuthenticated(detail=‘無權訪問‘) raise exceptions.PermissionDenied(detail=message)
2. 全局使用
上述操作中均是對單獨視圖進行特殊配置,如果想要對全局進行配置,則需要再配置文件中寫入即可。
settings.pyREST_FRAMEWORK = { ‘UNAUTHENTICATED_USER‘: None, ‘UNAUTHENTICATED_TOKEN‘: None, #將匿名用戶設置為None "DEFAULT_AUTHENTICATION_CLASSES": [ "app01.utils.MyAuthentication", ], ‘DEFAULT_PERMISSION_CLASSES‘:[ "app03.utils.MyPermission",#設置路徑, ] }
Views.py
class AuthView(APIView): authentication_classes = [] #認證頁面不需要認證 def get(self,request): self.dispatch return ‘認證列表‘ class HostView(APIView): ‘‘‘需求: Host是匿名用戶和用戶都能訪問 #匿名用戶的request.user = none User只有註冊用戶能訪問 ‘‘‘ authentication_classes = [MyAuthentication,] permission_classes = [] #都能訪問就沒必要設置權限了 def get(self,request): print(request.user) print(request.auth) return Response(‘主機列表‘) class UsersView(APIView): ‘‘‘用戶能訪問,request.user裏面有值‘‘‘ authentication_classes = [MyAuthentication,] permission_classes = [MyPermission,] def get(self,request): print(request.user,‘111111111‘) return Response(‘用戶列表‘) def permission_denied(self, request, message=None): """ If request is not permitted, determine what kind of exception to raise. """ if request.authenticators and not request.successful_authenticator: ‘‘‘如果沒有通過認證,並且權限中return False了,就會報下面的這個異常了‘‘‘ raise exceptions.NotAuthenticated(detail=‘無權訪問‘) raise exceptions.PermissionDenied(detail=message) class SalaryView(APIView): ‘‘‘用戶能訪問‘‘‘ message =‘無權訪問‘ authentication_classes = [MyAuthentication,] #驗證是不是用戶 permission_classes = [MyPermission,AdminPermission,] #再看用戶有沒有權限,如果有權限在判斷有沒有管理員的權限 def get(self,request): return Response(‘薪資列表‘) def permission_denied(self, request, message=None): """ If request is not permitted, determine what kind of exception to raise. """ if request.authenticators and not request.successful_authenticator: ‘‘‘如果沒有通過認證,並且權限中return False了,就會報下面的這個異常了‘‘‘ raise exceptions.NotAuthenticated(detail=‘無權訪問‘) raise exceptions.PermissionDenied(detail=message)
三、限流
1、為什麽要限流呢?
答:防爬
2、限制訪問頻率源碼分析
self.check_throttles(request)self.check_throttles(request)check_throttles
def check_throttles(self, request): """ Check if request should be throttled. Raises an appropriate exception if the request is throttled. """ for throttle in self.get_throttles(): #循環每一個throttle對象,執行allow_request方法 # allow_request: #返回False,說明限制訪問頻率 #返回True,說明不限制,通行 if not throttle.allow_request(request, self): self.throttled(request, throttle.wait()) #throttle.wait()表示還要等多少秒就能訪問了get_throttles
def get_throttles(self): """ Instantiates and returns the list of throttles that this view uses. """ #返回對象 return [throttle() for throttle in self.throttle_classes]找到類,可自定制類throttle_classes
throttle_classes = api_settings.DEFAULT_THROTTLE_CLASSESBaseThrottle
class BaseThrottle(object): """ Rate throttling of requests. """ def allow_request(self, request, view): """ Return `True` if the request should be allowed, `False` otherwise. """ raise NotImplementedError(‘.allow_request() must be overridden‘) def get_ident(self, request): """ Identify the machine making the request by parsing HTTP_X_FORWARDED_FOR if present and number of proxies is > 0. If not use all of HTTP_X_FORWARDED_FOR if it is available, if not use REMOTE_ADDR. """ xff = request.META.get(‘HTTP_X_FORWARDED_FOR‘) remote_addr = request.META.get(‘REMOTE_ADDR‘) num_proxies = api_settings.NUM_PROXIES if num_proxies is not None: if num_proxies == 0 or xff is None: return remote_addr addrs = xff.split(‘,‘) client_addr = addrs[-min(num_proxies, len(addrs))] return client_addr.strip() return ‘‘.join(xff.split()) if xff else remote_addr def wait(self): """ Optionally, return a recommended number of seconds to wait before the next request. """ return None也可以重寫allow_request方法 可自定制返回的錯誤信息throttled
def throttled(self, request, wait): """ If request is throttled, determine what kind of exception to raise. """ raise exceptions.Throttled(wait)raise exceptions.Throttled(wait)錯誤信息詳情
class Throttled(APIException): status_code = status.HTTP_429_TOO_MANY_REQUESTS default_detail = _(‘Request was throttled.‘) extra_detail_singular = ‘Expected available in {wait} second.‘ extra_detail_plural = ‘Expected available in {wait} seconds.‘ default_code = ‘throttled‘ def __init__(self, wait=None, detail=None, code=None): if detail is None: detail = force_text(self.default_detail) if wait is not None: wait = math.ceil(wait) detail = ‘ ‘.join(( detail, force_text(ungettext(self.extra_detail_singular.format(wait=wait), self.extra_detail_plural.format(wait=wait), wait)))) self.wait = wait super(Throttled, self).__init__(detail, code)
下面來看看最簡單的從源碼中分析的示例,這只是舉例說明了一下
urls.pyfrom django.conf.urls import url from app04 import views urlpatterns = [ url(‘limit/‘,views.LimitView.as_view()), ]views.py
from django.shortcuts import render from rest_framework.views import APIView from rest_framework.response import Response from rest_framework import exceptions # from rest_framewor import # Create your views here. class MyThrottle(object): def allow_request(self,request,view): #返回False,限制 #返回True,不限制 pass def wait(self): return 1000 class LimitView(APIView): authentication_classes = [] #不讓認證用戶 permission_classes = [] #不讓驗證權限 throttle_classes = [MyThrottle, ] def get(self,request): # self.dispatch return Response(‘控制訪問頻率示例‘) def throttled(self, request, wait): ‘‘‘可定制方法設置中文錯誤‘‘‘ # raise exceptions.Throttled(wait) class MyThrottle(exceptions.Throttled): default_detail = ‘請求被限制‘ extra_detail_singular = ‘Expected available in {wait} second.‘ extra_detail_plural = ‘Expected available in {wait} seconds.‘ default_code = ‘還需要再等{wait}秒‘ raise MyThrottle(wait)
3、需求:對匿名用戶進行限制,每個用戶一分鐘允許訪問10次(只針對用戶來說)
a、基於用戶IP限制訪問頻率
流程分析:
- 先獲取用戶信息,如果是匿名用戶,獲取IP。如果不是匿名用戶就可以獲取用戶名。
- 獲取匿名用戶IP,在request裏面獲取,比如IP= 1.1.1.1。
- 吧獲取到的IP添加到到recode字典裏面,需要在添加之前先限制一下。
- 如果時間間隔大於60秒,說明時間久遠了,就把那個時間給剔除 了pop。在timelist列表裏面現在留的是有效的訪問時間段。
- 然後判斷他的訪問次數超過了10次沒有,如果超過了時間就return False。
- 美中不足的是時間是固定的,我們改變他為動態的:列表裏面最開始進來的時間和當前的時間進行比較,看需要等多久。
具體實現:
views初級版本from django.shortcuts import render from rest_framework.views import APIView from rest_framework.response import Response from rest_framework import exceptions from rest_framework.throttling import BaseThrottle,SimpleRateThrottle #限制訪問頻率 import time # Create your views here. RECORD = {} class MyThrottle(BaseThrottle): def allow_request(self,request,view): ‘‘‘對匿名用戶進行限制,每個用戶一分鐘訪問10次 ‘‘‘ ctime = time.time() ip = ‘1.1.1.1‘ if ip not in RECORD: RECORD[ip] = [ctime] else: #[152042123,15204212,3152042,123152042123] time_list = RECORD[ip] #獲取ip裏面的值 while True: val = time_list[-1]#取出最後一個時間,也就是訪問最早的時間 if (ctime-60)>val: #吧時間大於60秒的給剔除了 time_list.pop() #剔除了之後timelist裏面就是有效的時間了,在進行判斷他的訪問次數是不是超過10次 else: break if len(time_list) >10: return False # 返回False,限制 time_list.insert(0, ctime) return True #返回True,不限制 def wait(self): ctime = time.time() first_in_time = RECORD[‘1.1.1.1‘][-1] wt = 60-(ctime-first_in_time) return wt class LimitView(APIView): authentication_classes = [] #不讓認證用戶 permission_classes = [] #不讓驗證權限 throttle_classes = [MyThrottle, ] def get(self,request): # self.dispatch return Response(‘控制訪問頻率示例‘) def throttled(self, request, wait): ‘‘‘可定制方法設置中文錯誤‘‘‘ # raise exceptions.Throttled(wait) class MyThrottle(exceptions.Throttled): default_detail = ‘請求被限制‘ extra_detail_singular = ‘Expected available in {wait} second.‘ extra_detail_plural = ‘Expected available in {wait} seconds.‘ default_code = ‘還需要再等{wait}秒‘ raise MyThrottle(wait)稍微做了改動
# from django.shortcuts import render # from rest_framework.views import APIView # from rest_framework.response import Response # from rest_framework import exceptions # from rest_framework.throttling import BaseThrottle,SimpleRateThrottle #限制訪問頻率 # import time # # Create your views here. # RECORD = {} # class MyThrottle(BaseThrottle): # # def allow_request(self,request,view): # ‘‘‘對匿名用戶進行限制,每個用戶一分鐘訪問10次 ‘‘‘ # ctime = time.time() # ip = ‘1.1.1.1‘ # if ip not in RECORD: # RECORD[ip] = [ctime] # else: # #[152042123,15204212,3152042,123152042123] # time_list = RECORD[ip] #獲取ip裏面的值 # while True: # val = time_list[-1]#取出最後一個時間,也就是訪問最早的時間 # if (ctime-60)>val: #吧時間大於60秒的給剔除了 # time_list.pop() # #剔除了之後timelist裏面就是有效的時間了,在進行判斷他的訪問次數是不是超過10次 # else: # break # if len(time_list) >10: # return False # 返回False,限制 # time_list.insert(0, ctime) # return True #返回True,不限制 # # def wait(self): # ctime = time.time() # first_in_time = RECORD[‘1.1.1.1‘][-1] # wt = 60-(ctime-first_in_time) # return wt # # # class LimitView(APIView): # authentication_classes = [] #不讓認證用戶 # permission_classes = [] #不讓驗證權限 # throttle_classes = [MyThrottle, ] # def get(self,request): # # self.dispatch # return Response(‘控制訪問頻率示例‘) # # def throttled(self, request, wait): # ‘‘‘可定制方法設置中文錯誤‘‘‘ # # raise exceptions.Throttled(wait) # class MyThrottle(exceptions.Throttled): # default_detail = ‘請求被限制‘ # extra_detail_singular = ‘Expected available in {wait} second.‘ # extra_detail_plural = ‘Expected available in {wait} seconds.‘ # default_code = ‘還需要再等{wait}秒‘ # raise MyThrottle(wait) from django.shortcuts import render from rest_framework.views import APIView from rest_framework.response import Response from rest_framework import exceptions from rest_framework.throttling import BaseThrottle,SimpleRateThrottle #限制訪問頻率 import time # Create your views here. RECORD = {} class MyThrottle(BaseThrottle): def allow_request(self,request,view): ‘‘‘對匿名用戶進行限制,每個用戶一分鐘訪問10次 ‘‘‘ ctime = time.time() self.ip =self.get_ident(request) if self.ip not in RECORD: RECORD[self.ip] = [ctime] else: #[152042123,15204212,3152042,123152042123] time_list = RECORD[self.ip] #獲取ip裏面的值 while True: val = time_list[-1]#取出最後一個時間,也就是訪問最早的時間 if (ctime-60)>val: #吧時間大於60秒的給剔除了 time_list.pop() #剔除了之後timelist裏面就是有效的時間了,在進行判斷他的訪問次數是不是超過10次 else: break if len(time_list) >10: return False # 返回False,限制 time_list.insert(0, ctime) return True #返回True,不限制 def wait(self): ctime = time.time() first_in_time = RECORD[self.ip][-1] wt = 60-(ctime-first_in_time) return wt class LimitView(APIView): authentication_classes = [] #不讓認證用戶 permission_classes = [] #不讓驗證權限 throttle_classes = [MyThrottle, ] def get(self,request): # self.dispatch return Response(‘控制訪問頻率示例‘) def throttled(self, request, wait): ‘‘‘可定制方法設置中文錯誤‘‘‘ # raise exceptions.Throttled(wait) class MyThrottle(exceptions.Throttled): default_detail = ‘請求被限制‘ extra_detail_singular = ‘Expected available in {wait} second.‘ extra_detail_plural = ‘Expected available in {wait} seconds.‘ default_code = ‘還需要再等{wait}秒‘ raise MyThrottle(wait)
b、用resetframework內部的限制訪問頻率(利於Django緩存)
源碼分析:
from rest_framework.throttling import BaseThrottle,SimpleRateThrottle #限制訪問頻率BaseThrottle相當於一個抽象類
class BaseThrottle(object): """ Rate throttling of requests. """ def allow_request(self, request, view): """ Return `True` if the request should be allowed, `False` otherwise. """ raise NotImplementedError(‘.allow_request() must be overridden‘) def get_ident(self, request): #唯一標識 """ Identify the machine making the request by parsing HTTP_X_FORWARDED_FOR if present and number of proxies is > 0. If not use all of HTTP_X_FORWARDED_FOR if it is available, if not use REMOTE_ADDR. """ xff = request.META.get(‘HTTP_X_FORWARDED_FOR‘) remote_addr = request.META.get(‘REMOTE_ADDR‘) #獲取IP等 num_proxies = api_settings.NUM_PROXIES if num_proxies is not None: if num_proxies == 0 or xff is None: return remote_addr addrs = xff.split(‘,‘) client_addr = addrs[-min(num_proxies, len(addrs))] return client_addr.strip() return ‘‘.join(xff.split()) if xff else remote_addr def wait(self): """ Optionally, return a recommended number of seconds to wait before the next request. """ return None
SimpleRateThrottle
class SimpleRateThrottle(BaseThrottle): """ 一個簡單的緩存實現,只需要` get_cache_key() `。被覆蓋。 速率(請求/秒)是由視圖上的“速率”屬性設置的。類。該屬性是一個字符串的形式number_of_requests /期。 周期應該是:(的),“秒”,“M”,“min”,“h”,“小時”,“D”,“一天”。 以前用於節流的請求信息存儲在高速緩存中。 A simple cache implementation, that only requires `.get_cache_key()` to be overridden. The rate (requests / seconds) is set by a `rate` attribute on the View class. The attribute is a string of the form ‘number_of_requests/period‘. Period should be one of: (‘s‘, ‘sec‘, ‘m‘, ‘min‘, ‘h‘, ‘hour‘, ‘d‘, ‘day‘) Previous request information used for throttling is stored in the cache. """ cache = default_cache timer = time.time cache_format = ‘throttle_%(scope)s_%(ident)s‘ scope = None THROTTLE_RATES = api_settings.DEFAULT_THROTTLE_RATES def __init__(self): if not getattr(self, ‘rate‘, None): self.rate = self.get_rate() self.num_requests, self.duration = self.parse_rate(self.rate) def get_cache_key(self, request, view):#這個相當於是一個半成品,我們可以來補充它 """ Should return a unique cache-key which can be used for throttling. Must be overridden. May return `None` if the request should not be throttled. """ raise NotImplementedError(‘.get_cache_key() must be overridden‘) def get_rate(self): """ Determine the string representation of the allowed request rate. """ if not getattr(self, ‘scope‘, None): msg = ("You must set either `.scope` or `.rate` for ‘%s‘ throttle" % self.__class__.__name__) raise ImproperlyConfigured(msg) try: return self.THROTTLE_RATES[self.scope] except KeyError: msg = "No default throttle rate set for ‘%s‘ scope" % self.scope raise ImproperlyConfigured(msg) def parse_rate(self, rate): """ Given the request rate string, return a two tuple of: <allowed number of requests>, <period of time in seconds> """ if rate is None: return (None, None) num, period = rate.split(‘/‘) num_requests = int(num) duration = {‘s‘: 1, ‘m‘: 60, ‘h‘: 3600, ‘d‘: 86400}[period[0]] return (num_requests, duration) #1、一進來會先執行他, def allow_request(self, request, view): """ Implement the check to see if the request should be throttled. On success calls `throttle_success`. On failure calls `throttle_failure`. """ if self.rate is None: return True self.key = self.get_cache_key(request, view) #2、執行get_cache_key,這裏的self.key就相當於我們舉例ip if self.key is None: return True self.history = self.cache.get(self.key, []) #3、得到的key,默認是一個列表,賦值給了self.history, # 這時候self.history就是每一個ip對應的訪問記錄 self.now = self.timer() # Drop any requests from the history which have now passed the # throttle duration while self.history and self.history[-1] <= self.now - self.duration: self.history.pop() if len(self.history) >= self.num_requests: return self.throttle_failure() return self.throttle_success() def throttle_success(self): """ Inserts the current request‘s timestamp along with the key into the cache. """ self.history.insert(0, self.now) self.cache.set(self.key, self.history, self.duration) return True def throttle_failure(self): """ Called when a request to the API has failed due to throttling. """ return False def wait(self): """ Returns the recommended next request time in seconds. """ if self.history: remaining_duration = self.duration - (self.now - self.history[-1]) else: remaining_duration = self.duration available_requests = self.num_requests - len(self.history) + 1 if available_requests <= 0: return None return remaining_duration / float(available_requests)
請求一進來會先執行SimpleRateThrottle這個類的構造方法
__init__def __init__(self): if not getattr(self, ‘rate‘, None): self.rate = self.get_rate() #點進去看到需要些一個scope ,2/m self.num_requests, self.duration = self.parse_rate(self.rate)
get_rate
def get_rate(self): """ Determine the string representation of the allowed request rate. """ if not getattr(self, ‘scope‘, None): #檢測必須有scope,沒有就報錯了 msg = ("You must set either `.scope` or `.rate` for ‘%s‘ throttle" % self.__class__.__name__) raise ImproperlyConfigured(msg) try: return self.THROTTLE_RATES[self.scope] except KeyError: msg = "No default throttle rate set for ‘%s‘ scope" % self.scope raise ImproperlyConfigured(msg)
parse_rate
def parse_rate(self, rate): """ Given the request rate string, return a two tuple of: <allowed number of requests>, <period of time in seconds> """ if rate is None: return (None, None) num, period = rate.split(‘/‘) num_requests = int(num) duration = {‘s‘: 1, ‘m‘: 60, ‘h‘: 3600, ‘d‘: 86400}[period[0]] return (num_requests, duration)
allow_request
#2、接下來會先執行他, def allow_request(self, request, view): """ Implement the check to see if the request should be throttled. On success calls `throttle_success`. On failure calls `throttle_failure`. """ if self.rate is None: return True self.key = self.get_cache_key(request, view) #2、執行get_cache_key,這裏的self.key就相當於我們舉例ip if self.key is None: return True #不限制 # [114521212,11452121211,45212121145,21212114,521212] self.history = self.cache.get(self.key, []) #3、得到的key,默認是一個列表,賦值給了self.history, # 這時候self.history就是每一個ip對應的訪問記錄 self.now = self.timer() # Drop any requests from the history which have now passed the # throttle duration while self.history and self.history[-1] <= self.now - self.duration: self.history.pop() if len(self.history) >= self.num_requests: return self.throttle_failure() return self.throttle_success()
wait
def wait(self): """ Returns the recommended next request time in seconds. """ if self.history: remaining_duration = self.duration - (self.now - self.history[-1]) else: remaining_duration = self.duration available_requests = self.num_requests - len(self.history) + 1 if available_requests <= 0: return None return remaining_duration / float(available_requests)
代碼實現:
views.py###########用resetframework內部的限制訪問頻率############## class MySimpleRateThrottle(SimpleRateThrottle): scope = ‘xxx‘ def get_cache_key(self, request, view): return self.get_ident(request) #返回唯一標識IP class LimitView(APIView): authentication_classes = [] #不讓認證用戶 permission_classes = [] #不讓驗證權限 throttle_classes = [MySimpleRateThrottle, ] def get(self,request): # self.dispatch return Response(‘控制訪問頻率示例‘) def throttled(self, request, wait): ‘‘‘可定制方法設置中文錯誤‘‘‘ # raise exceptions.Throttled(wait) class MyThrottle(exceptions.Throttled): default_detail = ‘請求被限制‘ extra_detail_singular = ‘Expected available in {wait} second.‘ extra_detail_plural = ‘Expected available in {wait} seconds.‘ default_code = ‘還需要再等{wait}秒‘ raise MyThrottle(wait)
記得在settings裏面配置
settings.pyREST_FRAMEWORK = { ‘UNAUTHENTICATED_USER‘: None, ‘UNAUTHENTICATED_TOKEN‘: None, #將匿名用戶設置為None "DEFAULT_AUTHENTICATION_CLASSES": [ "app01.utils.MyAuthentication", ], ‘DEFAULT_PERMISSION_CLASSES‘:[ # "app03.utils.MyPermission",#設置路徑, ], ‘DEFAULT_THROTTLE_RATES‘:{ ‘xxx‘:‘2/minute‘ #2分鐘 } } #緩存:放在文件 CACHES = { ‘default‘: { ‘BACKEND‘: ‘django.core.cache.backends.filebased.FileBasedCache‘, ‘LOCATION‘: ‘cache‘, #文件路徑 } }
4、對匿名用戶進行限制,每個用戶1分鐘允許訪問5次,對於登錄的普通用戶1分鐘訪問10次,VIP用戶一分鐘訪問20次
- 比如首頁可以匿名訪問
- #先認證,只有認證了才知道是不是匿名的,
- #權限登錄成功之後才能訪問, ,index頁面就不需要權限了
- If request.user #判斷登錄了沒有
from django.contrib import admin from django.conf.urls import url, include from app05 import views urlpatterns = [ url(‘index/‘,views.IndexView.as_view()), url(‘manage/‘,views.ManageView.as_view()), ]views.py
from django.shortcuts import render from rest_framework.views import APIView from rest_framework.response import Response from rest_framework.authentication import BaseAuthentication #認證需要 from rest_framework.throttling import BaseThrottle,SimpleRateThrottle #限流處理 from rest_framework.permissions import BasePermission from rest_framework import exceptions from app01 import models # Create your views here. ###############3##認證##################### class MyAuthentcate(BaseAuthentication): ‘‘‘檢查用戶是否存在,如果存在就返回user和auth,如果沒有就返回‘‘‘ def authenticate(self, request): token = request.query_params.get(‘token‘) obj = models.UserInfo.objects.filter(token=token).first() if obj: return (obj.username,obj.token) return None #表示我不處理 ##################權限##################### class MyPermission(BasePermission): message=‘無權訪問‘ def has_permission(self, request, view): if request.user: return True #true表示有權限 return False #false表示無權限 class AdminPermission(BasePermission): message = ‘無權訪問‘ def has_permission(self, request, view): if request.user==‘haiyan‘: return True # true表示有權限 return False # false表示無權限 ############3#####限流##################3## class AnonThrottle(SimpleRateThrottle): scope = ‘wdp_anon‘ #相當於設置了最大的訪問次數和時間 def get_cache_key(self, request, view): if request.user: return None #返回None表示我不限制,登錄用戶我不管 #匿名用戶 return self.get_ident(request) #返回一個唯一標識IP class UserThrottle(SimpleRateThrottle): scope = ‘wdp_user‘ def get_cache_key(self, request, view): #登錄用戶 if request.user: return request.user return None #返回NOne表示匿名用戶我不管 ##################視圖##################### #首頁支持匿名訪問, #無需要登錄就可以訪問 class IndexView(APIView): authentication_classes = [MyAuthentcate,] #認證判斷他是不是匿名用戶 permission_classes = [] #一般主頁就不需要權限驗證了 throttle_classes = [AnonThrottle,UserThrottle,] #對匿名用戶和普通用戶的訪問限制 def get(self,request): # self.dispatch return Response(‘訪問首頁‘) def throttled(self, request, wait): ‘‘‘可定制方法設置中文錯誤‘‘‘ # raise exceptions.Throttled(wait) class MyThrottle(exceptions.Throttled): default_detail = ‘請求被限制‘ extra_detail_singular = ‘Expected available in {wait} second.‘ extra_detail_plural = ‘Expected available in {wait} seconds.‘ default_code = ‘還需要再等{wait}秒‘ raise MyThrottle(wait) #需登錄就可以訪問 class ManageView(APIView): authentication_classes = [MyAuthentcate, ] # 認證判斷他是不是匿名用戶 permission_classes = [MyPermission,] # 一般主頁就不需要權限驗證了 throttle_classes = [AnonThrottle, UserThrottle, ] # 對匿名用戶和普通用戶的訪問限制 def get(self, request): # self.dispatch return Response(‘管理人員訪問頁面‘) def throttled(self, request, wait): ‘‘‘可定制方法設置中文錯誤‘‘‘ # raise exceptions.Throttled(wait) class MyThrottle(exceptions.Throttled): default_detail = ‘請求被限制‘ extra_detail_singular = ‘Expected available in {wait} second.‘ extra_detail_plural = ‘Expected available in {wait} seconds.‘ default_code = ‘還需要再等{wait}秒‘ raise MyThrottle(wait)
四、總結
1、認證:就是檢查用戶是否存在;如果存在返回(request.user,request.auth);不存在request.user/request.auth=None
2、權限:進行職責的劃分
3、限制訪問頻率
認證 - 類:authenticate/authenticate_header ##驗證不成功的時候執行的 - 返回值: - return None, - return (user,auth), - raise 異常 - 配置: - 視圖: class IndexView(APIView): authentication_classes = [MyAuthentication,] - 全局: REST_FRAMEWORK = { ‘UNAUTHENTICATED_USER‘: None, ‘UNAUTHENTICATED_TOKEN‘: None, "DEFAULT_AUTHENTICATION_CLASSES": [ # "app02.utils.MyAuthentication", ], } 權限 - 類:has_permission/has_object_permission - 返回值: - True、#有權限 - False、#無權限 - exceptions.PermissionDenied(detail="錯誤信息") #異常自己隨意,想拋就拋,錯誤信息自己指定 - 配置: - 視圖: class IndexView(APIView): permission_classes = [MyPermission,] - 全局: REST_FRAMEWORK = { "DEFAULT_PERMISSION_CLASSES": [ # "app02.utils.MyAuthentication", ], } 限流 - 類:allow_request/wait PS: scope = "wdp_user" - 返回值:
return True、#不限制
return False #限制 - 配置: - 視圖: class IndexView(APIView): throttle_classes=[AnonThrottle,UserThrottle,] def get(self,request,*args,**kwargs): self.dispatch return Response(‘訪問首頁‘) - 全局 REST_FRAMEWORK = { "DEFAULT_THROTTLE_CLASSES":[ ], ‘DEFAULT_THROTTLE_RATES‘:{ ‘wdp_anon‘:‘5/minute‘, ‘wdp_user‘:‘10/minute‘, } }
Django Restful Framework【第三篇】認證、權限、限制訪問頻率