""" 频率限制和验证码防护工具 v2.6新增:防止博查API被滥用 """ from datetime import datetime, timedelta from collections import defaultdict import hashlib class RateLimiter: """ 简单的基于内存的频率限制器 生产环境建议使用Redis存储 """ def __init__(self): # 存储格式: {ip: [(timestamp, action), ...]} self._requests = defaultdict(list) # 存储格式: {ip: require_captcha_until_timestamp} self._captcha_required = {} def is_rate_limited(self, ip, action='news_fetch', limit=3, window_minutes=60): """ 检查IP是否超过频率限制 Args: ip: 客户端IP地址 action: 操作类型(用于区分不同的API) limit: 时间窗口内允许的最大请求次数 window_minutes: 时间窗口(分钟) Returns: (is_limited, remaining_count, reset_time) """ now = datetime.now() cutoff_time = now - timedelta(minutes=window_minutes) # 清理过期记录 if ip in self._requests: self._requests[ip] = [ (ts, act) for ts, act in self._requests[ip] if ts > cutoff_time and act == action ] # 计算当前窗口内的请求次数 current_count = len(self._requests[ip]) if current_count >= limit: # 计算重置时间(最早的请求时间 + 窗口时间) oldest_request = min(ts for ts, _ in self._requests[ip]) reset_time = oldest_request + timedelta(minutes=window_minutes) return True, 0, reset_time return False, limit - current_count, now + timedelta(minutes=window_minutes) def record_request(self, ip, action='news_fetch'): """记录一次请求""" self._requests[ip].append((datetime.now(), action)) # 防止内存泄漏:每个IP最多保留100条记录 if len(self._requests[ip]) > 100: self._requests[ip] = self._requests[ip][-100:] def require_captcha(self, ip, duration_minutes=30): """ 标记某个IP需要验证码验证 Args: ip: 客户端IP duration_minutes: 需要验证码的持续时间 """ until = datetime.now() + timedelta(minutes=duration_minutes) self._captcha_required[ip] = until def is_captcha_required(self, ip): """ 检查IP是否需要验证码 Returns: (required, reason) """ if ip not in self._captcha_required: return False, None until = self._captcha_required[ip] if datetime.now() < until: remaining = (until - datetime.now()).seconds // 60 return True, f"请求过于频繁,请在{remaining}分钟后重试或完成验证码验证" else: # 过期,清理 del self._captcha_required[ip] return False, None def clear_captcha_requirement(self, ip): """清除验证码要求(验证通过后调用)""" if ip in self._captcha_required: del self._captcha_required[ip] def get_request_count(self, ip, action='news_fetch', window_minutes=60): """获取指定时间窗口内的请求次数""" now = datetime.now() cutoff_time = now - timedelta(minutes=window_minutes) if ip not in self._requests: return 0 return sum( 1 for ts, act in self._requests[ip] if ts > cutoff_time and act == action ) def cleanup_old_records(self, older_than_hours=24): """清理旧记录(建议定期调用)""" cutoff_time = datetime.now() - timedelta(hours=older_than_hours) # 清理请求记录 for ip in list(self._requests.keys()): self._requests[ip] = [ (ts, act) for ts, act in self._requests[ip] if ts > cutoff_time ] if not self._requests[ip]: del self._requests[ip] # 清理验证码要求 now = datetime.now() for ip in list(self._captcha_required.keys()): if self._captcha_required[ip] < now: del self._captcha_required[ip] # 全局实例(生产环境建议使用Redis) _global_limiter = RateLimiter() def get_rate_limiter(): """获取全局频率限制器实例""" return _global_limiter def get_client_ip(request): """ 获取客户端真实IP 考虑了代理和CDN的情况 """ # 优先从X-Forwarded-For获取(考虑CDN/代理) if request.headers.get('X-Forwarded-For'): # X-Forwarded-For可能包含多个IP,取第一个 ip = request.headers.get('X-Forwarded-For').split(',')[0].strip() elif request.headers.get('X-Real-IP'): ip = request.headers.get('X-Real-IP') else: ip = request.remote_addr return ip or 'unknown' class CaptchaVerifier: """ 验证码验证器(支持多种验证码服务) """ def __init__(self, service='simple', secret_key=None): """ Args: service: 验证码服务类型 ('simple', 'recaptcha', 'hcaptcha') secret_key: 验证码服务的密钥 """ self.service = service self.secret_key = secret_key def verify(self, response_token, remote_ip=None): """ 验证验证码 Args: response_token: 客户端提交的验证码响应 remote_ip: 客户端IP(可选) Returns: (success, error_message) """ if self.service == 'simple': # 简单验证:检查是否提供了token if response_token and len(response_token) > 10: return True, None return False, "验证码无效" elif self.service == 'recaptcha': # Google reCAPTCHA v2/v3 return self._verify_recaptcha(response_token, remote_ip) elif self.service == 'hcaptcha': # hCaptcha return self._verify_hcaptcha(response_token, remote_ip) return False, "不支持的验证码服务" def _verify_recaptcha(self, response_token, remote_ip): """验证Google reCAPTCHA""" import requests if not self.secret_key: return False, "reCAPTCHA密钥未配置" try: response = requests.post( 'https://www.google.com/recaptcha/api/siteverify', data={ 'secret': self.secret_key, 'response': response_token, 'remoteip': remote_ip }, timeout=5 ) result = response.json() if result.get('success'): return True, None else: errors = result.get('error-codes', []) return False, f"验证失败: {', '.join(errors)}" except Exception as e: return False, f"验证服务异常: {str(e)}" def _verify_hcaptcha(self, response_token, remote_ip): """验证hCaptcha""" import requests if not self.secret_key: return False, "hCaptcha密钥未配置" try: response = requests.post( 'https://hcaptcha.com/siteverify', data={ 'secret': self.secret_key, 'response': response_token, 'remoteip': remote_ip }, timeout=5 ) result = response.json() if result.get('success'): return True, None else: return False, "验证失败" except Exception as e: return False, f"验证服务异常: {str(e)}" if __name__ == '__main__': # 测试代码 limiter = get_rate_limiter() # 模拟多次请求 test_ip = "192.168.1.100" for i in range(5): is_limited, remaining, reset_time = limiter.is_rate_limited( test_ip, limit=3, window_minutes=60 ) if is_limited: print(f"请求{i+1}: 已被限制,重置时间: {reset_time}") else: print(f"请求{i+1}: 允许,剩余次数: {remaining}") limiter.record_request(test_ip)