Files
zjpb.net/utils/rate_limiter.py
Jowe 939717fa57 feat: v2.6.0 - API安全优化和文档整合
## 核心优化
- 移除详情页自动调用博查API的逻辑,改为按需加载
- 添加基于IP的频率限制(每小时3次)
- 实现验证码防护机制(超过阈值后要求验证)
- 新增频率限制工具类 utils/rate_limiter.py

## 成本控制
- API调用减少约90%+(只在用户点击时调用)
- 防止恶意滥用和攻击
- 可配置的频率限制和验证码策略

## 文档整合
- 创建 docs/ 目录结构
- 归档历史版本文档到 docs/archive/
- 移动部署文档到 docs/deployment/
- 添加文档索引 docs/README.md

## 技术变更
- 新增依赖: Flask-Limiter==3.5.0
- 修改: app.py (移除自动调用,新增API端点)
- 修改: templates/detail_new.html (按需加载UI)
- 新增: utils/rate_limiter.py (频率限制和验证码)
- 新增: docs/archive/DEVELOP_v2.6.0_API_SECURITY.md

## 部署说明
1. pip install Flask-Limiter==3.5.0
2. 重启应用
3. 无需数据库迁移

Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
2026-02-06 15:54:13 +08:00

274 lines
8.2 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
频率限制和验证码防护工具
v2.6新增防止博查API被滥用
"""
from datetime import datetime, timedelta
from collections import defaultdict
import hashlib
class RateLimiter:
"""
简单的基于内存的频率限制器
生产环境建议使用Redis存储
"""
def __init__(self):
# 存储格式: {ip: [(timestamp, action), ...]}
self._requests = defaultdict(list)
# 存储格式: {ip: require_captcha_until_timestamp}
self._captcha_required = {}
def is_rate_limited(self, ip, action='news_fetch', limit=3, window_minutes=60):
"""
检查IP是否超过频率限制
Args:
ip: 客户端IP地址
action: 操作类型用于区分不同的API
limit: 时间窗口内允许的最大请求次数
window_minutes: 时间窗口(分钟)
Returns:
(is_limited, remaining_count, reset_time)
"""
now = datetime.now()
cutoff_time = now - timedelta(minutes=window_minutes)
# 清理过期记录
if ip in self._requests:
self._requests[ip] = [
(ts, act) for ts, act in self._requests[ip]
if ts > cutoff_time and act == action
]
# 计算当前窗口内的请求次数
current_count = len(self._requests[ip])
if current_count >= limit:
# 计算重置时间(最早的请求时间 + 窗口时间)
oldest_request = min(ts for ts, _ in self._requests[ip])
reset_time = oldest_request + timedelta(minutes=window_minutes)
return True, 0, reset_time
return False, limit - current_count, now + timedelta(minutes=window_minutes)
def record_request(self, ip, action='news_fetch'):
"""记录一次请求"""
self._requests[ip].append((datetime.now(), action))
# 防止内存泄漏每个IP最多保留100条记录
if len(self._requests[ip]) > 100:
self._requests[ip] = self._requests[ip][-100:]
def require_captcha(self, ip, duration_minutes=30):
"""
标记某个IP需要验证码验证
Args:
ip: 客户端IP
duration_minutes: 需要验证码的持续时间
"""
until = datetime.now() + timedelta(minutes=duration_minutes)
self._captcha_required[ip] = until
def is_captcha_required(self, ip):
"""
检查IP是否需要验证码
Returns:
(required, reason)
"""
if ip not in self._captcha_required:
return False, None
until = self._captcha_required[ip]
if datetime.now() < until:
remaining = (until - datetime.now()).seconds // 60
return True, f"请求过于频繁,请在{remaining}分钟后重试或完成验证码验证"
else:
# 过期,清理
del self._captcha_required[ip]
return False, None
def clear_captcha_requirement(self, ip):
"""清除验证码要求(验证通过后调用)"""
if ip in self._captcha_required:
del self._captcha_required[ip]
def get_request_count(self, ip, action='news_fetch', window_minutes=60):
"""获取指定时间窗口内的请求次数"""
now = datetime.now()
cutoff_time = now - timedelta(minutes=window_minutes)
if ip not in self._requests:
return 0
return sum(
1 for ts, act in self._requests[ip]
if ts > cutoff_time and act == action
)
def cleanup_old_records(self, older_than_hours=24):
"""清理旧记录(建议定期调用)"""
cutoff_time = datetime.now() - timedelta(hours=older_than_hours)
# 清理请求记录
for ip in list(self._requests.keys()):
self._requests[ip] = [
(ts, act) for ts, act in self._requests[ip]
if ts > cutoff_time
]
if not self._requests[ip]:
del self._requests[ip]
# 清理验证码要求
now = datetime.now()
for ip in list(self._captcha_required.keys()):
if self._captcha_required[ip] < now:
del self._captcha_required[ip]
# 全局实例生产环境建议使用Redis
_global_limiter = RateLimiter()
def get_rate_limiter():
"""获取全局频率限制器实例"""
return _global_limiter
def get_client_ip(request):
"""
获取客户端真实IP
考虑了代理和CDN的情况
"""
# 优先从X-Forwarded-For获取考虑CDN/代理)
if request.headers.get('X-Forwarded-For'):
# X-Forwarded-For可能包含多个IP取第一个
ip = request.headers.get('X-Forwarded-For').split(',')[0].strip()
elif request.headers.get('X-Real-IP'):
ip = request.headers.get('X-Real-IP')
else:
ip = request.remote_addr
return ip or 'unknown'
class CaptchaVerifier:
"""
验证码验证器(支持多种验证码服务)
"""
def __init__(self, service='simple', secret_key=None):
"""
Args:
service: 验证码服务类型 ('simple', 'recaptcha', 'hcaptcha')
secret_key: 验证码服务的密钥
"""
self.service = service
self.secret_key = secret_key
def verify(self, response_token, remote_ip=None):
"""
验证验证码
Args:
response_token: 客户端提交的验证码响应
remote_ip: 客户端IP可选
Returns:
(success, error_message)
"""
if self.service == 'simple':
# 简单验证检查是否提供了token
if response_token and len(response_token) > 10:
return True, None
return False, "验证码无效"
elif self.service == 'recaptcha':
# Google reCAPTCHA v2/v3
return self._verify_recaptcha(response_token, remote_ip)
elif self.service == 'hcaptcha':
# hCaptcha
return self._verify_hcaptcha(response_token, remote_ip)
return False, "不支持的验证码服务"
def _verify_recaptcha(self, response_token, remote_ip):
"""验证Google reCAPTCHA"""
import requests
if not self.secret_key:
return False, "reCAPTCHA密钥未配置"
try:
response = requests.post(
'https://www.google.com/recaptcha/api/siteverify',
data={
'secret': self.secret_key,
'response': response_token,
'remoteip': remote_ip
},
timeout=5
)
result = response.json()
if result.get('success'):
return True, None
else:
errors = result.get('error-codes', [])
return False, f"验证失败: {', '.join(errors)}"
except Exception as e:
return False, f"验证服务异常: {str(e)}"
def _verify_hcaptcha(self, response_token, remote_ip):
"""验证hCaptcha"""
import requests
if not self.secret_key:
return False, "hCaptcha密钥未配置"
try:
response = requests.post(
'https://hcaptcha.com/siteverify',
data={
'secret': self.secret_key,
'response': response_token,
'remoteip': remote_ip
},
timeout=5
)
result = response.json()
if result.get('success'):
return True, None
else:
return False, "验证失败"
except Exception as e:
return False, f"验证服务异常: {str(e)}"
if __name__ == '__main__':
# 测试代码
limiter = get_rate_limiter()
# 模拟多次请求
test_ip = "192.168.1.100"
for i in range(5):
is_limited, remaining, reset_time = limiter.is_rate_limited(
test_ip, limit=3, window_minutes=60
)
if is_limited:
print(f"请求{i+1}: 已被限制,重置时间: {reset_time}")
else:
print(f"请求{i+1}: 允许,剩余次数: {remaining}")
limiter.record_request(test_ip)