Files
shopback/core/throttling.py
Beyhan Oğur d9f1ea341e first commit
2026-04-26 22:27:56 +03:00

351 lines
13 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
Custom throttling classes for API rate limiting
"""
import logging
import ipaddress
from rest_framework.throttling import AnonRateThrottle, UserRateThrottle
# Logger yapılandırması
logger = logging.getLogger('throttling')
logger.setLevel(logging.INFO)
# File handler ekle (logs dizinine yaz)
import os
import sys
from pathlib import Path
BASE_DIR = Path(__file__).resolve().parent.parent
LOG_DIR = BASE_DIR / 'logs'
LOG_DIR.mkdir(exist_ok=True)
# Formatter oluştur
formatter = logging.Formatter(
'%(asctime)s - %(name)s - %(levelname)s - %(message)s',
datefmt='%Y-%m-%d %H:%M:%S'
)
# File handler oluştur (dosyaya yaz)
file_handler = logging.FileHandler(LOG_DIR / 'throttling.log', encoding='utf-8')
file_handler.setLevel(logging.INFO)
file_handler.setFormatter(formatter)
# Stream handler oluştur (konsola yaz - Docker için)
console_handler = logging.StreamHandler(sys.stdout)
console_handler.setLevel(logging.INFO)
console_handler.setFormatter(formatter)
# Handler'ları logger'a ekle (duplicate handler'ları önlemek için)
if not logger.handlers:
logger.addHandler(file_handler)
logger.addHandler(console_handler)
def is_cloudflare_ip(ip):
"""
IP adresinin Cloudflare IP aralığında olup olmadığını kontrol eder
Cloudflare IP aralıkları: https://www.cloudflare.com/ips/
"""
try:
ip_obj = ipaddress.ip_address(ip)
# Cloudflare IPv4 aralıkları (en yaygın olanlar)
cloudflare_ranges = [
ipaddress.ip_network('173.245.48.0/20'),
ipaddress.ip_network('103.21.244.0/22'),
ipaddress.ip_network('103.22.200.0/22'),
ipaddress.ip_network('103.31.4.0/22'),
ipaddress.ip_network('141.101.64.0/18'),
ipaddress.ip_network('108.162.192.0/18'),
ipaddress.ip_network('190.93.240.0/20'),
ipaddress.ip_network('188.114.96.0/20'),
ipaddress.ip_network('197.234.240.0/22'),
ipaddress.ip_network('198.41.128.0/17'),
ipaddress.ip_network('162.158.0.0/15'),
ipaddress.ip_network('104.16.0.0/13'),
ipaddress.ip_network('104.24.0.0/14'),
ipaddress.ip_network('172.64.0.0/13'),
ipaddress.ip_network('131.0.72.0/22'),
]
# Cloudflare IPv6 aralıkları
cloudflare_ranges_v6 = [
ipaddress.ip_network('2400:cb00::/32'),
ipaddress.ip_network('2606:4700::/32'),
ipaddress.ip_network('2803:f800::/32'),
ipaddress.ip_network('2405:b500::/32'),
ipaddress.ip_network('2405:8100::/32'),
ipaddress.ip_network('2a06:98c0::/29'),
ipaddress.ip_network('2c0f:f248::/32'),
]
all_ranges = cloudflare_ranges + cloudflare_ranges_v6
return any(ip_obj in network for network in all_ranges)
except (ValueError, ipaddress.AddressValueError):
return False
class CustomAnonRateThrottle(AnonRateThrottle):
"""
Belirli IP'ler için throttling'i bypass eder
"""
EXEMPT_IPS = [
'127.0.0.1',
'localhost',
'::1',
'212.64.215.243',
'162.158.210.254',
'188.132.232.119',
]
def get_client_ip(self, request):
"""
Client IP adresini al
Cloudflare kullanıldığında CF-Connecting-IP header'ını öncelikli kullanır
"""
# Cloudflare gerçek client IP'si (en güvenilir)
cf_connecting_ip = request.META.get('HTTP_CF_CONNECTING_IP')
if cf_connecting_ip:
return cf_connecting_ip.strip()
# True-Client-IP (bazı Cloudflare yapılandırmalarında)
true_client_ip = request.META.get('HTTP_TRUE_CLIENT_IP')
if true_client_ip:
return true_client_ip.strip()
# X-Forwarded-For (fallback)
x_forwarded_for = request.META.get('HTTP_X_FORWARDED_FOR')
if x_forwarded_for:
# İlk IP genellikle gerçek client IP'sidir
ip = x_forwarded_for.split(',')[0].strip()
return ip
# Son çare: REMOTE_ADDR
remote_addr = request.META.get('REMOTE_ADDR', 'unknown')
# Eğer REMOTE_ADDR Cloudflare IP'si ise, gerçek IP bulunamadı demektir
if is_cloudflare_ip(remote_addr):
logger.warning(
f"[IP DETECTION] Cloudflare IP tespit edildi ({remote_addr}) "
f"ama gerçek client IP bulunamadı. CF-Connecting-IP header'ı eksik olabilir."
)
return remote_addr
def get_cache_key(self, request, view):
"""
Cache key'i gerçek client IP'ye göre oluşturur
Cloudflare kullanıldığında gerçek IP'yi kullanır
"""
# Gerçek client IP'yi al
ident = self.get_client_ip(request)
# Cache key format: throttle_anon_{ip}
return self.cache_format % {
'scope': self.scope,
'ident': ident
}
def allow_request(self, request, view):
# Get client IP
ip = self.get_client_ip(request)
remote_addr = request.META.get('REMOTE_ADDR', 'unknown')
host = request.get_host().split(':')[0]
path = request.path
method = request.method
user_agent = request.META.get('HTTP_USER_AGENT', 'unknown')
# View bilgisi
view_name = getattr(view, '__class__', None)
view_name = view_name.__name__ if view_name else 'unknown'
# Cloudflare kontrolü
is_from_cloudflare = is_cloudflare_ip(remote_addr)
cf_info = f"CF-IP: {remote_addr}" if is_from_cloudflare else ""
# Belirtilen IP'lerden geliyorsa throttling yapma
if ip in self.EXEMPT_IPS:
logger.info(
f"[ANON THROTTLE - BYPASS] Real-IP: {ip} | {cf_info} | Host: {host} | "
f"Path: {path} | Method: {method} | View: {view_name} | "
f"User-Agent: {user_agent[:100]}"
)
return True
# Normal throttling kurallarını uygula
allowed = super().allow_request(request, view)
# Throttle durumunu kontrol et
if allowed:
# Rate limit bilgilerini al
throttle_scope = getattr(view, 'throttle_scope', None) or 'anon'
rate = self.get_rate()
num_requests, duration = self.parse_rate(rate)
# Cache key'den kalan istek sayısını tahmin et
cache_key = self.get_cache_key(request, view)
history = self.cache.get(cache_key, [])
remaining = max(0, num_requests - len(history))
logger.info(
f"[ANON THROTTLE - ALLOWED] Real-IP: {ip} | {cf_info} | Host: {host} | "
f"Path: {path} | Method: {method} | View: {view_name} | "
f"Rate: {rate} | Remaining: {remaining}/{num_requests} | "
f"User-Agent: {user_agent[:100]}"
)
else:
# Throttle limit aşıldı
rate = self.get_rate()
num_requests, duration = self.parse_rate(rate)
logger.warning(
f"[ANON THROTTLE - BLOCKED] Real-IP: {ip} | {cf_info} | Host: {host} | "
f"Path: {path} | Method: {method} | View: {view_name} | "
f"Rate: {rate} | Limit: {num_requests}/{duration} | "
f"User-Agent: {user_agent[:100]}"
)
return allowed
class CustomUserRateThrottle(UserRateThrottle):
"""
Belirli kullanıcılar veya domainler için throttling'i bypass eder
"""
EXEMPT_HOSTS = [
'api.denizogur.com.tr',
'back.beyhan.gen.tr',
'shop.beyhan.gen.tr',
'beyhan.gen.tr',
'denizogur.com.tr',
'denizour.com.tr',
'localhost',
'127.0.0.1',
]
def get_client_ip(self, request):
"""
Client IP adresini al
Cloudflare kullanıldığında CF-Connecting-IP header'ını öncelikli kullanır
"""
# Cloudflare gerçek client IP'si (en güvenilir)
cf_connecting_ip = request.META.get('HTTP_CF_CONNECTING_IP')
if cf_connecting_ip:
return cf_connecting_ip.strip()
# True-Client-IP (bazı Cloudflare yapılandırmalarında)
true_client_ip = request.META.get('HTTP_TRUE_CLIENT_IP')
if true_client_ip:
return true_client_ip.strip()
# X-Forwarded-For (fallback)
x_forwarded_for = request.META.get('HTTP_X_FORWARDED_FOR')
if x_forwarded_for:
# İlk IP genellikle gerçek client IP'sidir
ip = x_forwarded_for.split(',')[0].strip()
return ip
# Son çare: REMOTE_ADDR
remote_addr = request.META.get('REMOTE_ADDR', 'unknown')
# Eğer REMOTE_ADDR Cloudflare IP'si ise, gerçek IP bulunamadı demektir
if is_cloudflare_ip(remote_addr):
logger.warning(
f"[IP DETECTION] Cloudflare IP tespit edildi ({remote_addr}) "
f"ama gerçek client IP bulunamadı. CF-Connecting-IP header'ı eksik olabilir."
)
return remote_addr
def get_cache_key(self, request, view):
"""
Cache key'i gerçek client IP'ye göre oluşturur
Cloudflare kullanıldığında gerçek IP'yi kullanır
UserRateThrottle için user ID de eklenir
"""
# Authenticated kullanıcı için user ID kullan
if request.user and request.user.is_authenticated:
ident = request.user.pk
else:
# Anonymous kullanıcı için gerçek client IP'yi kullan
ident = self.get_client_ip(request)
# Cache key format: throttle_user_{user_id} veya throttle_anon_{ip}
return self.cache_format % {
'scope': self.scope,
'ident': ident
}
def allow_request(self, request, view):
# Get client IP
ip = self.get_client_ip(request)
remote_addr = request.META.get('REMOTE_ADDR', 'unknown')
host = request.get_host().split(':')[0]
path = request.path
method = request.method
user_agent = request.META.get('HTTP_USER_AGENT', 'unknown')
# View bilgisi
view_name = getattr(view, '__class__', None)
view_name = view_name.__name__ if view_name else 'unknown'
# User bilgisi
user_info = 'anonymous'
if request.user and request.user.is_authenticated:
user_info = f"user_id:{request.user.id} | email:{getattr(request.user, 'email', 'N/A')} | staff:{request.user.is_staff}"
# Cloudflare kontrolü
is_from_cloudflare = is_cloudflare_ip(remote_addr)
cf_info = f"CF-IP: {remote_addr}" if is_from_cloudflare else ""
# Host kontrolü
if host in self.EXEMPT_HOSTS:
logger.info(
f"[USER THROTTLE - BYPASS (HOST)] Real-IP: {ip} | {cf_info} | Host: {host} | "
f"Path: {path} | Method: {method} | View: {view_name} | "
f"User: {user_info} | User-Agent: {user_agent[:100]}"
)
return True
# Authenticated kullanıcı için throttling yapma (staff users)
if request.user and request.user.is_authenticated and request.user.is_staff:
logger.info(
f"[USER THROTTLE - BYPASS (STAFF)] Real-IP: {ip} | {cf_info} | Host: {host} | "
f"Path: {path} | Method: {method} | View: {view_name} | "
f"User: {user_info} | User-Agent: {user_agent[:100]}"
)
return True
# Normal throttling kurallarını uygula
allowed = super().allow_request(request, view)
# Throttle durumunu kontrol et
if allowed:
# Rate limit bilgilerini al
throttle_scope = getattr(view, 'throttle_scope', None) or 'user'
rate = self.get_rate()
num_requests, duration = self.parse_rate(rate)
# Cache key'den kalan istek sayısını tahmin et
cache_key = self.get_cache_key(request, view)
history = self.cache.get(cache_key, [])
remaining = max(0, num_requests - len(history))
logger.info(
f"[USER THROTTLE - ALLOWED] Real-IP: {ip} | {cf_info} | Host: {host} | "
f"Path: {path} | Method: {method} | View: {view_name} | "
f"User: {user_info} | Rate: {rate} | Remaining: {remaining}/{num_requests} | "
f"User-Agent: {user_agent[:100]}"
)
else:
# Throttle limit aşıldı
rate = self.get_rate()
num_requests, duration = self.parse_rate(rate)
logger.warning(
f"[USER THROTTLE - BLOCKED] Real-IP: {ip} | {cf_info} | Host: {host} | "
f"Path: {path} | Method: {method} | View: {view_name} | "
f"User: {user_info} | Rate: {rate} | Limit: {num_requests}/{duration} | "
f"User-Agent: {user_agent[:100]}"
)
return allowed