Skip to content

Instantly share code, notes, and snippets.

@DArmstrong87
Created May 9, 2025 15:54
Show Gist options
  • Save DArmstrong87/27ad5d4beb727b713bd4425cdb2cc846 to your computer and use it in GitHub Desktop.
Save DArmstrong87/27ad5d4beb727b713bd4425cdb2cc846 to your computer and use it in GitHub Desktop.
Dynamic Rate Limit Decorator
"""
To use this decorator, add it to the view function you want to rate limit.
from authentication.decorators import dynamic_rate_limit
@dynamic_rate_limit()
def login_view(request):
The values can be changed based on a Config model that stores values in json.
"""
import ipaddress
import logging
import requests
from enum import Enum
from functools import wraps
from django.conf import settings
from django.contrib import messages
from django.core.cache import cache
from django.http import HttpRequest
from django.shortcuts import redirect
from app.views import rate_limited
from authentication.models import User, Config
logger = logging.getLogger("DefaultLogging")
def dynamic_rate_limit():
"""
A decorator for rate limiting requests based on IP or entered email address to login.
Values can be changed dynamically by updating the Config for "login_config".
This decorator rate limits using the following default values:
Login Screen
20 requests/min, lockout time is 5 minutes
10 email POST requests per 5 minutes
OTP Login
20 requests/min, lockout time is 5 minutes
3 OTP POST requests per 15 minutes
Enforces OTP when there are 5 failed login attempts (POST request for the same email)
Attempts are cached by either IP or entered email
Early return to show rate limit screen if user IP is locked out
"""
def decorator(view_func):
@wraps(view_func)
def _wrapped_view(request, *args, **kwargs):
# Clear cache during local testing
# cache.clear()
# ==================== USER IP ====================
user_ip = get_client_ip(request)
if user_ip is None:
return view_func(request, *args, **kwargs)
# Check if this IP is Google owned and skip rate limit logic if so.
ip_is_gcp_subnet = check_ip_is_google_owned(user_ip)
if ip_is_gcp_subnet:
logger.warning(
f"LOGIN RATE LIMIT: Client IP {user_ip} is a Google-owned IP. Skipping rate limit logic."
)
return view_func(request, *args, **kwargs)
# ==================== LOGIN CONFIG ====================
login_config = LoginConfig()
view_name = view_func.__name__
# Handle Max requests of login screens by IP.
# Resend OTP requests are by entered email.
rate_limit_screen_functions = {
"login_view": handle_login_screen_requests_by_ip,
"sso_login_view": handle_login_screen_requests_by_ip,
"otp_login_view": handle_login_screen_requests_by_ip,
"resend_otp": handle_otp_resend_requests,
}
handle_login_screen_rate_limit = rate_limit_screen_functions[view_name]
rate_limit_met, msg = handle_login_screen_rate_limit(
request, view_func, login_config, user_ip
)
# Return rate limit screen if limit met
if rate_limit_met:
if view_name != "resend_otp":
return rate_limited(request, msg)
# For rate limit of resending OTP, we simply redirect to OTP login screen
entered_email = request.resolver_match.kwargs.get("email")
return redirect(f"../../otp-login/{entered_email}/")
# Handle POST requests for Login and OTP Login
if request.method == "POST":
rate_limit_view_post_functions = {
"login_view": handle_login_post_rate_limit,
"sso_login_view": handle_login_post_rate_limit,
"otp_login_view": handle_otp_login_post_rate_limit,
}
handle_rate_limit = rate_limit_view_post_functions[view_name]
rate_limit_met, msg = handle_rate_limit(request, login_config)
if rate_limit_met:
return rate_limited(request, msg)
# Proceed to the view function if the rate limit is not exceeded
return view_func(request, *args, **kwargs)
return _wrapped_view
return decorator
def handle_login_screen_requests_by_ip(
request, view_func, login_config, user_ip
) -> tuple[bool, str]:
"""
Handle max requests of any request type. Track count by IP, regardless of entered email.
"""
# ================== LOGIN SCREEN or OTP LOGIN SCREEN REQUESTS ==================
if user_ip is None:
# Fallback IPs will not work as we don't want to ratelimit
# the load balancer or a Google IP.
logger.warning("LOGIN RATE LIMIT: User IP not found in X-Forwarded-For header.")
return False, None
# Get cache key for either Login screen or OTP Login screen
cache_key = LoginCacheKeys.RATE_LIMIT_IP_LOGIN_SCREEN_COUNT.value
if view_func.__name__ == "otp_login_view":
cache_key = LoginCacheKeys.RATE_LIMIT_IP_OTP_LOGIN_SCREEN_COUNT.value
user_ip_login_attempts_cache_key = f"{cache_key}{user_ip}"
cache.default_timeout = 60
login_screen_request_count = cache.get_or_set(user_ip_login_attempts_cache_key, 0)
# Do not increment once max requests are met to prevent the cache timeout from resetting
# and prolonging lockout period with subsequent requests.
MAX_LOGIN_REQUESTS_MIN = login_config.MAX_LOGIN_REQUESTS_MIN
if login_screen_request_count < MAX_LOGIN_REQUESTS_MIN:
login_screen_request_count = cache.incr(user_ip_login_attempts_cache_key)
logger.info(
f"LOGIN RATE LIMIT: Request count on {view_func.__name__} for user IP {user_ip}: {login_screen_request_count}"
)
if login_screen_request_count >= MAX_LOGIN_REQUESTS_MIN:
logger.warning(
f"LOGIN RATE LIMIT: Max requests on {view_func.__name__} of {MAX_LOGIN_REQUESTS_MIN} exceeded for user IP {user_ip}."
)
return True, None
return False, None
# ======================================================
def handle_otp_resend_requests(request, view_func, login_config, user_ip) -> tuple[bool, str]:
"""
Track OTP Resend Requests
Return boolean, error msg
"""
entered_email = request.resolver_match.kwargs.get("email")
MAX_OTP_RESENDS_INTERVAL = login_config.MAX_OTP_RESENDS_INTERVAL
MAX_OTP_RESENDS = login_config.MAX_OTP_RESENDS
# Get or set OTP resend count from cache
rate_limit_email_otp_resends_cache_key = (
f"{LoginCacheKeys.RATE_LIMIT_EMAIL_OTP_RESEND_COUNT.value}{entered_email}"
)
cache.default_timeout = MAX_OTP_RESENDS_INTERVAL * 60
otp_resend_count_email = cache.get_or_set(rate_limit_email_otp_resends_cache_key, 0)
# Increment count if max not met
if otp_resend_count_email < MAX_OTP_RESENDS:
otp_resend_count_email = cache.incr(rate_limit_email_otp_resends_cache_key)
logger.info(
f"LOGIN RATE LIMIT: OTP Resend count for user {entered_email}: {otp_resend_count_email}"
)
# If max resend, prevent user from resending any more
if otp_resend_count_email >= MAX_OTP_RESENDS:
logger.warning(
f"LOGIN RATE LIMIT: Max OTP Resend count met for user {entered_email}: {otp_resend_count_email}"
)
max_otp_resends_sent_error = f"You have reached the maximum number of One-Time Passcode requests. Please wait {MAX_OTP_RESENDS_INTERVAL} minutes before requesting another."
messages.warning(request, max_otp_resends_sent_error)
return True, None
return False, None
def handle_login_post_rate_limit(request, login_config) -> tuple[bool, str]:
"""
Rate limit POST requests for entered email
Enforces MFA if 5 failed login attempts
"""
# ================== POSTS PER EMAIL ==================
# Handle login rate limit of entered email value
# Email Post Request values
MAX_EMAIL_POST_REQUESTS_INTERVAL = login_config.MAX_EMAIL_POST_REQUESTS_INTERVAL
MAX_EMAIL_POST_REQUESTS = login_config.MAX_EMAIL_POST_REQUESTS
entered_email = request.POST.get("email")
rate_limit_email_login_post_requests_cache_key = (
f"{LoginCacheKeys.RATE_LIMIT_EMAIL_LOGIN_POST_COUNT.value}{entered_email}"
)
cache.default_timeout = MAX_EMAIL_POST_REQUESTS_INTERVAL * 60
rate_limit_email_login_post_request_count = cache.get_or_set(
rate_limit_email_login_post_requests_cache_key, 0
)
# Increment if request count has not met the max.
if rate_limit_email_login_post_request_count < MAX_EMAIL_POST_REQUESTS:
rate_limit_email_login_post_request_count = cache.incr(
rate_limit_email_login_post_requests_cache_key
)
logger.info(
f"LOGIN RATE LIMIT: Login POST attempts for email {entered_email}: {rate_limit_email_login_post_request_count}"
)
# ================== ENFORCE OTP IF MET MAX_BASIC_LOGIN_ATTEMPTS_TO_ENFORCE_MFA ==================
# If 5 failed login attempts, reset last_mfa_login to enforce OTP
# This will not rate limit the email.
MAX_BASIC_LOGIN_ATTEMPTS_TO_ENFORCE_MFA = login_config.MAX_BASIC_LOGIN_ATTEMPTS_TO_ENFORCE_MFA
if rate_limit_email_login_post_request_count == MAX_BASIC_LOGIN_ATTEMPTS_TO_ENFORCE_MFA:
try:
user = User.objects.get(email=entered_email)
logger.warning(
f"LOGIN RATE LIMIT: Max basic login POST attempts of {MAX_BASIC_LOGIN_ATTEMPTS_TO_ENFORCE_MFA} reached for email: {entered_email} - Enforcing MFA for this email."
)
user.last_mfa_login = None
user.save()
except User.DoesNotExist:
pass
# ======================================================
# Check if user has met or exceeded the max number of requests
if rate_limit_email_login_post_request_count >= MAX_EMAIL_POST_REQUESTS:
logger.warning(
f"LOGIN RATE LIMIT: Basic login POST requests exceeded for email {entered_email}"
)
msg = f"Too many requests. Please wait {MAX_EMAIL_POST_REQUESTS_INTERVAL} minutes and try again."
return True, msg
return False, None
def handle_otp_login_post_rate_limit(request, login_config) -> tuple[bool, str]:
"""
Return boolean and error message if max OTP logins are exceeded
If met or exceeded, invalidate OTP and valid credentials last entered
"""
# ================== POSTS PER EMAIL ==================
# Handle OTP login rate limit of entered email value
# Email Post Request values
MAX_OTP_ATTEMPTS_INTERVAL = login_config.MAX_OTP_ATTEMPTS_INTERVAL
MAX_OTP_ATTEMPTS = login_config.MAX_OTP_ATTEMPTS
entered_email = request.POST.get("email")
rate_limit_email_otp_post_count_cache_key = (
f"{LoginCacheKeys.RATE_LIMIT_EMAIL_OTP_POST_COUNT.value}{entered_email}"
)
cache.default_timeout = MAX_OTP_ATTEMPTS_INTERVAL * 60
rate_limit_email_otp_post_count = cache.get_or_set(rate_limit_email_otp_post_count_cache_key, 0)
# Increment if request count has not met the max.
if rate_limit_email_otp_post_count < MAX_OTP_ATTEMPTS:
rate_limit_email_otp_post_count = cache.incr(rate_limit_email_otp_post_count_cache_key)
logger.info(
f"LOGIN RATE LIMIT: OTP attempts for email {entered_email}: {rate_limit_email_otp_post_count}"
)
# Check if user has met or exceeded the max number of requests
if rate_limit_email_otp_post_count >= MAX_OTP_ATTEMPTS:
invalidate_OTP_and_valid_last_login(entered_email)
logger.warning(
f"LOGIN RATE LIMIT: OTP login POST requests exceeded for email {entered_email}"
)
msg = f"Too many requests. Please wait {MAX_OTP_ATTEMPTS_INTERVAL} minutes and try again."
return True, msg
return False, None
def invalidate_OTP_and_valid_last_login(entered_email):
"""
If user exists with the entered email, invalidate OTP and recent valid login.
This will force the user to enter correct credentials and OTP again.
"""
try:
# Invalidate OTP
user = User.objects.get(email=entered_email)
user_otp_cache_key = f"{LoginCacheKeys.USER_OTP.value}{user.id}"
cache.delete(user_otp_cache_key)
# Invalidate recent valid login to force user to login again
valid_creds_last_entered_cache_key = (
f"{LoginCacheKeys.VALID_CREDENTIALS_LAST_ENTERED.value}{user.id}"
)
cache.delete(valid_creds_last_entered_cache_key)
logger.warning(f"LOGIN: Invalidating OTP and recent valid login for user {entered_email}")
except User.DoesNotExist:
pass
def check_ip_is_google_owned(ip: str) -> bool:
"""Check if IP address is google-owned to prevent rate limiting load balancer, etc."""
try:
ip_to_check = ipaddress.ip_address(ip)
# IP ranges that Google makes available to users on the internet
google_public_ips_response = requests.get(GOOGLE_PUBLIC_IP_URL).json()
google_public_ip_prefix_list = google_public_ips_response.get("prefixes")
google_public_ips = []
if google_public_ip_prefix_list:
google_public_ips = [
item.get("ipv4Prefix")
for item in google_public_ip_prefix_list
if item.get("ipv4Prefix")
]
# Global and regional external IP address ranges for customers' Google Cloud resources
google_cloud_ips_response = requests.get(GOOGLE_CLOUD_IP_URL).json()
google_cloud_ip_prefix_list = google_cloud_ips_response.get("prefixes")
google_cloud_ips = []
if google_cloud_ip_prefix_list:
google_cloud_ips = [
item.get("ipv4Prefix")
for item in google_cloud_ip_prefix_list
if item.get("ipv4Prefix")
]
# Total list of Google IPs
google_ip_list = google_public_ips + google_cloud_ips
for ip in google_ip_list:
network = ipaddress.ip_network(ip)
if ip_to_check in network:
return True
except Exception as ex:
logger.error(f"Exception in determining if IP is google-owned: {ex}")
pass
return False
def get_client_ip(request: HttpRequest) -> str:
"""
The load balancer accepts and leaves the X-Forwarded-For header from requests unchanged.
This value is prepended to the beginning of the list and should not be trusted as an
ip can be spoofed.
The Client IP as set by the load balancer is sent as the second to last value in the X-Forwarded-For header
See: https://cloud.google.com/load-balancing/docs/https#x-forwarded-for_header
Do not use a fallback such as request.META.get("REMOTE_ADDR") as it is a Google IP.
Only use REMOTE_ADDR for local dev as the X-Forwarded-For header isn't available.
"""
ip = None
if settings.IS_LOCAL_DEV:
return request.META.get("REMOTE_ADDR")
x_forwarded_for = request.META.get("HTTP_X_FORWARDED_FOR")
# x_forwarded_for = "8.8.8.8, 127.0.0.1, 34.152.86.0"
if x_forwarded_for:
# Remove all spaces
x_forwarded_for = x_forwarded_for.replace(" ", "")
try:
ip = x_forwarded_for.strip().split(",")[-2]
except IndexError:
logger.error(
f"LOGIN RATE LIMIT: Unable to find the client IP in the X-Forwarded-For header: {x_forwarded_for}"
)
return ip
"""
These would go in a types.py file
"""
class ConfigName(Enum):
LOGIN_CONFIG = "login_config"
class LoginConfigOptions(Enum):
MAX_BASIC_LOGIN_ATTEMPTS_TO_ENFORCE_MFA = "MAX_BASIC_LOGIN_ATTEMPTS_TO_ENFORCE_MFA"
MAX_EMAIL_POST_REQUESTS = "MAX_EMAIL_POST_REQUESTS"
MAX_EMAIL_POST_REQUESTS_INTERVAL = "MAX_EMAIL_POST_REQUESTS_INTERVAL"
MAX_LOGIN_REQUESTS_MIN = "MAX_LOGIN_REQUESTS_MIN"
MAX_OTP_ATTEMPTS = "MAX_OTP_ATTEMPTS"
MAX_OTP_ATTEMPTS_INTERVAL = "MAX_OTP_ATTEMPTS_INTERVAL"
MAX_OTP_RESENDS = "MAX_OTP_RESENDS"
MAX_OTP_RESENDS_INTERVAL = "MAX_OTP_RESENDS_INTERVAL"
MFA_CACHED_MINS = "MFA_CACHED_MINS"
OTP_EXPIRY_MINS = "OTP_EXPIRY_MINS"
class LoginConfig(object):
MAX_BASIC_LOGIN_ATTEMPTS_TO_ENFORCE_MFA = 5
MAX_EMAIL_POST_REQUESTS = 10
MAX_EMAIL_POST_REQUESTS_INTERVAL = 5
MAX_LOGIN_REQUESTS_MIN = 20
MAX_OTP_ATTEMPTS = 3
MAX_OTP_ATTEMPTS_INTERVAL = 15
MAX_OTP_RESENDS = 3
MAX_OTP_RESENDS_INTERVAL = 15
MFA_CACHED_MINS = 240
OTP_EXPIRY_MINS = 15
def __init__(self) -> None:
try:
login_config = Config.objects.get(name=ConfigName.LOGIN_CONFIG.value).values
self.MAX_BASIC_LOGIN_ATTEMPTS_TO_ENFORCE_MFA = login_config.get(
LoginConfigOptions.MAX_BASIC_LOGIN_ATTEMPTS_TO_ENFORCE_MFA.value
)
self.MAX_EMAIL_POST_REQUESTS = login_config.get(
LoginConfigOptions.MAX_EMAIL_POST_REQUESTS.value
)
self.MAX_EMAIL_POST_REQUESTS_INTERVAL = login_config.get(
LoginConfigOptions.MAX_EMAIL_POST_REQUESTS_INTERVAL.value
)
self.MAX_LOGIN_REQUESTS_MIN = login_config.get(
LoginConfigOptions.MAX_LOGIN_REQUESTS_MIN.value
)
self.MAX_OTP_ATTEMPTS = login_config.get(LoginConfigOptions.MAX_OTP_ATTEMPTS.value)
self.MAX_OTP_ATTEMPTS_INTERVAL = login_config.get(
LoginConfigOptions.MAX_OTP_ATTEMPTS_INTERVAL.value
)
self.MAX_OTP_RESENDS = login_config.get(LoginConfigOptions.MAX_OTP_RESENDS.value)
self.MAX_OTP_RESENDS_INTERVAL = login_config.get(
LoginConfigOptions.MAX_OTP_RESENDS_INTERVAL.value
)
self.MFA_CACHED_MINS = login_config.get(LoginConfigOptions.MFA_CACHED_MINS.value)
self.OTP_EXPIRY_MINS = login_config.get(LoginConfigOptions.OTP_EXPIRY_MINS.value)
except Config.DoesNotExist:
# Use default values if no config is found
pass
class LoginCacheKeys(Enum):
RATE_LIMIT_EMAIL_LOGIN_POST_COUNT = "rate_limit:login_view:email-post:email-"
RATE_LIMIT_EMAIL_OTP_POST_COUNT = "otp-attempts:email-"
RATE_LIMIT_EMAIL_OTP_RESEND_COUNT = "otp-resend-count:email-"
RATE_LIMIT_IP_LOGIN_SCREEN_COUNT = "rate_limit:login_view:ip-"
RATE_LIMIT_IP_OTP_LOGIN_SCREEN_COUNT = "rate_limit:otp_login_view:ip-"
VALID_CREDENTIALS_LAST_ENTERED = "valid_credentials_last_entered:user_id-"
USER_OTP = "user-otp:user_id-"
GOOGLE_PUBLIC_IP_URL = "https://www.gstatic.com/ipranges/goog.json"
GOOGLE_CLOUD_IP_URL = "https://www.gstatic.com/ipranges/goog.json"
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment