Files
zjpb.net/app.py
Jowe 939717fa57 feat: v2.6.0 - API安全优化和文档整合
## 核心优化
- 移除详情页自动调用博查API的逻辑,改为按需加载
- 添加基于IP的频率限制(每小时3次)
- 实现验证码防护机制(超过阈值后要求验证)
- 新增频率限制工具类 utils/rate_limiter.py

## 成本控制
- API调用减少约90%+(只在用户点击时调用)
- 防止恶意滥用和攻击
- 可配置的频率限制和验证码策略

## 文档整合
- 创建 docs/ 目录结构
- 归档历史版本文档到 docs/archive/
- 移动部署文档到 docs/deployment/
- 添加文档索引 docs/README.md

## 技术变更
- 新增依赖: Flask-Limiter==3.5.0
- 修改: app.py (移除自动调用,新增API端点)
- 修改: templates/detail_new.html (按需加载UI)
- 新增: utils/rate_limiter.py (频率限制和验证码)
- 新增: docs/archive/DEVELOP_v2.6.0_API_SECURITY.md

## 部署说明
1. pip install Flask-Limiter==3.5.0
2. 重启应用
3. 无需数据库迁移

Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
2026-02-06 15:54:13 +08:00

1925 lines
74 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
import os
import markdown
from flask import Flask, render_template, redirect, url_for, request, flash, jsonify
from flask_login import LoginManager, login_user, logout_user, login_required, current_user
from flask_admin import Admin, AdminIndexView, expose
from flask_admin.contrib.sqla import ModelView
from datetime import datetime
from config import config
from models import db, Site, Tag, Admin as AdminModel, News, site_tags, PromptTemplate
from utils.website_fetcher import WebsiteFetcher
from utils.tag_generator import TagGenerator
from utils.news_searcher import NewsSearcher
from utils.rate_limiter import get_rate_limiter, get_client_ip, CaptchaVerifier
def create_app(config_name='default'):
"""应用工厂函数"""
app = Flask(__name__)
# 加载配置
app.config.from_object(config[config_name])
# 初始化数据库
db.init_app(app)
# 添加Markdown过滤器
@app.template_filter('markdown')
def markdown_filter(text):
"""将Markdown文本转换为HTML"""
if not text:
return ''
return markdown.markdown(text, extensions=['nl2br', 'fenced_code'])
# v2.4新增: 自动内链过滤器
@app.template_filter('auto_link')
def auto_link_filter(text, current_site_id=None):
"""自动为内容中的工具名称添加链接"""
if not text:
return ''
# 获取所有启用的网站(排除当前网站)
sites = Site.query.filter_by(is_active=True).all()
if current_site_id:
sites = [s for s in sites if s.id != current_site_id]
# 按名称长度降序排序,优先匹配长名称
sites = sorted(sites, key=lambda s: len(s.name), reverse=True)
# 记录已经添加链接的位置,避免重复
linked_sites = set()
for site in sites:
if site.name in text and site.name not in linked_sites:
# 只链接第一次出现的位置
link = f'<a href="/site/{site.code}" title="{site.short_desc or site.name}" style="color: var(--primary-blue); text-decoration: underline; text-decoration-style: dotted;">{site.name}</a>'
text = text.replace(site.name, link, 1)
linked_sites.add(site.name)
return text
# 初始化登录管理
login_manager = LoginManager()
login_manager.init_app(app)
login_manager.login_view = 'admin_login'
login_manager.login_message = '请先登录'
@login_manager.user_loader
def load_user(user_id):
return AdminModel.query.get(int(user_id))
# ========== 前台路由 ==========
@app.route('/')
def index():
"""首页"""
# 获取所有启用的标签
tags = Tag.query.order_by(Tag.sort_order.desc(), Tag.id).all()
# 优化使用一次SQL查询统计所有标签的网站数量
tag_counts = {}
if tags:
# 使用JOIN查询一次性获取所有标签的网站数量
from sqlalchemy import func
counts_query = db.session.query(
site_tags.c.tag_id,
func.count(site_tags.c.site_id).label('count')
).join(
Site, site_tags.c.site_id == Site.id
).filter(
Site.is_active == True
).group_by(site_tags.c.tag_id).all()
tag_counts = {tag_id: count for tag_id, count in counts_query}
# 获取筛选参数
tag_slug = request.args.get('tag')
search_query = request.args.get('q', '').strip()
current_tab = request.args.get('tab', 'latest') # 默认为"最新"
page = request.args.get('page', 1, type=int)
per_page = 100 # 每页显示100个站点
selected_tag = None
# 构建基础查询
query = Site.query.filter_by(is_active=True)
# 标签筛选
if tag_slug:
selected_tag = Tag.query.filter_by(slug=tag_slug).first()
if selected_tag:
query = query.filter(Site.tags.contains(selected_tag))
else:
sites = []
pagination = None
return render_template('index_new.html', sites=sites, tags=tags,
selected_tag=selected_tag, search_query=search_query,
pagination=pagination, tag_counts=tag_counts,
current_tab=current_tab)
# 搜索功能
if search_query:
# 使用OR条件搜索网站名称、URL、描述
search_pattern = f'%{search_query}%'
query = query.filter(
db.or_(
Site.name.like(search_pattern),
Site.url.like(search_pattern),
Site.description.like(search_pattern),
Site.short_desc.like(search_pattern)
)
)
# Tab筛选和排序
if current_tab == 'popular':
# 热门:按浏览次数倒序
query = query.order_by(Site.view_count.desc(), Site.id.desc())
elif current_tab == 'recommended':
# 推荐只显示is_recommended=True的
query = query.filter_by(is_recommended=True).order_by(Site.sort_order.desc(), Site.id.desc())
else:
# 最新:按创建时间倒序(默认)
query = query.order_by(Site.created_at.desc(), Site.id.desc())
# 分页
pagination = query.paginate(page=page, per_page=per_page, error_out=False)
sites = pagination.items
return render_template('index_new.html', sites=sites, tags=tags,
selected_tag=selected_tag, search_query=search_query,
pagination=pagination, tag_counts=tag_counts,
current_tab=current_tab)
@app.route('/site/<code>')
def site_detail(code):
"""网站详情页"""
site = Site.query.filter_by(code=code, is_active=True).first_or_404()
# 增加浏览次数
site.view_count += 1
db.session.commit()
# v2.6优化移除自动调用博查API的逻辑改为按需加载
# 只获取数据库中已有的新闻不再自动调用API
# 获取该网站的相关新闻最多显示5条
news_list = News.query.filter_by(
site_id=site.id,
is_active=True
).order_by(News.published_at.desc()).limit(5).all()
# 检查是否有新闻,如果没有则标记需要加载
has_news = len(news_list) > 0
# 获取同类工具推荐通过标签匹配最多显示4个
recommended_sites = []
if site.tags:
# 获取有相同标签的其他网站
recommended_sites = Site.query.filter(
Site.id != site.id,
Site.is_active == True,
Site.tags.any(Tag.id.in_([tag.id for tag in site.tags]))
).order_by(Site.view_count.desc()).limit(4).all()
return render_template('detail_new.html', site=site, news_list=news_list, has_news=has_news, recommended_sites=recommended_sites)
# ========== 新闻相关API (v2.6优化) ==========
@app.route('/api/fetch-news/<code>', methods=['POST'])
def fetch_news_for_site(code):
"""
按需获取指定网站的新闻
v2.6优化添加频率限制和验证码防护避免API被滥用
"""
try:
# 获取客户端IP
client_ip = get_client_ip(request)
# 获取频率限制器
limiter = get_rate_limiter()
# 检查是否需要验证码
captcha_required, captcha_reason = limiter.is_captcha_required(client_ip)
if captcha_required:
return jsonify({
'success': False,
'error': captcha_reason,
'require_captcha': True
}), 429
# 检查频率限制每小时3次
is_limited, remaining, reset_time = limiter.is_rate_limited(
client_ip,
action='news_fetch',
limit=3,
window_minutes=60
)
if is_limited:
# 触发频率限制,要求验证码
limiter.require_captcha(client_ip, duration_minutes=30)
return jsonify({
'success': False,
'error': f'请求过于频繁,请在 {reset_time.strftime("%H:%M")} 后重试',
'require_captcha': True,
'reset_time': reset_time.isoformat()
}), 429
# 检查验证码(如果提供了)
captcha_token = request.json.get('captcha_token') if request.json else None
if captcha_token:
# TODO: 配置实际的验证码服务reCAPTCHA或hCaptcha
verifier = CaptchaVerifier(service='simple')
success, error = verifier.verify(captcha_token, client_ip)
if success:
# 验证通过,清除验证码要求
limiter.clear_captcha_requirement(client_ip)
else:
return jsonify({
'success': False,
'error': f'验证码验证失败: {error}'
}), 400
# 记录本次请求
limiter.record_request(client_ip, action='news_fetch')
# 查找网站
site = Site.query.filter_by(code=code, is_active=True).first_or_404()
# 检查博查API配置
api_key = app.config.get('BOCHA_API_KEY')
if not api_key:
return jsonify({'success': False, 'error': '博查API未配置'}), 500
# 创建新闻搜索器
searcher = NewsSearcher(api_key)
# 获取新闻限制5条一周内的
news_items = searcher.search_site_news(
site_name=site.name,
site_url=site.url,
news_keywords=site.news_keywords,
count=5,
freshness='oneWeek'
)
# 保存新闻到数据库
new_count = 0
if news_items:
for item in news_items:
# 检查是否已存在根据URL去重
existing = News.query.filter_by(
site_id=site.id,
url=item['url']
).first()
if not existing:
news = News(
site_id=site.id,
title=item['title'],
content=item.get('summary') or item.get('snippet', ''),
url=item['url'],
source_name=item.get('site_name', ''),
source_icon=item.get('site_icon', ''),
published_at=item.get('published_at'),
news_type='Search Result',
is_active=True
)
db.session.add(news)
new_count += 1
db.session.commit()
# 返回最新的新闻列表
news_list = News.query.filter_by(
site_id=site.id,
is_active=True
).order_by(News.published_at.desc()).limit(5).all()
# 格式化新闻数据
news_data = []
for news in news_list:
news_data.append({
'title': news.title,
'content': news.content[:200] if news.content else '',
'url': news.url,
'source_name': news.source_name,
'source_icon': news.source_icon,
'published_at': news.published_at.strftime('%Y年%m月%d') if news.published_at else '未知日期',
'news_type': news.news_type
})
return jsonify({
'success': True,
'news': news_data,
'new_count': new_count,
'remaining_requests': remaining - 1, # 已经消耗了一次
'message': f'成功获取{new_count}条新资讯' if new_count > 0 else '暂无新资讯'
})
except Exception as e:
print(f"获取新闻失败:{str(e)}")
import traceback
traceback.print_exc()
db.session.rollback()
return jsonify({'success': False, 'error': str(e)}), 500
# ========== 社媒营销路由 (v2.5新增) ==========
@app.route('/api/generate-social-share', methods=['POST'])
def generate_social_share():
"""一键生成社媒分享文案(前台可访问)"""
try:
data = request.get_json() or {}
site_code = (data.get('site_code') or '').strip()
platforms = data.get('platforms') or []
if not site_code:
return jsonify({'success': False, 'message': '请提供网站编码'}), 400
site = Site.query.filter_by(code=site_code, is_active=True).first()
if not site:
return jsonify({'success': False, 'message': '网站不存在或未启用'}), 404
# 允许的平台列表(前端也会限制,这里再做一次兜底)
allowed_platforms = {
'universal',
'xiaohongshu',
'douyin',
'bilibili',
'wechat',
'moments',
'x',
'linkedin'
}
if not isinstance(platforms, list):
platforms = []
platforms = [p for p in platforms if isinstance(p, str)]
platforms = [p.strip().lower() for p in platforms if p.strip()]
platforms = [p for p in platforms if p in allowed_platforms]
# 默认生成通用模板
if not platforms:
platforms = ['universal']
# 组织站点信息
site_info = {
'name': site.name,
'url': site.url,
'short_desc': site.short_desc or '',
'description': (site.description or '')[:1200],
'features': (site.features or '')[:1200],
'tags': [t.name for t in (site.tags or [])]
}
# 尝试使用PromptTemplate + DeepSeek生成
from utils.tag_generator import TagGenerator
generator = TagGenerator()
prompt = PromptTemplate.query.filter_by(key='social_share', is_active=True).first()
generated = {}
if prompt and generator.client:
# 构造提示词变量
tags_text = ','.join(site_info['tags'])
user_prompt = prompt.user_prompt_template.format(
name=site_info['name'],
url=site_info['url'],
short_desc=site_info['short_desc'],
description=site_info['description'],
features=site_info['features'],
tags=tags_text,
platforms=','.join(platforms)
)
try:
resp = generator.client.chat.completions.create(
model='deepseek-chat',
messages=[
{'role': 'system', 'content': prompt.system_prompt},
{'role': 'user', 'content': user_prompt}
],
temperature=0.7,
max_tokens=1200
)
text = (resp.choices[0].message.content or '').strip()
# 约定模型返回JSON字符串若不是JSON则回退到纯文本放到universal
import json as _json
try:
parsed = _json.loads(text)
if isinstance(parsed, dict):
for k, v in parsed.items():
if isinstance(k, str) and isinstance(v, str) and k in allowed_platforms:
generated[k] = v.strip()
except Exception:
generated['universal'] = text
except Exception as e:
# AI失败不影响整体回退模板
print(f"社媒文案AI生成失败{str(e)}")
# 回退:规则模板(根据各平台字数限制优化)
def build_fallback(p):
name = site_info['name']
url = site_info['url']
short_desc = site_info['short_desc'] or ''
full_desc = site_info['description'] or ''
features = site_info['features'] or ''
tags = site_info['tags'][:6]
tags_line = ' ' + ' '.join([f"#{t}" for t in tags]) if tags else ''
# X (Twitter) - 280字符限制简洁为主
if p == 'x':
desc = (short_desc or full_desc)[:100]
base = f"🔥 {name}\n\n{desc}\n\n🔗 {url}{tags_line}"
return base[:280]
# LinkedIn - 3000字限制专业风格
if p == 'linkedin':
desc = (full_desc or short_desc)[:500]
content = f"💡 推荐一个实用工具:{name}\n\n📝 简介:\n{desc}\n\n"
if features:
features_list = features.split('\n')[:3]
content += f"✨ 主要功能:\n" + '\n'.join([f"{f.strip()}" for f in features_list if f.strip()][:3]) + "\n\n"
content += f"🔗 官网:{url}\n\n"
if tags:
content += f"📌 适用场景:{', '.join(tags)}"
return content[:3000]
# 小红书 - 2000字限制图文风格
if p == 'xiaohongshu':
desc = (short_desc or full_desc)[:300]
content = f"{name} | 我最近在用的宝藏工具\n\n"
content += f"📌 一句话介绍:\n{desc}\n\n"
if features:
features_list = features.split('\n')[:5]
content += f"🎯 核心功能:\n" + '\n'.join([f"{f.strip()}" for f in features_list if f.strip()][:5]) + "\n\n"
content += f"🔗 体验入口:{url}\n\n{tags_line}"
return content[:2000]
# 抖音 - 2000字限制短视频文案风格
if p == 'douyin':
desc = (short_desc or full_desc)[:200]
content = f"🔥 发现一个超好用的工具:{name}\n\n"
content += f"💡 {desc}\n\n"
if features:
features_list = features.split('\n')[:3]
content += "✨ 亮点功能:\n" + '\n'.join([f"👉 {f.strip()}" for f in features_list if f.strip()][:3]) + "\n\n"
content += f"🔗 想试试戳:{url}\n\n{tags_line}"
return content[:2000]
# B站 - 20000字限制专栏风格
if p == 'bilibili':
desc = (full_desc or short_desc)[:800]
content = f"【分享】{name} - 值得一试的优质工具\n\n"
content += f"## 📖 工具简介\n{desc}\n\n"
if features:
content += f"## ✨ 主要功能\n{features}\n\n"
content += f"## 🔗 体验地址\n{url}\n\n"
if tags:
content += f"## 🏷️ 相关标签\n{' / '.join(tags)}"
return content[:20000]
# 微信公众号 - 基本无限制,正式风格
if p == 'wechat':
desc = (full_desc or short_desc)[:600]
content = f"今天给大家分享一个实用工具:{name}\n\n"
content += f"📝 工具介绍\n{desc}\n\n"
if features:
content += f"✨ 核心功能\n{features}\n\n"
content += f"🔗 官方网站\n{url}\n\n"
if tags:
content += f"如果你也在关注「{' · '.join(tags)}」相关的工具,不妨收藏一下这个实用的选择。"
return content
# 朋友圈 - 简短为主
if p == 'moments':
desc = (short_desc or full_desc)[:80]
return f"💡 {name}\n{desc}\n🔗 {url}{tags_line}".strip()
# 通用模板
desc = (full_desc or short_desc)[:300]
return f"{name}\n\n{desc}\n\n{url}{tags_line}".strip()
results = {}
for p in platforms:
results[p] = generated.get(p) or build_fallback(p)
# 统一补充一个通用版本
if 'universal' not in results:
results['universal'] = generated.get('universal') or build_fallback('universal')
return jsonify({
'success': True,
'site': {
'code': site.code,
'name': site.name,
'url': site.url
},
'platforms': platforms,
'content': results
})
except Exception as e:
return jsonify({'success': False, 'message': f'生成失败: {str(e)}'}), 500
# ========== 后台登录路由 ==========
@app.route('/admin/login', methods=['GET', 'POST'])
def admin_login():
"""管理员登录"""
if current_user.is_authenticated:
return redirect(url_for('admin.index'))
if request.method == 'POST':
username = request.form.get('username')
password = request.form.get('password')
admin = AdminModel.query.filter_by(username=username).first()
if admin and admin.check_password(password) and admin.is_active:
login_user(admin)
admin.last_login = datetime.now()
db.session.commit()
return redirect(url_for('admin.index'))
else:
flash('用户名或密码错误', 'error')
return render_template('admin_login.html')
@app.route('/admin/logout')
@login_required
def admin_logout():
"""管理员登出"""
logout_user()
return redirect(url_for('index'))
@app.route('/admin/change-password', methods=['GET', 'POST'])
@login_required
def change_password():
"""修改密码"""
if request.method == 'POST':
old_password = request.form.get('old_password', '').strip()
new_password = request.form.get('new_password', '').strip()
confirm_password = request.form.get('confirm_password', '').strip()
# 验证旧密码
if not current_user.check_password(old_password):
flash('旧密码错误', 'error')
return render_template('admin/change_password.html')
# 验证新密码
if not new_password:
flash('新密码不能为空', 'error')
return render_template('admin/change_password.html')
if len(new_password) < 6:
flash('新密码长度至少6位', 'error')
return render_template('admin/change_password.html')
if new_password != confirm_password:
flash('两次输入的新密码不一致', 'error')
return render_template('admin/change_password.html')
if old_password == new_password:
flash('新密码不能与旧密码相同', 'error')
return render_template('admin/change_password.html')
# 修改密码
try:
current_user.set_password(new_password)
db.session.commit()
flash('密码修改成功,请重新登录', 'success')
logout_user()
return redirect(url_for('admin_login'))
except Exception as e:
db.session.rollback()
flash(f'密码修改失败:{str(e)}', 'error')
return render_template('admin/change_password.html')
return render_template('admin/change_password.html')
# ========== API路由 ==========
@app.route('/api/fetch-website-info', methods=['POST'])
@login_required
def fetch_website_info():
"""抓取网站信息API"""
try:
data = request.get_json()
url = data.get('url', '').strip()
if not url:
return jsonify({
'success': False,
'message': '请提供网站URL'
}), 400
# 创建抓取器
fetcher = WebsiteFetcher(timeout=15)
# 抓取网站信息
info = fetcher.fetch_website_info(url)
if not info:
return jsonify({
'success': False,
'message': '无法获取网站信息请检查URL是否正确或手动填写'
})
# 下载Logo到本地如果有
logo_path = None
if info.get('logo_url'):
logo_path = fetcher.download_logo(info['logo_url'], save_dir='static/logos')
# 如果下载失败不返回远程URL让用户手动上传
return jsonify({
'success': True,
'data': {
'name': info.get('name', ''),
'description': info.get('description', ''),
'logo': logo_path if logo_path else ''
}
})
except Exception as e:
return jsonify({
'success': False,
'message': f'抓取失败: {str(e)}'
}), 500
@app.route('/api/upload-logo', methods=['POST'])
@login_required
def upload_logo():
"""上传Logo图片API"""
try:
# 检查文件是否存在
if 'logo' not in request.files:
return jsonify({
'success': False,
'message': '请选择要上传的图片'
}), 400
file = request.files['logo']
# 检查文件名
if file.filename == '':
return jsonify({
'success': False,
'message': '未选择文件'
}), 400
# 检查文件类型
allowed_extensions = {'png', 'jpg', 'jpeg', 'gif', 'svg', 'ico', 'webp'}
filename = file.filename.lower()
if not any(filename.endswith('.' + ext) for ext in allowed_extensions):
return jsonify({
'success': False,
'message': '不支持的文件格式,请上传图片文件(png, jpg, gif, svg, ico, webp)'
}), 400
# 创建保存目录
save_dir = 'static/logos'
os.makedirs(save_dir, exist_ok=True)
# 生成安全的文件名
import time
import hashlib
ext = os.path.splitext(filename)[1]
timestamp = str(int(time.time() * 1000))
hash_name = hashlib.md5(f"{filename}{timestamp}".encode()).hexdigest()[:16]
safe_filename = f"logo_{hash_name}{ext}"
filepath = os.path.join(save_dir, safe_filename)
# 保存文件
file.save(filepath)
# 返回相对路径
return jsonify({
'success': True,
'path': f'/{filepath.replace(os.sep, "/")}'
})
except Exception as e:
return jsonify({
'success': False,
'message': f'上传失败: {str(e)}'
}), 500
@app.route('/api/generate-features', methods=['POST'])
@login_required
def generate_features():
"""使用DeepSeek自动生成网站主要功能"""
try:
data = request.get_json()
name = data.get('name', '').strip()
description = data.get('description', '').strip()
url = data.get('url', '').strip()
if not name or not description:
return jsonify({
'success': False,
'message': '请提供网站名称和描述'
}), 400
# 生成功能列表
generator = TagGenerator()
features = generator.generate_features(name, description, url)
if not features:
return jsonify({
'success': False,
'message': 'DeepSeek功能生成失败请检查API配置'
}), 500
return jsonify({
'success': True,
'features': features
})
except ValueError as e:
return jsonify({
'success': False,
'message': str(e)
}), 400
except Exception as e:
return jsonify({
'success': False,
'message': f'生成失败: {str(e)}'
}), 500
@app.route('/api/generate-description', methods=['POST'])
@login_required
def generate_description():
"""使用DeepSeek自动生成网站详细介绍"""
try:
data = request.get_json()
name = data.get('name', '').strip()
short_desc = data.get('short_desc', '').strip()
url = data.get('url', '').strip()
if not name:
return jsonify({
'success': False,
'message': '请提供网站名称'
}), 400
# 生成详细介绍
generator = TagGenerator()
description = generator.generate_description(name, short_desc, url)
if not description:
return jsonify({
'success': False,
'message': 'DeepSeek详细介绍生成失败请检查API配置'
}), 500
return jsonify({
'success': True,
'description': description
})
except ValueError as e:
return jsonify({
'success': False,
'message': str(e)
}), 400
except Exception as e:
return jsonify({
'success': False,
'message': f'生成失败: {str(e)}'
}), 500
@app.route('/api/generate-tags', methods=['POST'])
@login_required
def generate_tags():
"""使用DeepSeek自动生成标签"""
try:
data = request.get_json()
name = data.get('name', '').strip()
description = data.get('description', '').strip()
if not name or not description:
return jsonify({
'success': False,
'message': '请提供网站名称和描述'
}), 400
# 获取现有标签作为参考
existing_tags = [tag.name for tag in Tag.query.all()]
# 生成标签
generator = TagGenerator()
suggested_tags = generator.generate_tags(name, description, existing_tags)
if not suggested_tags:
return jsonify({
'success': False,
'message': 'DeepSeek标签生成失败请检查API配置'
}), 500
return jsonify({
'success': True,
'tags': suggested_tags
})
except ValueError as e:
return jsonify({
'success': False,
'message': str(e)
}), 400
except Exception as e:
return jsonify({
'success': False,
'message': f'生成失败: {str(e)}'
}), 500
# ========== 新闻获取路由 ==========
@app.route('/api/fetch-site-news', methods=['POST'])
@login_required
def fetch_site_news():
"""为指定网站获取最新新闻"""
try:
data = request.get_json()
site_id = data.get('site_id')
count = data.get('count', app.config.get('NEWS_SEARCH_COUNT', 10))
freshness = data.get('freshness', app.config.get('NEWS_SEARCH_FRESHNESS', 'oneMonth'))
if not site_id:
return jsonify({
'success': False,
'message': '请提供网站ID'
}), 400
# 获取网站信息
site = Site.query.get(site_id)
if not site:
return jsonify({
'success': False,
'message': '网站不存在'
}), 404
# 检查博查API配置
api_key = app.config.get('BOCHA_API_KEY')
if not api_key:
return jsonify({
'success': False,
'message': '博查API未配置请在.env文件中设置BOCHA_API_KEY'
}), 500
# 创建新闻搜索器
searcher = NewsSearcher(api_key)
# 搜索新闻
news_items = searcher.search_site_news(
site_name=site.name,
site_url=site.url,
news_keywords=site.news_keywords, # v2.3新增:使用专用关键词
count=count,
freshness=freshness
)
if not news_items:
return jsonify({
'success': False,
'message': '未找到相关新闻'
}), 404
# 保存新闻到数据库
saved_count = 0
for item in news_items:
# 检查新闻是否已存在根据URL判断
existing_news = News.query.filter_by(
site_id=site_id,
url=item['url']
).first()
if not existing_news:
# 创建新闻记录
news = News(
site_id=site_id,
title=item['title'],
content=item.get('summary') or item.get('snippet', ''),
url=item['url'],
source_name=item.get('site_name', ''),
source_icon=item.get('site_icon', ''),
published_at=item.get('published_at'),
news_type='Search Result',
is_active=True
)
db.session.add(news)
saved_count += 1
# 提交事务
db.session.commit()
return jsonify({
'success': True,
'message': f'成功获取并保存 {saved_count} 条新闻',
'total_found': len(news_items),
'saved': saved_count,
'news_items': searcher.format_news_for_display(news_items)
})
except Exception as e:
db.session.rollback()
return jsonify({
'success': False,
'message': f'获取失败: {str(e)}'
}), 500
@app.route('/api/fetch-all-news', methods=['POST'])
@login_required
def fetch_all_news():
"""批量为所有网站获取新闻"""
try:
data = request.get_json()
count_per_site = data.get('count', 5) # 每个网站获取的新闻数量
freshness = data.get('freshness', app.config.get('NEWS_SEARCH_FRESHNESS', 'oneMonth'))
limit = data.get('limit', 10) # 限制处理的网站数量
# 检查博查API配置
api_key = app.config.get('BOCHA_API_KEY')
if not api_key:
return jsonify({
'success': False,
'message': '博查API未配置请在.env文件中设置BOCHA_API_KEY'
}), 500
# 获取启用的网站(按更新时间排序,优先处理旧的)
sites = Site.query.filter_by(is_active=True).order_by(Site.updated_at).limit(limit).all()
if not sites:
return jsonify({
'success': False,
'message': '没有可用的网站'
}), 404
# 创建新闻搜索器
searcher = NewsSearcher(api_key)
# 统计信息
total_saved = 0
total_found = 0
processed_sites = []
# 为每个网站获取新闻
for site in sites:
try:
# 搜索新闻
news_items = searcher.search_site_news(
site_name=site.name,
site_url=site.url,
news_keywords=site.news_keywords, # v2.3新增:使用专用关键词
count=count_per_site,
freshness=freshness
)
site_saved = 0
for item in news_items:
# 检查是否已存在
existing_news = News.query.filter_by(
site_id=site.id,
url=item['url']
).first()
if not existing_news:
news = News(
site_id=site.id,
title=item['title'],
content=item.get('summary') or item.get('snippet', ''),
url=item['url'],
source_name=item.get('site_name', ''),
source_icon=item.get('site_icon', ''),
published_at=item.get('published_at'),
news_type='Search Result',
is_active=True
)
db.session.add(news)
site_saved += 1
total_found += len(news_items)
total_saved += site_saved
processed_sites.append({
'id': site.id,
'name': site.name,
'found': len(news_items),
'saved': site_saved
})
except Exception as e:
# 单个网站失败不影响其他网站
processed_sites.append({
'id': site.id,
'name': site.name,
'error': str(e)
})
continue
# 提交事务
db.session.commit()
return jsonify({
'success': True,
'message': f'批量获取完成,共处理 {len(processed_sites)} 个网站',
'total_found': total_found,
'total_saved': total_saved,
'processed_sites': processed_sites
})
except Exception as e:
db.session.rollback()
return jsonify({
'success': False,
'message': f'批量获取失败: {str(e)}'
}), 500
# ========== SEO路由 (v2.4新增) ==========
@app.route('/sitemap.xml')
def sitemap():
"""动态生成sitemap.xml"""
from flask import make_response
# 获取所有启用的网站
sites = Site.query.filter_by(is_active=True).order_by(Site.updated_at.desc()).all()
# 获取所有标签
tags = Tag.query.all()
# 构建XML内容
xml_content = '''<?xml version="1.0" encoding="UTF-8"?>
<urlset xmlns="http://www.sitemaps.org/schemas/sitemap/0.9">'''
# 首页
xml_content += '''
<url>
<loc>{}</loc>
<changefreq>daily</changefreq>
<priority>1.0</priority>
</url>'''.format(request.url_root.rstrip('/'))
# 工具详情页
for site in sites:
xml_content += '''
<url>
<loc>{}</loc>
<lastmod>{}</lastmod>
<changefreq>weekly</changefreq>
<priority>0.8</priority>
</url>'''.format(
request.url_root.rstrip('/') + url_for('site_detail', code=site.code),
site.updated_at.strftime('%Y-%m-%d') if site.updated_at else datetime.now().strftime('%Y-%m-%d')
)
# 标签页
for tag in tags:
xml_content += '''
<url>
<loc>{}</loc>
<changefreq>weekly</changefreq>
<priority>0.6</priority>
</url>'''.format(request.url_root.rstrip('/') + '/?tag=' + tag.slug)
xml_content += '''
</urlset>'''
response = make_response(xml_content)
response.headers['Content-Type'] = 'application/xml; charset=utf-8'
return response
@app.route('/robots.txt')
def robots():
"""动态生成robots.txt"""
from flask import make_response
robots_content = '''User-agent: *
Allow: /
Disallow: /admin/
Disallow: /api/
Sitemap: {}sitemap.xml
'''.format(request.url_root)
response = make_response(robots_content)
response.headers['Content-Type'] = 'text/plain; charset=utf-8'
return response
# ========== SEO工具管理路由 (v2.4新增) ==========
@app.route('/admin/seo-tools')
@login_required
def seo_tools():
"""SEO工具管理页面"""
# 检查static/sitemap.xml是否存在及最后更新时间
sitemap_path = 'static/sitemap.xml'
sitemap_info = None
if os.path.exists(sitemap_path):
import time
mtime = os.path.getmtime(sitemap_path)
sitemap_info = {
'exists': True,
'last_updated': datetime.fromtimestamp(mtime).strftime('%Y-%m-%d %H:%M:%S'),
'size': os.path.getsize(sitemap_path)
}
else:
sitemap_info = {'exists': False}
return render_template('admin/seo_tools.html', sitemap_info=sitemap_info)
@app.route('/api/generate-static-sitemap', methods=['POST'])
@login_required
def generate_static_sitemap():
"""生成静态sitemap.xml文件"""
try:
# 获取所有启用的网站
sites = Site.query.filter_by(is_active=True).order_by(Site.updated_at.desc()).all()
# 获取所有标签
tags = Tag.query.all()
# 构建XML内容使用网站配置的域名
base_url = request.url_root.rstrip('/')
xml_content = '''<?xml version="1.0" encoding="UTF-8"?>
<urlset xmlns="http://www.sitemaps.org/schemas/sitemap/0.9">'''
# 首页
xml_content += f'''
<url>
<loc>{base_url}</loc>
<changefreq>daily</changefreq>
<priority>1.0</priority>
</url>'''
# 工具详情页
for site in sites:
xml_content += f'''
<url>
<loc>{base_url}/site/{site.code}</loc>
<lastmod>{site.updated_at.strftime('%Y-%m-%d') if site.updated_at else datetime.now().strftime('%Y-%m-%d')}</lastmod>
<changefreq>weekly</changefreq>
<priority>0.8</priority>
</url>'''
# 标签页
for tag in tags:
xml_content += f'''
<url>
<loc>{base_url}/?tag={tag.slug}</loc>
<changefreq>weekly</changefreq>
<priority>0.6</priority>
</url>'''
xml_content += '''
</urlset>'''
# 保存到static目录
static_dir = 'static'
os.makedirs(static_dir, exist_ok=True)
sitemap_path = os.path.join(static_dir, 'sitemap.xml')
with open(sitemap_path, 'w', encoding='utf-8') as f:
f.write(xml_content)
# 统计信息
total_urls = 1 + len(sites) + len(tags) # 首页 + 工具页 + 标签页
return jsonify({
'success': True,
'message': f'静态sitemap.xml生成成功共包含 {total_urls} 个URL',
'total_urls': total_urls,
'file_path': sitemap_path,
'timestamp': datetime.now().strftime('%Y-%m-%d %H:%M:%S')
})
except Exception as e:
return jsonify({
'success': False,
'message': f'生成失败: {str(e)}'
}), 500
@app.route('/api/notify-search-engines', methods=['POST'])
@login_required
def notify_search_engines():
"""通知搜索引擎sitemap更新"""
try:
import requests
from urllib.parse import quote
# 获取sitemap URL使用当前请求的域名
sitemap_url = request.url_root.rstrip('/') + '/sitemap.xml'
encoded_sitemap_url = quote(sitemap_url, safe='')
results = []
# 1. 通知Google
google_ping_url = f'http://www.google.com/ping?sitemap={encoded_sitemap_url}'
try:
google_response = requests.get(google_ping_url, timeout=10)
results.append({
'engine': 'Google',
'status': 'success' if google_response.status_code == 200 else 'failed',
'status_code': google_response.status_code,
'message': '提交成功' if google_response.status_code == 200 else f'HTTP {google_response.status_code}'
})
except Exception as e:
results.append({
'engine': 'Google',
'status': 'error',
'message': f'请求失败: {str(e)}'
})
# 2. 通知Baidu
baidu_ping_url = f'http://data.zz.baidu.com/ping?sitemap={encoded_sitemap_url}'
try:
baidu_response = requests.get(baidu_ping_url, timeout=10)
results.append({
'engine': 'Baidu',
'status': 'success' if baidu_response.status_code == 200 else 'failed',
'status_code': baidu_response.status_code,
'message': '提交成功' if baidu_response.status_code == 200 else f'HTTP {baidu_response.status_code}'
})
except Exception as e:
results.append({
'engine': 'Baidu',
'status': 'error',
'message': f'请求失败: {str(e)}'
})
# 3. 通知Bing
bing_ping_url = f'http://www.bing.com/ping?sitemap={encoded_sitemap_url}'
try:
bing_response = requests.get(bing_ping_url, timeout=10)
results.append({
'engine': 'Bing',
'status': 'success' if bing_response.status_code == 200 else 'failed',
'status_code': bing_response.status_code,
'message': '提交成功' if bing_response.status_code == 200 else f'HTTP {bing_response.status_code}'
})
except Exception as e:
results.append({
'engine': 'Bing',
'status': 'error',
'message': f'请求失败: {str(e)}'
})
# 统计成功数量
success_count = sum(1 for r in results if r['status'] == 'success')
return jsonify({
'success': True,
'message': f'已通知 {success_count}/{len(results)} 个搜索引擎',
'sitemap_url': sitemap_url,
'results': results
})
except Exception as e:
return jsonify({
'success': False,
'message': f'通知失败: {str(e)}'
}), 500
@app.route('/api/refresh-site-news/<site_code>', methods=['POST'])
def refresh_site_news(site_code):
"""手动刷新指定网站的新闻(前台用户可访问)- v2.3新增"""
try:
# 根据code查找网站
site = Site.query.filter_by(code=site_code).first()
if not site:
return jsonify({
'success': False,
'message': '网站不存在'
}), 404
# 检查博查API配置
api_key = app.config.get('BOCHA_API_KEY')
if not api_key:
return jsonify({
'success': False,
'message': '新闻功能未启用'
}), 500
# 创建新闻搜索器
searcher = NewsSearcher(api_key)
# 搜索新闻获取最新5条
news_items = searcher.search_site_news(
site_name=site.name,
site_url=site.url,
news_keywords=site.news_keywords, # 使用专用关键词
count=5,
freshness='oneWeek' # 一周内的新闻
)
if not news_items:
return jsonify({
'success': False,
'message': '未找到相关新闻'
}), 404
# 保存新闻到数据库
saved_count = 0
for item in news_items:
# 检查新闻是否已存在根据URL判断
existing_news = News.query.filter_by(
site_id=site.id,
url=item['url']
).first()
if not existing_news:
news = News(
site_id=site.id,
title=item['title'],
content=item.get('summary') or item.get('snippet', ''),
url=item['url'],
source_name=item.get('site_name', ''),
source_icon=item.get('site_icon', ''),
published_at=item.get('published_at'),
news_type='Search Result',
is_active=True
)
db.session.add(news)
saved_count += 1
# 提交事务
db.session.commit()
return jsonify({
'success': True,
'message': f'成功获取 {saved_count} 条新资讯',
'total_found': len(news_items),
'saved_count': saved_count
})
except Exception as e:
db.session.rollback()
return jsonify({
'success': False,
'message': f'获取失败: {str(e)}'
}), 500
# ========== 批量导入路由 ==========
@app.route('/admin/batch-import', methods=['GET', 'POST'])
@login_required
def batch_import():
"""批量导入网站"""
from utils.bookmark_parser import BookmarkParser
from utils.website_fetcher import WebsiteFetcher
results = None
if request.method == 'POST':
import_type = request.form.get('import_type')
auto_activate = request.form.get('auto_activate') == 'on'
parser = BookmarkParser()
fetcher = WebsiteFetcher(timeout=15)
urls_to_import = []
try:
# 解析输入
if import_type == 'url_list':
url_list_text = request.form.get('url_list', '')
urls_to_import = parser.parse_url_list(url_list_text)
elif import_type == 'bookmark_file':
bookmark_file = request.files.get('bookmark_file')
if not bookmark_file:
flash('请选择书签文件', 'error')
return render_template('admin/batch_import.html')
html_content = bookmark_file.read().decode('utf-8', errors='ignore')
all_bookmarks = parser.parse_html_file(html_content)
# 筛选文件夹(如果指定)
folder_filter = request.form.get('folder_filter', '').strip()
if folder_filter:
urls_to_import = [
b for b in all_bookmarks
if folder_filter.lower() in b.get('folder', '').lower()
]
else:
urls_to_import = all_bookmarks
# 批量导入
success_list = []
failed_list = []
for idx, item in enumerate(urls_to_import, 1):
url = item['url']
name = item.get('name', '')
# 为每个URL创建独立的事务
try:
# 1. 检查URL是否已存在
try:
existing = Site.query.filter_by(url=url).first()
if existing:
failed_list.append({
'url': url,
'name': name or existing.name,
'error': f'该URL已存在网站名称{existing.name}'
})
continue
except Exception as e:
failed_list.append({
'url': url,
'name': name,
'error': f'检查URL时出错: {str(e)}'
})
continue
# 2. 抓取网站信息(带超时和错误处理)
info = None
try:
info = fetcher.fetch_website_info(url)
except Exception as e:
print(f"抓取 {url} 失败: {str(e)}")
# 抓取失败不是致命错误,继续尝试使用书签名称
# 3. 处理网站信息
if not info or not info.get('name'):
# 如果有书签名称,使用书签名称
if name:
info = {
'name': name,
'description': '',
'logo_url': ''
}
else:
# 尝试从URL提取域名作为名称
from urllib.parse import urlparse
try:
parsed = urlparse(url)
domain = parsed.netloc or parsed.path
if domain:
info = {
'name': domain,
'description': '',
'logo_url': ''
}
else:
failed_list.append({
'url': url,
'name': name,
'error': '无法获取网站信息且没有备用名称'
})
continue
except Exception:
failed_list.append({
'url': url,
'name': name,
'error': '无法获取网站信息且URL解析失败'
})
continue
# 4. 下载Logo到本地失败不影响导入
logo_path = None
if info.get('logo_url'):
try:
logo_path = fetcher.download_logo(info['logo_url'], save_dir='static/logos')
except Exception as e:
print(f"下载Logo失败 ({url}): {str(e)}")
# Logo下载失败不影响网站导入
# 5. 生成code和slug
try:
import random
from pypinyin import lazy_pinyin
import re
# 生成唯一的code
site_code = None
max_attempts = 10
for _ in range(max_attempts):
code = str(random.randint(10000000, 99999999))
if not Site.query.filter_by(code=code).first():
site_code = code
break
if not site_code:
# 如果10次都失败使用时间戳
import time
site_code = str(int(time.time() * 1000))[-8:]
# 生成slug
site_name = info.get('name', name or 'Unknown')[:100]
slug = ''.join(lazy_pinyin(site_name))
slug = slug.lower()
slug = re.sub(r'[^\w\s-]', '', slug)
slug = re.sub(r'[-\s]+', '-', slug).strip('-')
if not slug:
slug = f"site-{site_code}"
# 确保slug唯一
base_slug = slug[:50] # 限制长度
counter = 1
final_slug = slug
while Site.query.filter_by(slug=final_slug).first():
final_slug = f"{base_slug}-{counter}"
counter += 1
if counter > 100: # 防止无限循环
final_slug = f"{base_slug}-{site_code}"
break
# 6. 创建网站记录带code和slug
site = Site(
code=site_code,
slug=final_slug,
name=site_name,
url=url[:500], # 限制URL长度
logo=logo_path or info.get('logo_url', '')[:500] if info.get('logo_url') else '',
short_desc=info.get('description', '')[:200] if info.get('description') else '',
description=info.get('description', '')[:2000] if info.get('description') else '',
is_active=auto_activate
)
# 添加到数据库并提交
db.session.add(site)
db.session.commit()
success_list.append({
'name': site.name,
'url': site.url
})
print(f"成功导入 [{idx}/{len(urls_to_import)}]: {site.name}")
except Exception as e:
db.session.rollback()
failed_list.append({
'url': url,
'name': name or info.get('name', 'Unknown'),
'error': f'数据库保存失败: {str(e)}'
})
continue
except Exception as e:
# 捕获所有未预期的错误
db.session.rollback()
failed_list.append({
'url': url,
'name': name,
'error': f'未知错误: {str(e)}'
})
print(f"导入 {url} 时发生未知错误: {str(e)}")
continue
results = {
'total_count': len(urls_to_import),
'success_count': len(success_list),
'failed_count': len(failed_list),
'success_list': success_list,
'failed_list': failed_list
}
if success_list:
flash(f'成功导入 {len(success_list)} 个网站!', 'success')
except Exception as e:
flash(f'导入失败: {str(e)}', 'error')
return render_template('admin/batch_import.html', results=results)
# ========== Flask-Admin 配置 ==========
class SecureModelView(ModelView):
"""需要登录的模型视图"""
# 中文化配置
can_set_page_size = True
page_size = 20
# 自定义文本
list_template = 'admin/model/list.html'
create_template = 'admin/model/create.html'
edit_template = 'admin/model/edit.html'
# 覆盖英文文本
named_filter_urls = True
def is_accessible(self):
return current_user.is_authenticated
def inaccessible_callback(self, name, **kwargs):
return redirect(url_for('admin_login'))
class SecureAdminIndexView(AdminIndexView):
"""需要登录的管理首页"""
def is_accessible(self):
return current_user.is_authenticated
def inaccessible_callback(self, name, **kwargs):
return redirect(url_for('admin_login'))
@expose('/')
def index(self):
"""控制台首页,显示统计信息"""
# 统计数据
stats = {
'sites_count': Site.query.filter_by(is_active=True).count(),
'tags_count': Tag.query.count(),
'news_count': News.query.filter_by(is_active=True).count(),
'total_views': db.session.query(db.func.sum(Site.view_count)).scalar() or 0
}
# 最近添加的工具最多5个
recent_sites = Site.query.order_by(Site.created_at.desc()).limit(5).all()
return self.render('admin/index.html', stats=stats, recent_sites=recent_sites)
# 网站管理视图
class SiteAdmin(SecureModelView):
# 自定义模板
create_template = 'admin/site/create.html'
edit_template = 'admin/site/edit.html'
# 启用编辑和删除
can_edit = True
can_delete = True
can_create = True
can_view_details = False # 禁用查看详情,点击名称即可查看
# 显示操作列
column_display_actions = True
action_disallowed_list = []
column_list = ['id', 'code', 'name', 'url', 'slug', 'is_active', 'is_recommended', 'view_count', 'created_at']
column_searchable_list = ['code', 'name', 'url', 'description']
column_filters = ['is_active', 'is_recommended', 'tags']
column_labels = {
'id': 'ID',
'code': '网站编码',
'name': '网站名称',
'url': 'URL',
'slug': 'URL别名',
'logo': 'Logo',
'short_desc': '简短描述',
'description': '详细介绍',
'features': '主要功能',
'news_keywords': '新闻关键词',
'is_active': '是否启用',
'is_recommended': '是否推荐',
'view_count': '浏览次数',
'sort_order': '排序权重',
'tags': '标签',
'created_at': '创建时间',
'updated_at': '更新时间'
}
form_columns = ['name', 'url', 'slug', 'logo', 'short_desc', 'description', 'features', 'news_keywords', 'tags', 'is_active', 'is_recommended', 'sort_order']
# 自定义编辑/删除文字
column_extra_row_actions = None
def on_model_change(self, form, model, is_created):
"""保存前自动生成code和slug如果为空"""
import re
import random
from pypinyin import lazy_pinyin
from flask import request
# 使用no_autoflush防止在查询时触发提前flush
with db.session.no_autoflush:
# 处理手动输入的新标签
new_tags_str = request.form.get('new_tags', '')
if new_tags_str:
new_tag_names = [name.strip() for name in new_tags_str.split(',') if name.strip()]
for tag_name in new_tag_names:
# 检查标签是否已存在
existing_tag = Tag.query.filter_by(name=tag_name).first()
if not existing_tag:
# 创建新标签
tag_slug = ''.join(lazy_pinyin(tag_name))
tag_slug = tag_slug.lower()
tag_slug = re.sub(r'[^\w\s-]', '', tag_slug)
tag_slug = re.sub(r'[-\s]+', '-', tag_slug).strip('-')
# 确保slug唯一
base_tag_slug = tag_slug[:50]
counter = 1
final_tag_slug = tag_slug
while Tag.query.filter_by(slug=final_tag_slug).first():
final_tag_slug = f"{base_tag_slug}-{counter}"
counter += 1
if counter > 100:
final_tag_slug = f"{base_tag_slug}-{random.randint(1000, 9999)}"
break
new_tag = Tag(name=tag_name, slug=final_tag_slug)
db.session.add(new_tag)
db.session.flush() # 确保新标签有ID
# 添加到模型的标签列表
if new_tag not in model.tags:
model.tags.append(new_tag)
else:
# 添加已存在的标签
if existing_tag not in model.tags:
model.tags.append(existing_tag)
# 如果code为空自动生成唯一的8位数字编码
if not model.code or model.code.strip() == '':
max_attempts = 10
for attempt in range(max_attempts):
# 生成10000000-99999999之间的随机数
code = str(random.randint(10000000, 99999999))
# 检查是否已存在
existing = Site.query.filter(Site.code == code).first()
if not existing or existing.id == model.id:
model.code = code
break
# 如果10次都失败使用时间戳
if not model.code:
import time
model.code = str(int(time.time() * 1000))[-8:]
# 如果slug为空从name自动生成
if not model.slug or model.slug.strip() == '':
# 将中文转换为拼音
slug = ''.join(lazy_pinyin(model.name))
# 转换为小写,移除特殊字符
slug = slug.lower()
slug = re.sub(r'[^\w\s-]', '', slug)
slug = re.sub(r'[-\s]+', '-', slug).strip('-')
# 如果转换后为空使用code
if not slug:
slug = f"site-{model.code}"
# 确保slug唯一限制长度和重试次数
base_slug = slug[:50]
counter = 1
final_slug = slug
max_slug_attempts = 100
while counter < max_slug_attempts:
existing = Site.query.filter(Site.slug == final_slug).first()
if not existing or existing.id == model.id:
break
final_slug = f"{base_slug}-{counter}"
counter += 1
# 如果100次都失败使用code确保唯一
if counter >= max_slug_attempts:
final_slug = f"{base_slug}-{model.code}"
model.slug = final_slug
# 标签管理视图
class TagAdmin(SecureModelView):
can_edit = True
can_delete = True
can_create = True
can_view_details = False
# 显示操作列
column_display_actions = True
column_list = ['id', 'name', 'slug', 'description', 'sort_order']
column_searchable_list = ['name', 'description']
column_labels = {
'id': 'ID',
'name': '标签名称',
'slug': 'URL别名',
'description': '标签描述',
'seo_title': 'SEO标题 (v2.4)',
'seo_description': 'SEO描述 (v2.4)',
'seo_keywords': 'SEO关键词 (v2.4)',
'icon': '图标',
'sort_order': '排序权重',
'created_at': '创建时间'
}
form_columns = ['name', 'slug', 'description', 'seo_title', 'seo_description', 'seo_keywords', 'icon', 'sort_order']
# 管理员视图
class AdminAdmin(SecureModelView):
can_edit = True
can_delete = True
can_create = True
can_view_details = False
# 显示操作列
column_display_actions = True
column_list = ['id', 'username', 'email', 'is_active', 'last_login', 'created_at']
column_searchable_list = ['username', 'email']
column_filters = ['is_active']
column_labels = {
'id': 'ID',
'username': '用户名',
'email': '邮箱',
'is_active': '是否启用',
'created_at': '创建时间',
'last_login': '最后登录'
}
form_columns = ['username', 'email', 'is_active']
def on_model_change(self, form, model, is_created):
# 如果是新建管理员,设置默认密码
if is_created:
model.set_password('admin123') # 默认密码
# 新闻管理视图
class NewsAdmin(SecureModelView):
can_edit = True
can_delete = True
can_create = True
can_view_details = False
# 显示操作列
column_display_actions = True
column_list = ['id', 'site', 'title', 'source_name', 'news_type', 'published_at', 'is_active']
column_searchable_list = ['title', 'content', 'source_name']
column_filters = ['site', 'news_type', 'source_name', 'is_active', 'published_at']
column_labels = {
'id': 'ID',
'site': '关联网站',
'title': '新闻标题',
'content': '新闻内容',
'news_type': '新闻类型',
'url': '新闻链接',
'source_name': '来源网站',
'source_icon': '来源图标',
'published_at': '发布时间',
'is_active': '是否启用',
'created_at': '创建时间',
'updated_at': '更新时间'
}
form_columns = ['site', 'title', 'content', 'news_type', 'url', 'source_name', 'source_icon', 'published_at', 'is_active']
# 可选的新闻类型
form_choices = {
'news_type': [
('Search Result', 'Search Result'),
('Product Update', 'Product Update'),
('Industry News', 'Industry News'),
('Company News', 'Company News'),
('Other', 'Other')
]
}
# 默认排序
column_default_sort = ('published_at', True) # 按发布时间倒序排列
# Prompt模板管理视图
class PromptAdmin(SecureModelView):
can_edit = True
can_delete = False # 不允许删除避免系统必需的prompt被删除
can_create = True
can_view_details = False
# 显示操作列
column_display_actions = True
column_list = ['id', 'key', 'name', 'description', 'is_active', 'updated_at']
column_searchable_list = ['key', 'name', 'description']
column_filters = ['is_active', 'key']
column_labels = {
'id': 'ID',
'key': '唯一标识',
'name': '模板名称',
'system_prompt': '系统提示词',
'user_prompt_template': '用户提示词模板',
'description': '模板说明',
'is_active': '是否启用',
'created_at': '创建时间',
'updated_at': '更新时间'
}
form_columns = ['key', 'name', 'description', 'system_prompt', 'user_prompt_template', 'is_active']
# 字段说明
column_descriptions = {
'key': '唯一标识,如: tags, features, description',
'system_prompt': 'AI的系统角色设定',
'user_prompt_template': '用户提示词模板,支持变量如 {name}, {description}, {url}',
}
# 表单字段配置
form_widget_args = {
'system_prompt': {
'rows': 3,
'style': 'font-family: monospace;'
},
'user_prompt_template': {
'rows': 20,
'style': 'font-family: monospace;'
}
}
# 初始化 Flask-Admin
admin = Admin(
app,
name='ZJPB 焦提示词',
template_mode='bootstrap4',
index_view=SecureAdminIndexView(name='控制台', url='/admin'),
base_template='admin/master.html'
)
admin.add_view(SiteAdmin(Site, db.session, name='网站管理'))
admin.add_view(TagAdmin(Tag, db.session, name='标签管理'))
admin.add_view(NewsAdmin(News, db.session, name='新闻管理'))
admin.add_view(PromptAdmin(PromptTemplate, db.session, name='Prompt管理'))
admin.add_view(AdminAdmin(AdminModel, db.session, name='管理员', endpoint='admin_users'))
return app
if __name__ == '__main__':
app = create_app(os.getenv('FLASK_ENV', 'development'))
app.run(host='0.0.0.0', port=5000, debug=True)