Created
May 9, 2025 15:54
-
-
Save DArmstrong87/27ad5d4beb727b713bd4425cdb2cc846 to your computer and use it in GitHub Desktop.
Dynamic Rate Limit Decorator
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
""" | |
To use this decorator, add it to the view function you want to rate limit. | |
from authentication.decorators import dynamic_rate_limit | |
@dynamic_rate_limit() | |
def login_view(request): | |
The values can be changed based on a Config model that stores values in json. | |
""" | |
import ipaddress | |
import logging | |
import requests | |
from enum import Enum | |
from functools import wraps | |
from django.conf import settings | |
from django.contrib import messages | |
from django.core.cache import cache | |
from django.http import HttpRequest | |
from django.shortcuts import redirect | |
from app.views import rate_limited | |
from authentication.models import User, Config | |
logger = logging.getLogger("DefaultLogging") | |
def dynamic_rate_limit(): | |
""" | |
A decorator for rate limiting requests based on IP or entered email address to login. | |
Values can be changed dynamically by updating the Config for "login_config". | |
This decorator rate limits using the following default values: | |
Login Screen | |
20 requests/min, lockout time is 5 minutes | |
10 email POST requests per 5 minutes | |
OTP Login | |
20 requests/min, lockout time is 5 minutes | |
3 OTP POST requests per 15 minutes | |
Enforces OTP when there are 5 failed login attempts (POST request for the same email) | |
Attempts are cached by either IP or entered email | |
Early return to show rate limit screen if user IP is locked out | |
""" | |
def decorator(view_func): | |
@wraps(view_func) | |
def _wrapped_view(request, *args, **kwargs): | |
# Clear cache during local testing | |
# cache.clear() | |
# ==================== USER IP ==================== | |
user_ip = get_client_ip(request) | |
if user_ip is None: | |
return view_func(request, *args, **kwargs) | |
# Check if this IP is Google owned and skip rate limit logic if so. | |
ip_is_gcp_subnet = check_ip_is_google_owned(user_ip) | |
if ip_is_gcp_subnet: | |
logger.warning( | |
f"LOGIN RATE LIMIT: Client IP {user_ip} is a Google-owned IP. Skipping rate limit logic." | |
) | |
return view_func(request, *args, **kwargs) | |
# ==================== LOGIN CONFIG ==================== | |
login_config = LoginConfig() | |
view_name = view_func.__name__ | |
# Handle Max requests of login screens by IP. | |
# Resend OTP requests are by entered email. | |
rate_limit_screen_functions = { | |
"login_view": handle_login_screen_requests_by_ip, | |
"sso_login_view": handle_login_screen_requests_by_ip, | |
"otp_login_view": handle_login_screen_requests_by_ip, | |
"resend_otp": handle_otp_resend_requests, | |
} | |
handle_login_screen_rate_limit = rate_limit_screen_functions[view_name] | |
rate_limit_met, msg = handle_login_screen_rate_limit( | |
request, view_func, login_config, user_ip | |
) | |
# Return rate limit screen if limit met | |
if rate_limit_met: | |
if view_name != "resend_otp": | |
return rate_limited(request, msg) | |
# For rate limit of resending OTP, we simply redirect to OTP login screen | |
entered_email = request.resolver_match.kwargs.get("email") | |
return redirect(f"../../otp-login/{entered_email}/") | |
# Handle POST requests for Login and OTP Login | |
if request.method == "POST": | |
rate_limit_view_post_functions = { | |
"login_view": handle_login_post_rate_limit, | |
"sso_login_view": handle_login_post_rate_limit, | |
"otp_login_view": handle_otp_login_post_rate_limit, | |
} | |
handle_rate_limit = rate_limit_view_post_functions[view_name] | |
rate_limit_met, msg = handle_rate_limit(request, login_config) | |
if rate_limit_met: | |
return rate_limited(request, msg) | |
# Proceed to the view function if the rate limit is not exceeded | |
return view_func(request, *args, **kwargs) | |
return _wrapped_view | |
return decorator | |
def handle_login_screen_requests_by_ip( | |
request, view_func, login_config, user_ip | |
) -> tuple[bool, str]: | |
""" | |
Handle max requests of any request type. Track count by IP, regardless of entered email. | |
""" | |
# ================== LOGIN SCREEN or OTP LOGIN SCREEN REQUESTS ================== | |
if user_ip is None: | |
# Fallback IPs will not work as we don't want to ratelimit | |
# the load balancer or a Google IP. | |
logger.warning("LOGIN RATE LIMIT: User IP not found in X-Forwarded-For header.") | |
return False, None | |
# Get cache key for either Login screen or OTP Login screen | |
cache_key = LoginCacheKeys.RATE_LIMIT_IP_LOGIN_SCREEN_COUNT.value | |
if view_func.__name__ == "otp_login_view": | |
cache_key = LoginCacheKeys.RATE_LIMIT_IP_OTP_LOGIN_SCREEN_COUNT.value | |
user_ip_login_attempts_cache_key = f"{cache_key}{user_ip}" | |
cache.default_timeout = 60 | |
login_screen_request_count = cache.get_or_set(user_ip_login_attempts_cache_key, 0) | |
# Do not increment once max requests are met to prevent the cache timeout from resetting | |
# and prolonging lockout period with subsequent requests. | |
MAX_LOGIN_REQUESTS_MIN = login_config.MAX_LOGIN_REQUESTS_MIN | |
if login_screen_request_count < MAX_LOGIN_REQUESTS_MIN: | |
login_screen_request_count = cache.incr(user_ip_login_attempts_cache_key) | |
logger.info( | |
f"LOGIN RATE LIMIT: Request count on {view_func.__name__} for user IP {user_ip}: {login_screen_request_count}" | |
) | |
if login_screen_request_count >= MAX_LOGIN_REQUESTS_MIN: | |
logger.warning( | |
f"LOGIN RATE LIMIT: Max requests on {view_func.__name__} of {MAX_LOGIN_REQUESTS_MIN} exceeded for user IP {user_ip}." | |
) | |
return True, None | |
return False, None | |
# ====================================================== | |
def handle_otp_resend_requests(request, view_func, login_config, user_ip) -> tuple[bool, str]: | |
""" | |
Track OTP Resend Requests | |
Return boolean, error msg | |
""" | |
entered_email = request.resolver_match.kwargs.get("email") | |
MAX_OTP_RESENDS_INTERVAL = login_config.MAX_OTP_RESENDS_INTERVAL | |
MAX_OTP_RESENDS = login_config.MAX_OTP_RESENDS | |
# Get or set OTP resend count from cache | |
rate_limit_email_otp_resends_cache_key = ( | |
f"{LoginCacheKeys.RATE_LIMIT_EMAIL_OTP_RESEND_COUNT.value}{entered_email}" | |
) | |
cache.default_timeout = MAX_OTP_RESENDS_INTERVAL * 60 | |
otp_resend_count_email = cache.get_or_set(rate_limit_email_otp_resends_cache_key, 0) | |
# Increment count if max not met | |
if otp_resend_count_email < MAX_OTP_RESENDS: | |
otp_resend_count_email = cache.incr(rate_limit_email_otp_resends_cache_key) | |
logger.info( | |
f"LOGIN RATE LIMIT: OTP Resend count for user {entered_email}: {otp_resend_count_email}" | |
) | |
# If max resend, prevent user from resending any more | |
if otp_resend_count_email >= MAX_OTP_RESENDS: | |
logger.warning( | |
f"LOGIN RATE LIMIT: Max OTP Resend count met for user {entered_email}: {otp_resend_count_email}" | |
) | |
max_otp_resends_sent_error = f"You have reached the maximum number of One-Time Passcode requests. Please wait {MAX_OTP_RESENDS_INTERVAL} minutes before requesting another." | |
messages.warning(request, max_otp_resends_sent_error) | |
return True, None | |
return False, None | |
def handle_login_post_rate_limit(request, login_config) -> tuple[bool, str]: | |
""" | |
Rate limit POST requests for entered email | |
Enforces MFA if 5 failed login attempts | |
""" | |
# ================== POSTS PER EMAIL ================== | |
# Handle login rate limit of entered email value | |
# Email Post Request values | |
MAX_EMAIL_POST_REQUESTS_INTERVAL = login_config.MAX_EMAIL_POST_REQUESTS_INTERVAL | |
MAX_EMAIL_POST_REQUESTS = login_config.MAX_EMAIL_POST_REQUESTS | |
entered_email = request.POST.get("email") | |
rate_limit_email_login_post_requests_cache_key = ( | |
f"{LoginCacheKeys.RATE_LIMIT_EMAIL_LOGIN_POST_COUNT.value}{entered_email}" | |
) | |
cache.default_timeout = MAX_EMAIL_POST_REQUESTS_INTERVAL * 60 | |
rate_limit_email_login_post_request_count = cache.get_or_set( | |
rate_limit_email_login_post_requests_cache_key, 0 | |
) | |
# Increment if request count has not met the max. | |
if rate_limit_email_login_post_request_count < MAX_EMAIL_POST_REQUESTS: | |
rate_limit_email_login_post_request_count = cache.incr( | |
rate_limit_email_login_post_requests_cache_key | |
) | |
logger.info( | |
f"LOGIN RATE LIMIT: Login POST attempts for email {entered_email}: {rate_limit_email_login_post_request_count}" | |
) | |
# ================== ENFORCE OTP IF MET MAX_BASIC_LOGIN_ATTEMPTS_TO_ENFORCE_MFA ================== | |
# If 5 failed login attempts, reset last_mfa_login to enforce OTP | |
# This will not rate limit the email. | |
MAX_BASIC_LOGIN_ATTEMPTS_TO_ENFORCE_MFA = login_config.MAX_BASIC_LOGIN_ATTEMPTS_TO_ENFORCE_MFA | |
if rate_limit_email_login_post_request_count == MAX_BASIC_LOGIN_ATTEMPTS_TO_ENFORCE_MFA: | |
try: | |
user = User.objects.get(email=entered_email) | |
logger.warning( | |
f"LOGIN RATE LIMIT: Max basic login POST attempts of {MAX_BASIC_LOGIN_ATTEMPTS_TO_ENFORCE_MFA} reached for email: {entered_email} - Enforcing MFA for this email." | |
) | |
user.last_mfa_login = None | |
user.save() | |
except User.DoesNotExist: | |
pass | |
# ====================================================== | |
# Check if user has met or exceeded the max number of requests | |
if rate_limit_email_login_post_request_count >= MAX_EMAIL_POST_REQUESTS: | |
logger.warning( | |
f"LOGIN RATE LIMIT: Basic login POST requests exceeded for email {entered_email}" | |
) | |
msg = f"Too many requests. Please wait {MAX_EMAIL_POST_REQUESTS_INTERVAL} minutes and try again." | |
return True, msg | |
return False, None | |
def handle_otp_login_post_rate_limit(request, login_config) -> tuple[bool, str]: | |
""" | |
Return boolean and error message if max OTP logins are exceeded | |
If met or exceeded, invalidate OTP and valid credentials last entered | |
""" | |
# ================== POSTS PER EMAIL ================== | |
# Handle OTP login rate limit of entered email value | |
# Email Post Request values | |
MAX_OTP_ATTEMPTS_INTERVAL = login_config.MAX_OTP_ATTEMPTS_INTERVAL | |
MAX_OTP_ATTEMPTS = login_config.MAX_OTP_ATTEMPTS | |
entered_email = request.POST.get("email") | |
rate_limit_email_otp_post_count_cache_key = ( | |
f"{LoginCacheKeys.RATE_LIMIT_EMAIL_OTP_POST_COUNT.value}{entered_email}" | |
) | |
cache.default_timeout = MAX_OTP_ATTEMPTS_INTERVAL * 60 | |
rate_limit_email_otp_post_count = cache.get_or_set(rate_limit_email_otp_post_count_cache_key, 0) | |
# Increment if request count has not met the max. | |
if rate_limit_email_otp_post_count < MAX_OTP_ATTEMPTS: | |
rate_limit_email_otp_post_count = cache.incr(rate_limit_email_otp_post_count_cache_key) | |
logger.info( | |
f"LOGIN RATE LIMIT: OTP attempts for email {entered_email}: {rate_limit_email_otp_post_count}" | |
) | |
# Check if user has met or exceeded the max number of requests | |
if rate_limit_email_otp_post_count >= MAX_OTP_ATTEMPTS: | |
invalidate_OTP_and_valid_last_login(entered_email) | |
logger.warning( | |
f"LOGIN RATE LIMIT: OTP login POST requests exceeded for email {entered_email}" | |
) | |
msg = f"Too many requests. Please wait {MAX_OTP_ATTEMPTS_INTERVAL} minutes and try again." | |
return True, msg | |
return False, None | |
def invalidate_OTP_and_valid_last_login(entered_email): | |
""" | |
If user exists with the entered email, invalidate OTP and recent valid login. | |
This will force the user to enter correct credentials and OTP again. | |
""" | |
try: | |
# Invalidate OTP | |
user = User.objects.get(email=entered_email) | |
user_otp_cache_key = f"{LoginCacheKeys.USER_OTP.value}{user.id}" | |
cache.delete(user_otp_cache_key) | |
# Invalidate recent valid login to force user to login again | |
valid_creds_last_entered_cache_key = ( | |
f"{LoginCacheKeys.VALID_CREDENTIALS_LAST_ENTERED.value}{user.id}" | |
) | |
cache.delete(valid_creds_last_entered_cache_key) | |
logger.warning(f"LOGIN: Invalidating OTP and recent valid login for user {entered_email}") | |
except User.DoesNotExist: | |
pass | |
def check_ip_is_google_owned(ip: str) -> bool: | |
"""Check if IP address is google-owned to prevent rate limiting load balancer, etc.""" | |
try: | |
ip_to_check = ipaddress.ip_address(ip) | |
# IP ranges that Google makes available to users on the internet | |
google_public_ips_response = requests.get(GOOGLE_PUBLIC_IP_URL).json() | |
google_public_ip_prefix_list = google_public_ips_response.get("prefixes") | |
google_public_ips = [] | |
if google_public_ip_prefix_list: | |
google_public_ips = [ | |
item.get("ipv4Prefix") | |
for item in google_public_ip_prefix_list | |
if item.get("ipv4Prefix") | |
] | |
# Global and regional external IP address ranges for customers' Google Cloud resources | |
google_cloud_ips_response = requests.get(GOOGLE_CLOUD_IP_URL).json() | |
google_cloud_ip_prefix_list = google_cloud_ips_response.get("prefixes") | |
google_cloud_ips = [] | |
if google_cloud_ip_prefix_list: | |
google_cloud_ips = [ | |
item.get("ipv4Prefix") | |
for item in google_cloud_ip_prefix_list | |
if item.get("ipv4Prefix") | |
] | |
# Total list of Google IPs | |
google_ip_list = google_public_ips + google_cloud_ips | |
for ip in google_ip_list: | |
network = ipaddress.ip_network(ip) | |
if ip_to_check in network: | |
return True | |
except Exception as ex: | |
logger.error(f"Exception in determining if IP is google-owned: {ex}") | |
pass | |
return False | |
def get_client_ip(request: HttpRequest) -> str: | |
""" | |
The load balancer accepts and leaves the X-Forwarded-For header from requests unchanged. | |
This value is prepended to the beginning of the list and should not be trusted as an | |
ip can be spoofed. | |
The Client IP as set by the load balancer is sent as the second to last value in the X-Forwarded-For header | |
See: https://cloud.google.com/load-balancing/docs/https#x-forwarded-for_header | |
Do not use a fallback such as request.META.get("REMOTE_ADDR") as it is a Google IP. | |
Only use REMOTE_ADDR for local dev as the X-Forwarded-For header isn't available. | |
""" | |
ip = None | |
if settings.IS_LOCAL_DEV: | |
return request.META.get("REMOTE_ADDR") | |
x_forwarded_for = request.META.get("HTTP_X_FORWARDED_FOR") | |
# x_forwarded_for = "8.8.8.8, 127.0.0.1, 34.152.86.0" | |
if x_forwarded_for: | |
# Remove all spaces | |
x_forwarded_for = x_forwarded_for.replace(" ", "") | |
try: | |
ip = x_forwarded_for.strip().split(",")[-2] | |
except IndexError: | |
logger.error( | |
f"LOGIN RATE LIMIT: Unable to find the client IP in the X-Forwarded-For header: {x_forwarded_for}" | |
) | |
return ip | |
""" | |
These would go in a types.py file | |
""" | |
class ConfigName(Enum): | |
LOGIN_CONFIG = "login_config" | |
class LoginConfigOptions(Enum): | |
MAX_BASIC_LOGIN_ATTEMPTS_TO_ENFORCE_MFA = "MAX_BASIC_LOGIN_ATTEMPTS_TO_ENFORCE_MFA" | |
MAX_EMAIL_POST_REQUESTS = "MAX_EMAIL_POST_REQUESTS" | |
MAX_EMAIL_POST_REQUESTS_INTERVAL = "MAX_EMAIL_POST_REQUESTS_INTERVAL" | |
MAX_LOGIN_REQUESTS_MIN = "MAX_LOGIN_REQUESTS_MIN" | |
MAX_OTP_ATTEMPTS = "MAX_OTP_ATTEMPTS" | |
MAX_OTP_ATTEMPTS_INTERVAL = "MAX_OTP_ATTEMPTS_INTERVAL" | |
MAX_OTP_RESENDS = "MAX_OTP_RESENDS" | |
MAX_OTP_RESENDS_INTERVAL = "MAX_OTP_RESENDS_INTERVAL" | |
MFA_CACHED_MINS = "MFA_CACHED_MINS" | |
OTP_EXPIRY_MINS = "OTP_EXPIRY_MINS" | |
class LoginConfig(object): | |
MAX_BASIC_LOGIN_ATTEMPTS_TO_ENFORCE_MFA = 5 | |
MAX_EMAIL_POST_REQUESTS = 10 | |
MAX_EMAIL_POST_REQUESTS_INTERVAL = 5 | |
MAX_LOGIN_REQUESTS_MIN = 20 | |
MAX_OTP_ATTEMPTS = 3 | |
MAX_OTP_ATTEMPTS_INTERVAL = 15 | |
MAX_OTP_RESENDS = 3 | |
MAX_OTP_RESENDS_INTERVAL = 15 | |
MFA_CACHED_MINS = 240 | |
OTP_EXPIRY_MINS = 15 | |
def __init__(self) -> None: | |
try: | |
login_config = Config.objects.get(name=ConfigName.LOGIN_CONFIG.value).values | |
self.MAX_BASIC_LOGIN_ATTEMPTS_TO_ENFORCE_MFA = login_config.get( | |
LoginConfigOptions.MAX_BASIC_LOGIN_ATTEMPTS_TO_ENFORCE_MFA.value | |
) | |
self.MAX_EMAIL_POST_REQUESTS = login_config.get( | |
LoginConfigOptions.MAX_EMAIL_POST_REQUESTS.value | |
) | |
self.MAX_EMAIL_POST_REQUESTS_INTERVAL = login_config.get( | |
LoginConfigOptions.MAX_EMAIL_POST_REQUESTS_INTERVAL.value | |
) | |
self.MAX_LOGIN_REQUESTS_MIN = login_config.get( | |
LoginConfigOptions.MAX_LOGIN_REQUESTS_MIN.value | |
) | |
self.MAX_OTP_ATTEMPTS = login_config.get(LoginConfigOptions.MAX_OTP_ATTEMPTS.value) | |
self.MAX_OTP_ATTEMPTS_INTERVAL = login_config.get( | |
LoginConfigOptions.MAX_OTP_ATTEMPTS_INTERVAL.value | |
) | |
self.MAX_OTP_RESENDS = login_config.get(LoginConfigOptions.MAX_OTP_RESENDS.value) | |
self.MAX_OTP_RESENDS_INTERVAL = login_config.get( | |
LoginConfigOptions.MAX_OTP_RESENDS_INTERVAL.value | |
) | |
self.MFA_CACHED_MINS = login_config.get(LoginConfigOptions.MFA_CACHED_MINS.value) | |
self.OTP_EXPIRY_MINS = login_config.get(LoginConfigOptions.OTP_EXPIRY_MINS.value) | |
except Config.DoesNotExist: | |
# Use default values if no config is found | |
pass | |
class LoginCacheKeys(Enum): | |
RATE_LIMIT_EMAIL_LOGIN_POST_COUNT = "rate_limit:login_view:email-post:email-" | |
RATE_LIMIT_EMAIL_OTP_POST_COUNT = "otp-attempts:email-" | |
RATE_LIMIT_EMAIL_OTP_RESEND_COUNT = "otp-resend-count:email-" | |
RATE_LIMIT_IP_LOGIN_SCREEN_COUNT = "rate_limit:login_view:ip-" | |
RATE_LIMIT_IP_OTP_LOGIN_SCREEN_COUNT = "rate_limit:otp_login_view:ip-" | |
VALID_CREDENTIALS_LAST_ENTERED = "valid_credentials_last_entered:user_id-" | |
USER_OTP = "user-otp:user_id-" | |
GOOGLE_PUBLIC_IP_URL = "https://www.gstatic.com/ipranges/goog.json" | |
GOOGLE_CLOUD_IP_URL = "https://www.gstatic.com/ipranges/goog.json" |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment