feat: v2.6.0 - API安全优化和文档整合
## 核心优化 - 移除详情页自动调用博查API的逻辑,改为按需加载 - 添加基于IP的频率限制(每小时3次) - 实现验证码防护机制(超过阈值后要求验证) - 新增频率限制工具类 utils/rate_limiter.py ## 成本控制 - API调用减少约90%+(只在用户点击时调用) - 防止恶意滥用和攻击 - 可配置的频率限制和验证码策略 ## 文档整合 - 创建 docs/ 目录结构 - 归档历史版本文档到 docs/archive/ - 移动部署文档到 docs/deployment/ - 添加文档索引 docs/README.md ## 技术变更 - 新增依赖: Flask-Limiter==3.5.0 - 修改: app.py (移除自动调用,新增API端点) - 修改: templates/detail_new.html (按需加载UI) - 新增: utils/rate_limiter.py (频率限制和验证码) - 新增: docs/archive/DEVELOP_v2.6.0_API_SECURITY.md ## 部署说明 1. pip install Flask-Limiter==3.5.0 2. 重启应用 3. 无需数据库迁移 Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
This commit is contained in:
273
utils/rate_limiter.py
Normal file
273
utils/rate_limiter.py
Normal file
@@ -0,0 +1,273 @@
|
||||
"""
|
||||
频率限制和验证码防护工具
|
||||
v2.6新增:防止博查API被滥用
|
||||
"""
|
||||
|
||||
from datetime import datetime, timedelta
|
||||
from collections import defaultdict
|
||||
import hashlib
|
||||
|
||||
|
||||
class RateLimiter:
|
||||
"""
|
||||
简单的基于内存的频率限制器
|
||||
生产环境建议使用Redis存储
|
||||
"""
|
||||
|
||||
def __init__(self):
|
||||
# 存储格式: {ip: [(timestamp, action), ...]}
|
||||
self._requests = defaultdict(list)
|
||||
# 存储格式: {ip: require_captcha_until_timestamp}
|
||||
self._captcha_required = {}
|
||||
|
||||
def is_rate_limited(self, ip, action='news_fetch', limit=3, window_minutes=60):
|
||||
"""
|
||||
检查IP是否超过频率限制
|
||||
|
||||
Args:
|
||||
ip: 客户端IP地址
|
||||
action: 操作类型(用于区分不同的API)
|
||||
limit: 时间窗口内允许的最大请求次数
|
||||
window_minutes: 时间窗口(分钟)
|
||||
|
||||
Returns:
|
||||
(is_limited, remaining_count, reset_time)
|
||||
"""
|
||||
now = datetime.now()
|
||||
cutoff_time = now - timedelta(minutes=window_minutes)
|
||||
|
||||
# 清理过期记录
|
||||
if ip in self._requests:
|
||||
self._requests[ip] = [
|
||||
(ts, act) for ts, act in self._requests[ip]
|
||||
if ts > cutoff_time and act == action
|
||||
]
|
||||
|
||||
# 计算当前窗口内的请求次数
|
||||
current_count = len(self._requests[ip])
|
||||
|
||||
if current_count >= limit:
|
||||
# 计算重置时间(最早的请求时间 + 窗口时间)
|
||||
oldest_request = min(ts for ts, _ in self._requests[ip])
|
||||
reset_time = oldest_request + timedelta(minutes=window_minutes)
|
||||
return True, 0, reset_time
|
||||
|
||||
return False, limit - current_count, now + timedelta(minutes=window_minutes)
|
||||
|
||||
def record_request(self, ip, action='news_fetch'):
|
||||
"""记录一次请求"""
|
||||
self._requests[ip].append((datetime.now(), action))
|
||||
|
||||
# 防止内存泄漏:每个IP最多保留100条记录
|
||||
if len(self._requests[ip]) > 100:
|
||||
self._requests[ip] = self._requests[ip][-100:]
|
||||
|
||||
def require_captcha(self, ip, duration_minutes=30):
|
||||
"""
|
||||
标记某个IP需要验证码验证
|
||||
|
||||
Args:
|
||||
ip: 客户端IP
|
||||
duration_minutes: 需要验证码的持续时间
|
||||
"""
|
||||
until = datetime.now() + timedelta(minutes=duration_minutes)
|
||||
self._captcha_required[ip] = until
|
||||
|
||||
def is_captcha_required(self, ip):
|
||||
"""
|
||||
检查IP是否需要验证码
|
||||
|
||||
Returns:
|
||||
(required, reason)
|
||||
"""
|
||||
if ip not in self._captcha_required:
|
||||
return False, None
|
||||
|
||||
until = self._captcha_required[ip]
|
||||
if datetime.now() < until:
|
||||
remaining = (until - datetime.now()).seconds // 60
|
||||
return True, f"请求过于频繁,请在{remaining}分钟后重试或完成验证码验证"
|
||||
else:
|
||||
# 过期,清理
|
||||
del self._captcha_required[ip]
|
||||
return False, None
|
||||
|
||||
def clear_captcha_requirement(self, ip):
|
||||
"""清除验证码要求(验证通过后调用)"""
|
||||
if ip in self._captcha_required:
|
||||
del self._captcha_required[ip]
|
||||
|
||||
def get_request_count(self, ip, action='news_fetch', window_minutes=60):
|
||||
"""获取指定时间窗口内的请求次数"""
|
||||
now = datetime.now()
|
||||
cutoff_time = now - timedelta(minutes=window_minutes)
|
||||
|
||||
if ip not in self._requests:
|
||||
return 0
|
||||
|
||||
return sum(
|
||||
1 for ts, act in self._requests[ip]
|
||||
if ts > cutoff_time and act == action
|
||||
)
|
||||
|
||||
def cleanup_old_records(self, older_than_hours=24):
|
||||
"""清理旧记录(建议定期调用)"""
|
||||
cutoff_time = datetime.now() - timedelta(hours=older_than_hours)
|
||||
|
||||
# 清理请求记录
|
||||
for ip in list(self._requests.keys()):
|
||||
self._requests[ip] = [
|
||||
(ts, act) for ts, act in self._requests[ip]
|
||||
if ts > cutoff_time
|
||||
]
|
||||
if not self._requests[ip]:
|
||||
del self._requests[ip]
|
||||
|
||||
# 清理验证码要求
|
||||
now = datetime.now()
|
||||
for ip in list(self._captcha_required.keys()):
|
||||
if self._captcha_required[ip] < now:
|
||||
del self._captcha_required[ip]
|
||||
|
||||
|
||||
# 全局实例(生产环境建议使用Redis)
|
||||
_global_limiter = RateLimiter()
|
||||
|
||||
|
||||
def get_rate_limiter():
|
||||
"""获取全局频率限制器实例"""
|
||||
return _global_limiter
|
||||
|
||||
|
||||
def get_client_ip(request):
|
||||
"""
|
||||
获取客户端真实IP
|
||||
|
||||
考虑了代理和CDN的情况
|
||||
"""
|
||||
# 优先从X-Forwarded-For获取(考虑CDN/代理)
|
||||
if request.headers.get('X-Forwarded-For'):
|
||||
# X-Forwarded-For可能包含多个IP,取第一个
|
||||
ip = request.headers.get('X-Forwarded-For').split(',')[0].strip()
|
||||
elif request.headers.get('X-Real-IP'):
|
||||
ip = request.headers.get('X-Real-IP')
|
||||
else:
|
||||
ip = request.remote_addr
|
||||
|
||||
return ip or 'unknown'
|
||||
|
||||
|
||||
class CaptchaVerifier:
|
||||
"""
|
||||
验证码验证器(支持多种验证码服务)
|
||||
"""
|
||||
|
||||
def __init__(self, service='simple', secret_key=None):
|
||||
"""
|
||||
Args:
|
||||
service: 验证码服务类型 ('simple', 'recaptcha', 'hcaptcha')
|
||||
secret_key: 验证码服务的密钥
|
||||
"""
|
||||
self.service = service
|
||||
self.secret_key = secret_key
|
||||
|
||||
def verify(self, response_token, remote_ip=None):
|
||||
"""
|
||||
验证验证码
|
||||
|
||||
Args:
|
||||
response_token: 客户端提交的验证码响应
|
||||
remote_ip: 客户端IP(可选)
|
||||
|
||||
Returns:
|
||||
(success, error_message)
|
||||
"""
|
||||
if self.service == 'simple':
|
||||
# 简单验证:检查是否提供了token
|
||||
if response_token and len(response_token) > 10:
|
||||
return True, None
|
||||
return False, "验证码无效"
|
||||
|
||||
elif self.service == 'recaptcha':
|
||||
# Google reCAPTCHA v2/v3
|
||||
return self._verify_recaptcha(response_token, remote_ip)
|
||||
|
||||
elif self.service == 'hcaptcha':
|
||||
# hCaptcha
|
||||
return self._verify_hcaptcha(response_token, remote_ip)
|
||||
|
||||
return False, "不支持的验证码服务"
|
||||
|
||||
def _verify_recaptcha(self, response_token, remote_ip):
|
||||
"""验证Google reCAPTCHA"""
|
||||
import requests
|
||||
|
||||
if not self.secret_key:
|
||||
return False, "reCAPTCHA密钥未配置"
|
||||
|
||||
try:
|
||||
response = requests.post(
|
||||
'https://www.google.com/recaptcha/api/siteverify',
|
||||
data={
|
||||
'secret': self.secret_key,
|
||||
'response': response_token,
|
||||
'remoteip': remote_ip
|
||||
},
|
||||
timeout=5
|
||||
)
|
||||
result = response.json()
|
||||
|
||||
if result.get('success'):
|
||||
return True, None
|
||||
else:
|
||||
errors = result.get('error-codes', [])
|
||||
return False, f"验证失败: {', '.join(errors)}"
|
||||
|
||||
except Exception as e:
|
||||
return False, f"验证服务异常: {str(e)}"
|
||||
|
||||
def _verify_hcaptcha(self, response_token, remote_ip):
|
||||
"""验证hCaptcha"""
|
||||
import requests
|
||||
|
||||
if not self.secret_key:
|
||||
return False, "hCaptcha密钥未配置"
|
||||
|
||||
try:
|
||||
response = requests.post(
|
||||
'https://hcaptcha.com/siteverify',
|
||||
data={
|
||||
'secret': self.secret_key,
|
||||
'response': response_token,
|
||||
'remoteip': remote_ip
|
||||
},
|
||||
timeout=5
|
||||
)
|
||||
result = response.json()
|
||||
|
||||
if result.get('success'):
|
||||
return True, None
|
||||
else:
|
||||
return False, "验证失败"
|
||||
|
||||
except Exception as e:
|
||||
return False, f"验证服务异常: {str(e)}"
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
# 测试代码
|
||||
limiter = get_rate_limiter()
|
||||
|
||||
# 模拟多次请求
|
||||
test_ip = "192.168.1.100"
|
||||
|
||||
for i in range(5):
|
||||
is_limited, remaining, reset_time = limiter.is_rate_limited(
|
||||
test_ip, limit=3, window_minutes=60
|
||||
)
|
||||
|
||||
if is_limited:
|
||||
print(f"请求{i+1}: 已被限制,重置时间: {reset_time}")
|
||||
else:
|
||||
print(f"请求{i+1}: 允许,剩余次数: {remaining}")
|
||||
limiter.record_request(test_ip)
|
||||
Reference in New Issue
Block a user