import os import markdown import random import string from io import BytesIO from flask import Flask, render_template, redirect, url_for, request, flash, jsonify, session, send_file from flask_login import LoginManager, login_user, logout_user, login_required, current_user from flask_admin import Admin, AdminIndexView, expose from flask_admin.contrib.sqla import ModelView from datetime import datetime from config import config from models import db, Site, Tag, Admin as AdminModel, News, site_tags, PromptTemplate from utils.website_fetcher import WebsiteFetcher from utils.tag_generator import TagGenerator from utils.news_searcher import NewsSearcher from PIL import Image, ImageDraw, ImageFont def create_app(config_name='default'): """应用工厂函数""" app = Flask(__name__) # 加载配置 app.config.from_object(config[config_name]) # 初始化数据库 db.init_app(app) # 添加Markdown过滤器 @app.template_filter('markdown') def markdown_filter(text): """将Markdown文本转换为HTML""" if not text: return '' return markdown.markdown(text, extensions=['nl2br', 'fenced_code']) # v2.4新增: 自动内链过滤器 @app.template_filter('auto_link') def auto_link_filter(text, current_site_id=None): """自动为内容中的工具名称添加链接""" if not text: return '' # 获取所有启用的网站(排除当前网站) sites = Site.query.filter_by(is_active=True).all() if current_site_id: sites = [s for s in sites if s.id != current_site_id] # 按名称长度降序排序,优先匹配长名称 sites = sorted(sites, key=lambda s: len(s.name), reverse=True) # 记录已经添加链接的位置,避免重复 linked_sites = set() for site in sites: if site.name in text and site.name not in linked_sites: # 只链接第一次出现的位置 link = f'{site.name}' text = text.replace(site.name, link, 1) linked_sites.add(site.name) return text # 初始化登录管理 login_manager = LoginManager() login_manager.init_app(app) login_manager.login_view = 'admin_login' login_manager.login_message = '请先登录' @login_manager.user_loader def load_user(user_id): return AdminModel.query.get(int(user_id)) # ========== 前台路由 ========== @app.route('/') def index(): """首页""" # 获取所有启用的标签 tags = Tag.query.order_by(Tag.sort_order.desc(), Tag.id).all() # 优化:使用一次SQL查询统计所有标签的网站数量 tag_counts = {} if tags: # 使用JOIN查询一次性获取所有标签的网站数量 from sqlalchemy import func counts_query = db.session.query( site_tags.c.tag_id, func.count(site_tags.c.site_id).label('count') ).join( Site, site_tags.c.site_id == Site.id ).filter( Site.is_active == True ).group_by(site_tags.c.tag_id).all() tag_counts = {tag_id: count for tag_id, count in counts_query} # 获取筛选参数 tag_slug = request.args.get('tag') search_query = request.args.get('q', '').strip() current_tab = request.args.get('tab', 'latest') # 默认为"最新" page = request.args.get('page', 1, type=int) per_page = 100 # 每页显示100个站点 selected_tag = None # 构建基础查询 query = Site.query.filter_by(is_active=True) # 标签筛选 if tag_slug: selected_tag = Tag.query.filter_by(slug=tag_slug).first() if selected_tag: query = query.filter(Site.tags.contains(selected_tag)) else: sites = [] pagination = None return render_template('index_new.html', sites=sites, tags=tags, selected_tag=selected_tag, search_query=search_query, pagination=pagination, tag_counts=tag_counts, current_tab=current_tab) # 搜索功能 if search_query: # 使用OR条件搜索:网站名称、URL、描述 search_pattern = f'%{search_query}%' query = query.filter( db.or_( Site.name.like(search_pattern), Site.url.like(search_pattern), Site.description.like(search_pattern), Site.short_desc.like(search_pattern) ) ) # Tab筛选和排序 if current_tab == 'popular': # 热门:按浏览次数倒序 query = query.order_by(Site.view_count.desc(), Site.id.desc()) elif current_tab == 'recommended': # 推荐:只显示is_recommended=True的 query = query.filter_by(is_recommended=True).order_by(Site.sort_order.desc(), Site.id.desc()) else: # 最新:按创建时间倒序(默认) query = query.order_by(Site.created_at.desc(), Site.id.desc()) # 分页 pagination = query.paginate(page=page, per_page=per_page, error_out=False) sites = pagination.items return render_template('index_new.html', sites=sites, tags=tags, selected_tag=selected_tag, search_query=search_query, pagination=pagination, tag_counts=tag_counts, current_tab=current_tab) @app.route('/site/') def site_detail(code): """网站详情页""" site = Site.query.filter_by(code=code, is_active=True).first_or_404() # 增加浏览次数 site.view_count += 1 db.session.commit() # v2.6优化:移除自动调用博查API的逻辑,改为按需加载 # 只获取数据库中已有的新闻,不再自动调用API # 获取该网站的相关新闻(最多显示5条) news_list = News.query.filter_by( site_id=site.id, is_active=True ).order_by(News.published_at.desc()).limit(5).all() # 检查是否有新闻,如果没有则标记需要加载 has_news = len(news_list) > 0 # 获取同类工具推荐(通过标签匹配,最多显示4个) recommended_sites = [] if site.tags: # 获取有相同标签的其他网站 recommended_sites = Site.query.filter( Site.id != site.id, Site.is_active == True, Site.tags.any(Tag.id.in_([tag.id for tag in site.tags])) ).order_by(Site.view_count.desc()).limit(4).all() return render_template('detail_new.html', site=site, news_list=news_list, has_news=has_news, recommended_sites=recommended_sites) # ========== 验证码相关 (v2.6.1优化) ========== def generate_captcha_image(text): """生成验证码图片""" # 创建图片 (宽120, 高40) width, height = 120, 40 image = Image.new('RGB', (width, height), color=(255, 255, 255)) draw = ImageDraw.Draw(image) # 使用默认字体 try: font = ImageFont.truetype("arial.ttf", 28) except: font = ImageFont.load_default() # 添加随机干扰线 for i in range(3): x1 = random.randint(0, width) y1 = random.randint(0, height) x2 = random.randint(0, width) y2 = random.randint(0, height) draw.line([(x1, y1), (x2, y2)], fill=(200, 200, 200), width=1) # 绘制验证码文字 for i, char in enumerate(text): x = 10 + i * 25 y = random.randint(5, 12) color = (random.randint(0, 100), random.randint(0, 100), random.randint(0, 100)) draw.text((x, y), char, font=font, fill=color) # 添加随机噪点 for _ in range(100): x = random.randint(0, width - 1) y = random.randint(0, height - 1) draw.point((x, y), fill=(random.randint(150, 200), random.randint(150, 200), random.randint(150, 200))) return image @app.route('/api/captcha', methods=['GET']) def get_captcha(): """生成验证码图片""" # 生成4位随机验证码 captcha_text = ''.join(random.choices(string.ascii_uppercase + string.digits, k=4)) # 存储到session session['captcha'] = captcha_text session['captcha_created'] = datetime.now().timestamp() # 生成图片 image = generate_captcha_image(captcha_text) # 转换为字节流 img_io = BytesIO() image.save(img_io, 'PNG') img_io.seek(0) return send_file(img_io, mimetype='image/png') # ========== 新闻相关API (v2.6优化) ========== @app.route('/api/fetch-news/', methods=['POST']) def fetch_news_for_site(code): """ 按需获取指定网站的新闻 v2.6.1优化:所有手动请求都需要验证码,防止API滥用 """ try: # 获取请求数据 data = request.get_json() or {} captcha_input = data.get('captcha', '').strip().upper() # 验证验证码 session_captcha = session.get('captcha', '').upper() captcha_created = session.get('captcha_created', 0) # 检查验证码是否存在 if not session_captcha: return jsonify({ 'success': False, 'error': '请先获取验证码' }), 400 # 检查验证码是否过期(5分钟) if datetime.now().timestamp() - captcha_created > 300: session.pop('captcha', None) session.pop('captcha_created', None) return jsonify({ 'success': False, 'error': '验证码已过期,请刷新后重试' }), 400 # 检查验证码是否正确 if captcha_input != session_captcha: return jsonify({ 'success': False, 'error': '验证码错误,请重新输入' }), 400 # 验证通过,清除验证码 session.pop('captcha', None) session.pop('captcha_created', None) # 查找网站 site = Site.query.filter_by(code=code, is_active=True).first_or_404() # 检查博查API配置 api_key = app.config.get('BOCHA_API_KEY') if not api_key: return jsonify({'success': False, 'error': '博查API未配置'}), 500 # 创建新闻搜索器 searcher = NewsSearcher(api_key) # 获取新闻(限制5条,一周内的) news_items = searcher.search_site_news( site_name=site.name, site_url=site.url, news_keywords=site.news_keywords, count=5, freshness='oneWeek' ) # 保存新闻到数据库 new_count = 0 if news_items: for item in news_items: # 检查是否已存在(根据URL去重) existing = News.query.filter_by( site_id=site.id, url=item['url'] ).first() if not existing: news = News( site_id=site.id, title=item['title'], content=item.get('summary') or item.get('snippet', ''), url=item['url'], source_name=item.get('site_name', ''), source_icon=item.get('site_icon', ''), published_at=item.get('published_at'), news_type='Search Result', is_active=True ) db.session.add(news) new_count += 1 db.session.commit() # 返回最新的新闻列表 news_list = News.query.filter_by( site_id=site.id, is_active=True ).order_by(News.published_at.desc()).limit(5).all() # 格式化新闻数据 news_data = [] for news in news_list: news_data.append({ 'title': news.title, 'content': news.content[:200] if news.content else '', 'url': news.url, 'source_name': news.source_name, 'source_icon': news.source_icon, 'published_at': news.published_at.strftime('%Y年%m月%d日') if news.published_at else '未知日期', 'news_type': news.news_type }) return jsonify({ 'success': True, 'news': news_data, 'new_count': new_count, 'message': f'成功获取{new_count}条新资讯' if new_count > 0 else '暂无新资讯' }) except Exception as e: print(f"获取新闻失败:{str(e)}") import traceback traceback.print_exc() db.session.rollback() return jsonify({'success': False, 'error': str(e)}), 500 # ========== 社媒营销路由 (v2.5新增) ========== @app.route('/api/generate-social-share', methods=['POST']) def generate_social_share(): """一键生成社媒分享文案(前台可访问)""" try: data = request.get_json() or {} site_code = (data.get('site_code') or '').strip() platforms = data.get('platforms') or [] if not site_code: return jsonify({'success': False, 'message': '请提供网站编码'}), 400 site = Site.query.filter_by(code=site_code, is_active=True).first() if not site: return jsonify({'success': False, 'message': '网站不存在或未启用'}), 404 # 允许的平台列表(前端也会限制,这里再做一次兜底) allowed_platforms = { 'universal', 'xiaohongshu', 'douyin', 'bilibili', 'wechat', 'moments', 'x', 'linkedin' } if not isinstance(platforms, list): platforms = [] platforms = [p for p in platforms if isinstance(p, str)] platforms = [p.strip().lower() for p in platforms if p.strip()] platforms = [p for p in platforms if p in allowed_platforms] # 默认生成通用模板 if not platforms: platforms = ['universal'] # 组织站点信息 site_info = { 'name': site.name, 'url': site.url, 'short_desc': site.short_desc or '', 'description': (site.description or '')[:1200], 'features': (site.features or '')[:1200], 'tags': [t.name for t in (site.tags or [])] } # 尝试使用PromptTemplate + DeepSeek生成 from utils.tag_generator import TagGenerator generator = TagGenerator() prompt = PromptTemplate.query.filter_by(key='social_share', is_active=True).first() generated = {} if prompt and generator.client: # 构造提示词变量 tags_text = ','.join(site_info['tags']) user_prompt = prompt.user_prompt_template.format( name=site_info['name'], url=site_info['url'], short_desc=site_info['short_desc'], description=site_info['description'], features=site_info['features'], tags=tags_text, platforms=','.join(platforms) ) try: resp = generator.client.chat.completions.create( model='deepseek-chat', messages=[ {'role': 'system', 'content': prompt.system_prompt}, {'role': 'user', 'content': user_prompt} ], temperature=0.7, max_tokens=1200 ) text = (resp.choices[0].message.content or '').strip() # 约定:模型返回JSON字符串;若不是JSON则回退到纯文本放到universal import json as _json try: parsed = _json.loads(text) if isinstance(parsed, dict): for k, v in parsed.items(): if isinstance(k, str) and isinstance(v, str) and k in allowed_platforms: generated[k] = v.strip() except Exception: generated['universal'] = text except Exception as e: # AI失败不影响整体,回退模板 print(f"社媒文案AI生成失败:{str(e)}") # 回退:规则模板(根据各平台字数限制优化) def build_fallback(p): name = site_info['name'] url = site_info['url'] short_desc = site_info['short_desc'] or '' full_desc = site_info['description'] or '' features = site_info['features'] or '' tags = site_info['tags'][:6] tags_line = ' ' + ' '.join([f"#{t}" for t in tags]) if tags else '' # X (Twitter) - 280字符限制,简洁为主 if p == 'x': desc = (short_desc or full_desc)[:100] base = f"🔥 {name}\n\n{desc}\n\n🔗 {url}{tags_line}" return base[:280] # LinkedIn - 3000字限制,专业风格 if p == 'linkedin': desc = (full_desc or short_desc)[:500] content = f"💡 推荐一个实用工具:{name}\n\n📝 简介:\n{desc}\n\n" if features: features_list = features.split('\n')[:3] content += f"✨ 主要功能:\n" + '\n'.join([f"• {f.strip()}" for f in features_list if f.strip()][:3]) + "\n\n" content += f"🔗 官网:{url}\n\n" if tags: content += f"📌 适用场景:{', '.join(tags)}" return content[:3000] # 小红书 - 2000字限制,图文风格 if p == 'xiaohongshu': desc = (short_desc or full_desc)[:300] content = f"✨ {name} | 我最近在用的宝藏工具\n\n" content += f"📌 一句话介绍:\n{desc}\n\n" if features: features_list = features.split('\n')[:5] content += f"🎯 核心功能:\n" + '\n'.join([f"✓ {f.strip()}" for f in features_list if f.strip()][:5]) + "\n\n" content += f"🔗 体验入口:{url}\n\n{tags_line}" return content[:2000] # 抖音 - 2000字限制,短视频文案风格 if p == 'douyin': desc = (short_desc or full_desc)[:200] content = f"🔥 发现一个超好用的工具:{name}\n\n" content += f"💡 {desc}\n\n" if features: features_list = features.split('\n')[:3] content += "✨ 亮点功能:\n" + '\n'.join([f"👉 {f.strip()}" for f in features_list if f.strip()][:3]) + "\n\n" content += f"🔗 想试试戳:{url}\n\n{tags_line}" return content[:2000] # B站 - 20000字限制,专栏风格 if p == 'bilibili': desc = (full_desc or short_desc)[:800] content = f"【分享】{name} - 值得一试的优质工具\n\n" content += f"## 📖 工具简介\n{desc}\n\n" if features: content += f"## ✨ 主要功能\n{features}\n\n" content += f"## 🔗 体验地址\n{url}\n\n" if tags: content += f"## 🏷️ 相关标签\n{' / '.join(tags)}" return content[:20000] # 微信公众号 - 基本无限制,正式风格 if p == 'wechat': desc = (full_desc or short_desc)[:600] content = f"今天给大家分享一个实用工具:{name}\n\n" content += f"📝 工具介绍\n{desc}\n\n" if features: content += f"✨ 核心功能\n{features}\n\n" content += f"🔗 官方网站\n{url}\n\n" if tags: content += f"如果你也在关注「{' · '.join(tags)}」相关的工具,不妨收藏一下这个实用的选择。" return content # 朋友圈 - 简短为主 if p == 'moments': desc = (short_desc or full_desc)[:80] return f"💡 {name}\n{desc}\n🔗 {url}{tags_line}".strip() # 通用模板 desc = (full_desc or short_desc)[:300] return f"{name}\n\n{desc}\n\n{url}{tags_line}".strip() results = {} for p in platforms: results[p] = generated.get(p) or build_fallback(p) # 统一补充一个通用版本 if 'universal' not in results: results['universal'] = generated.get('universal') or build_fallback('universal') return jsonify({ 'success': True, 'site': { 'code': site.code, 'name': site.name, 'url': site.url }, 'platforms': platforms, 'content': results }) except Exception as e: return jsonify({'success': False, 'message': f'生成失败: {str(e)}'}), 500 # ========== 后台登录路由 ========== @app.route('/admin/login', methods=['GET', 'POST']) def admin_login(): """管理员登录""" if current_user.is_authenticated: return redirect(url_for('admin.index')) if request.method == 'POST': username = request.form.get('username') password = request.form.get('password') admin = AdminModel.query.filter_by(username=username).first() if admin and admin.check_password(password) and admin.is_active: login_user(admin) admin.last_login = datetime.now() db.session.commit() return redirect(url_for('admin.index')) else: flash('用户名或密码错误', 'error') return render_template('admin_login.html') @app.route('/admin/logout') @login_required def admin_logout(): """管理员登出""" logout_user() return redirect(url_for('index')) @app.route('/admin/change-password', methods=['GET', 'POST']) @login_required def change_password(): """修改密码""" if request.method == 'POST': old_password = request.form.get('old_password', '').strip() new_password = request.form.get('new_password', '').strip() confirm_password = request.form.get('confirm_password', '').strip() # 验证旧密码 if not current_user.check_password(old_password): flash('旧密码错误', 'error') return render_template('admin/change_password.html') # 验证新密码 if not new_password: flash('新密码不能为空', 'error') return render_template('admin/change_password.html') if len(new_password) < 6: flash('新密码长度至少6位', 'error') return render_template('admin/change_password.html') if new_password != confirm_password: flash('两次输入的新密码不一致', 'error') return render_template('admin/change_password.html') if old_password == new_password: flash('新密码不能与旧密码相同', 'error') return render_template('admin/change_password.html') # 修改密码 try: current_user.set_password(new_password) db.session.commit() flash('密码修改成功,请重新登录', 'success') logout_user() return redirect(url_for('admin_login')) except Exception as e: db.session.rollback() flash(f'密码修改失败:{str(e)}', 'error') return render_template('admin/change_password.html') return render_template('admin/change_password.html') # ========== API路由 ========== @app.route('/api/fetch-website-info', methods=['POST']) @login_required def fetch_website_info(): """抓取网站信息API""" try: data = request.get_json() url = data.get('url', '').strip() if not url: return jsonify({ 'success': False, 'message': '请提供网站URL' }), 400 # 创建抓取器 fetcher = WebsiteFetcher(timeout=15) # 抓取网站信息 info = fetcher.fetch_website_info(url) if not info: return jsonify({ 'success': False, 'message': '无法获取网站信息,请检查URL是否正确或手动填写' }) # 下载Logo到本地(如果有) logo_path = None if info.get('logo_url'): logo_path = fetcher.download_logo(info['logo_url'], save_dir='static/logos') # 如果下载失败,不返回远程URL,让用户手动上传 return jsonify({ 'success': True, 'data': { 'name': info.get('name', ''), 'description': info.get('description', ''), 'logo': logo_path if logo_path else '' } }) except Exception as e: return jsonify({ 'success': False, 'message': f'抓取失败: {str(e)}' }), 500 @app.route('/api/upload-logo', methods=['POST']) @login_required def upload_logo(): """上传Logo图片API""" try: # 检查文件是否存在 if 'logo' not in request.files: return jsonify({ 'success': False, 'message': '请选择要上传的图片' }), 400 file = request.files['logo'] # 检查文件名 if file.filename == '': return jsonify({ 'success': False, 'message': '未选择文件' }), 400 # 检查文件类型 allowed_extensions = {'png', 'jpg', 'jpeg', 'gif', 'svg', 'ico', 'webp'} filename = file.filename.lower() if not any(filename.endswith('.' + ext) for ext in allowed_extensions): return jsonify({ 'success': False, 'message': '不支持的文件格式,请上传图片文件(png, jpg, gif, svg, ico, webp)' }), 400 # 创建保存目录 save_dir = 'static/logos' os.makedirs(save_dir, exist_ok=True) # 生成安全的文件名 import time import hashlib ext = os.path.splitext(filename)[1] timestamp = str(int(time.time() * 1000)) hash_name = hashlib.md5(f"{filename}{timestamp}".encode()).hexdigest()[:16] safe_filename = f"logo_{hash_name}{ext}" filepath = os.path.join(save_dir, safe_filename) # 保存文件 file.save(filepath) # 返回相对路径 return jsonify({ 'success': True, 'path': f'/{filepath.replace(os.sep, "/")}' }) except Exception as e: return jsonify({ 'success': False, 'message': f'上传失败: {str(e)}' }), 500 @app.route('/api/generate-features', methods=['POST']) @login_required def generate_features(): """使用DeepSeek自动生成网站主要功能""" try: data = request.get_json() name = data.get('name', '').strip() description = data.get('description', '').strip() url = data.get('url', '').strip() if not name or not description: return jsonify({ 'success': False, 'message': '请提供网站名称和描述' }), 400 # 生成功能列表 generator = TagGenerator() features = generator.generate_features(name, description, url) if not features: return jsonify({ 'success': False, 'message': 'DeepSeek功能生成失败,请检查API配置' }), 500 return jsonify({ 'success': True, 'features': features }) except ValueError as e: return jsonify({ 'success': False, 'message': str(e) }), 400 except Exception as e: return jsonify({ 'success': False, 'message': f'生成失败: {str(e)}' }), 500 @app.route('/api/generate-description', methods=['POST']) @login_required def generate_description(): """使用DeepSeek自动生成网站详细介绍""" try: data = request.get_json() name = data.get('name', '').strip() short_desc = data.get('short_desc', '').strip() url = data.get('url', '').strip() if not name: return jsonify({ 'success': False, 'message': '请提供网站名称' }), 400 # 生成详细介绍 generator = TagGenerator() description = generator.generate_description(name, short_desc, url) if not description: return jsonify({ 'success': False, 'message': 'DeepSeek详细介绍生成失败,请检查API配置' }), 500 return jsonify({ 'success': True, 'description': description }) except ValueError as e: return jsonify({ 'success': False, 'message': str(e) }), 400 except Exception as e: return jsonify({ 'success': False, 'message': f'生成失败: {str(e)}' }), 500 @app.route('/api/generate-tags', methods=['POST']) @login_required def generate_tags(): """使用DeepSeek自动生成标签""" try: data = request.get_json() name = data.get('name', '').strip() description = data.get('description', '').strip() if not name or not description: return jsonify({ 'success': False, 'message': '请提供网站名称和描述' }), 400 # 获取现有标签作为参考 existing_tags = [tag.name for tag in Tag.query.all()] # 生成标签 generator = TagGenerator() suggested_tags = generator.generate_tags(name, description, existing_tags) if not suggested_tags: return jsonify({ 'success': False, 'message': 'DeepSeek标签生成失败,请检查API配置' }), 500 return jsonify({ 'success': True, 'tags': suggested_tags }) except ValueError as e: return jsonify({ 'success': False, 'message': str(e) }), 400 except Exception as e: return jsonify({ 'success': False, 'message': f'生成失败: {str(e)}' }), 500 # ========== 新闻获取路由 ========== @app.route('/api/fetch-site-news', methods=['POST']) @login_required def fetch_site_news(): """为指定网站获取最新新闻""" try: data = request.get_json() site_id = data.get('site_id') count = data.get('count', app.config.get('NEWS_SEARCH_COUNT', 10)) freshness = data.get('freshness', app.config.get('NEWS_SEARCH_FRESHNESS', 'oneMonth')) if not site_id: return jsonify({ 'success': False, 'message': '请提供网站ID' }), 400 # 获取网站信息 site = Site.query.get(site_id) if not site: return jsonify({ 'success': False, 'message': '网站不存在' }), 404 # 检查博查API配置 api_key = app.config.get('BOCHA_API_KEY') if not api_key: return jsonify({ 'success': False, 'message': '博查API未配置,请在.env文件中设置BOCHA_API_KEY' }), 500 # 创建新闻搜索器 searcher = NewsSearcher(api_key) # 搜索新闻 news_items = searcher.search_site_news( site_name=site.name, site_url=site.url, news_keywords=site.news_keywords, # v2.3新增:使用专用关键词 count=count, freshness=freshness ) if not news_items: return jsonify({ 'success': False, 'message': '未找到相关新闻' }), 404 # 保存新闻到数据库 saved_count = 0 for item in news_items: # 检查新闻是否已存在(根据URL判断) existing_news = News.query.filter_by( site_id=site_id, url=item['url'] ).first() if not existing_news: # 创建新闻记录 news = News( site_id=site_id, title=item['title'], content=item.get('summary') or item.get('snippet', ''), url=item['url'], source_name=item.get('site_name', ''), source_icon=item.get('site_icon', ''), published_at=item.get('published_at'), news_type='Search Result', is_active=True ) db.session.add(news) saved_count += 1 # 提交事务 db.session.commit() return jsonify({ 'success': True, 'message': f'成功获取并保存 {saved_count} 条新闻', 'total_found': len(news_items), 'saved': saved_count, 'news_items': searcher.format_news_for_display(news_items) }) except Exception as e: db.session.rollback() return jsonify({ 'success': False, 'message': f'获取失败: {str(e)}' }), 500 @app.route('/api/fetch-all-news', methods=['POST']) @login_required def fetch_all_news(): """批量为所有网站获取新闻""" try: data = request.get_json() count_per_site = data.get('count', 5) # 每个网站获取的新闻数量 freshness = data.get('freshness', app.config.get('NEWS_SEARCH_FRESHNESS', 'oneMonth')) limit = data.get('limit', 10) # 限制处理的网站数量 # 检查博查API配置 api_key = app.config.get('BOCHA_API_KEY') if not api_key: return jsonify({ 'success': False, 'message': '博查API未配置,请在.env文件中设置BOCHA_API_KEY' }), 500 # 获取启用的网站(按更新时间排序,优先处理旧的) sites = Site.query.filter_by(is_active=True).order_by(Site.updated_at).limit(limit).all() if not sites: return jsonify({ 'success': False, 'message': '没有可用的网站' }), 404 # 创建新闻搜索器 searcher = NewsSearcher(api_key) # 统计信息 total_saved = 0 total_found = 0 processed_sites = [] # 为每个网站获取新闻 for site in sites: try: # 搜索新闻 news_items = searcher.search_site_news( site_name=site.name, site_url=site.url, news_keywords=site.news_keywords, # v2.3新增:使用专用关键词 count=count_per_site, freshness=freshness ) site_saved = 0 for item in news_items: # 检查是否已存在 existing_news = News.query.filter_by( site_id=site.id, url=item['url'] ).first() if not existing_news: news = News( site_id=site.id, title=item['title'], content=item.get('summary') or item.get('snippet', ''), url=item['url'], source_name=item.get('site_name', ''), source_icon=item.get('site_icon', ''), published_at=item.get('published_at'), news_type='Search Result', is_active=True ) db.session.add(news) site_saved += 1 total_found += len(news_items) total_saved += site_saved processed_sites.append({ 'id': site.id, 'name': site.name, 'found': len(news_items), 'saved': site_saved }) except Exception as e: # 单个网站失败不影响其他网站 processed_sites.append({ 'id': site.id, 'name': site.name, 'error': str(e) }) continue # 提交事务 db.session.commit() return jsonify({ 'success': True, 'message': f'批量获取完成,共处理 {len(processed_sites)} 个网站', 'total_found': total_found, 'total_saved': total_saved, 'processed_sites': processed_sites }) except Exception as e: db.session.rollback() return jsonify({ 'success': False, 'message': f'批量获取失败: {str(e)}' }), 500 # ========== SEO路由 (v2.4新增) ========== @app.route('/sitemap.xml') def sitemap(): """动态生成sitemap.xml""" from flask import make_response # 获取所有启用的网站 sites = Site.query.filter_by(is_active=True).order_by(Site.updated_at.desc()).all() # 获取所有标签 tags = Tag.query.all() # 构建XML内容 xml_content = ''' ''' # 首页 xml_content += ''' {} daily 1.0 '''.format(request.url_root.rstrip('/')) # 工具详情页 for site in sites: xml_content += ''' {} {} weekly 0.8 '''.format( request.url_root.rstrip('/') + url_for('site_detail', code=site.code), site.updated_at.strftime('%Y-%m-%d') if site.updated_at else datetime.now().strftime('%Y-%m-%d') ) # 标签页 for tag in tags: xml_content += ''' {} weekly 0.6 '''.format(request.url_root.rstrip('/') + '/?tag=' + tag.slug) xml_content += ''' ''' response = make_response(xml_content) response.headers['Content-Type'] = 'application/xml; charset=utf-8' return response @app.route('/robots.txt') def robots(): """动态生成robots.txt""" from flask import make_response robots_content = '''User-agent: * Allow: / Disallow: /admin/ Disallow: /api/ Sitemap: {}sitemap.xml '''.format(request.url_root) response = make_response(robots_content) response.headers['Content-Type'] = 'text/plain; charset=utf-8' return response # ========== SEO工具管理路由 (v2.4新增) ========== @app.route('/admin/seo-tools') @login_required def seo_tools(): """SEO工具管理页面""" # 检查static/sitemap.xml是否存在及最后更新时间 sitemap_path = 'static/sitemap.xml' sitemap_info = None if os.path.exists(sitemap_path): import time mtime = os.path.getmtime(sitemap_path) sitemap_info = { 'exists': True, 'last_updated': datetime.fromtimestamp(mtime).strftime('%Y-%m-%d %H:%M:%S'), 'size': os.path.getsize(sitemap_path) } else: sitemap_info = {'exists': False} return render_template('admin/seo_tools.html', sitemap_info=sitemap_info) @app.route('/api/generate-static-sitemap', methods=['POST']) @login_required def generate_static_sitemap(): """生成静态sitemap.xml文件""" try: # 获取所有启用的网站 sites = Site.query.filter_by(is_active=True).order_by(Site.updated_at.desc()).all() # 获取所有标签 tags = Tag.query.all() # 构建XML内容(使用网站配置的域名) base_url = request.url_root.rstrip('/') xml_content = ''' ''' # 首页 xml_content += f''' {base_url} daily 1.0 ''' # 工具详情页 for site in sites: xml_content += f''' {base_url}/site/{site.code} {site.updated_at.strftime('%Y-%m-%d') if site.updated_at else datetime.now().strftime('%Y-%m-%d')} weekly 0.8 ''' # 标签页 for tag in tags: xml_content += f''' {base_url}/?tag={tag.slug} weekly 0.6 ''' xml_content += ''' ''' # 保存到static目录 static_dir = 'static' os.makedirs(static_dir, exist_ok=True) sitemap_path = os.path.join(static_dir, 'sitemap.xml') with open(sitemap_path, 'w', encoding='utf-8') as f: f.write(xml_content) # 统计信息 total_urls = 1 + len(sites) + len(tags) # 首页 + 工具页 + 标签页 return jsonify({ 'success': True, 'message': f'静态sitemap.xml生成成功!共包含 {total_urls} 个URL', 'total_urls': total_urls, 'file_path': sitemap_path, 'timestamp': datetime.now().strftime('%Y-%m-%d %H:%M:%S') }) except Exception as e: return jsonify({ 'success': False, 'message': f'生成失败: {str(e)}' }), 500 @app.route('/api/notify-search-engines', methods=['POST']) @login_required def notify_search_engines(): """通知搜索引擎sitemap更新""" try: import requests from urllib.parse import quote # 获取sitemap URL(使用当前请求的域名) sitemap_url = request.url_root.rstrip('/') + '/sitemap.xml' encoded_sitemap_url = quote(sitemap_url, safe='') results = [] # 1. 通知Google google_ping_url = f'http://www.google.com/ping?sitemap={encoded_sitemap_url}' try: google_response = requests.get(google_ping_url, timeout=10) results.append({ 'engine': 'Google', 'status': 'success' if google_response.status_code == 200 else 'failed', 'status_code': google_response.status_code, 'message': '提交成功' if google_response.status_code == 200 else f'HTTP {google_response.status_code}' }) except Exception as e: results.append({ 'engine': 'Google', 'status': 'error', 'message': f'请求失败: {str(e)}' }) # 2. 通知Baidu baidu_ping_url = f'http://data.zz.baidu.com/ping?sitemap={encoded_sitemap_url}' try: baidu_response = requests.get(baidu_ping_url, timeout=10) results.append({ 'engine': 'Baidu', 'status': 'success' if baidu_response.status_code == 200 else 'failed', 'status_code': baidu_response.status_code, 'message': '提交成功' if baidu_response.status_code == 200 else f'HTTP {baidu_response.status_code}' }) except Exception as e: results.append({ 'engine': 'Baidu', 'status': 'error', 'message': f'请求失败: {str(e)}' }) # 3. 通知Bing bing_ping_url = f'http://www.bing.com/ping?sitemap={encoded_sitemap_url}' try: bing_response = requests.get(bing_ping_url, timeout=10) results.append({ 'engine': 'Bing', 'status': 'success' if bing_response.status_code == 200 else 'failed', 'status_code': bing_response.status_code, 'message': '提交成功' if bing_response.status_code == 200 else f'HTTP {bing_response.status_code}' }) except Exception as e: results.append({ 'engine': 'Bing', 'status': 'error', 'message': f'请求失败: {str(e)}' }) # 统计成功数量 success_count = sum(1 for r in results if r['status'] == 'success') return jsonify({ 'success': True, 'message': f'已通知 {success_count}/{len(results)} 个搜索引擎', 'sitemap_url': sitemap_url, 'results': results }) except Exception as e: return jsonify({ 'success': False, 'message': f'通知失败: {str(e)}' }), 500 @app.route('/api/refresh-site-news/', methods=['POST']) def refresh_site_news(site_code): """手动刷新指定网站的新闻(前台用户可访问)- v2.3新增""" try: # 根据code查找网站 site = Site.query.filter_by(code=site_code).first() if not site: return jsonify({ 'success': False, 'message': '网站不存在' }), 404 # 检查博查API配置 api_key = app.config.get('BOCHA_API_KEY') if not api_key: return jsonify({ 'success': False, 'message': '新闻功能未启用' }), 500 # 创建新闻搜索器 searcher = NewsSearcher(api_key) # 搜索新闻(获取最新5条) news_items = searcher.search_site_news( site_name=site.name, site_url=site.url, news_keywords=site.news_keywords, # 使用专用关键词 count=5, freshness='oneWeek' # 一周内的新闻 ) if not news_items: return jsonify({ 'success': False, 'message': '未找到相关新闻' }), 404 # 保存新闻到数据库 saved_count = 0 for item in news_items: # 检查新闻是否已存在(根据URL判断) existing_news = News.query.filter_by( site_id=site.id, url=item['url'] ).first() if not existing_news: news = News( site_id=site.id, title=item['title'], content=item.get('summary') or item.get('snippet', ''), url=item['url'], source_name=item.get('site_name', ''), source_icon=item.get('site_icon', ''), published_at=item.get('published_at'), news_type='Search Result', is_active=True ) db.session.add(news) saved_count += 1 # 提交事务 db.session.commit() return jsonify({ 'success': True, 'message': f'成功获取 {saved_count} 条新资讯', 'total_found': len(news_items), 'saved_count': saved_count }) except Exception as e: db.session.rollback() return jsonify({ 'success': False, 'message': f'获取失败: {str(e)}' }), 500 # ========== 批量导入路由 ========== @app.route('/admin/batch-import', methods=['GET', 'POST']) @login_required def batch_import(): """批量导入网站""" from utils.bookmark_parser import BookmarkParser from utils.website_fetcher import WebsiteFetcher results = None if request.method == 'POST': import_type = request.form.get('import_type') auto_activate = request.form.get('auto_activate') == 'on' parser = BookmarkParser() fetcher = WebsiteFetcher(timeout=15) urls_to_import = [] try: # 解析输入 if import_type == 'url_list': url_list_text = request.form.get('url_list', '') urls_to_import = parser.parse_url_list(url_list_text) elif import_type == 'bookmark_file': bookmark_file = request.files.get('bookmark_file') if not bookmark_file: flash('请选择书签文件', 'error') return render_template('admin/batch_import.html') html_content = bookmark_file.read().decode('utf-8', errors='ignore') all_bookmarks = parser.parse_html_file(html_content) # 筛选文件夹(如果指定) folder_filter = request.form.get('folder_filter', '').strip() if folder_filter: urls_to_import = [ b for b in all_bookmarks if folder_filter.lower() in b.get('folder', '').lower() ] else: urls_to_import = all_bookmarks # 批量导入 success_list = [] failed_list = [] for idx, item in enumerate(urls_to_import, 1): url = item['url'] name = item.get('name', '') # 为每个URL创建独立的事务 try: # 1. 检查URL是否已存在 try: existing = Site.query.filter_by(url=url).first() if existing: failed_list.append({ 'url': url, 'name': name or existing.name, 'error': f'该URL已存在(网站名称:{existing.name})' }) continue except Exception as e: failed_list.append({ 'url': url, 'name': name, 'error': f'检查URL时出错: {str(e)}' }) continue # 2. 抓取网站信息(带超时和错误处理) info = None try: info = fetcher.fetch_website_info(url) except Exception as e: print(f"抓取 {url} 失败: {str(e)}") # 抓取失败不是致命错误,继续尝试使用书签名称 # 3. 处理网站信息 if not info or not info.get('name'): # 如果有书签名称,使用书签名称 if name: info = { 'name': name, 'description': '', 'logo_url': '' } else: # 尝试从URL提取域名作为名称 from urllib.parse import urlparse try: parsed = urlparse(url) domain = parsed.netloc or parsed.path if domain: info = { 'name': domain, 'description': '', 'logo_url': '' } else: failed_list.append({ 'url': url, 'name': name, 'error': '无法获取网站信息且没有备用名称' }) continue except Exception: failed_list.append({ 'url': url, 'name': name, 'error': '无法获取网站信息且URL解析失败' }) continue # 4. 下载Logo到本地(失败不影响导入) logo_path = None if info.get('logo_url'): try: logo_path = fetcher.download_logo(info['logo_url'], save_dir='static/logos') except Exception as e: print(f"下载Logo失败 ({url}): {str(e)}") # Logo下载失败不影响网站导入 # 5. 生成code和slug try: import random from pypinyin import lazy_pinyin import re # 生成唯一的code site_code = None max_attempts = 10 for _ in range(max_attempts): code = str(random.randint(10000000, 99999999)) if not Site.query.filter_by(code=code).first(): site_code = code break if not site_code: # 如果10次都失败,使用时间戳 import time site_code = str(int(time.time() * 1000))[-8:] # 生成slug site_name = info.get('name', name or 'Unknown')[:100] slug = ''.join(lazy_pinyin(site_name)) slug = slug.lower() slug = re.sub(r'[^\w\s-]', '', slug) slug = re.sub(r'[-\s]+', '-', slug).strip('-') if not slug: slug = f"site-{site_code}" # 确保slug唯一 base_slug = slug[:50] # 限制长度 counter = 1 final_slug = slug while Site.query.filter_by(slug=final_slug).first(): final_slug = f"{base_slug}-{counter}" counter += 1 if counter > 100: # 防止无限循环 final_slug = f"{base_slug}-{site_code}" break # 6. 创建网站记录(带code和slug) site = Site( code=site_code, slug=final_slug, name=site_name, url=url[:500], # 限制URL长度 logo=logo_path or info.get('logo_url', '')[:500] if info.get('logo_url') else '', short_desc=info.get('description', '')[:200] if info.get('description') else '', description=info.get('description', '')[:2000] if info.get('description') else '', is_active=auto_activate ) # 添加到数据库并提交 db.session.add(site) db.session.commit() success_list.append({ 'name': site.name, 'url': site.url }) print(f"成功导入 [{idx}/{len(urls_to_import)}]: {site.name}") except Exception as e: db.session.rollback() failed_list.append({ 'url': url, 'name': name or info.get('name', 'Unknown'), 'error': f'数据库保存失败: {str(e)}' }) continue except Exception as e: # 捕获所有未预期的错误 db.session.rollback() failed_list.append({ 'url': url, 'name': name, 'error': f'未知错误: {str(e)}' }) print(f"导入 {url} 时发生未知错误: {str(e)}") continue results = { 'total_count': len(urls_to_import), 'success_count': len(success_list), 'failed_count': len(failed_list), 'success_list': success_list, 'failed_list': failed_list } if success_list: flash(f'成功导入 {len(success_list)} 个网站!', 'success') except Exception as e: flash(f'导入失败: {str(e)}', 'error') return render_template('admin/batch_import.html', results=results) # ========== Flask-Admin 配置 ========== class SecureModelView(ModelView): """需要登录的模型视图""" # 中文化配置 can_set_page_size = True page_size = 20 # 自定义文本 list_template = 'admin/model/list.html' create_template = 'admin/model/create.html' edit_template = 'admin/model/edit.html' # 覆盖英文文本 named_filter_urls = True def is_accessible(self): return current_user.is_authenticated def inaccessible_callback(self, name, **kwargs): return redirect(url_for('admin_login')) class SecureAdminIndexView(AdminIndexView): """需要登录的管理首页""" def is_accessible(self): return current_user.is_authenticated def inaccessible_callback(self, name, **kwargs): return redirect(url_for('admin_login')) @expose('/') def index(self): """控制台首页,显示统计信息""" # 统计数据 stats = { 'sites_count': Site.query.filter_by(is_active=True).count(), 'tags_count': Tag.query.count(), 'news_count': News.query.filter_by(is_active=True).count(), 'total_views': db.session.query(db.func.sum(Site.view_count)).scalar() or 0 } # 最近添加的工具(最多5个) recent_sites = Site.query.order_by(Site.created_at.desc()).limit(5).all() return self.render('admin/index.html', stats=stats, recent_sites=recent_sites) # 网站管理视图 class SiteAdmin(SecureModelView): # 自定义模板 create_template = 'admin/site/create.html' edit_template = 'admin/site/edit.html' # 启用编辑和删除 can_edit = True can_delete = True can_create = True can_view_details = False # 禁用查看详情,点击名称即可查看 # 显示操作列 column_display_actions = True action_disallowed_list = [] column_list = ['id', 'code', 'name', 'url', 'slug', 'is_active', 'is_recommended', 'view_count', 'created_at'] column_searchable_list = ['code', 'name', 'url', 'description'] column_filters = ['is_active', 'is_recommended', 'tags'] column_labels = { 'id': 'ID', 'code': '网站编码', 'name': '网站名称', 'url': 'URL', 'slug': 'URL别名', 'logo': 'Logo', 'short_desc': '简短描述', 'description': '详细介绍', 'features': '主要功能', 'news_keywords': '新闻关键词', 'is_active': '是否启用', 'is_recommended': '是否推荐', 'view_count': '浏览次数', 'sort_order': '排序权重', 'tags': '标签', 'created_at': '创建时间', 'updated_at': '更新时间' } form_columns = ['name', 'url', 'slug', 'logo', 'short_desc', 'description', 'features', 'news_keywords', 'tags', 'is_active', 'is_recommended', 'sort_order'] # 自定义编辑/删除文字 column_extra_row_actions = None def on_model_change(self, form, model, is_created): """保存前自动生成code和slug(如果为空)""" import re import random from pypinyin import lazy_pinyin from flask import request # 使用no_autoflush防止在查询时触发提前flush with db.session.no_autoflush: # 处理手动输入的新标签 new_tags_str = request.form.get('new_tags', '') if new_tags_str: new_tag_names = [name.strip() for name in new_tags_str.split(',') if name.strip()] for tag_name in new_tag_names: # 检查标签是否已存在 existing_tag = Tag.query.filter_by(name=tag_name).first() if not existing_tag: # 创建新标签 tag_slug = ''.join(lazy_pinyin(tag_name)) tag_slug = tag_slug.lower() tag_slug = re.sub(r'[^\w\s-]', '', tag_slug) tag_slug = re.sub(r'[-\s]+', '-', tag_slug).strip('-') # 确保slug唯一 base_tag_slug = tag_slug[:50] counter = 1 final_tag_slug = tag_slug while Tag.query.filter_by(slug=final_tag_slug).first(): final_tag_slug = f"{base_tag_slug}-{counter}" counter += 1 if counter > 100: final_tag_slug = f"{base_tag_slug}-{random.randint(1000, 9999)}" break new_tag = Tag(name=tag_name, slug=final_tag_slug) db.session.add(new_tag) db.session.flush() # 确保新标签有ID # 添加到模型的标签列表 if new_tag not in model.tags: model.tags.append(new_tag) else: # 添加已存在的标签 if existing_tag not in model.tags: model.tags.append(existing_tag) # 如果code为空,自动生成唯一的8位数字编码 if not model.code or model.code.strip() == '': max_attempts = 10 for attempt in range(max_attempts): # 生成10000000-99999999之间的随机数 code = str(random.randint(10000000, 99999999)) # 检查是否已存在 existing = Site.query.filter(Site.code == code).first() if not existing or existing.id == model.id: model.code = code break # 如果10次都失败,使用时间戳 if not model.code: import time model.code = str(int(time.time() * 1000))[-8:] # 如果slug为空,从name自动生成 if not model.slug or model.slug.strip() == '': # 将中文转换为拼音 slug = ''.join(lazy_pinyin(model.name)) # 转换为小写,移除特殊字符 slug = slug.lower() slug = re.sub(r'[^\w\s-]', '', slug) slug = re.sub(r'[-\s]+', '-', slug).strip('-') # 如果转换后为空,使用code if not slug: slug = f"site-{model.code}" # 确保slug唯一(限制长度和重试次数) base_slug = slug[:50] counter = 1 final_slug = slug max_slug_attempts = 100 while counter < max_slug_attempts: existing = Site.query.filter(Site.slug == final_slug).first() if not existing or existing.id == model.id: break final_slug = f"{base_slug}-{counter}" counter += 1 # 如果100次都失败,使用code确保唯一 if counter >= max_slug_attempts: final_slug = f"{base_slug}-{model.code}" model.slug = final_slug # 标签管理视图 class TagAdmin(SecureModelView): can_edit = True can_delete = True can_create = True can_view_details = False # 显示操作列 column_display_actions = True column_list = ['id', 'name', 'slug', 'description', 'sort_order'] column_searchable_list = ['name', 'description'] column_labels = { 'id': 'ID', 'name': '标签名称', 'slug': 'URL别名', 'description': '标签描述', 'seo_title': 'SEO标题 (v2.4)', 'seo_description': 'SEO描述 (v2.4)', 'seo_keywords': 'SEO关键词 (v2.4)', 'icon': '图标', 'sort_order': '排序权重', 'created_at': '创建时间' } form_columns = ['name', 'slug', 'description', 'seo_title', 'seo_description', 'seo_keywords', 'icon', 'sort_order'] # 管理员视图 class AdminAdmin(SecureModelView): can_edit = True can_delete = True can_create = True can_view_details = False # 显示操作列 column_display_actions = True column_list = ['id', 'username', 'email', 'is_active', 'last_login', 'created_at'] column_searchable_list = ['username', 'email'] column_filters = ['is_active'] column_labels = { 'id': 'ID', 'username': '用户名', 'email': '邮箱', 'is_active': '是否启用', 'created_at': '创建时间', 'last_login': '最后登录' } form_columns = ['username', 'email', 'is_active'] def on_model_change(self, form, model, is_created): # 如果是新建管理员,设置默认密码 if is_created: model.set_password('admin123') # 默认密码 # 新闻管理视图 class NewsAdmin(SecureModelView): can_edit = True can_delete = True can_create = True can_view_details = False # 显示操作列 column_display_actions = True column_list = ['id', 'site', 'title', 'source_name', 'news_type', 'published_at', 'is_active'] column_searchable_list = ['title', 'content', 'source_name'] column_filters = ['site', 'news_type', 'source_name', 'is_active', 'published_at'] column_labels = { 'id': 'ID', 'site': '关联网站', 'title': '新闻标题', 'content': '新闻内容', 'news_type': '新闻类型', 'url': '新闻链接', 'source_name': '来源网站', 'source_icon': '来源图标', 'published_at': '发布时间', 'is_active': '是否启用', 'created_at': '创建时间', 'updated_at': '更新时间' } form_columns = ['site', 'title', 'content', 'news_type', 'url', 'source_name', 'source_icon', 'published_at', 'is_active'] # 可选的新闻类型 form_choices = { 'news_type': [ ('Search Result', 'Search Result'), ('Product Update', 'Product Update'), ('Industry News', 'Industry News'), ('Company News', 'Company News'), ('Other', 'Other') ] } # 默认排序 column_default_sort = ('published_at', True) # 按发布时间倒序排列 # Prompt模板管理视图 class PromptAdmin(SecureModelView): can_edit = True can_delete = False # 不允许删除,避免系统必需的prompt被删除 can_create = True can_view_details = False # 显示操作列 column_display_actions = True column_list = ['id', 'key', 'name', 'description', 'is_active', 'updated_at'] column_searchable_list = ['key', 'name', 'description'] column_filters = ['is_active', 'key'] column_labels = { 'id': 'ID', 'key': '唯一标识', 'name': '模板名称', 'system_prompt': '系统提示词', 'user_prompt_template': '用户提示词模板', 'description': '模板说明', 'is_active': '是否启用', 'created_at': '创建时间', 'updated_at': '更新时间' } form_columns = ['key', 'name', 'description', 'system_prompt', 'user_prompt_template', 'is_active'] # 字段说明 column_descriptions = { 'key': '唯一标识,如: tags, features, description', 'system_prompt': 'AI的系统角色设定', 'user_prompt_template': '用户提示词模板,支持变量如 {name}, {description}, {url}', } # 表单字段配置 form_widget_args = { 'system_prompt': { 'rows': 3, 'style': 'font-family: monospace;' }, 'user_prompt_template': { 'rows': 20, 'style': 'font-family: monospace;' } } # 初始化 Flask-Admin admin = Admin( app, name='ZJPB 焦提示词', template_mode='bootstrap4', index_view=SecureAdminIndexView(name='控制台', url='/admin'), base_template='admin/master.html' ) admin.add_view(SiteAdmin(Site, db.session, name='网站管理')) admin.add_view(TagAdmin(Tag, db.session, name='标签管理')) admin.add_view(NewsAdmin(News, db.session, name='新闻管理')) admin.add_view(PromptAdmin(PromptTemplate, db.session, name='Prompt管理')) admin.add_view(AdminAdmin(AdminModel, db.session, name='管理员', endpoint='admin_users')) return app if __name__ == '__main__': app = create_app(os.getenv('FLASK_ENV', 'development')) app.run(host='0.0.0.0', port=5000, debug=True)