import os import markdown from flask import Flask, render_template, redirect, url_for, request, flash, jsonify from flask_login import LoginManager, login_user, logout_user, login_required, current_user from flask_admin import Admin, AdminIndexView, expose from flask_admin.contrib.sqla import ModelView from datetime import datetime from config import config from models import db, Site, Tag, Admin as AdminModel, News, site_tags, PromptTemplate from utils.website_fetcher import WebsiteFetcher from utils.tag_generator import TagGenerator from utils.news_searcher import NewsSearcher def create_app(config_name='default'): """应用工厂函数""" app = Flask(__name__) # 加载配置 app.config.from_object(config[config_name]) # 初始化数据库 db.init_app(app) # 添加Markdown过滤器 @app.template_filter('markdown') def markdown_filter(text): """将Markdown文本转换为HTML""" if not text: return '' return markdown.markdown(text, extensions=['nl2br', 'fenced_code']) # 初始化登录管理 login_manager = LoginManager() login_manager.init_app(app) login_manager.login_view = 'admin_login' login_manager.login_message = '请先登录' @login_manager.user_loader def load_user(user_id): return AdminModel.query.get(int(user_id)) # ========== 前台路由 ========== @app.route('/') def index(): """首页""" # 获取所有启用的标签 tags = Tag.query.order_by(Tag.sort_order.desc(), Tag.id).all() # 优化:使用一次SQL查询统计所有标签的网站数量 tag_counts = {} if tags: # 使用JOIN查询一次性获取所有标签的网站数量 from sqlalchemy import func counts_query = db.session.query( site_tags.c.tag_id, func.count(site_tags.c.site_id).label('count') ).join( Site, site_tags.c.site_id == Site.id ).filter( Site.is_active == True ).group_by(site_tags.c.tag_id).all() tag_counts = {tag_id: count for tag_id, count in counts_query} # 获取筛选参数 tag_slug = request.args.get('tag') search_query = request.args.get('q', '').strip() page = request.args.get('page', 1, type=int) per_page = 100 # 每页显示100个站点 selected_tag = None # 构建基础查询 query = Site.query.filter_by(is_active=True) # 标签筛选 if tag_slug: selected_tag = Tag.query.filter_by(slug=tag_slug).first() if selected_tag: query = query.filter(Site.tags.contains(selected_tag)) else: sites = [] pagination = None return render_template('index_new.html', sites=sites, tags=tags, selected_tag=selected_tag, search_query=search_query, pagination=pagination, tag_counts=tag_counts) # 搜索功能 if search_query: # 使用OR条件搜索:网站名称、URL、描述 search_pattern = f'%{search_query}%' query = query.filter( db.or_( Site.name.like(search_pattern), Site.url.like(search_pattern), Site.description.like(search_pattern), Site.short_desc.like(search_pattern) ) ) # 排序并分页 query = query.order_by(Site.sort_order.desc(), Site.id.desc()) pagination = query.paginate(page=page, per_page=per_page, error_out=False) sites = pagination.items return render_template('index_new.html', sites=sites, tags=tags, selected_tag=selected_tag, search_query=search_query, pagination=pagination, tag_counts=tag_counts) @app.route('/site/') def site_detail(code): """网站详情页""" site = Site.query.filter_by(code=code, is_active=True).first_or_404() # 增加浏览次数 site.view_count += 1 db.session.commit() # 智能新闻更新:检查今天是否已更新过新闻 from datetime import date today = date.today() # 检查该网站最新一条新闻的创建时间 latest_news = News.query.filter_by( site_id=site.id ).order_by(News.created_at.desc()).first() # 判断是否需要更新新闻 need_update = False if not latest_news: # 没有任何新闻,需要获取 need_update = True elif latest_news.created_at.date() < today: # 最新新闻不是今天创建的,需要更新 need_update = True # 如果需要更新,自动获取最新新闻 if need_update: api_key = app.config.get('BOCHA_API_KEY') if api_key: try: # 创建新闻搜索器 searcher = NewsSearcher(api_key) # 获取新闻(限制3条,一周内的) news_items = searcher.search_site_news( site_name=site.name, site_url=site.url, news_keywords=site.news_keywords, # v2.3新增:使用专用关键词 count=3, freshness='oneWeek' ) # 保存新闻到数据库 if news_items: for item in news_items: # 检查是否已存在(根据URL去重) existing = News.query.filter_by( site_id=site.id, url=item['url'] ).first() if not existing: news = News( site_id=site.id, title=item['title'], content=item.get('summary') or item.get('snippet', ''), url=item['url'], source_name=item.get('site_name', ''), source_icon=item.get('site_icon', ''), published_at=item.get('published_at'), news_type='Search Result', is_active=True ) db.session.add(news) db.session.commit() except Exception as e: # 获取新闻失败,不影响页面显示 print(f"自动获取新闻失败:{str(e)}") db.session.rollback() # 获取该网站的相关新闻(最多显示5条) news_list = News.query.filter_by( site_id=site.id, is_active=True ).order_by(News.published_at.desc()).limit(5).all() # 获取同类工具推荐(通过标签匹配,最多显示4个) recommended_sites = [] if site.tags: # 获取有相同标签的其他网站 recommended_sites = Site.query.filter( Site.id != site.id, Site.is_active == True, Site.tags.any(Tag.id.in_([tag.id for tag in site.tags])) ).order_by(Site.view_count.desc()).limit(4).all() return render_template('detail_new.html', site=site, news_list=news_list, recommended_sites=recommended_sites) # ========== 后台登录路由 ========== @app.route('/admin/login', methods=['GET', 'POST']) def admin_login(): """管理员登录""" if current_user.is_authenticated: return redirect(url_for('admin.index')) if request.method == 'POST': username = request.form.get('username') password = request.form.get('password') admin = AdminModel.query.filter_by(username=username).first() if admin and admin.check_password(password) and admin.is_active: login_user(admin) admin.last_login = datetime.now() db.session.commit() return redirect(url_for('admin.index')) else: flash('用户名或密码错误', 'error') return render_template('admin_login.html') @app.route('/admin/logout') @login_required def admin_logout(): """管理员登出""" logout_user() return redirect(url_for('index')) @app.route('/admin/change-password', methods=['GET', 'POST']) @login_required def change_password(): """修改密码""" if request.method == 'POST': old_password = request.form.get('old_password', '').strip() new_password = request.form.get('new_password', '').strip() confirm_password = request.form.get('confirm_password', '').strip() # 验证旧密码 if not current_user.check_password(old_password): flash('旧密码错误', 'error') return render_template('admin/change_password.html') # 验证新密码 if not new_password: flash('新密码不能为空', 'error') return render_template('admin/change_password.html') if len(new_password) < 6: flash('新密码长度至少6位', 'error') return render_template('admin/change_password.html') if new_password != confirm_password: flash('两次输入的新密码不一致', 'error') return render_template('admin/change_password.html') if old_password == new_password: flash('新密码不能与旧密码相同', 'error') return render_template('admin/change_password.html') # 修改密码 try: current_user.set_password(new_password) db.session.commit() flash('密码修改成功,请重新登录', 'success') logout_user() return redirect(url_for('admin_login')) except Exception as e: db.session.rollback() flash(f'密码修改失败:{str(e)}', 'error') return render_template('admin/change_password.html') return render_template('admin/change_password.html') # ========== API路由 ========== @app.route('/api/fetch-website-info', methods=['POST']) @login_required def fetch_website_info(): """抓取网站信息API""" try: data = request.get_json() url = data.get('url', '').strip() if not url: return jsonify({ 'success': False, 'message': '请提供网站URL' }), 400 # 创建抓取器 fetcher = WebsiteFetcher(timeout=15) # 抓取网站信息 info = fetcher.fetch_website_info(url) if not info: return jsonify({ 'success': False, 'message': '无法获取网站信息,请检查URL是否正确或手动填写' }) # 下载Logo到本地(如果有) logo_path = None if info.get('logo_url'): logo_path = fetcher.download_logo(info['logo_url'], save_dir='static/logos') # 如果下载失败,不返回远程URL,让用户手动上传 return jsonify({ 'success': True, 'data': { 'name': info.get('name', ''), 'description': info.get('description', ''), 'logo': logo_path if logo_path else '' } }) except Exception as e: return jsonify({ 'success': False, 'message': f'抓取失败: {str(e)}' }), 500 @app.route('/api/upload-logo', methods=['POST']) @login_required def upload_logo(): """上传Logo图片API""" try: # 检查文件是否存在 if 'logo' not in request.files: return jsonify({ 'success': False, 'message': '请选择要上传的图片' }), 400 file = request.files['logo'] # 检查文件名 if file.filename == '': return jsonify({ 'success': False, 'message': '未选择文件' }), 400 # 检查文件类型 allowed_extensions = {'png', 'jpg', 'jpeg', 'gif', 'svg', 'ico', 'webp'} filename = file.filename.lower() if not any(filename.endswith('.' + ext) for ext in allowed_extensions): return jsonify({ 'success': False, 'message': '不支持的文件格式,请上传图片文件(png, jpg, gif, svg, ico, webp)' }), 400 # 创建保存目录 save_dir = 'static/logos' os.makedirs(save_dir, exist_ok=True) # 生成安全的文件名 import time import hashlib ext = os.path.splitext(filename)[1] timestamp = str(int(time.time() * 1000)) hash_name = hashlib.md5(f"{filename}{timestamp}".encode()).hexdigest()[:16] safe_filename = f"logo_{hash_name}{ext}" filepath = os.path.join(save_dir, safe_filename) # 保存文件 file.save(filepath) # 返回相对路径 return jsonify({ 'success': True, 'path': f'/{filepath.replace(os.sep, "/")}' }) except Exception as e: return jsonify({ 'success': False, 'message': f'上传失败: {str(e)}' }), 500 @app.route('/api/generate-features', methods=['POST']) @login_required def generate_features(): """使用DeepSeek自动生成网站主要功能""" try: data = request.get_json() name = data.get('name', '').strip() description = data.get('description', '').strip() url = data.get('url', '').strip() if not name or not description: return jsonify({ 'success': False, 'message': '请提供网站名称和描述' }), 400 # 生成功能列表 generator = TagGenerator() features = generator.generate_features(name, description, url) if not features: return jsonify({ 'success': False, 'message': 'DeepSeek功能生成失败,请检查API配置' }), 500 return jsonify({ 'success': True, 'features': features }) except ValueError as e: return jsonify({ 'success': False, 'message': str(e) }), 400 except Exception as e: return jsonify({ 'success': False, 'message': f'生成失败: {str(e)}' }), 500 @app.route('/api/generate-description', methods=['POST']) @login_required def generate_description(): """使用DeepSeek自动生成网站详细介绍""" try: data = request.get_json() name = data.get('name', '').strip() short_desc = data.get('short_desc', '').strip() url = data.get('url', '').strip() if not name: return jsonify({ 'success': False, 'message': '请提供网站名称' }), 400 # 生成详细介绍 generator = TagGenerator() description = generator.generate_description(name, short_desc, url) if not description: return jsonify({ 'success': False, 'message': 'DeepSeek详细介绍生成失败,请检查API配置' }), 500 return jsonify({ 'success': True, 'description': description }) except ValueError as e: return jsonify({ 'success': False, 'message': str(e) }), 400 except Exception as e: return jsonify({ 'success': False, 'message': f'生成失败: {str(e)}' }), 500 @app.route('/api/generate-tags', methods=['POST']) @login_required def generate_tags(): """使用DeepSeek自动生成标签""" try: data = request.get_json() name = data.get('name', '').strip() description = data.get('description', '').strip() if not name or not description: return jsonify({ 'success': False, 'message': '请提供网站名称和描述' }), 400 # 获取现有标签作为参考 existing_tags = [tag.name for tag in Tag.query.all()] # 生成标签 generator = TagGenerator() suggested_tags = generator.generate_tags(name, description, existing_tags) if not suggested_tags: return jsonify({ 'success': False, 'message': 'DeepSeek标签生成失败,请检查API配置' }), 500 return jsonify({ 'success': True, 'tags': suggested_tags }) except ValueError as e: return jsonify({ 'success': False, 'message': str(e) }), 400 except Exception as e: return jsonify({ 'success': False, 'message': f'生成失败: {str(e)}' }), 500 # ========== 新闻获取路由 ========== @app.route('/api/fetch-site-news', methods=['POST']) @login_required def fetch_site_news(): """为指定网站获取最新新闻""" try: data = request.get_json() site_id = data.get('site_id') count = data.get('count', app.config.get('NEWS_SEARCH_COUNT', 10)) freshness = data.get('freshness', app.config.get('NEWS_SEARCH_FRESHNESS', 'oneMonth')) if not site_id: return jsonify({ 'success': False, 'message': '请提供网站ID' }), 400 # 获取网站信息 site = Site.query.get(site_id) if not site: return jsonify({ 'success': False, 'message': '网站不存在' }), 404 # 检查博查API配置 api_key = app.config.get('BOCHA_API_KEY') if not api_key: return jsonify({ 'success': False, 'message': '博查API未配置,请在.env文件中设置BOCHA_API_KEY' }), 500 # 创建新闻搜索器 searcher = NewsSearcher(api_key) # 搜索新闻 news_items = searcher.search_site_news( site_name=site.name, site_url=site.url, news_keywords=site.news_keywords, # v2.3新增:使用专用关键词 count=count, freshness=freshness ) if not news_items: return jsonify({ 'success': False, 'message': '未找到相关新闻' }), 404 # 保存新闻到数据库 saved_count = 0 for item in news_items: # 检查新闻是否已存在(根据URL判断) existing_news = News.query.filter_by( site_id=site_id, url=item['url'] ).first() if not existing_news: # 创建新闻记录 news = News( site_id=site_id, title=item['title'], content=item.get('summary') or item.get('snippet', ''), url=item['url'], source_name=item.get('site_name', ''), source_icon=item.get('site_icon', ''), published_at=item.get('published_at'), news_type='Search Result', is_active=True ) db.session.add(news) saved_count += 1 # 提交事务 db.session.commit() return jsonify({ 'success': True, 'message': f'成功获取并保存 {saved_count} 条新闻', 'total_found': len(news_items), 'saved': saved_count, 'news_items': searcher.format_news_for_display(news_items) }) except Exception as e: db.session.rollback() return jsonify({ 'success': False, 'message': f'获取失败: {str(e)}' }), 500 @app.route('/api/fetch-all-news', methods=['POST']) @login_required def fetch_all_news(): """批量为所有网站获取新闻""" try: data = request.get_json() count_per_site = data.get('count', 5) # 每个网站获取的新闻数量 freshness = data.get('freshness', app.config.get('NEWS_SEARCH_FRESHNESS', 'oneMonth')) limit = data.get('limit', 10) # 限制处理的网站数量 # 检查博查API配置 api_key = app.config.get('BOCHA_API_KEY') if not api_key: return jsonify({ 'success': False, 'message': '博查API未配置,请在.env文件中设置BOCHA_API_KEY' }), 500 # 获取启用的网站(按更新时间排序,优先处理旧的) sites = Site.query.filter_by(is_active=True).order_by(Site.updated_at).limit(limit).all() if not sites: return jsonify({ 'success': False, 'message': '没有可用的网站' }), 404 # 创建新闻搜索器 searcher = NewsSearcher(api_key) # 统计信息 total_saved = 0 total_found = 0 processed_sites = [] # 为每个网站获取新闻 for site in sites: try: # 搜索新闻 news_items = searcher.search_site_news( site_name=site.name, site_url=site.url, news_keywords=site.news_keywords, # v2.3新增:使用专用关键词 count=count_per_site, freshness=freshness ) site_saved = 0 for item in news_items: # 检查是否已存在 existing_news = News.query.filter_by( site_id=site.id, url=item['url'] ).first() if not existing_news: news = News( site_id=site.id, title=item['title'], content=item.get('summary') or item.get('snippet', ''), url=item['url'], source_name=item.get('site_name', ''), source_icon=item.get('site_icon', ''), published_at=item.get('published_at'), news_type='Search Result', is_active=True ) db.session.add(news) site_saved += 1 total_found += len(news_items) total_saved += site_saved processed_sites.append({ 'id': site.id, 'name': site.name, 'found': len(news_items), 'saved': site_saved }) except Exception as e: # 单个网站失败不影响其他网站 processed_sites.append({ 'id': site.id, 'name': site.name, 'error': str(e) }) continue # 提交事务 db.session.commit() return jsonify({ 'success': True, 'message': f'批量获取完成,共处理 {len(processed_sites)} 个网站', 'total_found': total_found, 'total_saved': total_saved, 'processed_sites': processed_sites }) except Exception as e: db.session.rollback() return jsonify({ 'success': False, 'message': f'批量获取失败: {str(e)}' }), 500 @app.route('/api/refresh-site-news/', methods=['POST']) def refresh_site_news(site_code): """手动刷新指定网站的新闻(前台用户可访问)- v2.3新增""" try: # 根据code查找网站 site = Site.query.filter_by(code=site_code).first() if not site: return jsonify({ 'success': False, 'message': '网站不存在' }), 404 # 检查博查API配置 api_key = app.config.get('BOCHA_API_KEY') if not api_key: return jsonify({ 'success': False, 'message': '新闻功能未启用' }), 500 # 创建新闻搜索器 searcher = NewsSearcher(api_key) # 搜索新闻(获取最新5条) news_items = searcher.search_site_news( site_name=site.name, site_url=site.url, news_keywords=site.news_keywords, # 使用专用关键词 count=5, freshness='oneWeek' # 一周内的新闻 ) if not news_items: return jsonify({ 'success': False, 'message': '未找到相关新闻' }), 404 # 保存新闻到数据库 saved_count = 0 for item in news_items: # 检查新闻是否已存在(根据URL判断) existing_news = News.query.filter_by( site_id=site.id, url=item['url'] ).first() if not existing_news: news = News( site_id=site.id, title=item['title'], content=item.get('summary') or item.get('snippet', ''), url=item['url'], source_name=item.get('site_name', ''), source_icon=item.get('site_icon', ''), published_at=item.get('published_at'), news_type='Search Result', is_active=True ) db.session.add(news) saved_count += 1 # 提交事务 db.session.commit() return jsonify({ 'success': True, 'message': f'成功获取 {saved_count} 条新资讯', 'total_found': len(news_items), 'saved_count': saved_count }) except Exception as e: db.session.rollback() return jsonify({ 'success': False, 'message': f'获取失败: {str(e)}' }), 500 # ========== 批量导入路由 ========== @app.route('/admin/batch-import', methods=['GET', 'POST']) @login_required def batch_import(): """批量导入网站""" from utils.bookmark_parser import BookmarkParser from utils.website_fetcher import WebsiteFetcher results = None if request.method == 'POST': import_type = request.form.get('import_type') auto_activate = request.form.get('auto_activate') == 'on' parser = BookmarkParser() fetcher = WebsiteFetcher(timeout=15) urls_to_import = [] try: # 解析输入 if import_type == 'url_list': url_list_text = request.form.get('url_list', '') urls_to_import = parser.parse_url_list(url_list_text) elif import_type == 'bookmark_file': bookmark_file = request.files.get('bookmark_file') if not bookmark_file: flash('请选择书签文件', 'error') return render_template('admin/batch_import.html') html_content = bookmark_file.read().decode('utf-8', errors='ignore') all_bookmarks = parser.parse_html_file(html_content) # 筛选文件夹(如果指定) folder_filter = request.form.get('folder_filter', '').strip() if folder_filter: urls_to_import = [ b for b in all_bookmarks if folder_filter.lower() in b.get('folder', '').lower() ] else: urls_to_import = all_bookmarks # 批量导入 success_list = [] failed_list = [] for idx, item in enumerate(urls_to_import, 1): url = item['url'] name = item.get('name', '') # 为每个URL创建独立的事务 try: # 1. 检查URL是否已存在 try: existing = Site.query.filter_by(url=url).first() if existing: failed_list.append({ 'url': url, 'name': name or existing.name, 'error': f'该URL已存在(网站名称:{existing.name})' }) continue except Exception as e: failed_list.append({ 'url': url, 'name': name, 'error': f'检查URL时出错: {str(e)}' }) continue # 2. 抓取网站信息(带超时和错误处理) info = None try: info = fetcher.fetch_website_info(url) except Exception as e: print(f"抓取 {url} 失败: {str(e)}") # 抓取失败不是致命错误,继续尝试使用书签名称 # 3. 处理网站信息 if not info or not info.get('name'): # 如果有书签名称,使用书签名称 if name: info = { 'name': name, 'description': '', 'logo_url': '' } else: # 尝试从URL提取域名作为名称 from urllib.parse import urlparse try: parsed = urlparse(url) domain = parsed.netloc or parsed.path if domain: info = { 'name': domain, 'description': '', 'logo_url': '' } else: failed_list.append({ 'url': url, 'name': name, 'error': '无法获取网站信息且没有备用名称' }) continue except Exception: failed_list.append({ 'url': url, 'name': name, 'error': '无法获取网站信息且URL解析失败' }) continue # 4. 下载Logo到本地(失败不影响导入) logo_path = None if info.get('logo_url'): try: logo_path = fetcher.download_logo(info['logo_url'], save_dir='static/logos') except Exception as e: print(f"下载Logo失败 ({url}): {str(e)}") # Logo下载失败不影响网站导入 # 5. 生成code和slug try: import random from pypinyin import lazy_pinyin import re # 生成唯一的code site_code = None max_attempts = 10 for _ in range(max_attempts): code = str(random.randint(10000000, 99999999)) if not Site.query.filter_by(code=code).first(): site_code = code break if not site_code: # 如果10次都失败,使用时间戳 import time site_code = str(int(time.time() * 1000))[-8:] # 生成slug site_name = info.get('name', name or 'Unknown')[:100] slug = ''.join(lazy_pinyin(site_name)) slug = slug.lower() slug = re.sub(r'[^\w\s-]', '', slug) slug = re.sub(r'[-\s]+', '-', slug).strip('-') if not slug: slug = f"site-{site_code}" # 确保slug唯一 base_slug = slug[:50] # 限制长度 counter = 1 final_slug = slug while Site.query.filter_by(slug=final_slug).first(): final_slug = f"{base_slug}-{counter}" counter += 1 if counter > 100: # 防止无限循环 final_slug = f"{base_slug}-{site_code}" break # 6. 创建网站记录(带code和slug) site = Site( code=site_code, slug=final_slug, name=site_name, url=url[:500], # 限制URL长度 logo=logo_path or info.get('logo_url', '')[:500] if info.get('logo_url') else '', short_desc=info.get('description', '')[:200] if info.get('description') else '', description=info.get('description', '')[:2000] if info.get('description') else '', is_active=auto_activate ) # 添加到数据库并提交 db.session.add(site) db.session.commit() success_list.append({ 'name': site.name, 'url': site.url }) print(f"成功导入 [{idx}/{len(urls_to_import)}]: {site.name}") except Exception as e: db.session.rollback() failed_list.append({ 'url': url, 'name': name or info.get('name', 'Unknown'), 'error': f'数据库保存失败: {str(e)}' }) continue except Exception as e: # 捕获所有未预期的错误 db.session.rollback() failed_list.append({ 'url': url, 'name': name, 'error': f'未知错误: {str(e)}' }) print(f"导入 {url} 时发生未知错误: {str(e)}") continue results = { 'total_count': len(urls_to_import), 'success_count': len(success_list), 'failed_count': len(failed_list), 'success_list': success_list, 'failed_list': failed_list } if success_list: flash(f'成功导入 {len(success_list)} 个网站!', 'success') except Exception as e: flash(f'导入失败: {str(e)}', 'error') return render_template('admin/batch_import.html', results=results) # ========== Flask-Admin 配置 ========== class SecureModelView(ModelView): """需要登录的模型视图""" # 中文化配置 can_set_page_size = True page_size = 20 # 自定义文本 list_template = 'admin/model/list.html' create_template = 'admin/model/create.html' edit_template = 'admin/model/edit.html' # 覆盖英文文本 named_filter_urls = True def is_accessible(self): return current_user.is_authenticated def inaccessible_callback(self, name, **kwargs): return redirect(url_for('admin_login')) class SecureAdminIndexView(AdminIndexView): """需要登录的管理首页""" def is_accessible(self): return current_user.is_authenticated def inaccessible_callback(self, name, **kwargs): return redirect(url_for('admin_login')) @expose('/') def index(self): """控制台首页,显示统计信息""" # 统计数据 stats = { 'sites_count': Site.query.filter_by(is_active=True).count(), 'tags_count': Tag.query.count(), 'news_count': News.query.filter_by(is_active=True).count(), 'total_views': db.session.query(db.func.sum(Site.view_count)).scalar() or 0 } # 最近添加的工具(最多5个) recent_sites = Site.query.order_by(Site.created_at.desc()).limit(5).all() return self.render('admin/index.html', stats=stats, recent_sites=recent_sites) # 网站管理视图 class SiteAdmin(SecureModelView): # 自定义模板 create_template = 'admin/site/create.html' edit_template = 'admin/site/edit.html' # 启用编辑和删除 can_edit = True can_delete = True can_create = True can_view_details = False # 禁用查看详情,点击名称即可查看 # 显示操作列 column_display_actions = True action_disallowed_list = [] column_list = ['id', 'code', 'name', 'url', 'slug', 'is_active', 'view_count', 'created_at'] column_searchable_list = ['code', 'name', 'url', 'description'] column_filters = ['is_active', 'tags'] column_labels = { 'id': 'ID', 'code': '网站编码', 'name': '网站名称', 'url': 'URL', 'slug': 'URL别名', 'logo': 'Logo', 'short_desc': '简短描述', 'description': '详细介绍', 'features': '主要功能', 'news_keywords': '新闻关键词', 'is_active': '是否启用', 'view_count': '浏览次数', 'sort_order': '排序权重', 'tags': '标签', 'created_at': '创建时间', 'updated_at': '更新时间' } form_columns = ['name', 'url', 'slug', 'logo', 'short_desc', 'description', 'features', 'news_keywords', 'tags', 'is_active', 'sort_order'] # 自定义编辑/删除文字 column_extra_row_actions = None def on_model_change(self, form, model, is_created): """保存前自动生成code和slug(如果为空)""" import re import random from pypinyin import lazy_pinyin from flask import request # 使用no_autoflush防止在查询时触发提前flush with db.session.no_autoflush: # 处理手动输入的新标签 new_tags_str = request.form.get('new_tags', '') if new_tags_str: new_tag_names = [name.strip() for name in new_tags_str.split(',') if name.strip()] for tag_name in new_tag_names: # 检查标签是否已存在 existing_tag = Tag.query.filter_by(name=tag_name).first() if not existing_tag: # 创建新标签 tag_slug = ''.join(lazy_pinyin(tag_name)) tag_slug = tag_slug.lower() tag_slug = re.sub(r'[^\w\s-]', '', tag_slug) tag_slug = re.sub(r'[-\s]+', '-', tag_slug).strip('-') # 确保slug唯一 base_tag_slug = tag_slug[:50] counter = 1 final_tag_slug = tag_slug while Tag.query.filter_by(slug=final_tag_slug).first(): final_tag_slug = f"{base_tag_slug}-{counter}" counter += 1 if counter > 100: final_tag_slug = f"{base_tag_slug}-{random.randint(1000, 9999)}" break new_tag = Tag(name=tag_name, slug=final_tag_slug) db.session.add(new_tag) db.session.flush() # 确保新标签有ID # 添加到模型的标签列表 if new_tag not in model.tags: model.tags.append(new_tag) else: # 添加已存在的标签 if existing_tag not in model.tags: model.tags.append(existing_tag) # 如果code为空,自动生成唯一的8位数字编码 if not model.code or model.code.strip() == '': max_attempts = 10 for attempt in range(max_attempts): # 生成10000000-99999999之间的随机数 code = str(random.randint(10000000, 99999999)) # 检查是否已存在 existing = Site.query.filter(Site.code == code).first() if not existing or existing.id == model.id: model.code = code break # 如果10次都失败,使用时间戳 if not model.code: import time model.code = str(int(time.time() * 1000))[-8:] # 如果slug为空,从name自动生成 if not model.slug or model.slug.strip() == '': # 将中文转换为拼音 slug = ''.join(lazy_pinyin(model.name)) # 转换为小写,移除特殊字符 slug = slug.lower() slug = re.sub(r'[^\w\s-]', '', slug) slug = re.sub(r'[-\s]+', '-', slug).strip('-') # 如果转换后为空,使用code if not slug: slug = f"site-{model.code}" # 确保slug唯一(限制长度和重试次数) base_slug = slug[:50] counter = 1 final_slug = slug max_slug_attempts = 100 while counter < max_slug_attempts: existing = Site.query.filter(Site.slug == final_slug).first() if not existing or existing.id == model.id: break final_slug = f"{base_slug}-{counter}" counter += 1 # 如果100次都失败,使用code确保唯一 if counter >= max_slug_attempts: final_slug = f"{base_slug}-{model.code}" model.slug = final_slug # 标签管理视图 class TagAdmin(SecureModelView): can_edit = True can_delete = True can_create = True can_view_details = False # 显示操作列 column_display_actions = True column_list = ['id', 'name', 'slug', 'description', 'sort_order'] column_searchable_list = ['name', 'description'] column_labels = { 'id': 'ID', 'name': '标签名称', 'slug': 'URL别名', 'description': '标签描述', 'icon': '图标', 'sort_order': '排序权重', 'created_at': '创建时间' } form_columns = ['name', 'slug', 'description', 'icon', 'sort_order'] # 管理员视图 class AdminAdmin(SecureModelView): can_edit = True can_delete = True can_create = True can_view_details = False # 显示操作列 column_display_actions = True column_list = ['id', 'username', 'email', 'is_active', 'last_login', 'created_at'] column_searchable_list = ['username', 'email'] column_filters = ['is_active'] column_labels = { 'id': 'ID', 'username': '用户名', 'email': '邮箱', 'is_active': '是否启用', 'created_at': '创建时间', 'last_login': '最后登录' } form_columns = ['username', 'email', 'is_active'] def on_model_change(self, form, model, is_created): # 如果是新建管理员,设置默认密码 if is_created: model.set_password('admin123') # 默认密码 # 新闻管理视图 class NewsAdmin(SecureModelView): can_edit = True can_delete = True can_create = True can_view_details = False # 显示操作列 column_display_actions = True column_list = ['id', 'site', 'title', 'source_name', 'news_type', 'published_at', 'is_active'] column_searchable_list = ['title', 'content', 'source_name'] column_filters = ['site', 'news_type', 'source_name', 'is_active', 'published_at'] column_labels = { 'id': 'ID', 'site': '关联网站', 'title': '新闻标题', 'content': '新闻内容', 'news_type': '新闻类型', 'url': '新闻链接', 'source_name': '来源网站', 'source_icon': '来源图标', 'published_at': '发布时间', 'is_active': '是否启用', 'created_at': '创建时间', 'updated_at': '更新时间' } form_columns = ['site', 'title', 'content', 'news_type', 'url', 'source_name', 'source_icon', 'published_at', 'is_active'] # 可选的新闻类型 form_choices = { 'news_type': [ ('Search Result', 'Search Result'), ('Product Update', 'Product Update'), ('Industry News', 'Industry News'), ('Company News', 'Company News'), ('Other', 'Other') ] } # 默认排序 column_default_sort = ('published_at', True) # 按发布时间倒序排列 # Prompt模板管理视图 class PromptAdmin(SecureModelView): can_edit = True can_delete = False # 不允许删除,避免系统必需的prompt被删除 can_create = True can_view_details = False # 显示操作列 column_display_actions = True column_list = ['id', 'key', 'name', 'description', 'is_active', 'updated_at'] column_searchable_list = ['key', 'name', 'description'] column_filters = ['is_active', 'key'] column_labels = { 'id': 'ID', 'key': '唯一标识', 'name': '模板名称', 'system_prompt': '系统提示词', 'user_prompt_template': '用户提示词模板', 'description': '模板说明', 'is_active': '是否启用', 'created_at': '创建时间', 'updated_at': '更新时间' } form_columns = ['key', 'name', 'description', 'system_prompt', 'user_prompt_template', 'is_active'] # 字段说明 column_descriptions = { 'key': '唯一标识,如: tags, features, description', 'system_prompt': 'AI的系统角色设定', 'user_prompt_template': '用户提示词模板,支持变量如 {name}, {description}, {url}', } # 表单字段配置 form_widget_args = { 'system_prompt': { 'rows': 3, 'style': 'font-family: monospace;' }, 'user_prompt_template': { 'rows': 20, 'style': 'font-family: monospace;' } } # 初始化 Flask-Admin admin = Admin( app, name='ZJPB 焦提示词', template_mode='bootstrap4', index_view=SecureAdminIndexView(name='控制台', url='/admin'), base_template='admin/master.html' ) admin.add_view(SiteAdmin(Site, db.session, name='网站管理')) admin.add_view(TagAdmin(Tag, db.session, name='标签管理')) admin.add_view(NewsAdmin(News, db.session, name='新闻管理')) admin.add_view(PromptAdmin(PromptTemplate, db.session, name='Prompt管理')) admin.add_view(AdminAdmin(AdminModel, db.session, name='管理员', endpoint='admin_users')) return app if __name__ == '__main__': app = create_app(os.getenv('FLASK_ENV', 'development')) app.run(host='0.0.0.0', port=5000, debug=True)