import os import markdown import random import string import secrets from io import BytesIO from flask import Flask, render_template, redirect, url_for, request, flash, jsonify, session, send_file from flask_login import LoginManager, login_user, logout_user, login_required, current_user from flask_admin import Admin, AdminIndexView, expose from flask_admin.contrib.sqla import ModelView from datetime import datetime, timedelta from config import config from models import db, Site, Tag, Admin as AdminModel, News, site_tags, PromptTemplate, User, Folder, Collection from utils.website_fetcher import WebsiteFetcher from utils.tag_generator import TagGenerator from utils.news_searcher import NewsSearcher from utils.email_sender import EmailSender from PIL import Image, ImageDraw, ImageFont def create_app(config_name='default'): """应用工厂函数""" app = Flask(__name__) # 加载配置 app.config.from_object(config[config_name]) # 初始化数据库 db.init_app(app) # 添加Markdown过滤器 @app.template_filter('markdown') def markdown_filter(text): """将Markdown文本转换为HTML""" if not text: return '' return markdown.markdown(text, extensions=['nl2br', 'fenced_code']) # v2.4新增: 自动内链过滤器 @app.template_filter('auto_link') def auto_link_filter(text, current_site_id=None): """自动为内容中的工具名称添加链接""" if not text: return '' # 获取所有启用的网站(排除当前网站) sites = Site.query.filter_by(is_active=True).all() if current_site_id: sites = [s for s in sites if s.id != current_site_id] # 按名称长度降序排序,优先匹配长名称 sites = sorted(sites, key=lambda s: len(s.name), reverse=True) # 记录已经添加链接的位置,避免重复 linked_sites = set() for site in sites: if site.name in text and site.name not in linked_sites: # 只链接第一次出现的位置 link = f'{site.name}' text = text.replace(site.name, link, 1) linked_sites.add(site.name) return text # 初始化登录管理 login_manager = LoginManager() login_manager.init_app(app) login_manager.login_view = 'admin_login' login_manager.login_message = '请先登录' @login_manager.user_loader def load_user(user_id): """加载用户(支持Admin和User两种类型)""" try: user_type, uid = user_id.split(':', 1) if user_type == 'admin': return AdminModel.query.get(int(uid)) elif user_type == 'user': return User.query.get(int(uid)) except (ValueError, AttributeError): # 兼容旧格式(纯数字ID,默认为Admin) return AdminModel.query.get(int(user_id)) return None # ========== 前台路由 ========== @app.route('/') def index(): """首页""" # 获取所有启用的标签 tags = Tag.query.order_by(Tag.sort_order.desc(), Tag.id).all() # 优化:使用一次SQL查询统计所有标签的网站数量 tag_counts = {} if tags: # 使用JOIN查询一次性获取所有标签的网站数量 from sqlalchemy import func counts_query = db.session.query( site_tags.c.tag_id, func.count(site_tags.c.site_id).label('count') ).join( Site, site_tags.c.site_id == Site.id ).filter( Site.is_active == True ).group_by(site_tags.c.tag_id).all() tag_counts = {tag_id: count for tag_id, count in counts_query} # 获取筛选参数 tag_slug = request.args.get('tag') search_query = request.args.get('q', '').strip() current_tab = request.args.get('tab', 'latest') # 默认为"最新" page = request.args.get('page', 1, type=int) per_page = 100 # 每页显示100个站点 selected_tag = None # 构建基础查询 query = Site.query.filter_by(is_active=True) # 标签筛选 if tag_slug: selected_tag = Tag.query.filter_by(slug=tag_slug).first() if selected_tag: query = query.filter(Site.tags.contains(selected_tag)) else: sites = [] pagination = None return render_template('index_new.html', sites=sites, tags=tags, selected_tag=selected_tag, search_query=search_query, pagination=pagination, tag_counts=tag_counts, current_tab=current_tab) # 搜索功能 if search_query: # 使用OR条件搜索:网站名称、URL、描述 search_pattern = f'%{search_query}%' query = query.filter( db.or_( Site.name.like(search_pattern), Site.url.like(search_pattern), Site.description.like(search_pattern), Site.short_desc.like(search_pattern) ) ) # Tab筛选和排序 if current_tab == 'popular': # 热门:按浏览次数倒序 query = query.order_by(Site.view_count.desc(), Site.id.desc()) elif current_tab == 'recommended': # 推荐:只显示is_recommended=True的 query = query.filter_by(is_recommended=True).order_by(Site.sort_order.desc(), Site.id.desc()) else: # 最新:按创建时间倒序(默认) query = query.order_by(Site.created_at.desc(), Site.id.desc()) # 分页 pagination = query.paginate(page=page, per_page=per_page, error_out=False) sites = pagination.items return render_template('index_new.html', sites=sites, tags=tags, selected_tag=selected_tag, search_query=search_query, pagination=pagination, tag_counts=tag_counts, current_tab=current_tab) @app.route('/site/') def site_detail(code): """网站详情页""" site = Site.query.filter_by(code=code, is_active=True).first_or_404() # 增加浏览次数 site.view_count += 1 db.session.commit() # v2.6优化:移除自动调用博查API的逻辑,改为按需加载 # 只获取数据库中已有的新闻,不再自动调用API # 获取该网站的相关新闻(最多显示5条) news_list = News.query.filter_by( site_id=site.id, is_active=True ).order_by(News.published_at.desc()).limit(5).all() # 检查是否有新闻,如果没有则标记需要加载 has_news = len(news_list) > 0 # 获取同类工具推荐(通过标签匹配,最多显示4个) recommended_sites = [] if site.tags: # 获取有相同标签的其他网站 recommended_sites = Site.query.filter( Site.id != site.id, Site.is_active == True, Site.tags.any(Tag.id.in_([tag.id for tag in site.tags])) ).order_by(Site.view_count.desc()).limit(4).all() return render_template('detail_new.html', site=site, news_list=news_list, has_news=has_news, recommended_sites=recommended_sites) # ========== 验证码相关 (v2.6.1优化) ========== def generate_captcha_image(text): """生成验证码图片""" # 创建图片 (宽120, 高40) width, height = 120, 40 image = Image.new('RGB', (width, height), color=(255, 255, 255)) draw = ImageDraw.Draw(image) # 使用默认字体 try: font = ImageFont.truetype("arial.ttf", 28) except: font = ImageFont.load_default() # 添加随机干扰线 for i in range(3): x1 = random.randint(0, width) y1 = random.randint(0, height) x2 = random.randint(0, width) y2 = random.randint(0, height) draw.line([(x1, y1), (x2, y2)], fill=(200, 200, 200), width=1) # 绘制验证码文字 for i, char in enumerate(text): x = 10 + i * 25 y = random.randint(5, 12) color = (random.randint(0, 100), random.randint(0, 100), random.randint(0, 100)) draw.text((x, y), char, font=font, fill=color) # 添加随机噪点 for _ in range(100): x = random.randint(0, width - 1) y = random.randint(0, height - 1) draw.point((x, y), fill=(random.randint(150, 200), random.randint(150, 200), random.randint(150, 200))) return image @app.route('/api/captcha', methods=['GET']) def get_captcha(): """生成验证码图片""" # 生成4位随机验证码 captcha_text = ''.join(random.choices(string.ascii_uppercase + string.digits, k=4)) # 存储到session session['captcha'] = captcha_text session['captcha_created'] = datetime.now().timestamp() # 生成图片 image = generate_captcha_image(captcha_text) # 转换为字节流 img_io = BytesIO() image.save(img_io, 'PNG') img_io.seek(0) return send_file(img_io, mimetype='image/png') # ========== 新闻相关API (v2.6优化) ========== @app.route('/api/fetch-news/', methods=['POST']) def fetch_news_for_site(code): """ 按需获取指定网站的新闻 v2.6.1优化:所有手动请求都需要验证码,防止API滥用 """ try: # 获取请求数据 data = request.get_json() or {} captcha_input = data.get('captcha', '').strip().upper() # 验证验证码 session_captcha = session.get('captcha', '').upper() captcha_created = session.get('captcha_created', 0) # 检查验证码是否存在 if not session_captcha: return jsonify({ 'success': False, 'error': '请先获取验证码' }), 400 # 检查验证码是否过期(5分钟) if datetime.now().timestamp() - captcha_created > 300: session.pop('captcha', None) session.pop('captcha_created', None) return jsonify({ 'success': False, 'error': '验证码已过期,请刷新后重试' }), 400 # 检查验证码是否正确 if captcha_input != session_captcha: return jsonify({ 'success': False, 'error': '验证码错误,请重新输入' }), 400 # 验证通过,清除验证码 session.pop('captcha', None) session.pop('captcha_created', None) # 查找网站 site = Site.query.filter_by(code=code, is_active=True).first_or_404() # 检查博查API配置 api_key = app.config.get('BOCHA_API_KEY') if not api_key: return jsonify({'success': False, 'error': '博查API未配置'}), 500 # 创建新闻搜索器 searcher = NewsSearcher(api_key) # 获取新闻(限制5条,一周内的) news_items = searcher.search_site_news( site_name=site.name, site_url=site.url, news_keywords=site.news_keywords, count=5, freshness='oneWeek' ) # 保存新闻到数据库 new_count = 0 if news_items: for item in news_items: # 检查是否已存在(根据URL去重) existing = News.query.filter_by( site_id=site.id, url=item['url'] ).first() if not existing: news = News( site_id=site.id, title=item['title'], content=item.get('summary') or item.get('snippet', ''), url=item['url'], source_name=item.get('site_name', ''), source_icon=item.get('site_icon', ''), published_at=item.get('published_at'), news_type='Search Result', is_active=True ) db.session.add(news) new_count += 1 db.session.commit() # 返回最新的新闻列表 news_list = News.query.filter_by( site_id=site.id, is_active=True ).order_by(News.published_at.desc()).limit(5).all() # 格式化新闻数据 news_data = [] for news in news_list: news_data.append({ 'title': news.title, 'content': news.content[:200] if news.content else '', 'url': news.url, 'source_name': news.source_name, 'source_icon': news.source_icon, 'published_at': news.published_at.strftime('%Y年%m月%d日') if news.published_at else '未知日期', 'news_type': news.news_type }) return jsonify({ 'success': True, 'news': news_data, 'new_count': new_count, 'message': f'成功获取{new_count}条新资讯' if new_count > 0 else '暂无新资讯' }) except Exception as e: print(f"获取新闻失败:{str(e)}") import traceback traceback.print_exc() db.session.rollback() return jsonify({'success': False, 'error': str(e)}), 500 # ========== 社媒营销路由 (v2.5新增) ========== @app.route('/api/generate-social-share', methods=['POST']) def generate_social_share(): """一键生成社媒分享文案(前台可访问)""" try: data = request.get_json() or {} site_code = (data.get('site_code') or '').strip() platforms = data.get('platforms') or [] if not site_code: return jsonify({'success': False, 'message': '请提供网站编码'}), 400 site = Site.query.filter_by(code=site_code, is_active=True).first() if not site: return jsonify({'success': False, 'message': '网站不存在或未启用'}), 404 # 允许的平台列表(前端也会限制,这里再做一次兜底) allowed_platforms = { 'universal', 'xiaohongshu', 'douyin', 'bilibili', 'wechat', 'moments', 'x', 'linkedin' } if not isinstance(platforms, list): platforms = [] platforms = [p for p in platforms if isinstance(p, str)] platforms = [p.strip().lower() for p in platforms if p.strip()] platforms = [p for p in platforms if p in allowed_platforms] # 默认生成通用模板 if not platforms: platforms = ['universal'] # 组织站点信息 site_info = { 'name': site.name, 'url': site.url, 'short_desc': site.short_desc or '', 'description': (site.description or '')[:1200], 'features': (site.features or '')[:1200], 'tags': [t.name for t in (site.tags or [])] } # 尝试使用PromptTemplate + DeepSeek生成 from utils.tag_generator import TagGenerator generator = TagGenerator() prompt = PromptTemplate.query.filter_by(key='social_share', is_active=True).first() generated = {} if prompt and generator.client: # 构造提示词变量 tags_text = ','.join(site_info['tags']) user_prompt = prompt.user_prompt_template.format( name=site_info['name'], url=site_info['url'], short_desc=site_info['short_desc'], description=site_info['description'], features=site_info['features'], tags=tags_text, platforms=','.join(platforms) ) try: resp = generator.client.chat.completions.create( model='deepseek-chat', messages=[ {'role': 'system', 'content': prompt.system_prompt}, {'role': 'user', 'content': user_prompt} ], temperature=0.7, max_tokens=1200 ) text = (resp.choices[0].message.content or '').strip() # 约定:模型返回JSON字符串;若不是JSON则回退到纯文本放到universal import json as _json try: parsed = _json.loads(text) if isinstance(parsed, dict): for k, v in parsed.items(): if isinstance(k, str) and isinstance(v, str) and k in allowed_platforms: generated[k] = v.strip() except Exception: generated['universal'] = text except Exception as e: # AI失败不影响整体,回退模板 print(f"社媒文案AI生成失败:{str(e)}") # 回退:规则模板(根据各平台字数限制优化) def build_fallback(p): name = site_info['name'] url = site_info['url'] short_desc = site_info['short_desc'] or '' full_desc = site_info['description'] or '' features = site_info['features'] or '' tags = site_info['tags'][:6] tags_line = ' ' + ' '.join([f"#{t}" for t in tags]) if tags else '' # X (Twitter) - 280字符限制,简洁为主 if p == 'x': desc = (short_desc or full_desc)[:100] base = f"🔥 {name}\n\n{desc}\n\n🔗 {url}{tags_line}" return base[:280] # LinkedIn - 3000字限制,专业风格 if p == 'linkedin': desc = (full_desc or short_desc)[:500] content = f"💡 推荐一个实用工具:{name}\n\n📝 简介:\n{desc}\n\n" if features: features_list = features.split('\n')[:3] content += f"✨ 主要功能:\n" + '\n'.join([f"• {f.strip()}" for f in features_list if f.strip()][:3]) + "\n\n" content += f"🔗 官网:{url}\n\n" if tags: content += f"📌 适用场景:{', '.join(tags)}" return content[:3000] # 小红书 - 2000字限制,图文风格 if p == 'xiaohongshu': desc = (short_desc or full_desc)[:300] content = f"✨ {name} | 我最近在用的宝藏工具\n\n" content += f"📌 一句话介绍:\n{desc}\n\n" if features: features_list = features.split('\n')[:5] content += f"🎯 核心功能:\n" + '\n'.join([f"✓ {f.strip()}" for f in features_list if f.strip()][:5]) + "\n\n" content += f"🔗 体验入口:{url}\n\n{tags_line}" return content[:2000] # 抖音 - 2000字限制,短视频文案风格 if p == 'douyin': desc = (short_desc or full_desc)[:200] content = f"🔥 发现一个超好用的工具:{name}\n\n" content += f"💡 {desc}\n\n" if features: features_list = features.split('\n')[:3] content += "✨ 亮点功能:\n" + '\n'.join([f"👉 {f.strip()}" for f in features_list if f.strip()][:3]) + "\n\n" content += f"🔗 想试试戳:{url}\n\n{tags_line}" return content[:2000] # B站 - 20000字限制,专栏风格 if p == 'bilibili': desc = (full_desc or short_desc)[:800] content = f"【分享】{name} - 值得一试的优质工具\n\n" content += f"## 📖 工具简介\n{desc}\n\n" if features: content += f"## ✨ 主要功能\n{features}\n\n" content += f"## 🔗 体验地址\n{url}\n\n" if tags: content += f"## 🏷️ 相关标签\n{' / '.join(tags)}" return content[:20000] # 微信公众号 - 基本无限制,正式风格 if p == 'wechat': desc = (full_desc or short_desc)[:600] content = f"今天给大家分享一个实用工具:{name}\n\n" content += f"📝 工具介绍\n{desc}\n\n" if features: content += f"✨ 核心功能\n{features}\n\n" content += f"🔗 官方网站\n{url}\n\n" if tags: content += f"如果你也在关注「{' · '.join(tags)}」相关的工具,不妨收藏一下这个实用的选择。" return content # 朋友圈 - 简短为主 if p == 'moments': desc = (short_desc or full_desc)[:80] return f"💡 {name}\n{desc}\n🔗 {url}{tags_line}".strip() # 通用模板 desc = (full_desc or short_desc)[:300] return f"{name}\n\n{desc}\n\n{url}{tags_line}".strip() results = {} for p in platforms: results[p] = generated.get(p) or build_fallback(p) # 统一补充一个通用版本 if 'universal' not in results: results['universal'] = generated.get('universal') or build_fallback('universal') return jsonify({ 'success': True, 'site': { 'code': site.code, 'name': site.name, 'url': site.url }, 'platforms': platforms, 'content': results }) except Exception as e: return jsonify({'success': False, 'message': f'生成失败: {str(e)}'}), 500 # ========== 后台登录路由 ========== @app.route('/admin/login', methods=['GET', 'POST']) def admin_login(): """管理员登录""" if current_user.is_authenticated: return redirect(url_for('admin.index')) if request.method == 'POST': username = request.form.get('username') password = request.form.get('password') admin = AdminModel.query.filter_by(username=username).first() if admin and admin.check_password(password) and admin.is_active: login_user(admin) admin.last_login = datetime.now() db.session.commit() return redirect(url_for('admin.index')) else: flash('用户名或密码错误', 'error') return render_template('admin_login.html') @app.route('/admin/logout') @login_required def admin_logout(): """管理员登出""" logout_user() return redirect(url_for('index')) # ========== 用户认证路由 ========== @app.route('/register', methods=['GET', 'POST']) def user_register(): """用户注册""" if current_user.is_authenticated: # 如果已登录,跳转到首页 return redirect(url_for('index')) if request.method == 'POST': username = request.form.get('username', '').strip() password = request.form.get('password', '').strip() confirm_password = request.form.get('confirm_password', '').strip() # 验证输入 if not username or not password: flash('用户名和密码不能为空', 'error') return render_template('auth/register.html') if len(username) < 3: flash('用户名至少3个字符', 'error') return render_template('auth/register.html') if len(password) < 6: flash('密码至少6个字符', 'error') return render_template('auth/register.html') if password != confirm_password: flash('两次输入的密码不一致', 'error') return render_template('auth/register.html') # 检查用户名是否已存在 if User.query.filter_by(username=username).first(): flash('该用户名已被注册', 'error') return render_template('auth/register.html') # 创建用户 try: user = User(username=username) user.set_password(password) user.last_login = datetime.now() db.session.add(user) db.session.commit() # 自动登录 login_user(user) flash('注册成功!', 'success') return redirect(url_for('index')) except Exception as e: db.session.rollback() flash(f'注册失败:{str(e)}', 'error') return render_template('auth/register.html') return render_template('auth/register.html') @app.route('/login', methods=['GET', 'POST']) def user_login(): """用户登录""" if current_user.is_authenticated: return redirect(url_for('index')) if request.method == 'POST': username = request.form.get('username', '').strip() password = request.form.get('password', '').strip() if not username or not password: flash('请输入用户名和密码', 'error') return render_template('auth/login.html') # 查找用户 user = User.query.filter_by(username=username).first() if user and user.check_password(password) and user.is_active: login_user(user) user.last_login = datetime.now() db.session.commit() # 获取next参数,如果有则跳转,否则跳转首页 next_page = request.args.get('next') if next_page and next_page.startswith('/'): return redirect(next_page) return redirect(url_for('index')) else: flash('用户名或密码错误', 'error') return render_template('auth/login.html') @app.route('/logout') @login_required def user_logout(): """用户登出""" logout_user() return redirect(url_for('index')) # ========== 用户认证API ========== @app.route('/api/auth/status', methods=['GET']) def auth_status(): """获取登录状态""" if current_user.is_authenticated: # 判断是Admin还是User if isinstance(current_user, User): return jsonify({ 'logged_in': True, 'user_type': 'user', 'username': current_user.username, 'avatar': current_user.avatar, 'id': current_user.id }) else: return jsonify({ 'logged_in': True, 'user_type': 'admin', 'username': current_user.username, 'id': current_user.id }) return jsonify({'logged_in': False}) # ========== 收藏功能API ========== @app.route('/api/collections/toggle', methods=['POST']) @login_required def toggle_collection(): """收藏/取消收藏""" # 检查是否为普通用户 if not isinstance(current_user, User): return jsonify({'success': False, 'message': '管理员账号无法使用收藏功能'}), 403 try: data = request.get_json() or {} site_code = data.get('site_code', '').strip() folder_id = data.get('folder_id') # 可选,指定文件夹 if not site_code: return jsonify({'success': False, 'message': '请提供网站编码'}), 400 # 查找网站 site = Site.query.filter_by(code=site_code, is_active=True).first() if not site: return jsonify({'success': False, 'message': '网站不存在'}), 404 # 检查是否已收藏 existing = Collection.query.filter_by( user_id=current_user.id, site_id=site.id ).first() if existing: # 已收藏,则取消收藏 db.session.delete(existing) db.session.commit() return jsonify({ 'success': True, 'action': 'removed', 'message': '已取消收藏' }) else: # 未收藏,则添加收藏 collection = Collection( user_id=current_user.id, site_id=site.id, folder_id=folder_id ) db.session.add(collection) db.session.commit() return jsonify({ 'success': True, 'action': 'added', 'message': '收藏成功', 'collection_id': collection.id }) except Exception as e: db.session.rollback() return jsonify({'success': False, 'message': f'操作失败:{str(e)}'}), 500 @app.route('/api/collections/status/', methods=['GET']) @login_required def collection_status(site_code): """查询收藏状态""" if not isinstance(current_user, User): return jsonify({'is_collected': False}) try: site = Site.query.filter_by(code=site_code, is_active=True).first() if not site: return jsonify({'is_collected': False}) collection = Collection.query.filter_by( user_id=current_user.id, site_id=site.id ).first() return jsonify({ 'is_collected': collection is not None, 'collection_id': collection.id if collection else None, 'folder_id': collection.folder_id if collection else None }) except Exception as e: return jsonify({'is_collected': False, 'error': str(e)}) @app.route('/api/collections/list', methods=['GET']) @login_required def list_collections(): """获取收藏列表""" if not isinstance(current_user, User): return jsonify({'success': False, 'message': '仅普通用户可访问'}), 403 try: folder_id = request.args.get('folder_id') page = request.args.get('page', 1, type=int) per_page = request.args.get('per_page', 20, type=int) # 构建查询 query = Collection.query.filter_by(user_id=current_user.id) if folder_id: if folder_id == 'null' or folder_id == 'none': query = query.filter_by(folder_id=None) else: query = query.filter_by(folder_id=int(folder_id)) # 按创建时间倒序 query = query.order_by(Collection.created_at.desc()) # 分页 pagination = query.paginate(page=page, per_page=per_page, error_out=False) # 格式化数据 collections_data = [] for collection in pagination.items: site = collection.site collections_data.append({ 'id': collection.id, 'site_id': site.id, 'site_code': site.code, 'site_name': site.name, 'site_url': site.url, 'site_logo': site.logo, 'site_short_desc': site.short_desc, 'folder_id': collection.folder_id, 'note': collection.note, 'created_at': collection.created_at.strftime('%Y-%m-%d %H:%M:%S') if collection.created_at else None }) return jsonify({ 'success': True, 'collections': collections_data, 'total': pagination.total, 'page': page, 'per_page': per_page, 'has_next': pagination.has_next, 'has_prev': pagination.has_prev }) except Exception as e: return jsonify({'success': False, 'message': f'获取失败:{str(e)}'}), 500 @app.route('/api/collections//note', methods=['PUT']) @login_required def update_collection_note(collection_id): """更新收藏备注""" if not isinstance(current_user, User): return jsonify({'success': False, 'message': '仅普通用户可访问'}), 403 try: data = request.get_json() or {} note = data.get('note', '').strip() collection = Collection.query.filter_by( id=collection_id, user_id=current_user.id ).first() if not collection: return jsonify({'success': False, 'message': '收藏不存在'}), 404 collection.note = note db.session.commit() return jsonify({ 'success': True, 'message': '备注已更新' }) except Exception as e: db.session.rollback() return jsonify({'success': False, 'message': f'更新失败:{str(e)}'}), 500 @app.route('/api/collections//move', methods=['PUT']) @login_required def move_collection(collection_id): """移动收藏到其他文件夹""" if not isinstance(current_user, User): return jsonify({'success': False, 'message': '仅普通用户可访问'}), 403 try: data = request.get_json() or {} folder_id = data.get('folder_id') # None表示移到未分类 collection = Collection.query.filter_by( id=collection_id, user_id=current_user.id ).first() if not collection: return jsonify({'success': False, 'message': '收藏不存在'}), 404 # 如果指定了文件夹,验证文件夹是否属于当前用户 if folder_id: folder = Folder.query.filter_by( id=folder_id, user_id=current_user.id ).first() if not folder: return jsonify({'success': False, 'message': '文件夹不存在'}), 404 collection.folder_id = folder_id db.session.commit() return jsonify({ 'success': True, 'message': '已移动到指定文件夹' }) except Exception as e: db.session.rollback() return jsonify({'success': False, 'message': f'移动失败:{str(e)}'}), 500 # ========== 文件夹管理API ========== @app.route('/api/folders', methods=['GET']) @login_required def list_folders(): """获取文件夹列表""" if not isinstance(current_user, User): return jsonify({'success': False, 'message': '仅普通用户可访问'}), 403 try: folders = Folder.query.filter_by(user_id=current_user.id).order_by( Folder.sort_order.desc(), Folder.created_at ).all() folders_data = [] for folder in folders: # 统计文件夹中的收藏数量 count = Collection.query.filter_by( user_id=current_user.id, folder_id=folder.id ).count() folders_data.append({ 'id': folder.id, 'name': folder.name, 'description': folder.description, 'icon': folder.icon, 'sort_order': folder.sort_order, 'is_public': folder.is_public, 'public_slug': folder.public_slug, 'count': count, 'created_at': folder.created_at.strftime('%Y-%m-%d %H:%M:%S') if folder.created_at else None }) return jsonify({ 'success': True, 'folders': folders_data }) except Exception as e: return jsonify({'success': False, 'message': f'获取失败:{str(e)}'}), 500 @app.route('/api/folders', methods=['POST']) @login_required def create_folder(): """创建文件夹""" if not isinstance(current_user, User): return jsonify({'success': False, 'message': '仅普通用户可访问'}), 403 try: data = request.get_json() or {} name = data.get('name', '').strip() description = data.get('description', '').strip() icon = data.get('icon', '📁').strip() if not name: return jsonify({'success': False, 'message': '文件夹名称不能为空'}), 400 # 检查同名文件夹 existing = Folder.query.filter_by( user_id=current_user.id, name=name ).first() if existing: return jsonify({'success': False, 'message': '该文件夹名称已存在'}), 400 # 创建文件夹 folder = Folder( user_id=current_user.id, name=name, description=description, icon=icon ) db.session.add(folder) db.session.commit() return jsonify({ 'success': True, 'message': '文件夹创建成功', 'folder': { 'id': folder.id, 'name': folder.name, 'description': folder.description, 'icon': folder.icon } }) except Exception as e: db.session.rollback() return jsonify({'success': False, 'message': f'创建失败:{str(e)}'}), 500 @app.route('/api/folders/', methods=['PUT']) @login_required def update_folder(folder_id): """更新文件夹""" if not isinstance(current_user, User): return jsonify({'success': False, 'message': '仅普通用户可访问'}), 403 try: data = request.get_json() or {} folder = Folder.query.filter_by( id=folder_id, user_id=current_user.id ).first() if not folder: return jsonify({'success': False, 'message': '文件夹不存在'}), 404 # 更新字段 if 'name' in data: name = data['name'].strip() if not name: return jsonify({'success': False, 'message': '文件夹名称不能为空'}), 400 # 检查同名(排除自己) existing = Folder.query.filter( Folder.user_id == current_user.id, Folder.name == name, Folder.id != folder_id ).first() if existing: return jsonify({'success': False, 'message': '该文件夹名称已存在'}), 400 folder.name = name if 'description' in data: folder.description = data['description'].strip() if 'icon' in data: folder.icon = data['icon'].strip() if 'sort_order' in data: folder.sort_order = int(data['sort_order']) db.session.commit() return jsonify({ 'success': True, 'message': '文件夹已更新' }) except Exception as e: db.session.rollback() return jsonify({'success': False, 'message': f'更新失败:{str(e)}'}), 500 @app.route('/api/folders/', methods=['DELETE']) @login_required def delete_folder(folder_id): """删除文件夹""" if not isinstance(current_user, User): return jsonify({'success': False, 'message': '仅普通用户可访问'}), 403 try: folder = Folder.query.filter_by( id=folder_id, user_id=current_user.id ).first() if not folder: return jsonify({'success': False, 'message': '文件夹不存在'}), 404 # 删除文件夹(级联删除收藏记录) db.session.delete(folder) db.session.commit() return jsonify({ 'success': True, 'message': '文件夹已删除' }) except Exception as e: db.session.rollback() return jsonify({'success': False, 'message': f'删除失败:{str(e)}'}), 500 # ========== 用户中心页面路由 ========== @app.route('/user/profile') @login_required def user_profile(): """用户中心主页""" if not isinstance(current_user, User): flash('仅普通用户可访问', 'error') return redirect(url_for('index')) # 统计信息 collections_count = Collection.query.filter_by(user_id=current_user.id).count() folders_count = Folder.query.filter_by(user_id=current_user.id).count() # 最近收藏(5条) recent_collections = Collection.query.filter_by( user_id=current_user.id ).order_by(Collection.created_at.desc()).limit(5).all() return render_template('user/profile.html', collections_count=collections_count, folders_count=folders_count, recent_collections=recent_collections) @app.route('/user/change-password') @login_required def user_change_password_page(): """修改密码页面""" if not isinstance(current_user, User): flash('仅普通用户可访问', 'error') return redirect(url_for('index')) return render_template('user/change_password.html') @app.route('/user/collections') @login_required def user_collections(): """收藏列表页面""" if not isinstance(current_user, User): flash('仅普通用户可访问', 'error') return redirect(url_for('index')) # 获取所有文件夹 folders = Folder.query.filter_by(user_id=current_user.id).order_by( Folder.sort_order.desc(), Folder.created_at ).all() # 为每个文件夹添加收藏计数 for folder in folders: folder.count = Collection.query.filter_by( user_id=current_user.id, folder_id=folder.id ).count() # 获取收藏(分页) page = request.args.get('page', 1, type=int) folder_id = request.args.get('folder_id') query = Collection.query.filter_by(user_id=current_user.id) if folder_id: if folder_id == 'none': query = query.filter_by(folder_id=None) else: query = query.filter_by(folder_id=int(folder_id)) query = query.order_by(Collection.created_at.desc()) pagination = query.paginate(page=page, per_page=20, error_out=False) return render_template('user/collections.html', folders=folders, collections=pagination.items, pagination=pagination, current_folder_id=folder_id) @app.route('/api/user/profile', methods=['PUT']) @login_required def update_user_profile(): """更新用户资料""" if not isinstance(current_user, User): return jsonify({'success': False, 'message': '仅普通用户可访问'}), 403 try: data = request.get_json() or {} if 'bio' in data: current_user.bio = data['bio'].strip() if 'avatar' in data: current_user.avatar = data['avatar'].strip() if 'is_public_profile' in data: current_user.is_public_profile = bool(data['is_public_profile']) db.session.commit() return jsonify({ 'success': True, 'message': '资料已更新' }) except Exception as e: db.session.rollback() return jsonify({'success': False, 'message': f'更新失败:{str(e)}'}), 500 @app.route('/api/user/change-password', methods=['PUT']) @login_required def user_change_password(): """普通用户修改密码""" if not isinstance(current_user, User): return jsonify({'success': False, 'message': '仅普通用户可访问'}), 403 try: data = request.get_json() or {} old_password = data.get('old_password', '').strip() new_password = data.get('new_password', '').strip() confirm_password = data.get('confirm_password', '').strip() # 验证旧密码 if not old_password: return jsonify({'success': False, 'message': '请输入旧密码'}), 400 if not current_user.check_password(old_password): return jsonify({'success': False, 'message': '旧密码错误'}), 400 # 验证新密码 if not new_password: return jsonify({'success': False, 'message': '请输入新密码'}), 400 if len(new_password) < 6: return jsonify({'success': False, 'message': '新密码长度至少6位'}), 400 if new_password != confirm_password: return jsonify({'success': False, 'message': '两次输入的新密码不一致'}), 400 if old_password == new_password: return jsonify({'success': False, 'message': '新密码不能与旧密码相同'}), 400 # 更新密码 current_user.set_password(new_password) db.session.commit() return jsonify({ 'success': True, 'message': '密码修改成功' }) except Exception as e: db.session.rollback() return jsonify({'success': False, 'message': f'修改失败:{str(e)}'}), 500 @app.route('/api/user/email', methods=['PUT']) @login_required def update_user_email(): """更新用户邮箱""" if not isinstance(current_user, User): return jsonify({'success': False, 'message': '仅普通用户可访问'}), 403 try: data = request.get_json() or {} email = data.get('email', '').strip() # 验证邮箱格式 if not email: return jsonify({'success': False, 'message': '请输入邮箱地址'}), 400 import re email_pattern = r'^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$' if not re.match(email_pattern, email): return jsonify({'success': False, 'message': '邮箱格式不正确'}), 400 # 检查邮箱是否已被其他用户使用 existing_user = User.query.filter( User.email == email, User.id != current_user.id ).first() if existing_user: return jsonify({'success': False, 'message': '该邮箱已被其他用户使用'}), 400 # 更新邮箱 current_user.email = email # 重置验证状态 current_user.email_verified = False current_user.email_verified_at = None current_user.email_verify_token = None current_user.email_verify_token_expires = None db.session.commit() return jsonify({ 'success': True, 'message': '邮箱已更新,请验证新邮箱' }) except Exception as e: db.session.rollback() return jsonify({'success': False, 'message': f'更新失败:{str(e)}'}), 500 @app.route('/api/user/send-verify-email', methods=['POST']) @login_required def send_verify_email(): """发送邮箱验证邮件""" if not isinstance(current_user, User): return jsonify({'success': False, 'message': '仅普通用户可访问'}), 403 try: # 检查是否已绑定邮箱 if not current_user.email: return jsonify({'success': False, 'message': '请先绑定邮箱'}), 400 # 检查是否已验证 if current_user.email_verified: return jsonify({'success': False, 'message': '邮箱已验证,无需重复验证'}), 400 # 生成验证令牌(32位随机字符串) token = secrets.token_urlsafe(32) expires = datetime.now() + timedelta(hours=24) # 保存令牌 current_user.email_verify_token = token current_user.email_verify_token_expires = expires db.session.commit() # 发送验证邮件 verify_url = url_for('verify_email', token=token, _external=True) html_content = f"""

验证您的邮箱

您好,{current_user.username}!

感谢您注册 ZJPB。请点击下面的按钮验证您的邮箱地址:

验证邮箱

或复制以下链接到浏览器:

{verify_url}

此链接将在24小时后失效。

""" text_content = f""" 验证您的邮箱 您好,{current_user.username}! 感谢您注册 ZJPB。请访问以下链接验证您的邮箱地址: {verify_url} 此链接将在24小时后失效。 如果您没有注册 ZJPB,请忽略此邮件。 """ email_sender = EmailSender() success = email_sender.send_email( to_email=current_user.email, subject='验证您的邮箱 - ZJPB', html_content=html_content, text_content=text_content ) if success: return jsonify({ 'success': True, 'message': '验证邮件已发送,请查收' }) else: return jsonify({ 'success': False, 'message': '邮件发送失败,请稍后重试' }), 500 except Exception as e: db.session.rollback() return jsonify({'success': False, 'message': f'发送失败:{str(e)}'}), 500 @app.route('/verify-email/') def verify_email(token): """验证邮箱""" try: # 查找令牌对应的用户 user = User.query.filter_by(email_verify_token=token).first() if not user: flash('验证链接无效', 'error') return redirect(url_for('index')) # 检查令牌是否过期 if user.email_verify_token_expires < datetime.now(): flash('验证链接已过期,请重新发送', 'error') return redirect(url_for('user_profile')) # 验证成功 user.email_verified = True user.email_verified_at = datetime.now() user.email_verify_token = None user.email_verify_token_expires = None db.session.commit() flash('邮箱验证成功!', 'success') return redirect(url_for('user_profile')) except Exception as e: flash(f'验证失败:{str(e)}', 'error') return redirect(url_for('index')) @app.route('/admin/change-password', methods=['GET', 'POST']) @login_required def change_password(): """修改密码""" # 只允许管理员访问 if not isinstance(current_user, AdminModel): flash('无权访问此页面', 'error') return redirect(url_for('index')) if request.method == 'POST': old_password = request.form.get('old_password', '').strip() new_password = request.form.get('new_password', '').strip() confirm_password = request.form.get('confirm_password', '').strip() # 验证旧密码 if not current_user.check_password(old_password): flash('旧密码错误', 'error') return render_template('admin/change_password.html') # 验证新密码 if not new_password: flash('新密码不能为空', 'error') return render_template('admin/change_password.html') if len(new_password) < 6: flash('新密码长度至少6位', 'error') return render_template('admin/change_password.html') if new_password != confirm_password: flash('两次输入的新密码不一致', 'error') return render_template('admin/change_password.html') if old_password == new_password: flash('新密码不能与旧密码相同', 'error') return render_template('admin/change_password.html') # 修改密码 try: current_user.set_password(new_password) db.session.commit() flash('密码修改成功,请重新登录', 'success') logout_user() return redirect(url_for('admin_login')) except Exception as e: db.session.rollback() flash(f'密码修改失败:{str(e)}', 'error') return render_template('admin/change_password.html') return render_template('admin/change_password.html') # ========== API路由 ========== @app.route('/api/fetch-website-info', methods=['POST']) @login_required def fetch_website_info(): """抓取网站信息API""" # 只允许管理员访问 if not isinstance(current_user, AdminModel): return jsonify({'success': False, 'message': '无权访问'}), 403 try: data = request.get_json() url = data.get('url', '').strip() if not url: return jsonify({ 'success': False, 'message': '请提供网站URL' }), 400 # 创建抓取器 fetcher = WebsiteFetcher(timeout=15) # 抓取网站信息 info = fetcher.fetch_website_info(url) if not info: return jsonify({ 'success': False, 'message': '无法获取网站信息,请检查URL是否正确或手动填写' }) # 下载Logo到本地(如果有) logo_path = None if info.get('logo_url'): logo_path = fetcher.download_logo(info['logo_url'], save_dir='static/logos') # 如果下载失败,不返回远程URL,让用户手动上传 return jsonify({ 'success': True, 'data': { 'name': info.get('name', ''), 'description': info.get('description', ''), 'logo': logo_path if logo_path else '' } }) except Exception as e: return jsonify({ 'success': False, 'message': f'抓取失败: {str(e)}' }), 500 @app.route('/api/upload-logo', methods=['POST']) @login_required def upload_logo(): """上传Logo图片API""" # 只允许管理员访问 if not isinstance(current_user, AdminModel): return jsonify({'success': False, 'message': '无权访问'}), 403 try: # 检查文件是否存在 if 'logo' not in request.files: return jsonify({ 'success': False, 'message': '请选择要上传的图片' }), 400 file = request.files['logo'] # 检查文件名 if file.filename == '': return jsonify({ 'success': False, 'message': '未选择文件' }), 400 # 检查文件类型 allowed_extensions = {'png', 'jpg', 'jpeg', 'gif', 'svg', 'ico', 'webp'} filename = file.filename.lower() if not any(filename.endswith('.' + ext) for ext in allowed_extensions): return jsonify({ 'success': False, 'message': '不支持的文件格式,请上传图片文件(png, jpg, gif, svg, ico, webp)' }), 400 # 创建保存目录 save_dir = 'static/logos' os.makedirs(save_dir, exist_ok=True) # 生成安全的文件名 import time import hashlib ext = os.path.splitext(filename)[1] timestamp = str(int(time.time() * 1000)) hash_name = hashlib.md5(f"{filename}{timestamp}".encode()).hexdigest()[:16] safe_filename = f"logo_{hash_name}{ext}" filepath = os.path.join(save_dir, safe_filename) # 保存文件 file.save(filepath) # 返回相对路径 return jsonify({ 'success': True, 'path': f'/{filepath.replace(os.sep, "/")}' }) except Exception as e: return jsonify({ 'success': False, 'message': f'上传失败: {str(e)}' }), 500 @app.route('/api/generate-features', methods=['POST']) @login_required def generate_features(): """使用DeepSeek自动生成网站主要功能""" # 只允许管理员访问 if not isinstance(current_user, AdminModel): return jsonify({'success': False, 'message': '无权访问'}), 403 try: data = request.get_json() name = data.get('name', '').strip() description = data.get('description', '').strip() url = data.get('url', '').strip() if not name or not description: return jsonify({ 'success': False, 'message': '请提供网站名称和描述' }), 400 # 生成功能列表 generator = TagGenerator() features = generator.generate_features(name, description, url) if not features: return jsonify({ 'success': False, 'message': 'DeepSeek功能生成失败,请检查API配置' }), 500 return jsonify({ 'success': True, 'features': features }) except ValueError as e: return jsonify({ 'success': False, 'message': str(e) }), 400 except Exception as e: return jsonify({ 'success': False, 'message': f'生成失败: {str(e)}' }), 500 @app.route('/api/generate-description', methods=['POST']) @login_required def generate_description(): """使用DeepSeek自动生成网站详细介绍""" # 只允许管理员访问 if not isinstance(current_user, AdminModel): return jsonify({'success': False, 'message': '无权访问'}), 403 try: data = request.get_json() name = data.get('name', '').strip() short_desc = data.get('short_desc', '').strip() url = data.get('url', '').strip() if not name: return jsonify({ 'success': False, 'message': '请提供网站名称' }), 400 # 生成详细介绍 generator = TagGenerator() description = generator.generate_description(name, short_desc, url) if not description: return jsonify({ 'success': False, 'message': 'DeepSeek详细介绍生成失败,请检查API配置' }), 500 return jsonify({ 'success': True, 'description': description }) except ValueError as e: return jsonify({ 'success': False, 'message': str(e) }), 400 except Exception as e: return jsonify({ 'success': False, 'message': f'生成失败: {str(e)}' }), 500 @app.route('/api/generate-tags', methods=['POST']) @login_required def generate_tags(): """使用DeepSeek自动生成标签""" # 只允许管理员访问 if not isinstance(current_user, AdminModel): return jsonify({'success': False, 'message': '无权访问'}), 403 try: data = request.get_json() name = data.get('name', '').strip() description = data.get('description', '').strip() if not name or not description: return jsonify({ 'success': False, 'message': '请提供网站名称和描述' }), 400 # 获取现有标签作为参考 existing_tags = [tag.name for tag in Tag.query.all()] # 生成标签 generator = TagGenerator() suggested_tags = generator.generate_tags(name, description, existing_tags) if not suggested_tags: return jsonify({ 'success': False, 'message': 'DeepSeek标签生成失败,请检查API配置' }), 500 return jsonify({ 'success': True, 'tags': suggested_tags }) except ValueError as e: return jsonify({ 'success': False, 'message': str(e) }), 400 except Exception as e: return jsonify({ 'success': False, 'message': f'生成失败: {str(e)}' }), 500 # ========== 新闻获取路由 ========== @app.route('/api/fetch-site-news', methods=['POST']) @login_required def fetch_site_news(): """为指定网站获取最新新闻""" # 只允许管理员访问 if not isinstance(current_user, AdminModel): return jsonify({'success': False, 'message': '无权访问'}), 403 try: data = request.get_json() site_id = data.get('site_id') count = data.get('count', app.config.get('NEWS_SEARCH_COUNT', 10)) freshness = data.get('freshness', app.config.get('NEWS_SEARCH_FRESHNESS', 'oneMonth')) if not site_id: return jsonify({ 'success': False, 'message': '请提供网站ID' }), 400 # 获取网站信息 site = Site.query.get(site_id) if not site: return jsonify({ 'success': False, 'message': '网站不存在' }), 404 # 检查博查API配置 api_key = app.config.get('BOCHA_API_KEY') if not api_key: return jsonify({ 'success': False, 'message': '博查API未配置,请在.env文件中设置BOCHA_API_KEY' }), 500 # 创建新闻搜索器 searcher = NewsSearcher(api_key) # 搜索新闻 news_items = searcher.search_site_news( site_name=site.name, site_url=site.url, news_keywords=site.news_keywords, # v2.3新增:使用专用关键词 count=count, freshness=freshness ) if not news_items: return jsonify({ 'success': False, 'message': '未找到相关新闻' }), 404 # 保存新闻到数据库 saved_count = 0 for item in news_items: # 检查新闻是否已存在(根据URL判断) existing_news = News.query.filter_by( site_id=site_id, url=item['url'] ).first() if not existing_news: # 创建新闻记录 news = News( site_id=site_id, title=item['title'], content=item.get('summary') or item.get('snippet', ''), url=item['url'], source_name=item.get('site_name', ''), source_icon=item.get('site_icon', ''), published_at=item.get('published_at'), news_type='Search Result', is_active=True ) db.session.add(news) saved_count += 1 # 提交事务 db.session.commit() return jsonify({ 'success': True, 'message': f'成功获取并保存 {saved_count} 条新闻', 'total_found': len(news_items), 'saved': saved_count, 'news_items': searcher.format_news_for_display(news_items) }) except Exception as e: db.session.rollback() return jsonify({ 'success': False, 'message': f'获取失败: {str(e)}' }), 500 @app.route('/api/fetch-all-news', methods=['POST']) @login_required def fetch_all_news(): """批量为所有网站获取新闻""" # 只允许管理员访问 if not isinstance(current_user, AdminModel): return jsonify({'success': False, 'message': '无权访问'}), 403 try: data = request.get_json() count_per_site = data.get('count', 5) # 每个网站获取的新闻数量 freshness = data.get('freshness', app.config.get('NEWS_SEARCH_FRESHNESS', 'oneMonth')) limit = data.get('limit', 10) # 限制处理的网站数量 # 检查博查API配置 api_key = app.config.get('BOCHA_API_KEY') if not api_key: return jsonify({ 'success': False, 'message': '博查API未配置,请在.env文件中设置BOCHA_API_KEY' }), 500 # 获取启用的网站(按更新时间排序,优先处理旧的) sites = Site.query.filter_by(is_active=True).order_by(Site.updated_at).limit(limit).all() if not sites: return jsonify({ 'success': False, 'message': '没有可用的网站' }), 404 # 创建新闻搜索器 searcher = NewsSearcher(api_key) # 统计信息 total_saved = 0 total_found = 0 processed_sites = [] # 为每个网站获取新闻 for site in sites: try: # 搜索新闻 news_items = searcher.search_site_news( site_name=site.name, site_url=site.url, news_keywords=site.news_keywords, # v2.3新增:使用专用关键词 count=count_per_site, freshness=freshness ) site_saved = 0 for item in news_items: # 检查是否已存在 existing_news = News.query.filter_by( site_id=site.id, url=item['url'] ).first() if not existing_news: news = News( site_id=site.id, title=item['title'], content=item.get('summary') or item.get('snippet', ''), url=item['url'], source_name=item.get('site_name', ''), source_icon=item.get('site_icon', ''), published_at=item.get('published_at'), news_type='Search Result', is_active=True ) db.session.add(news) site_saved += 1 total_found += len(news_items) total_saved += site_saved processed_sites.append({ 'id': site.id, 'name': site.name, 'found': len(news_items), 'saved': site_saved }) except Exception as e: # 单个网站失败不影响其他网站 processed_sites.append({ 'id': site.id, 'name': site.name, 'error': str(e) }) continue # 提交事务 db.session.commit() return jsonify({ 'success': True, 'message': f'批量获取完成,共处理 {len(processed_sites)} 个网站', 'total_found': total_found, 'total_saved': total_saved, 'processed_sites': processed_sites }) except Exception as e: db.session.rollback() return jsonify({ 'success': False, 'message': f'批量获取失败: {str(e)}' }), 500 # ========== SEO路由 (v2.4新增) ========== @app.route('/sitemap.xml') def sitemap(): """动态生成sitemap.xml""" from flask import make_response # 获取所有启用的网站 sites = Site.query.filter_by(is_active=True).order_by(Site.updated_at.desc()).all() # 获取所有标签 tags = Tag.query.all() # 构建XML内容 xml_content = ''' ''' # 首页 xml_content += ''' {} daily 1.0 '''.format(request.url_root.rstrip('/')) # 工具详情页 for site in sites: xml_content += ''' {} {} weekly 0.8 '''.format( request.url_root.rstrip('/') + url_for('site_detail', code=site.code), site.updated_at.strftime('%Y-%m-%d') if site.updated_at else datetime.now().strftime('%Y-%m-%d') ) # 标签页 for tag in tags: xml_content += ''' {} weekly 0.6 '''.format(request.url_root.rstrip('/') + '/?tag=' + tag.slug) xml_content += ''' ''' response = make_response(xml_content) response.headers['Content-Type'] = 'application/xml; charset=utf-8' return response @app.route('/robots.txt') def robots(): """动态生成robots.txt""" from flask import make_response robots_content = '''User-agent: * Allow: / Disallow: /admin/ Disallow: /api/ Sitemap: {}sitemap.xml '''.format(request.url_root) response = make_response(robots_content) response.headers['Content-Type'] = 'text/plain; charset=utf-8' return response # ========== SEO工具管理路由 (v2.4新增) ========== @app.route('/admin/seo-tools') @login_required def seo_tools(): """SEO工具管理页面""" # 只允许管理员访问 if not isinstance(current_user, AdminModel): flash('无权访问此页面', 'error') return redirect(url_for('index')) # 检查static/sitemap.xml是否存在及最后更新时间 sitemap_path = 'static/sitemap.xml' sitemap_info = None if os.path.exists(sitemap_path): import time mtime = os.path.getmtime(sitemap_path) sitemap_info = { 'exists': True, 'last_updated': datetime.fromtimestamp(mtime).strftime('%Y-%m-%d %H:%M:%S'), 'size': os.path.getsize(sitemap_path) } else: sitemap_info = {'exists': False} return render_template('admin/seo_tools.html', sitemap_info=sitemap_info) @app.route('/api/generate-static-sitemap', methods=['POST']) @login_required def generate_static_sitemap(): """生成静态sitemap.xml文件""" # 只允许管理员访问 if not isinstance(current_user, AdminModel): return jsonify({'success': False, 'message': '无权访问'}), 403 try: # 获取所有启用的网站 sites = Site.query.filter_by(is_active=True).order_by(Site.updated_at.desc()).all() # 获取所有标签 tags = Tag.query.all() # 构建XML内容(使用网站配置的域名) base_url = request.url_root.rstrip('/') xml_content = ''' ''' # 首页 xml_content += f''' {base_url} daily 1.0 ''' # 工具详情页 for site in sites: xml_content += f''' {base_url}/site/{site.code} {site.updated_at.strftime('%Y-%m-%d') if site.updated_at else datetime.now().strftime('%Y-%m-%d')} weekly 0.8 ''' # 标签页 for tag in tags: xml_content += f''' {base_url}/?tag={tag.slug} weekly 0.6 ''' xml_content += ''' ''' # 保存到static目录 static_dir = 'static' os.makedirs(static_dir, exist_ok=True) sitemap_path = os.path.join(static_dir, 'sitemap.xml') with open(sitemap_path, 'w', encoding='utf-8') as f: f.write(xml_content) # 统计信息 total_urls = 1 + len(sites) + len(tags) # 首页 + 工具页 + 标签页 return jsonify({ 'success': True, 'message': f'静态sitemap.xml生成成功!共包含 {total_urls} 个URL', 'total_urls': total_urls, 'file_path': sitemap_path, 'timestamp': datetime.now().strftime('%Y-%m-%d %H:%M:%S') }) except Exception as e: return jsonify({ 'success': False, 'message': f'生成失败: {str(e)}' }), 500 @app.route('/api/notify-search-engines', methods=['POST']) @login_required def notify_search_engines(): """通知搜索引擎sitemap更新""" # 只允许管理员访问 if not isinstance(current_user, AdminModel): return jsonify({'success': False, 'message': '无权访问'}), 403 try: import requests from urllib.parse import quote # 获取sitemap URL(使用当前请求的域名) sitemap_url = request.url_root.rstrip('/') + '/sitemap.xml' encoded_sitemap_url = quote(sitemap_url, safe='') results = [] # 1. 通知Google google_ping_url = f'http://www.google.com/ping?sitemap={encoded_sitemap_url}' try: google_response = requests.get(google_ping_url, timeout=10) results.append({ 'engine': 'Google', 'status': 'success' if google_response.status_code == 200 else 'failed', 'status_code': google_response.status_code, 'message': '提交成功' if google_response.status_code == 200 else f'HTTP {google_response.status_code}' }) except Exception as e: results.append({ 'engine': 'Google', 'status': 'error', 'message': f'请求失败: {str(e)}' }) # 2. 通知Baidu baidu_ping_url = f'http://data.zz.baidu.com/ping?sitemap={encoded_sitemap_url}' try: baidu_response = requests.get(baidu_ping_url, timeout=10) results.append({ 'engine': 'Baidu', 'status': 'success' if baidu_response.status_code == 200 else 'failed', 'status_code': baidu_response.status_code, 'message': '提交成功' if baidu_response.status_code == 200 else f'HTTP {baidu_response.status_code}' }) except Exception as e: results.append({ 'engine': 'Baidu', 'status': 'error', 'message': f'请求失败: {str(e)}' }) # 3. 通知Bing bing_ping_url = f'http://www.bing.com/ping?sitemap={encoded_sitemap_url}' try: bing_response = requests.get(bing_ping_url, timeout=10) results.append({ 'engine': 'Bing', 'status': 'success' if bing_response.status_code == 200 else 'failed', 'status_code': bing_response.status_code, 'message': '提交成功' if bing_response.status_code == 200 else f'HTTP {bing_response.status_code}' }) except Exception as e: results.append({ 'engine': 'Bing', 'status': 'error', 'message': f'请求失败: {str(e)}' }) # 统计成功数量 success_count = sum(1 for r in results if r['status'] == 'success') return jsonify({ 'success': True, 'message': f'已通知 {success_count}/{len(results)} 个搜索引擎', 'sitemap_url': sitemap_url, 'results': results }) except Exception as e: return jsonify({ 'success': False, 'message': f'通知失败: {str(e)}' }), 500 @app.route('/api/refresh-site-news/', methods=['POST']) def refresh_site_news(site_code): """手动刷新指定网站的新闻(前台用户可访问)- v2.3新增""" try: # 根据code查找网站 site = Site.query.filter_by(code=site_code).first() if not site: return jsonify({ 'success': False, 'message': '网站不存在' }), 404 # 检查博查API配置 api_key = app.config.get('BOCHA_API_KEY') if not api_key: return jsonify({ 'success': False, 'message': '新闻功能未启用' }), 500 # 创建新闻搜索器 searcher = NewsSearcher(api_key) # 搜索新闻(获取最新5条) news_items = searcher.search_site_news( site_name=site.name, site_url=site.url, news_keywords=site.news_keywords, # 使用专用关键词 count=5, freshness='oneWeek' # 一周内的新闻 ) if not news_items: return jsonify({ 'success': False, 'message': '未找到相关新闻' }), 404 # 保存新闻到数据库 saved_count = 0 for item in news_items: # 检查新闻是否已存在(根据URL判断) existing_news = News.query.filter_by( site_id=site.id, url=item['url'] ).first() if not existing_news: news = News( site_id=site.id, title=item['title'], content=item.get('summary') or item.get('snippet', ''), url=item['url'], source_name=item.get('site_name', ''), source_icon=item.get('site_icon', ''), published_at=item.get('published_at'), news_type='Search Result', is_active=True ) db.session.add(news) saved_count += 1 # 提交事务 db.session.commit() return jsonify({ 'success': True, 'message': f'成功获取 {saved_count} 条新资讯', 'total_found': len(news_items), 'saved_count': saved_count }) except Exception as e: db.session.rollback() return jsonify({ 'success': False, 'message': f'获取失败: {str(e)}' }), 500 # ========== 批量导入路由 ========== @app.route('/admin/batch-import', methods=['GET', 'POST']) @login_required def batch_import(): """批量导入网站""" # 只允许管理员访问 if not isinstance(current_user, AdminModel): flash('无权访问此页面', 'error') return redirect(url_for('index')) from utils.bookmark_parser import BookmarkParser from utils.website_fetcher import WebsiteFetcher results = None if request.method == 'POST': import_type = request.form.get('import_type') auto_activate = request.form.get('auto_activate') == 'on' parser = BookmarkParser() fetcher = WebsiteFetcher(timeout=15) urls_to_import = [] try: # 解析输入 if import_type == 'url_list': url_list_text = request.form.get('url_list', '') urls_to_import = parser.parse_url_list(url_list_text) elif import_type == 'bookmark_file': bookmark_file = request.files.get('bookmark_file') if not bookmark_file: flash('请选择书签文件', 'error') return render_template('admin/batch_import.html') html_content = bookmark_file.read().decode('utf-8', errors='ignore') all_bookmarks = parser.parse_html_file(html_content) # 筛选文件夹(如果指定) folder_filter = request.form.get('folder_filter', '').strip() if folder_filter: urls_to_import = [ b for b in all_bookmarks if folder_filter.lower() in b.get('folder', '').lower() ] else: urls_to_import = all_bookmarks # 批量导入 success_list = [] failed_list = [] for idx, item in enumerate(urls_to_import, 1): url = item['url'] name = item.get('name', '') # 为每个URL创建独立的事务 try: # 1. 检查URL是否已存在 try: existing = Site.query.filter_by(url=url).first() if existing: failed_list.append({ 'url': url, 'name': name or existing.name, 'error': f'该URL已存在(网站名称:{existing.name})' }) continue except Exception as e: failed_list.append({ 'url': url, 'name': name, 'error': f'检查URL时出错: {str(e)}' }) continue # 2. 抓取网站信息(带超时和错误处理) info = None try: info = fetcher.fetch_website_info(url) except Exception as e: print(f"抓取 {url} 失败: {str(e)}") # 抓取失败不是致命错误,继续尝试使用书签名称 # 3. 处理网站信息 if not info or not info.get('name'): # 如果有书签名称,使用书签名称 if name: info = { 'name': name, 'description': '', 'logo_url': '' } else: # 尝试从URL提取域名作为名称 from urllib.parse import urlparse try: parsed = urlparse(url) domain = parsed.netloc or parsed.path if domain: info = { 'name': domain, 'description': '', 'logo_url': '' } else: failed_list.append({ 'url': url, 'name': name, 'error': '无法获取网站信息且没有备用名称' }) continue except Exception: failed_list.append({ 'url': url, 'name': name, 'error': '无法获取网站信息且URL解析失败' }) continue # 4. 下载Logo到本地(失败不影响导入) logo_path = None if info.get('logo_url'): try: logo_path = fetcher.download_logo(info['logo_url'], save_dir='static/logos') except Exception as e: print(f"下载Logo失败 ({url}): {str(e)}") # Logo下载失败不影响网站导入 # 5. 生成code和slug try: import random from pypinyin import lazy_pinyin import re # 生成唯一的code site_code = None max_attempts = 10 for _ in range(max_attempts): code = str(random.randint(10000000, 99999999)) if not Site.query.filter_by(code=code).first(): site_code = code break if not site_code: # 如果10次都失败,使用时间戳 import time site_code = str(int(time.time() * 1000))[-8:] # 生成slug site_name = info.get('name', name or 'Unknown')[:100] slug = ''.join(lazy_pinyin(site_name)) slug = slug.lower() slug = re.sub(r'[^\w\s-]', '', slug) slug = re.sub(r'[-\s]+', '-', slug).strip('-') if not slug: slug = f"site-{site_code}" # 确保slug唯一 base_slug = slug[:50] # 限制长度 counter = 1 final_slug = slug while Site.query.filter_by(slug=final_slug).first(): final_slug = f"{base_slug}-{counter}" counter += 1 if counter > 100: # 防止无限循环 final_slug = f"{base_slug}-{site_code}" break # 6. 创建网站记录(带code和slug) site = Site( code=site_code, slug=final_slug, name=site_name, url=url[:500], # 限制URL长度 logo=logo_path or info.get('logo_url', '')[:500] if info.get('logo_url') else '', short_desc=info.get('description', '')[:200] if info.get('description') else '', description=info.get('description', '')[:2000] if info.get('description') else '', is_active=auto_activate ) # 添加到数据库并提交 db.session.add(site) db.session.commit() success_list.append({ 'name': site.name, 'url': site.url }) print(f"成功导入 [{idx}/{len(urls_to_import)}]: {site.name}") except Exception as e: db.session.rollback() failed_list.append({ 'url': url, 'name': name or info.get('name', 'Unknown'), 'error': f'数据库保存失败: {str(e)}' }) continue except Exception as e: # 捕获所有未预期的错误 db.session.rollback() failed_list.append({ 'url': url, 'name': name, 'error': f'未知错误: {str(e)}' }) print(f"导入 {url} 时发生未知错误: {str(e)}") continue results = { 'total_count': len(urls_to_import), 'success_count': len(success_list), 'failed_count': len(failed_list), 'success_list': success_list, 'failed_list': failed_list } if success_list: flash(f'成功导入 {len(success_list)} 个网站!', 'success') except Exception as e: flash(f'导入失败: {str(e)}', 'error') return render_template('admin/batch_import.html', results=results) # ========== 用户管理路由 ========== @app.route('/admin/users') @login_required def admin_users(): """用户管理列表页""" if not isinstance(current_user, AdminModel): flash('无权访问', 'error') return redirect(url_for('index')) # 获取分页参数 page = request.args.get('page', 1, type=int) per_page = 20 # 获取搜索参数 search = request.args.get('search', '').strip() # 构建查询 query = User.query if search: query = query.filter( db.or_( User.username.like(f'%{search}%'), User.email.like(f'%{search}%') ) ) # 排序:按注册时间倒序 query = query.order_by(User.created_at.desc()) # 分页 pagination = query.paginate(page=page, per_page=per_page, error_out=False) users = pagination.items # 为每个用户统计收藏数据 user_stats = {} for user in users: user_stats[user.id] = { 'collections_count': Collection.query.filter_by(user_id=user.id).count(), 'folders_count': Folder.query.filter_by(user_id=user.id).count() } # 获取真实的 admin 实例 from flask import current_app admin_instance = current_app.extensions['admin'][0] # 创建模拟的 admin_view 对象 class MockAdminView: name = '用户管理' category = None admin = admin_instance return render_template('admin/users/list.html', users=users, pagination=pagination, user_stats=user_stats, search=search, admin_view=MockAdminView()) @app.route('/admin/users/') @login_required def admin_user_detail(user_id): """用户详情页""" if not isinstance(current_user, AdminModel): flash('无权访问', 'error') return redirect(url_for('index')) user = User.query.get_or_404(user_id) # 统计数据 collections_count = Collection.query.filter_by(user_id=user.id).count() folders_count = Folder.query.filter_by(user_id=user.id).count() # 获取最近的收藏(前10条) recent_collections = Collection.query.filter_by(user_id=user.id)\ .order_by(Collection.created_at.desc())\ .limit(10).all() # 获取所有文件夹 folders = Folder.query.filter_by(user_id=user.id)\ .order_by(Folder.sort_order.desc(), Folder.created_at.desc())\ .all() # 获取真实的 admin 实例 from flask import current_app admin_instance = current_app.extensions['admin'][0] # 创建模拟的 admin_view 对象 class MockAdminView: name = '用户详情' category = None admin = admin_instance return render_template('admin/users/detail.html', user=user, collections_count=collections_count, folders_count=folders_count, recent_collections=recent_collections, folders=folders, admin_view=MockAdminView()) @app.route('/api/admin/users//reset-password', methods=['POST']) @login_required def admin_reset_user_password(user_id): """管理员重置用户密码""" if not isinstance(current_user, AdminModel): return jsonify({'success': False, 'message': '无权操作'}), 403 user = User.query.get_or_404(user_id) data = request.get_json() new_password = data.get('new_password', '').strip() # 验证新密码 if not new_password: return jsonify({'success': False, 'message': '新密码不能为空'}), 400 if len(new_password) < 6: return jsonify({'success': False, 'message': '新密码至少需要6位'}), 400 try: # 设置新密码 user.set_password(new_password) db.session.commit() return jsonify({ 'success': True, 'message': f'已成功重置用户 {user.username} 的密码' }) except Exception as e: db.session.rollback() return jsonify({'success': False, 'message': f'重置密码失败: {str(e)}'}), 500 @app.route('/api/admin/users//update-username', methods=['POST']) @login_required def admin_update_username(user_id): """管理员修改用户昵称""" if not isinstance(current_user, AdminModel): return jsonify({'success': False, 'message': '无权操作'}), 403 user = User.query.get_or_404(user_id) data = request.get_json() new_username = data.get('new_username', '').strip() # 验证新昵称 if not new_username: return jsonify({'success': False, 'message': '昵称不能为空'}), 400 if len(new_username) < 2 or len(new_username) > 50: return jsonify({'success': False, 'message': '昵称长度需要在2-50个字符之间'}), 400 # 检查昵称是否已被使用 existing_user = User.query.filter_by(username=new_username).first() if existing_user and existing_user.id != user.id: return jsonify({'success': False, 'message': '该昵称已被使用'}), 400 try: old_username = user.username user.username = new_username db.session.commit() return jsonify({ 'success': True, 'message': f'已成功将昵称从 {old_username} 修改为 {new_username}' }) except Exception as e: db.session.rollback() return jsonify({'success': False, 'message': f'修改昵称失败: {str(e)}'}), 500 # ========== Flask-Admin 配置 ========== class SecureModelView(ModelView): """需要登录的模型视图""" # 中文化配置 can_set_page_size = True page_size = 20 # 自定义文本 list_template = 'admin/model/list.html' create_template = 'admin/model/create.html' edit_template = 'admin/model/edit.html' # 覆盖英文文本 named_filter_urls = True def is_accessible(self): # 只允许Admin类型的用户访问 return current_user.is_authenticated and isinstance(current_user, AdminModel) def inaccessible_callback(self, name, **kwargs): return redirect(url_for('admin_login')) class SecureAdminIndexView(AdminIndexView): """需要登录的管理首页""" def is_accessible(self): # 只允许Admin类型的用户访问 return current_user.is_authenticated and isinstance(current_user, AdminModel) def inaccessible_callback(self, name, **kwargs): return redirect(url_for('admin_login')) @expose('/') def index(self): """控制台首页,显示统计信息""" # 优化查询:减少数据库往返次数 # 1. 获取 sites_count 和 total_views site_stats = db.session.query( db.func.count(Site.id).label('total'), db.func.sum(Site.view_count).label('total_views') ).first() # 2. 获取 active sites count sites_count = Site.query.filter_by(is_active=True).count() # 3. 获取 tags_count tags_count = Tag.query.count() # 4. 获取 news_count news_count = News.query.filter_by(is_active=True).count() # 5. 获取 users_count users_count = User.query.count() # 组装统计数据 stats = { 'sites_count': sites_count, 'tags_count': tags_count, 'news_count': news_count, 'total_views': site_stats.total_views or 0, 'users_count': users_count } # 最近添加的工具(最多5个)- 只查询必要字段 recent_sites = db.session.query( Site.id, Site.name, Site.url, Site.logo, Site.is_active, Site.view_count, Site.created_at ).order_by(Site.created_at.desc()).limit(5).all() return self.render('admin/index.html', stats=stats, recent_sites=recent_sites) # 网站管理视图 class SiteAdmin(SecureModelView): # 自定义模板 create_template = 'admin/site/create.html' edit_template = 'admin/site/edit.html' # 启用编辑和删除 can_edit = True can_delete = True can_create = True can_view_details = False # 禁用查看详情,点击名称即可查看 # 显示操作列 column_display_actions = True action_disallowed_list = [] column_list = ['id', 'code', 'name', 'url', 'slug', 'is_active', 'is_recommended', 'view_count', 'created_at'] column_default_sort = ('created_at', True) # 按创建时间倒序,最新的排在前面 column_searchable_list = ['code', 'name', 'url', 'description'] column_filters = ['is_active', 'is_recommended', 'tags'] column_labels = { 'id': 'ID', 'code': '网站编码', 'name': '网站名称', 'url': 'URL', 'slug': 'URL别名', 'logo': 'Logo', 'short_desc': '简短描述', 'description': '详细介绍', 'features': '主要功能', 'news_keywords': '新闻关键词', 'is_active': '是否启用', 'is_recommended': '是否推荐', 'view_count': '浏览次数', 'sort_order': '排序权重', 'tags': '标签', 'created_at': '创建时间', 'updated_at': '更新时间' } form_columns = ['name', 'url', 'slug', 'logo', 'short_desc', 'description', 'features', 'news_keywords', 'tags', 'is_active', 'is_recommended', 'sort_order'] # 自定义编辑/删除文字 column_extra_row_actions = None def on_model_change(self, form, model, is_created): """保存前自动生成code和slug(如果为空)""" import re import random from pypinyin import lazy_pinyin from flask import request # 使用no_autoflush防止在查询时触发提前flush with db.session.no_autoflush: # 处理手动输入的新标签 new_tags_str = request.form.get('new_tags', '') if new_tags_str: new_tag_names = [name.strip() for name in new_tags_str.split(',') if name.strip()] for tag_name in new_tag_names: # 检查标签是否已存在 existing_tag = Tag.query.filter_by(name=tag_name).first() if not existing_tag: # 创建新标签 tag_slug = ''.join(lazy_pinyin(tag_name)) tag_slug = tag_slug.lower() tag_slug = re.sub(r'[^\w\s-]', '', tag_slug) tag_slug = re.sub(r'[-\s]+', '-', tag_slug).strip('-') # 确保slug唯一 base_tag_slug = tag_slug[:50] counter = 1 final_tag_slug = tag_slug while Tag.query.filter_by(slug=final_tag_slug).first(): final_tag_slug = f"{base_tag_slug}-{counter}" counter += 1 if counter > 100: final_tag_slug = f"{base_tag_slug}-{random.randint(1000, 9999)}" break new_tag = Tag(name=tag_name, slug=final_tag_slug) db.session.add(new_tag) db.session.flush() # 确保新标签有ID # 添加到模型的标签列表 if new_tag not in model.tags: model.tags.append(new_tag) else: # 添加已存在的标签 if existing_tag not in model.tags: model.tags.append(existing_tag) # 如果code为空,自动生成唯一的8位数字编码 if not model.code or model.code.strip() == '': max_attempts = 10 for attempt in range(max_attempts): # 生成10000000-99999999之间的随机数 code = str(random.randint(10000000, 99999999)) # 检查是否已存在 existing = Site.query.filter(Site.code == code).first() if not existing or existing.id == model.id: model.code = code break # 如果10次都失败,使用时间戳 if not model.code: import time model.code = str(int(time.time() * 1000))[-8:] # 如果slug为空,从name自动生成 if not model.slug or model.slug.strip() == '': # 将中文转换为拼音 slug = ''.join(lazy_pinyin(model.name)) # 转换为小写,移除特殊字符 slug = slug.lower() slug = re.sub(r'[^\w\s-]', '', slug) slug = re.sub(r'[-\s]+', '-', slug).strip('-') # 如果转换后为空,使用code if not slug: slug = f"site-{model.code}" # 确保slug唯一(限制长度和重试次数) base_slug = slug[:50] counter = 1 final_slug = slug max_slug_attempts = 100 while counter < max_slug_attempts: existing = Site.query.filter(Site.slug == final_slug).first() if not existing or existing.id == model.id: break final_slug = f"{base_slug}-{counter}" counter += 1 # 如果100次都失败,使用code确保唯一 if counter >= max_slug_attempts: final_slug = f"{base_slug}-{model.code}" model.slug = final_slug # 标签管理视图 class TagAdmin(SecureModelView): can_edit = True can_delete = True can_create = True can_view_details = False # 显示操作列 column_display_actions = True column_list = ['id', 'name', 'slug', 'description', 'sort_order'] column_searchable_list = ['name', 'description'] column_labels = { 'id': 'ID', 'name': '标签名称', 'slug': 'URL别名', 'description': '标签描述', 'seo_title': 'SEO标题 (v2.4)', 'seo_description': 'SEO描述 (v2.4)', 'seo_keywords': 'SEO关键词 (v2.4)', 'icon': '图标', 'sort_order': '排序权重', 'created_at': '创建时间' } form_columns = ['name', 'slug', 'description', 'seo_title', 'seo_description', 'seo_keywords', 'icon', 'sort_order'] # 管理员视图 class AdminAdmin(SecureModelView): can_edit = True can_delete = True can_create = True can_view_details = False # 显示操作列 column_display_actions = True column_list = ['id', 'username', 'email', 'is_active', 'last_login', 'created_at'] column_searchable_list = ['username', 'email'] column_filters = ['is_active'] column_labels = { 'id': 'ID', 'username': '用户名', 'email': '邮箱', 'is_active': '是否启用', 'created_at': '创建时间', 'last_login': '最后登录' } form_columns = ['username', 'email', 'is_active'] def on_model_change(self, form, model, is_created): # 如果是新建管理员,设置默认密码 if is_created: model.set_password('admin123') # 默认密码 # 新闻管理视图 class NewsAdmin(SecureModelView): can_edit = True can_delete = True can_create = True can_view_details = False # 显示操作列 column_display_actions = True column_list = ['id', 'site', 'title', 'source_name', 'news_type', 'published_at', 'is_active'] column_searchable_list = ['title', 'content', 'source_name'] column_filters = ['site', 'news_type', 'source_name', 'is_active', 'published_at'] column_labels = { 'id': 'ID', 'site': '关联网站', 'title': '新闻标题', 'content': '新闻内容', 'news_type': '新闻类型', 'url': '新闻链接', 'source_name': '来源网站', 'source_icon': '来源图标', 'published_at': '发布时间', 'is_active': '是否启用', 'created_at': '创建时间', 'updated_at': '更新时间' } form_columns = ['site', 'title', 'content', 'news_type', 'url', 'source_name', 'source_icon', 'published_at', 'is_active'] # 可选的新闻类型 form_choices = { 'news_type': [ ('Search Result', 'Search Result'), ('Product Update', 'Product Update'), ('Industry News', 'Industry News'), ('Company News', 'Company News'), ('Other', 'Other') ] } # 默认排序 column_default_sort = ('published_at', True) # 按发布时间倒序排列 def get_query(self): """优化查询:使用joinedload避免N+1问题""" return super().get_query().options( db.orm.joinedload(News.site) ) # Prompt模板管理视图 class PromptAdmin(SecureModelView): can_edit = True can_delete = False # 不允许删除,避免系统必需的prompt被删除 can_create = True can_view_details = False # 显示操作列 column_display_actions = True column_list = ['id', 'key', 'name', 'description', 'is_active', 'updated_at'] column_searchable_list = ['key', 'name', 'description'] column_filters = ['is_active', 'key'] column_labels = { 'id': 'ID', 'key': '唯一标识', 'name': '模板名称', 'system_prompt': '系统提示词', 'user_prompt_template': '用户提示词模板', 'description': '模板说明', 'is_active': '是否启用', 'created_at': '创建时间', 'updated_at': '更新时间' } form_columns = ['key', 'name', 'description', 'system_prompt', 'user_prompt_template', 'is_active'] # 字段说明 column_descriptions = { 'key': '唯一标识,如: tags, features, description', 'system_prompt': 'AI的系统角色设定', 'user_prompt_template': '用户提示词模板,支持变量如 {name}, {description}, {url}', } # 表单字段配置 form_widget_args = { 'system_prompt': { 'rows': 3, 'style': 'font-family: monospace;' }, 'user_prompt_template': { 'rows': 20, 'style': 'font-family: monospace;' } } # 初始化 Flask-Admin admin = Admin( app, name='ZJPB 焦提示词', template_mode='bootstrap4', index_view=SecureAdminIndexView(name='控制台', url='/admin'), base_template='admin/master.html' ) admin.add_view(SiteAdmin(Site, db.session, name='网站管理')) admin.add_view(TagAdmin(Tag, db.session, name='标签管理')) admin.add_view(NewsAdmin(News, db.session, name='新闻管理')) admin.add_view(PromptAdmin(PromptTemplate, db.session, name='Prompt管理')) admin.add_view(AdminAdmin(AdminModel, db.session, name='管理员', endpoint='admin_users')) return app if __name__ == '__main__': app = create_app(os.getenv('FLASK_ENV', 'development')) app.run(host='0.0.0.0', port=5000, debug=True)