## 核心优化 - 移除详情页自动调用博查API的逻辑,改为按需加载 - 添加基于IP的频率限制(每小时3次) - 实现验证码防护机制(超过阈值后要求验证) - 新增频率限制工具类 utils/rate_limiter.py ## 成本控制 - API调用减少约90%+(只在用户点击时调用) - 防止恶意滥用和攻击 - 可配置的频率限制和验证码策略 ## 文档整合 - 创建 docs/ 目录结构 - 归档历史版本文档到 docs/archive/ - 移动部署文档到 docs/deployment/ - 添加文档索引 docs/README.md ## 技术变更 - 新增依赖: Flask-Limiter==3.5.0 - 修改: app.py (移除自动调用,新增API端点) - 修改: templates/detail_new.html (按需加载UI) - 新增: utils/rate_limiter.py (频率限制和验证码) - 新增: docs/archive/DEVELOP_v2.6.0_API_SECURITY.md ## 部署说明 1. pip install Flask-Limiter==3.5.0 2. 重启应用 3. 无需数据库迁移 Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
274 lines
8.2 KiB
Python
274 lines
8.2 KiB
Python
"""
|
||
频率限制和验证码防护工具
|
||
v2.6新增:防止博查API被滥用
|
||
"""
|
||
|
||
from datetime import datetime, timedelta
|
||
from collections import defaultdict
|
||
import hashlib
|
||
|
||
|
||
class RateLimiter:
|
||
"""
|
||
简单的基于内存的频率限制器
|
||
生产环境建议使用Redis存储
|
||
"""
|
||
|
||
def __init__(self):
|
||
# 存储格式: {ip: [(timestamp, action), ...]}
|
||
self._requests = defaultdict(list)
|
||
# 存储格式: {ip: require_captcha_until_timestamp}
|
||
self._captcha_required = {}
|
||
|
||
def is_rate_limited(self, ip, action='news_fetch', limit=3, window_minutes=60):
|
||
"""
|
||
检查IP是否超过频率限制
|
||
|
||
Args:
|
||
ip: 客户端IP地址
|
||
action: 操作类型(用于区分不同的API)
|
||
limit: 时间窗口内允许的最大请求次数
|
||
window_minutes: 时间窗口(分钟)
|
||
|
||
Returns:
|
||
(is_limited, remaining_count, reset_time)
|
||
"""
|
||
now = datetime.now()
|
||
cutoff_time = now - timedelta(minutes=window_minutes)
|
||
|
||
# 清理过期记录
|
||
if ip in self._requests:
|
||
self._requests[ip] = [
|
||
(ts, act) for ts, act in self._requests[ip]
|
||
if ts > cutoff_time and act == action
|
||
]
|
||
|
||
# 计算当前窗口内的请求次数
|
||
current_count = len(self._requests[ip])
|
||
|
||
if current_count >= limit:
|
||
# 计算重置时间(最早的请求时间 + 窗口时间)
|
||
oldest_request = min(ts for ts, _ in self._requests[ip])
|
||
reset_time = oldest_request + timedelta(minutes=window_minutes)
|
||
return True, 0, reset_time
|
||
|
||
return False, limit - current_count, now + timedelta(minutes=window_minutes)
|
||
|
||
def record_request(self, ip, action='news_fetch'):
|
||
"""记录一次请求"""
|
||
self._requests[ip].append((datetime.now(), action))
|
||
|
||
# 防止内存泄漏:每个IP最多保留100条记录
|
||
if len(self._requests[ip]) > 100:
|
||
self._requests[ip] = self._requests[ip][-100:]
|
||
|
||
def require_captcha(self, ip, duration_minutes=30):
|
||
"""
|
||
标记某个IP需要验证码验证
|
||
|
||
Args:
|
||
ip: 客户端IP
|
||
duration_minutes: 需要验证码的持续时间
|
||
"""
|
||
until = datetime.now() + timedelta(minutes=duration_minutes)
|
||
self._captcha_required[ip] = until
|
||
|
||
def is_captcha_required(self, ip):
|
||
"""
|
||
检查IP是否需要验证码
|
||
|
||
Returns:
|
||
(required, reason)
|
||
"""
|
||
if ip not in self._captcha_required:
|
||
return False, None
|
||
|
||
until = self._captcha_required[ip]
|
||
if datetime.now() < until:
|
||
remaining = (until - datetime.now()).seconds // 60
|
||
return True, f"请求过于频繁,请在{remaining}分钟后重试或完成验证码验证"
|
||
else:
|
||
# 过期,清理
|
||
del self._captcha_required[ip]
|
||
return False, None
|
||
|
||
def clear_captcha_requirement(self, ip):
|
||
"""清除验证码要求(验证通过后调用)"""
|
||
if ip in self._captcha_required:
|
||
del self._captcha_required[ip]
|
||
|
||
def get_request_count(self, ip, action='news_fetch', window_minutes=60):
|
||
"""获取指定时间窗口内的请求次数"""
|
||
now = datetime.now()
|
||
cutoff_time = now - timedelta(minutes=window_minutes)
|
||
|
||
if ip not in self._requests:
|
||
return 0
|
||
|
||
return sum(
|
||
1 for ts, act in self._requests[ip]
|
||
if ts > cutoff_time and act == action
|
||
)
|
||
|
||
def cleanup_old_records(self, older_than_hours=24):
|
||
"""清理旧记录(建议定期调用)"""
|
||
cutoff_time = datetime.now() - timedelta(hours=older_than_hours)
|
||
|
||
# 清理请求记录
|
||
for ip in list(self._requests.keys()):
|
||
self._requests[ip] = [
|
||
(ts, act) for ts, act in self._requests[ip]
|
||
if ts > cutoff_time
|
||
]
|
||
if not self._requests[ip]:
|
||
del self._requests[ip]
|
||
|
||
# 清理验证码要求
|
||
now = datetime.now()
|
||
for ip in list(self._captcha_required.keys()):
|
||
if self._captcha_required[ip] < now:
|
||
del self._captcha_required[ip]
|
||
|
||
|
||
# 全局实例(生产环境建议使用Redis)
|
||
_global_limiter = RateLimiter()
|
||
|
||
|
||
def get_rate_limiter():
|
||
"""获取全局频率限制器实例"""
|
||
return _global_limiter
|
||
|
||
|
||
def get_client_ip(request):
|
||
"""
|
||
获取客户端真实IP
|
||
|
||
考虑了代理和CDN的情况
|
||
"""
|
||
# 优先从X-Forwarded-For获取(考虑CDN/代理)
|
||
if request.headers.get('X-Forwarded-For'):
|
||
# X-Forwarded-For可能包含多个IP,取第一个
|
||
ip = request.headers.get('X-Forwarded-For').split(',')[0].strip()
|
||
elif request.headers.get('X-Real-IP'):
|
||
ip = request.headers.get('X-Real-IP')
|
||
else:
|
||
ip = request.remote_addr
|
||
|
||
return ip or 'unknown'
|
||
|
||
|
||
class CaptchaVerifier:
|
||
"""
|
||
验证码验证器(支持多种验证码服务)
|
||
"""
|
||
|
||
def __init__(self, service='simple', secret_key=None):
|
||
"""
|
||
Args:
|
||
service: 验证码服务类型 ('simple', 'recaptcha', 'hcaptcha')
|
||
secret_key: 验证码服务的密钥
|
||
"""
|
||
self.service = service
|
||
self.secret_key = secret_key
|
||
|
||
def verify(self, response_token, remote_ip=None):
|
||
"""
|
||
验证验证码
|
||
|
||
Args:
|
||
response_token: 客户端提交的验证码响应
|
||
remote_ip: 客户端IP(可选)
|
||
|
||
Returns:
|
||
(success, error_message)
|
||
"""
|
||
if self.service == 'simple':
|
||
# 简单验证:检查是否提供了token
|
||
if response_token and len(response_token) > 10:
|
||
return True, None
|
||
return False, "验证码无效"
|
||
|
||
elif self.service == 'recaptcha':
|
||
# Google reCAPTCHA v2/v3
|
||
return self._verify_recaptcha(response_token, remote_ip)
|
||
|
||
elif self.service == 'hcaptcha':
|
||
# hCaptcha
|
||
return self._verify_hcaptcha(response_token, remote_ip)
|
||
|
||
return False, "不支持的验证码服务"
|
||
|
||
def _verify_recaptcha(self, response_token, remote_ip):
|
||
"""验证Google reCAPTCHA"""
|
||
import requests
|
||
|
||
if not self.secret_key:
|
||
return False, "reCAPTCHA密钥未配置"
|
||
|
||
try:
|
||
response = requests.post(
|
||
'https://www.google.com/recaptcha/api/siteverify',
|
||
data={
|
||
'secret': self.secret_key,
|
||
'response': response_token,
|
||
'remoteip': remote_ip
|
||
},
|
||
timeout=5
|
||
)
|
||
result = response.json()
|
||
|
||
if result.get('success'):
|
||
return True, None
|
||
else:
|
||
errors = result.get('error-codes', [])
|
||
return False, f"验证失败: {', '.join(errors)}"
|
||
|
||
except Exception as e:
|
||
return False, f"验证服务异常: {str(e)}"
|
||
|
||
def _verify_hcaptcha(self, response_token, remote_ip):
|
||
"""验证hCaptcha"""
|
||
import requests
|
||
|
||
if not self.secret_key:
|
||
return False, "hCaptcha密钥未配置"
|
||
|
||
try:
|
||
response = requests.post(
|
||
'https://hcaptcha.com/siteverify',
|
||
data={
|
||
'secret': self.secret_key,
|
||
'response': response_token,
|
||
'remoteip': remote_ip
|
||
},
|
||
timeout=5
|
||
)
|
||
result = response.json()
|
||
|
||
if result.get('success'):
|
||
return True, None
|
||
else:
|
||
return False, "验证失败"
|
||
|
||
except Exception as e:
|
||
return False, f"验证服务异常: {str(e)}"
|
||
|
||
|
||
if __name__ == '__main__':
|
||
# 测试代码
|
||
limiter = get_rate_limiter()
|
||
|
||
# 模拟多次请求
|
||
test_ip = "192.168.1.100"
|
||
|
||
for i in range(5):
|
||
is_limited, remaining, reset_time = limiter.is_rate_limited(
|
||
test_ip, limit=3, window_minutes=60
|
||
)
|
||
|
||
if is_limited:
|
||
print(f"请求{i+1}: 已被限制,重置时间: {reset_time}")
|
||
else:
|
||
print(f"请求{i+1}: 允许,剩余次数: {remaining}")
|
||
limiter.record_request(test_ip)
|