Files
zjpb.net/app.py
Jowe 30b1ef75d6 release: v2.1 - Prompt管理系统、页脚优化、图标修复
新增功能:
- Prompt管理:后台新增Prompt模板管理功能
- 数据库迁移:新增prompt_templates表及默认数据
- 页脚优化:添加ICP备案号(浙ICP备2025154782号-1)和Microsoft Clarity统计
- 图标修复:详情页Material Icons替换为Emoji
- 标签显示:修复编辑页标签名称无法显示的问题

技术改进:
- 添加正则表达式提取标签名称
- 优化页脚样式和链接
- 完善增量部署文档

🚀 Generated with Claude Code
Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
2025-12-30 00:45:39 +08:00

1003 lines
38 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
import os
import markdown
from flask import Flask, render_template, redirect, url_for, request, flash, jsonify
from flask_login import LoginManager, login_user, logout_user, login_required, current_user
from flask_admin import Admin, AdminIndexView, expose
from flask_admin.contrib.sqla import ModelView
from datetime import datetime
from config import config
from models import db, Site, Tag, Admin as AdminModel, News, site_tags, PromptTemplate
from utils.website_fetcher import WebsiteFetcher
from utils.tag_generator import TagGenerator
def create_app(config_name='default'):
"""应用工厂函数"""
app = Flask(__name__)
# 加载配置
app.config.from_object(config[config_name])
# 初始化数据库
db.init_app(app)
# 添加Markdown过滤器
@app.template_filter('markdown')
def markdown_filter(text):
"""将Markdown文本转换为HTML"""
if not text:
return ''
return markdown.markdown(text, extensions=['nl2br', 'fenced_code'])
# 初始化登录管理
login_manager = LoginManager()
login_manager.init_app(app)
login_manager.login_view = 'admin_login'
login_manager.login_message = '请先登录'
@login_manager.user_loader
def load_user(user_id):
return AdminModel.query.get(int(user_id))
# ========== 前台路由 ==========
@app.route('/')
def index():
"""首页"""
# 获取所有启用的标签
tags = Tag.query.order_by(Tag.sort_order.desc(), Tag.id).all()
# 优化使用一次SQL查询统计所有标签的网站数量
tag_counts = {}
if tags:
# 使用JOIN查询一次性获取所有标签的网站数量
from sqlalchemy import func
counts_query = db.session.query(
site_tags.c.tag_id,
func.count(site_tags.c.site_id).label('count')
).join(
Site, site_tags.c.site_id == Site.id
).filter(
Site.is_active == True
).group_by(site_tags.c.tag_id).all()
tag_counts = {tag_id: count for tag_id, count in counts_query}
# 获取筛选参数
tag_slug = request.args.get('tag')
search_query = request.args.get('q', '').strip()
page = request.args.get('page', 1, type=int)
per_page = 100 # 每页显示100个站点
selected_tag = None
# 构建基础查询
query = Site.query.filter_by(is_active=True)
# 标签筛选
if tag_slug:
selected_tag = Tag.query.filter_by(slug=tag_slug).first()
if selected_tag:
query = query.filter(Site.tags.contains(selected_tag))
else:
sites = []
pagination = None
return render_template('index_new.html', sites=sites, tags=tags,
selected_tag=selected_tag, search_query=search_query,
pagination=pagination, tag_counts=tag_counts)
# 搜索功能
if search_query:
# 使用OR条件搜索网站名称、URL、描述
search_pattern = f'%{search_query}%'
query = query.filter(
db.or_(
Site.name.like(search_pattern),
Site.url.like(search_pattern),
Site.description.like(search_pattern),
Site.short_desc.like(search_pattern)
)
)
# 排序并分页
query = query.order_by(Site.sort_order.desc(), Site.id.desc())
pagination = query.paginate(page=page, per_page=per_page, error_out=False)
sites = pagination.items
return render_template('index_new.html', sites=sites, tags=tags,
selected_tag=selected_tag, search_query=search_query,
pagination=pagination, tag_counts=tag_counts)
@app.route('/site/<code>')
def site_detail(code):
"""网站详情页"""
site = Site.query.filter_by(code=code, is_active=True).first_or_404()
# 增加浏览次数
site.view_count += 1
db.session.commit()
# 获取该网站的相关新闻最多显示5条
news_list = News.query.filter_by(
site_id=site.id,
is_active=True
).order_by(News.published_at.desc()).limit(5).all()
# 获取同类工具推荐通过标签匹配最多显示4个
recommended_sites = []
if site.tags:
# 获取有相同标签的其他网站
recommended_sites = Site.query.filter(
Site.id != site.id,
Site.is_active == True,
Site.tags.any(Tag.id.in_([tag.id for tag in site.tags]))
).order_by(Site.view_count.desc()).limit(4).all()
return render_template('detail_new.html', site=site, news_list=news_list, recommended_sites=recommended_sites)
# ========== 后台登录路由 ==========
@app.route('/admin/login', methods=['GET', 'POST'])
def admin_login():
"""管理员登录"""
if current_user.is_authenticated:
return redirect(url_for('admin.index'))
if request.method == 'POST':
username = request.form.get('username')
password = request.form.get('password')
admin = AdminModel.query.filter_by(username=username).first()
if admin and admin.check_password(password) and admin.is_active:
login_user(admin)
admin.last_login = datetime.now()
db.session.commit()
return redirect(url_for('admin.index'))
else:
flash('用户名或密码错误', 'error')
return render_template('admin_login.html')
@app.route('/admin/logout')
@login_required
def admin_logout():
"""管理员登出"""
logout_user()
return redirect(url_for('index'))
@app.route('/admin/change-password', methods=['GET', 'POST'])
@login_required
def change_password():
"""修改密码"""
if request.method == 'POST':
old_password = request.form.get('old_password', '').strip()
new_password = request.form.get('new_password', '').strip()
confirm_password = request.form.get('confirm_password', '').strip()
# 验证旧密码
if not current_user.check_password(old_password):
flash('旧密码错误', 'error')
return render_template('admin/change_password.html')
# 验证新密码
if not new_password:
flash('新密码不能为空', 'error')
return render_template('admin/change_password.html')
if len(new_password) < 6:
flash('新密码长度至少6位', 'error')
return render_template('admin/change_password.html')
if new_password != confirm_password:
flash('两次输入的新密码不一致', 'error')
return render_template('admin/change_password.html')
if old_password == new_password:
flash('新密码不能与旧密码相同', 'error')
return render_template('admin/change_password.html')
# 修改密码
try:
current_user.set_password(new_password)
db.session.commit()
flash('密码修改成功,请重新登录', 'success')
logout_user()
return redirect(url_for('admin_login'))
except Exception as e:
db.session.rollback()
flash(f'密码修改失败:{str(e)}', 'error')
return render_template('admin/change_password.html')
return render_template('admin/change_password.html')
# ========== API路由 ==========
@app.route('/api/fetch-website-info', methods=['POST'])
@login_required
def fetch_website_info():
"""抓取网站信息API"""
try:
data = request.get_json()
url = data.get('url', '').strip()
if not url:
return jsonify({
'success': False,
'message': '请提供网站URL'
}), 400
# 创建抓取器
fetcher = WebsiteFetcher(timeout=15)
# 抓取网站信息
info = fetcher.fetch_website_info(url)
if not info:
return jsonify({
'success': False,
'message': '无法获取网站信息请检查URL是否正确或手动填写'
})
# 下载Logo到本地如果有
logo_path = None
if info.get('logo_url'):
logo_path = fetcher.download_logo(info['logo_url'], save_dir='static/logos')
# 如果下载失败不返回远程URL让用户手动上传
return jsonify({
'success': True,
'data': {
'name': info.get('name', ''),
'description': info.get('description', ''),
'logo': logo_path if logo_path else ''
}
})
except Exception as e:
return jsonify({
'success': False,
'message': f'抓取失败: {str(e)}'
}), 500
@app.route('/api/upload-logo', methods=['POST'])
@login_required
def upload_logo():
"""上传Logo图片API"""
try:
# 检查文件是否存在
if 'logo' not in request.files:
return jsonify({
'success': False,
'message': '请选择要上传的图片'
}), 400
file = request.files['logo']
# 检查文件名
if file.filename == '':
return jsonify({
'success': False,
'message': '未选择文件'
}), 400
# 检查文件类型
allowed_extensions = {'png', 'jpg', 'jpeg', 'gif', 'svg', 'ico', 'webp'}
filename = file.filename.lower()
if not any(filename.endswith('.' + ext) for ext in allowed_extensions):
return jsonify({
'success': False,
'message': '不支持的文件格式,请上传图片文件(png, jpg, gif, svg, ico, webp)'
}), 400
# 创建保存目录
save_dir = 'static/logos'
os.makedirs(save_dir, exist_ok=True)
# 生成安全的文件名
import time
import hashlib
ext = os.path.splitext(filename)[1]
timestamp = str(int(time.time() * 1000))
hash_name = hashlib.md5(f"{filename}{timestamp}".encode()).hexdigest()[:16]
safe_filename = f"logo_{hash_name}{ext}"
filepath = os.path.join(save_dir, safe_filename)
# 保存文件
file.save(filepath)
# 返回相对路径
return jsonify({
'success': True,
'path': f'/{filepath.replace(os.sep, "/")}'
})
except Exception as e:
return jsonify({
'success': False,
'message': f'上传失败: {str(e)}'
}), 500
@app.route('/api/generate-features', methods=['POST'])
@login_required
def generate_features():
"""使用DeepSeek自动生成网站主要功能"""
try:
data = request.get_json()
name = data.get('name', '').strip()
description = data.get('description', '').strip()
url = data.get('url', '').strip()
if not name or not description:
return jsonify({
'success': False,
'message': '请提供网站名称和描述'
}), 400
# 生成功能列表
generator = TagGenerator()
features = generator.generate_features(name, description, url)
if not features:
return jsonify({
'success': False,
'message': 'DeepSeek功能生成失败请检查API配置'
}), 500
return jsonify({
'success': True,
'features': features
})
except ValueError as e:
return jsonify({
'success': False,
'message': str(e)
}), 400
except Exception as e:
return jsonify({
'success': False,
'message': f'生成失败: {str(e)}'
}), 500
@app.route('/api/generate-description', methods=['POST'])
@login_required
def generate_description():
"""使用DeepSeek自动生成网站详细介绍"""
try:
data = request.get_json()
name = data.get('name', '').strip()
short_desc = data.get('short_desc', '').strip()
url = data.get('url', '').strip()
if not name:
return jsonify({
'success': False,
'message': '请提供网站名称'
}), 400
# 生成详细介绍
generator = TagGenerator()
description = generator.generate_description(name, short_desc, url)
if not description:
return jsonify({
'success': False,
'message': 'DeepSeek详细介绍生成失败请检查API配置'
}), 500
return jsonify({
'success': True,
'description': description
})
except ValueError as e:
return jsonify({
'success': False,
'message': str(e)
}), 400
except Exception as e:
return jsonify({
'success': False,
'message': f'生成失败: {str(e)}'
}), 500
@app.route('/api/generate-tags', methods=['POST'])
@login_required
def generate_tags():
"""使用DeepSeek自动生成标签"""
try:
data = request.get_json()
name = data.get('name', '').strip()
description = data.get('description', '').strip()
if not name or not description:
return jsonify({
'success': False,
'message': '请提供网站名称和描述'
}), 400
# 获取现有标签作为参考
existing_tags = [tag.name for tag in Tag.query.all()]
# 生成标签
generator = TagGenerator()
suggested_tags = generator.generate_tags(name, description, existing_tags)
if not suggested_tags:
return jsonify({
'success': False,
'message': 'DeepSeek标签生成失败请检查API配置'
}), 500
return jsonify({
'success': True,
'tags': suggested_tags
})
except ValueError as e:
return jsonify({
'success': False,
'message': str(e)
}), 400
except Exception as e:
return jsonify({
'success': False,
'message': f'生成失败: {str(e)}'
}), 500
# ========== 批量导入路由 ==========
@app.route('/admin/batch-import', methods=['GET', 'POST'])
@login_required
def batch_import():
"""批量导入网站"""
from utils.bookmark_parser import BookmarkParser
from utils.website_fetcher import WebsiteFetcher
results = None
if request.method == 'POST':
import_type = request.form.get('import_type')
auto_activate = request.form.get('auto_activate') == 'on'
parser = BookmarkParser()
fetcher = WebsiteFetcher(timeout=15)
urls_to_import = []
try:
# 解析输入
if import_type == 'url_list':
url_list_text = request.form.get('url_list', '')
urls_to_import = parser.parse_url_list(url_list_text)
elif import_type == 'bookmark_file':
bookmark_file = request.files.get('bookmark_file')
if not bookmark_file:
flash('请选择书签文件', 'error')
return render_template('admin/batch_import.html')
html_content = bookmark_file.read().decode('utf-8', errors='ignore')
all_bookmarks = parser.parse_html_file(html_content)
# 筛选文件夹(如果指定)
folder_filter = request.form.get('folder_filter', '').strip()
if folder_filter:
urls_to_import = [
b for b in all_bookmarks
if folder_filter.lower() in b.get('folder', '').lower()
]
else:
urls_to_import = all_bookmarks
# 批量导入
success_list = []
failed_list = []
for idx, item in enumerate(urls_to_import, 1):
url = item['url']
name = item.get('name', '')
# 为每个URL创建独立的事务
try:
# 1. 检查URL是否已存在
try:
existing = Site.query.filter_by(url=url).first()
if existing:
failed_list.append({
'url': url,
'name': name or existing.name,
'error': f'该URL已存在网站名称{existing.name}'
})
continue
except Exception as e:
failed_list.append({
'url': url,
'name': name,
'error': f'检查URL时出错: {str(e)}'
})
continue
# 2. 抓取网站信息(带超时和错误处理)
info = None
try:
info = fetcher.fetch_website_info(url)
except Exception as e:
print(f"抓取 {url} 失败: {str(e)}")
# 抓取失败不是致命错误,继续尝试使用书签名称
# 3. 处理网站信息
if not info or not info.get('name'):
# 如果有书签名称,使用书签名称
if name:
info = {
'name': name,
'description': '',
'logo_url': ''
}
else:
# 尝试从URL提取域名作为名称
from urllib.parse import urlparse
try:
parsed = urlparse(url)
domain = parsed.netloc or parsed.path
if domain:
info = {
'name': domain,
'description': '',
'logo_url': ''
}
else:
failed_list.append({
'url': url,
'name': name,
'error': '无法获取网站信息且没有备用名称'
})
continue
except Exception:
failed_list.append({
'url': url,
'name': name,
'error': '无法获取网站信息且URL解析失败'
})
continue
# 4. 下载Logo到本地失败不影响导入
logo_path = None
if info.get('logo_url'):
try:
logo_path = fetcher.download_logo(info['logo_url'], save_dir='static/logos')
except Exception as e:
print(f"下载Logo失败 ({url}): {str(e)}")
# Logo下载失败不影响网站导入
# 5. 生成code和slug
try:
import random
from pypinyin import lazy_pinyin
import re
# 生成唯一的code
site_code = None
max_attempts = 10
for _ in range(max_attempts):
code = str(random.randint(10000000, 99999999))
if not Site.query.filter_by(code=code).first():
site_code = code
break
if not site_code:
# 如果10次都失败使用时间戳
import time
site_code = str(int(time.time() * 1000))[-8:]
# 生成slug
site_name = info.get('name', name or 'Unknown')[:100]
slug = ''.join(lazy_pinyin(site_name))
slug = slug.lower()
slug = re.sub(r'[^\w\s-]', '', slug)
slug = re.sub(r'[-\s]+', '-', slug).strip('-')
if not slug:
slug = f"site-{site_code}"
# 确保slug唯一
base_slug = slug[:50] # 限制长度
counter = 1
final_slug = slug
while Site.query.filter_by(slug=final_slug).first():
final_slug = f"{base_slug}-{counter}"
counter += 1
if counter > 100: # 防止无限循环
final_slug = f"{base_slug}-{site_code}"
break
# 6. 创建网站记录带code和slug
site = Site(
code=site_code,
slug=final_slug,
name=site_name,
url=url[:500], # 限制URL长度
logo=logo_path or info.get('logo_url', '')[:500] if info.get('logo_url') else '',
short_desc=info.get('description', '')[:200] if info.get('description') else '',
description=info.get('description', '')[:2000] if info.get('description') else '',
is_active=auto_activate
)
# 添加到数据库并提交
db.session.add(site)
db.session.commit()
success_list.append({
'name': site.name,
'url': site.url
})
print(f"成功导入 [{idx}/{len(urls_to_import)}]: {site.name}")
except Exception as e:
db.session.rollback()
failed_list.append({
'url': url,
'name': name or info.get('name', 'Unknown'),
'error': f'数据库保存失败: {str(e)}'
})
continue
except Exception as e:
# 捕获所有未预期的错误
db.session.rollback()
failed_list.append({
'url': url,
'name': name,
'error': f'未知错误: {str(e)}'
})
print(f"导入 {url} 时发生未知错误: {str(e)}")
continue
results = {
'total_count': len(urls_to_import),
'success_count': len(success_list),
'failed_count': len(failed_list),
'success_list': success_list,
'failed_list': failed_list
}
if success_list:
flash(f'成功导入 {len(success_list)} 个网站!', 'success')
except Exception as e:
flash(f'导入失败: {str(e)}', 'error')
return render_template('admin/batch_import.html', results=results)
# ========== Flask-Admin 配置 ==========
class SecureModelView(ModelView):
"""需要登录的模型视图"""
# 中文化配置
can_set_page_size = True
page_size = 20
# 自定义文本
list_template = 'admin/model/list.html'
create_template = 'admin/model/create.html'
edit_template = 'admin/model/edit.html'
# 覆盖英文文本
named_filter_urls = True
def is_accessible(self):
return current_user.is_authenticated
def inaccessible_callback(self, name, **kwargs):
return redirect(url_for('admin_login'))
class SecureAdminIndexView(AdminIndexView):
"""需要登录的管理首页"""
def is_accessible(self):
return current_user.is_authenticated
def inaccessible_callback(self, name, **kwargs):
return redirect(url_for('admin_login'))
@expose('/')
def index(self):
"""控制台首页,显示统计信息"""
# 统计数据
stats = {
'sites_count': Site.query.filter_by(is_active=True).count(),
'tags_count': Tag.query.count(),
'news_count': News.query.filter_by(is_active=True).count(),
'total_views': db.session.query(db.func.sum(Site.view_count)).scalar() or 0
}
# 最近添加的工具最多5个
recent_sites = Site.query.order_by(Site.created_at.desc()).limit(5).all()
return self.render('admin/index.html', stats=stats, recent_sites=recent_sites)
# 网站管理视图
class SiteAdmin(SecureModelView):
# 自定义模板
create_template = 'admin/site/create.html'
edit_template = 'admin/site/edit.html'
# 启用编辑和删除
can_edit = True
can_delete = True
can_create = True
can_view_details = False # 禁用查看详情,点击名称即可查看
# 显示操作列
column_display_actions = True
action_disallowed_list = []
column_list = ['id', 'code', 'name', 'url', 'slug', 'is_active', 'view_count', 'created_at']
column_searchable_list = ['code', 'name', 'url', 'description']
column_filters = ['is_active', 'tags']
column_labels = {
'id': 'ID',
'code': '网站编码',
'name': '网站名称',
'url': 'URL',
'slug': 'URL别名',
'logo': 'Logo',
'short_desc': '简短描述',
'description': '详细介绍',
'features': '主要功能',
'is_active': '是否启用',
'view_count': '浏览次数',
'sort_order': '排序权重',
'tags': '标签',
'created_at': '创建时间',
'updated_at': '更新时间'
}
form_columns = ['name', 'url', 'slug', 'logo', 'short_desc', 'description', 'features', 'tags', 'is_active', 'sort_order']
# 自定义编辑/删除文字
column_extra_row_actions = None
def on_model_change(self, form, model, is_created):
"""保存前自动生成code和slug如果为空"""
import re
import random
from pypinyin import lazy_pinyin
from flask import request
# 使用no_autoflush防止在查询时触发提前flush
with db.session.no_autoflush:
# 处理手动输入的新标签
new_tags_str = request.form.get('new_tags', '')
if new_tags_str:
new_tag_names = [name.strip() for name in new_tags_str.split(',') if name.strip()]
for tag_name in new_tag_names:
# 检查标签是否已存在
existing_tag = Tag.query.filter_by(name=tag_name).first()
if not existing_tag:
# 创建新标签
tag_slug = ''.join(lazy_pinyin(tag_name))
tag_slug = tag_slug.lower()
tag_slug = re.sub(r'[^\w\s-]', '', tag_slug)
tag_slug = re.sub(r'[-\s]+', '-', tag_slug).strip('-')
# 确保slug唯一
base_tag_slug = tag_slug[:50]
counter = 1
final_tag_slug = tag_slug
while Tag.query.filter_by(slug=final_tag_slug).first():
final_tag_slug = f"{base_tag_slug}-{counter}"
counter += 1
if counter > 100:
final_tag_slug = f"{base_tag_slug}-{random.randint(1000, 9999)}"
break
new_tag = Tag(name=tag_name, slug=final_tag_slug)
db.session.add(new_tag)
db.session.flush() # 确保新标签有ID
# 添加到模型的标签列表
if new_tag not in model.tags:
model.tags.append(new_tag)
else:
# 添加已存在的标签
if existing_tag not in model.tags:
model.tags.append(existing_tag)
# 如果code为空自动生成唯一的8位数字编码
if not model.code or model.code.strip() == '':
max_attempts = 10
for attempt in range(max_attempts):
# 生成10000000-99999999之间的随机数
code = str(random.randint(10000000, 99999999))
# 检查是否已存在
existing = Site.query.filter(Site.code == code).first()
if not existing or existing.id == model.id:
model.code = code
break
# 如果10次都失败使用时间戳
if not model.code:
import time
model.code = str(int(time.time() * 1000))[-8:]
# 如果slug为空从name自动生成
if not model.slug or model.slug.strip() == '':
# 将中文转换为拼音
slug = ''.join(lazy_pinyin(model.name))
# 转换为小写,移除特殊字符
slug = slug.lower()
slug = re.sub(r'[^\w\s-]', '', slug)
slug = re.sub(r'[-\s]+', '-', slug).strip('-')
# 如果转换后为空使用code
if not slug:
slug = f"site-{model.code}"
# 确保slug唯一限制长度和重试次数
base_slug = slug[:50]
counter = 1
final_slug = slug
max_slug_attempts = 100
while counter < max_slug_attempts:
existing = Site.query.filter(Site.slug == final_slug).first()
if not existing or existing.id == model.id:
break
final_slug = f"{base_slug}-{counter}"
counter += 1
# 如果100次都失败使用code确保唯一
if counter >= max_slug_attempts:
final_slug = f"{base_slug}-{model.code}"
model.slug = final_slug
# 标签管理视图
class TagAdmin(SecureModelView):
can_edit = True
can_delete = True
can_create = True
can_view_details = False
# 显示操作列
column_display_actions = True
column_list = ['id', 'name', 'slug', 'description', 'sort_order']
column_searchable_list = ['name', 'description']
column_labels = {
'id': 'ID',
'name': '标签名称',
'slug': 'URL别名',
'description': '标签描述',
'icon': '图标',
'sort_order': '排序权重',
'created_at': '创建时间'
}
form_columns = ['name', 'slug', 'description', 'icon', 'sort_order']
# 管理员视图
class AdminAdmin(SecureModelView):
can_edit = True
can_delete = True
can_create = True
can_view_details = False
# 显示操作列
column_display_actions = True
column_list = ['id', 'username', 'email', 'is_active', 'last_login', 'created_at']
column_searchable_list = ['username', 'email']
column_filters = ['is_active']
column_labels = {
'id': 'ID',
'username': '用户名',
'email': '邮箱',
'is_active': '是否启用',
'created_at': '创建时间',
'last_login': '最后登录'
}
form_columns = ['username', 'email', 'is_active']
def on_model_change(self, form, model, is_created):
# 如果是新建管理员,设置默认密码
if is_created:
model.set_password('admin123') # 默认密码
# 新闻管理视图
class NewsAdmin(SecureModelView):
can_edit = True
can_delete = True
can_create = True
can_view_details = False
# 显示操作列
column_display_actions = True
column_list = ['id', 'site', 'title', 'news_type', 'published_at', 'is_active']
column_searchable_list = ['title', 'content']
column_filters = ['site', 'news_type', 'is_active', 'published_at']
column_labels = {
'id': 'ID',
'site': '关联网站',
'title': '新闻标题',
'content': '新闻内容',
'news_type': '新闻类型',
'url': '新闻链接',
'published_at': '发布时间',
'is_active': '是否启用',
'created_at': '创建时间',
'updated_at': '更新时间'
}
form_columns = ['site', 'title', 'content', 'news_type', 'url', 'published_at', 'is_active']
# 可选的新闻类型
form_choices = {
'news_type': [
('Product Update', 'Product Update'),
('Industry News', 'Industry News'),
('Company News', 'Company News'),
('Other', 'Other')
]
}
# Prompt模板管理视图
class PromptAdmin(SecureModelView):
can_edit = True
can_delete = False # 不允许删除避免系统必需的prompt被删除
can_create = True
can_view_details = False
# 显示操作列
column_display_actions = True
column_list = ['id', 'key', 'name', 'description', 'is_active', 'updated_at']
column_searchable_list = ['key', 'name', 'description']
column_filters = ['is_active', 'key']
column_labels = {
'id': 'ID',
'key': '唯一标识',
'name': '模板名称',
'system_prompt': '系统提示词',
'user_prompt_template': '用户提示词模板',
'description': '模板说明',
'is_active': '是否启用',
'created_at': '创建时间',
'updated_at': '更新时间'
}
form_columns = ['key', 'name', 'description', 'system_prompt', 'user_prompt_template', 'is_active']
# 字段说明
column_descriptions = {
'key': '唯一标识,如: tags, features, description',
'system_prompt': 'AI的系统角色设定',
'user_prompt_template': '用户提示词模板,支持变量如 {name}, {description}, {url}',
}
# 表单字段配置
form_widget_args = {
'system_prompt': {
'rows': 3,
'style': 'font-family: monospace;'
},
'user_prompt_template': {
'rows': 20,
'style': 'font-family: monospace;'
}
}
# 初始化 Flask-Admin
admin = Admin(
app,
name='ZJPB 焦提示词',
template_mode='bootstrap4',
index_view=SecureAdminIndexView(name='控制台', url='/admin'),
base_template='admin/master.html'
)
admin.add_view(SiteAdmin(Site, db.session, name='网站管理'))
admin.add_view(TagAdmin(Tag, db.session, name='标签管理'))
admin.add_view(NewsAdmin(News, db.session, name='新闻管理'))
admin.add_view(PromptAdmin(PromptTemplate, db.session, name='Prompt管理'))
admin.add_view(AdminAdmin(AdminModel, db.session, name='管理员', endpoint='admin_users'))
return app
if __name__ == '__main__':
app = create_app(os.getenv('FLASK_ENV', 'development'))
app.run(host='0.0.0.0', port=5000, debug=True)