前言

有時候我們傳送手機驗證碼,會發現1分鐘只能傳送1次,這是做了頻率限制,限制的時間次數,都由開發者自己決定

頻率認證原始碼分析

  1. def check_throttles(self, request):
  2. """
  3. 檢查是否應限制請求。如果請求受到限制,則引發適當的異常。
  4. """
  5. throttle_durations = []
  6. # 1.遍歷配置的頻率認證類,初始化一個個頻率認證類物件(會呼叫頻率認證類的__init__()方法)
  7. # 2.頻率認證類物件呼叫allow_request()方法,頻率是否限次(沒有限次可訪問,限次不可訪問)
  8. # 3.頻率認證類限次後,呼叫wait方法,獲取還需多長時間可以進行下一次訪問
  9. for throttle in self.get_throttles():
  10. if not throttle.allow_request(request, self):
  11. throttle_durations.append(throttle.wait())
  12. if throttle_durations:
  13. # Filter out `None` values which may happen in case of config / rate
  14. # changes, see #1438
  15. durations = [
  16. duration for duration in throttle_durations
  17. if duration is not None
  18. ]
  19. duration = max(durations, default=None)
  20. self.throttled(request, duration)

get_throttles()

我們首先來檢視get_throttles()原始碼

  1. def get_throttles(self):
  2. """
  3. 例項化並返回此檢視使用的節流閥列表。
  4. """
  5. return [throttle() for throttle in self.throttle_classes]

然後點選throttle_classes,跳轉到APIView後檢視原始碼

  1. throttle_classes = api_settings.DEFAULT_THROTTLE_CLASSES

接著我們去settings.py檔案中檢視,發現'DEFAULT_THROTTLE_CLASSES': [],預設是一個空列表,那麼我們就知道了for throttle in self.get_throttles()其實是去遍歷列表中配置的頻率認證,至於列表中需要填寫什麼,我們後續再看

allow_request

接下來我們檢視allow_request方法,它是drf中的throtting.py檔案中BaseThrottle類中的方法,我們檢視下BaseThrottle原始碼

  1. class BaseThrottle:
  2. """
  3. Rate throttling of requests.
  4. """
  5. def allow_request(self, request, view):
  6. """
  7. 如果應該允許請求,則返回 `True`,否則返回 `False`。
  8. """
  9. raise NotImplementedError('.allow_request() must be overridden')
  10. def get_ident(self, request):
  11. """
  12. Identify the machine making the request by parsing HTTP_X_FORWARDED_FOR
  13. if present and number of proxies is > 0. If not use all of
  14. HTTP_X_FORWARDED_FOR if it is available, if not use REMOTE_ADDR.
  15. """
  16. xff = request.META.get('HTTP_X_FORWARDED_FOR')
  17. remote_addr = request.META.get('REMOTE_ADDR')
  18. num_proxies = api_settings.NUM_PROXIES
  19. if num_proxies is not None:
  20. if num_proxies == 0 or xff is None:
  21. return remote_addr
  22. addrs = xff.split(',')
  23. client_addr = addrs[-min(num_proxies, len(addrs))]
  24. return client_addr.strip()
  25. return ''.join(xff.split()) if xff else remote_addr
  26. def wait(self):
  27. """
  28. 返回推薦的在下一個請求之前等待的秒數
  29. """
  30. return None

可以看到BaseThrottle類下有3個方法

  • allow_request:如果需要繼承該類,必須重寫此方法
  • get_ident:獲取身份
  • wait:返回等待的秒數

SimpleRateThrottle

throtting中有個SimpleRateThrottle繼承自BaseThrottle,我們大多數情況下都會自定義SimpleRateThrottle類,讓我們檢視下原始碼,看他幹了哪些事情

  1. class SimpleRateThrottle(BaseThrottle):
  2. """
  3. 一個簡單的快取實現,只需要提供get_cache_key方法即可
  4. 速率(requests / seconds)由 View 類上的 `rate` 屬性設定。該屬性是“number_of_requests/period”形式的字串。
  5. period應該是以下之一:('s', 'sec', 'm', 'min', 'h', 'hour', 'd', 'day')
  6. 用於限制的先前請求資訊儲存在快取中
  7. """
  8. cache = default_cache
  9. timer = time.time
  10. cache_format = 'throttle_%(scope)s_%(ident)s'
  11. scope = None
  12. THROTTLE_RATES = api_settings.DEFAULT_THROTTLE_RATES
  13. def __init__(self):
  14. if not getattr(self, 'rate', None):
  15. self.rate = self.get_rate()
  16. self.num_requests, self.duration = self.parse_rate(self.rate)
  17. def get_cache_key(self, request, view):
  18. def get_rate(self):
  19. def parse_rate(self, rate):
  20. def allow_request(self, request, view):
  21. def throttle_success(self):
  22. def throttle_failure(self):
  23. def wait(self):

我們可以看到SimpleRateThrottle有5個屬性

  • cache:預設的django中的快取
  • timer:當前時間
  • cache_format:快取的格式throttle_%(scope)s_%(ident)s
  • scope:範圍
  • THROTTLE_RATES:預設的頻率

除了屬性,還有8個方法,我們依次檢視原始碼

init

  1. def __init__(self):
  2. if not getattr(self, 'rate', None):
  3. self.rate = self.get_rate()
  4. self.num_requests, self.duration = self.parse_rate(self.rate)

程式碼講解:如果沒有獲取到rate屬性,那麼rate屬性就從get_rate()方法中獲取,拿到後,從parse_rate方法中解析出一個元組,包含2個元素num_requestsduration

  • num_request:請求次數
  • duration:持續時間

get_rate

既然上面用到了此方法,我們就來看看

  1. def get_rate(self):
  2. """
  3. 確定允許的請求速率用字串表示形式。
  4. """
  5. if not getattr(self, 'scope', None):
  6. msg = ("You must set either `.scope` or `.rate` for '%s' throttle" %
  7. self.__class__.__name__)
  8. raise ImproperlyConfigured(msg)
  9. try:
  10. return self.THROTTLE_RATES[self.scope]
  11. except KeyError:
  12. msg = "No default throttle rate set for '%s' scope" % self.scope
  13. raise ImproperlyConfigured(msg)

程式碼講解:如果沒有獲取到scope屬性,會丟擲異常資訊,如果有scope就從THROTTLE_RATES[self.scope]中返回它,THROTTLE_RATES預設值如下:

  1. 'DEFAULT_THROTTLE_RATES': {
  2. 'user': None,
  3. 'anon': None,
  4. },

所以get_rate方法返回的是THROTTLE_RATESkeyscope所對應的值,scope屬性我們可以自定義的時候隨意設定,如果我們自定義scopeuser,那麼get_rate方法返回的就是None,所以self.rate也就為None

parse_rate

獲取到rate,用此方法解析

  1. def parse_rate(self, rate):
  2. """
  3. 提供請求速率字串,返回一個二元組
  4. 允許請求的次數, 以秒為單位的時間段
  5. """
  6. if rate is None:
  7. return (None, None)
  8. num, period = rate.split('/')
  9. num_requests = int(num)
  10. duration = {'s': 1, 'm': 60, 'h': 3600, 'd': 86400}[period[0]]
  11. return (num_requests, duration)

程式碼講解:如果rateNone,那麼就返回(None, None),如果不為None,會把rate/分割,從這裡我們就知道了rate的字串的形式就是num/period,比如3/min,最終會把他分割,然後返回一個元組

  • num_requests:請求的次數
  • duration:取period中的下標為0的,然後從字典中取出對應的key的值,比如min,第一個開頭字母為m,最後從字典中取m的值,就是60

所以示例3/min代表的就是1分鐘可以訪問3次

get_cache_key

  1. def get_cache_key(self, request, view):
  2. """
  3. 應該返回可用於限制的唯一cache-key。必須被覆蓋。
  4. 如果不限制請求,則可能返回“None”。
  5. """
  6. raise NotImplementedError('.get_cache_key() must be overridden')

這個方法很簡單,就是獲取唯一的快取key,如果請求不做限制,則返回None

allow_request

由於父類BaseThrottleallow_request方法沒有實現具體的邏輯,所以SimpleRateThrottle中實現了具體的細節

  1. def allow_request(self, request, view):
  2. """
  3. 如果請求應該被節流,那麼實行檢查以便檢視
  4. 成功時呼叫`throttle_success`.
  5. 失敗時呼叫`throttle_failure`.
  6. """
  7. if self.rate is None:
  8. return True
  9. self.key = self.get_cache_key(request, view)
  10. if self.key is None:
  11. return True
  12. self.history = self.cache.get(self.key, [])
  13. self.now = self.timer()
  14. # 從歷史記錄中刪除現在已經超過節流持續時間的任何請求
  15. while self.history and self.history[-1] <= self.now - self.duration:
  16. self.history.pop()
  17. if len(self.history) >= self.num_requests:
  18. return self.throttle_failure()
  19. return self.throttle_success()

程式碼講解:如果rateNone就返回True,代表允許請求,如果key也為None則返回True,代表允許請求,如果ratekey都有值,history就從快取中獲取key所對應的列表,now代表當前時間。如果history有值,並且列表history的最後一個元素≤當前時間-持續時間,那麼history列表就會刪除這個元素,如果列表長度≥請求次數,就會呼叫throttle_failure,如果列表長度<請求次數,則呼叫throttle_success

舉例:如果self.now假設為晚上20:00,durationnum_requests就用之前3/min的示例,duration表示60s,num_requests表示3次,那麼self.now-self.duration就代表19:59分,如果history列表中的最後一個元素的時間值≤19:59,那麼就刪除它,我們的需求是3/min一分鐘只能訪問3次,而你超過了1分鐘,就沒必要限制了,所以將時間從history刪除,如果history列表長度≥3,一開始是空列表的時候不滿足條件,會返回throttle_success,第二次訪問列表長度會增加到1,但還是不滿足條件,會繼續呼叫throttle_success,第三次訪問列表長度為2,仍然不滿足會繼續呼叫throttle_success,第四次訪問滿足條件,就會呼叫throttle_failure,代表不能再請求了

throttle_success

  1. def throttle_success(self):
  2. """
  3. 將當前請求的時間戳與鍵一起插入快取中。
  4. """
  5. self.history.insert(0, self.now)
  6. self.cache.set(self.key, self.history, self.duration)
  7. return True

程式碼詳解:將當前時間插入到history列表的頭部,給快取設定key的值為當前時間,超時時間為duration,最後返回True,代表可以訪問

throttle_failure

  1. def throttle_failure(self):
  2. """
  3. 當對 API 的請求由於節流而失敗時呼叫。
  4. """
  5. return False

返回False,代表請求節流失敗,不允許訪問

wait

  1. def wait(self):
  2. """
  3. 以秒為單位返回推薦的下一個請求時間。
  4. """
  5. if self.history:
  6. remaining_duration = self.duration - (self.now - self.history[-1])
  7. else:
  8. remaining_duration = self.duration
  9. available_requests = self.num_requests - len(self.history) + 1
  10. if available_requests <= 0:
  11. return None
  12. return remaining_duration / float(available_requests)

程式碼解析:如果history列表存在,remaining_duration剩餘時間就等於持續時間減去(當期時間-列表最後一個元素的時間),如果self.now為晚上20:00,history的最後一個元素值為19:59:30,而持續時間duration設定為60s,那麼remaining_duration就代表還剩30s就可以進行訪問了,而available_requests可用請求等於(設定好的請求次數-history列表+1)

自定義頻率認證

  1. 自定義一個繼承SimpleRateThrottle類的頻率類
  2. 設定一個scope類屬性,屬性值為任意見名知意的字串
  3. settings配置檔案中,配置drfDEFAULT_THROTTLE_RATES,格式為{scope對應的字串值:'次數/時間'}
  4. 在自定義頻率類中重寫get_cache_key方法

    限制的物件返回與限制資訊有關的字串

    不限制的物件返回None

需求:使用者訪問簡訊驗證碼1分鐘只能傳送1次驗證碼

我們建立一個throttles.py檔案,然後定義SMSRateThrottle類,程式碼如下:

  1. from rest_framework.throttling import SimpleRateThrottle
  2. class SMSRateThrottle(SimpleRateThrottle):
  3. scope = "sms"
  4. def get_cache_key(self, request, view):
  5. phone = request.query_params.get('phone') or request.data.get('phone')
  6. # 沒有手機號,就不做頻率限制
  7. if not phone:
  8. return None
  9. # 返回可以根據手機號動態變化,且不易重複的字串,作為操作快取的key
  10. return f"throttle_{self.scope}_{phone}"

settings.py檔案中配置DEFAULT_THROTTLE_RATES,程式碼如下:

  1. 'DEFAULT_THROTTLE_RATES': {
  2. 'sms': '1/min'
  3. },

最後再檢視函式中,區域性配置自定義認證類

  1. class TestView(APIView):
  2. throttle_classes = [SMSRateThrottle]
  3. def get(self, request, *args, **kwargs):
  4. return APIResponse(data_msg="get 獲取驗證碼")
  5. def post(self, request, *args, **kwargs):
  6. return APIResponse(data_msg="post 獲取驗證碼")

具體測試細節過程就不再描述了,這裡只講述結果,當我們使用get或者post請求時,攜帶請求引數phone第一次傳送請求,請求成功,第二次就會出現以下提示

  1. {
  2. "detail": "請求超過了限速。 Expected available in 58 seconds."
  3. }

58 seconds代表還剩58秒可以再次訪問,至於58s是怎麼算出來的,就是SimpleRateThrottle類中的wait方法實現的