release: v2.1.0 - Prompt管理系统、页脚优化、图标修复
This commit is contained in:
313
app.py
313
app.py
@@ -1,11 +1,12 @@
|
||||
import os
|
||||
import markdown
|
||||
from flask import Flask, render_template, redirect, url_for, request, flash, jsonify
|
||||
from flask_login import LoginManager, login_user, logout_user, login_required, current_user
|
||||
from flask_admin import Admin, AdminIndexView, expose
|
||||
from flask_admin.contrib.sqla import ModelView
|
||||
from datetime import datetime
|
||||
from config import config
|
||||
from models import db, Site, Tag, Admin as AdminModel, News
|
||||
from models import db, Site, Tag, Admin as AdminModel, News, site_tags, PromptTemplate
|
||||
from utils.website_fetcher import WebsiteFetcher
|
||||
from utils.tag_generator import TagGenerator
|
||||
|
||||
@@ -19,6 +20,14 @@ def create_app(config_name='default'):
|
||||
# 初始化数据库
|
||||
db.init_app(app)
|
||||
|
||||
# 添加Markdown过滤器
|
||||
@app.template_filter('markdown')
|
||||
def markdown_filter(text):
|
||||
"""将Markdown文本转换为HTML"""
|
||||
if not text:
|
||||
return ''
|
||||
return markdown.markdown(text, extensions=['nl2br', 'fenced_code'])
|
||||
|
||||
# 初始化登录管理
|
||||
login_manager = LoginManager()
|
||||
login_manager.init_app(app)
|
||||
@@ -36,6 +45,22 @@ def create_app(config_name='default'):
|
||||
# 获取所有启用的标签
|
||||
tags = Tag.query.order_by(Tag.sort_order.desc(), Tag.id).all()
|
||||
|
||||
# 优化:使用一次SQL查询统计所有标签的网站数量
|
||||
tag_counts = {}
|
||||
if tags:
|
||||
# 使用JOIN查询一次性获取所有标签的网站数量
|
||||
from sqlalchemy import func
|
||||
counts_query = db.session.query(
|
||||
site_tags.c.tag_id,
|
||||
func.count(site_tags.c.site_id).label('count')
|
||||
).join(
|
||||
Site, site_tags.c.site_id == Site.id
|
||||
).filter(
|
||||
Site.is_active == True
|
||||
).group_by(site_tags.c.tag_id).all()
|
||||
|
||||
tag_counts = {tag_id: count for tag_id, count in counts_query}
|
||||
|
||||
# 获取筛选参数
|
||||
tag_slug = request.args.get('tag')
|
||||
search_query = request.args.get('q', '').strip()
|
||||
@@ -57,7 +82,7 @@ def create_app(config_name='default'):
|
||||
pagination = None
|
||||
return render_template('index_new.html', sites=sites, tags=tags,
|
||||
selected_tag=selected_tag, search_query=search_query,
|
||||
pagination=pagination)
|
||||
pagination=pagination, tag_counts=tag_counts)
|
||||
|
||||
# 搜索功能
|
||||
if search_query:
|
||||
@@ -79,7 +104,7 @@ def create_app(config_name='default'):
|
||||
|
||||
return render_template('index_new.html', sites=sites, tags=tags,
|
||||
selected_tag=selected_tag, search_query=search_query,
|
||||
pagination=pagination)
|
||||
pagination=pagination, tag_counts=tag_counts)
|
||||
|
||||
@app.route('/site/<code>')
|
||||
def site_detail(code):
|
||||
@@ -138,6 +163,51 @@ def create_app(config_name='default'):
|
||||
logout_user()
|
||||
return redirect(url_for('index'))
|
||||
|
||||
@app.route('/admin/change-password', methods=['GET', 'POST'])
|
||||
@login_required
|
||||
def change_password():
|
||||
"""修改密码"""
|
||||
if request.method == 'POST':
|
||||
old_password = request.form.get('old_password', '').strip()
|
||||
new_password = request.form.get('new_password', '').strip()
|
||||
confirm_password = request.form.get('confirm_password', '').strip()
|
||||
|
||||
# 验证旧密码
|
||||
if not current_user.check_password(old_password):
|
||||
flash('旧密码错误', 'error')
|
||||
return render_template('admin/change_password.html')
|
||||
|
||||
# 验证新密码
|
||||
if not new_password:
|
||||
flash('新密码不能为空', 'error')
|
||||
return render_template('admin/change_password.html')
|
||||
|
||||
if len(new_password) < 6:
|
||||
flash('新密码长度至少6位', 'error')
|
||||
return render_template('admin/change_password.html')
|
||||
|
||||
if new_password != confirm_password:
|
||||
flash('两次输入的新密码不一致', 'error')
|
||||
return render_template('admin/change_password.html')
|
||||
|
||||
if old_password == new_password:
|
||||
flash('新密码不能与旧密码相同', 'error')
|
||||
return render_template('admin/change_password.html')
|
||||
|
||||
# 修改密码
|
||||
try:
|
||||
current_user.set_password(new_password)
|
||||
db.session.commit()
|
||||
flash('密码修改成功,请重新登录', 'success')
|
||||
logout_user()
|
||||
return redirect(url_for('admin_login'))
|
||||
except Exception as e:
|
||||
db.session.rollback()
|
||||
flash(f'密码修改失败:{str(e)}', 'error')
|
||||
return render_template('admin/change_password.html')
|
||||
|
||||
return render_template('admin/change_password.html')
|
||||
|
||||
# ========== API路由 ==========
|
||||
@app.route('/api/fetch-website-info', methods=['POST'])
|
||||
@login_required
|
||||
@@ -165,17 +235,18 @@ def create_app(config_name='default'):
|
||||
'message': '无法获取网站信息,请检查URL是否正确或手动填写'
|
||||
})
|
||||
|
||||
# 下载Logo(如果有)
|
||||
# 下载Logo到本地(如果有)
|
||||
logo_path = None
|
||||
if info.get('logo_url'):
|
||||
logo_path = fetcher.download_logo(info['logo_url'])
|
||||
logo_path = fetcher.download_logo(info['logo_url'], save_dir='static/logos')
|
||||
|
||||
# 如果下载失败,不返回远程URL,让用户手动上传
|
||||
return jsonify({
|
||||
'success': True,
|
||||
'data': {
|
||||
'name': info.get('name', ''),
|
||||
'description': info.get('description', ''),
|
||||
'logo': logo_path or info.get('logo_url', '')
|
||||
'logo': logo_path if logo_path else ''
|
||||
}
|
||||
})
|
||||
|
||||
@@ -185,6 +256,148 @@ def create_app(config_name='default'):
|
||||
'message': f'抓取失败: {str(e)}'
|
||||
}), 500
|
||||
|
||||
@app.route('/api/upload-logo', methods=['POST'])
|
||||
@login_required
|
||||
def upload_logo():
|
||||
"""上传Logo图片API"""
|
||||
try:
|
||||
# 检查文件是否存在
|
||||
if 'logo' not in request.files:
|
||||
return jsonify({
|
||||
'success': False,
|
||||
'message': '请选择要上传的图片'
|
||||
}), 400
|
||||
|
||||
file = request.files['logo']
|
||||
|
||||
# 检查文件名
|
||||
if file.filename == '':
|
||||
return jsonify({
|
||||
'success': False,
|
||||
'message': '未选择文件'
|
||||
}), 400
|
||||
|
||||
# 检查文件类型
|
||||
allowed_extensions = {'png', 'jpg', 'jpeg', 'gif', 'svg', 'ico', 'webp'}
|
||||
filename = file.filename.lower()
|
||||
if not any(filename.endswith('.' + ext) for ext in allowed_extensions):
|
||||
return jsonify({
|
||||
'success': False,
|
||||
'message': '不支持的文件格式,请上传图片文件(png, jpg, gif, svg, ico, webp)'
|
||||
}), 400
|
||||
|
||||
# 创建保存目录
|
||||
save_dir = 'static/logos'
|
||||
os.makedirs(save_dir, exist_ok=True)
|
||||
|
||||
# 生成安全的文件名
|
||||
import time
|
||||
import hashlib
|
||||
ext = os.path.splitext(filename)[1]
|
||||
timestamp = str(int(time.time() * 1000))
|
||||
hash_name = hashlib.md5(f"{filename}{timestamp}".encode()).hexdigest()[:16]
|
||||
safe_filename = f"logo_{hash_name}{ext}"
|
||||
filepath = os.path.join(save_dir, safe_filename)
|
||||
|
||||
# 保存文件
|
||||
file.save(filepath)
|
||||
|
||||
# 返回相对路径
|
||||
return jsonify({
|
||||
'success': True,
|
||||
'path': f'/{filepath.replace(os.sep, "/")}'
|
||||
})
|
||||
|
||||
except Exception as e:
|
||||
return jsonify({
|
||||
'success': False,
|
||||
'message': f'上传失败: {str(e)}'
|
||||
}), 500
|
||||
|
||||
@app.route('/api/generate-features', methods=['POST'])
|
||||
@login_required
|
||||
def generate_features():
|
||||
"""使用DeepSeek自动生成网站主要功能"""
|
||||
try:
|
||||
data = request.get_json()
|
||||
name = data.get('name', '').strip()
|
||||
description = data.get('description', '').strip()
|
||||
url = data.get('url', '').strip()
|
||||
|
||||
if not name or not description:
|
||||
return jsonify({
|
||||
'success': False,
|
||||
'message': '请提供网站名称和描述'
|
||||
}), 400
|
||||
|
||||
# 生成功能列表
|
||||
generator = TagGenerator()
|
||||
features = generator.generate_features(name, description, url)
|
||||
|
||||
if not features:
|
||||
return jsonify({
|
||||
'success': False,
|
||||
'message': 'DeepSeek功能生成失败,请检查API配置'
|
||||
}), 500
|
||||
|
||||
return jsonify({
|
||||
'success': True,
|
||||
'features': features
|
||||
})
|
||||
|
||||
except ValueError as e:
|
||||
return jsonify({
|
||||
'success': False,
|
||||
'message': str(e)
|
||||
}), 400
|
||||
except Exception as e:
|
||||
return jsonify({
|
||||
'success': False,
|
||||
'message': f'生成失败: {str(e)}'
|
||||
}), 500
|
||||
|
||||
@app.route('/api/generate-description', methods=['POST'])
|
||||
@login_required
|
||||
def generate_description():
|
||||
"""使用DeepSeek自动生成网站详细介绍"""
|
||||
try:
|
||||
data = request.get_json()
|
||||
name = data.get('name', '').strip()
|
||||
short_desc = data.get('short_desc', '').strip()
|
||||
url = data.get('url', '').strip()
|
||||
|
||||
if not name:
|
||||
return jsonify({
|
||||
'success': False,
|
||||
'message': '请提供网站名称'
|
||||
}), 400
|
||||
|
||||
# 生成详细介绍
|
||||
generator = TagGenerator()
|
||||
description = generator.generate_description(name, short_desc, url)
|
||||
|
||||
if not description:
|
||||
return jsonify({
|
||||
'success': False,
|
||||
'message': 'DeepSeek详细介绍生成失败,请检查API配置'
|
||||
}), 500
|
||||
|
||||
return jsonify({
|
||||
'success': True,
|
||||
'description': description
|
||||
})
|
||||
|
||||
except ValueError as e:
|
||||
return jsonify({
|
||||
'success': False,
|
||||
'message': str(e)
|
||||
}), 400
|
||||
except Exception as e:
|
||||
return jsonify({
|
||||
'success': False,
|
||||
'message': f'生成失败: {str(e)}'
|
||||
}), 500
|
||||
|
||||
@app.route('/api/generate-tags', methods=['POST'])
|
||||
@login_required
|
||||
def generate_tags():
|
||||
@@ -345,11 +558,11 @@ def create_app(config_name='default'):
|
||||
})
|
||||
continue
|
||||
|
||||
# 4. 下载Logo(失败不影响导入)
|
||||
# 4. 下载Logo到本地(失败不影响导入)
|
||||
logo_path = None
|
||||
if info.get('logo_url'):
|
||||
try:
|
||||
logo_path = fetcher.download_logo(info['logo_url'])
|
||||
logo_path = fetcher.download_logo(info['logo_url'], save_dir='static/logos')
|
||||
except Exception as e:
|
||||
print(f"下载Logo失败 ({url}): {str(e)}")
|
||||
# Logo下载失败不影响网站导入
|
||||
@@ -544,9 +757,47 @@ def create_app(config_name='default'):
|
||||
import re
|
||||
import random
|
||||
from pypinyin import lazy_pinyin
|
||||
from flask import request
|
||||
|
||||
# 使用no_autoflush防止在查询时触发提前flush
|
||||
with db.session.no_autoflush:
|
||||
# 处理手动输入的新标签
|
||||
new_tags_str = request.form.get('new_tags', '')
|
||||
if new_tags_str:
|
||||
new_tag_names = [name.strip() for name in new_tags_str.split(',') if name.strip()]
|
||||
for tag_name in new_tag_names:
|
||||
# 检查标签是否已存在
|
||||
existing_tag = Tag.query.filter_by(name=tag_name).first()
|
||||
if not existing_tag:
|
||||
# 创建新标签
|
||||
tag_slug = ''.join(lazy_pinyin(tag_name))
|
||||
tag_slug = tag_slug.lower()
|
||||
tag_slug = re.sub(r'[^\w\s-]', '', tag_slug)
|
||||
tag_slug = re.sub(r'[-\s]+', '-', tag_slug).strip('-')
|
||||
|
||||
# 确保slug唯一
|
||||
base_tag_slug = tag_slug[:50]
|
||||
counter = 1
|
||||
final_tag_slug = tag_slug
|
||||
while Tag.query.filter_by(slug=final_tag_slug).first():
|
||||
final_tag_slug = f"{base_tag_slug}-{counter}"
|
||||
counter += 1
|
||||
if counter > 100:
|
||||
final_tag_slug = f"{base_tag_slug}-{random.randint(1000, 9999)}"
|
||||
break
|
||||
|
||||
new_tag = Tag(name=tag_name, slug=final_tag_slug)
|
||||
db.session.add(new_tag)
|
||||
db.session.flush() # 确保新标签有ID
|
||||
|
||||
# 添加到模型的标签列表
|
||||
if new_tag not in model.tags:
|
||||
model.tags.append(new_tag)
|
||||
else:
|
||||
# 添加已存在的标签
|
||||
if existing_tag not in model.tags:
|
||||
model.tags.append(existing_tag)
|
||||
|
||||
# 如果code为空,自动生成唯一的8位数字编码
|
||||
if not model.code or model.code.strip() == '':
|
||||
max_attempts = 10
|
||||
@@ -684,6 +935,51 @@ def create_app(config_name='default'):
|
||||
]
|
||||
}
|
||||
|
||||
# Prompt模板管理视图
|
||||
class PromptAdmin(SecureModelView):
|
||||
can_edit = True
|
||||
can_delete = False # 不允许删除,避免系统必需的prompt被删除
|
||||
can_create = True
|
||||
can_view_details = False
|
||||
|
||||
# 显示操作列
|
||||
column_display_actions = True
|
||||
|
||||
column_list = ['id', 'key', 'name', 'description', 'is_active', 'updated_at']
|
||||
column_searchable_list = ['key', 'name', 'description']
|
||||
column_filters = ['is_active', 'key']
|
||||
column_labels = {
|
||||
'id': 'ID',
|
||||
'key': '唯一标识',
|
||||
'name': '模板名称',
|
||||
'system_prompt': '系统提示词',
|
||||
'user_prompt_template': '用户提示词模板',
|
||||
'description': '模板说明',
|
||||
'is_active': '是否启用',
|
||||
'created_at': '创建时间',
|
||||
'updated_at': '更新时间'
|
||||
}
|
||||
form_columns = ['key', 'name', 'description', 'system_prompt', 'user_prompt_template', 'is_active']
|
||||
|
||||
# 字段说明
|
||||
column_descriptions = {
|
||||
'key': '唯一标识,如: tags, features, description',
|
||||
'system_prompt': 'AI的系统角色设定',
|
||||
'user_prompt_template': '用户提示词模板,支持变量如 {name}, {description}, {url}',
|
||||
}
|
||||
|
||||
# 表单字段配置
|
||||
form_widget_args = {
|
||||
'system_prompt': {
|
||||
'rows': 3,
|
||||
'style': 'font-family: monospace;'
|
||||
},
|
||||
'user_prompt_template': {
|
||||
'rows': 20,
|
||||
'style': 'font-family: monospace;'
|
||||
}
|
||||
}
|
||||
|
||||
# 初始化 Flask-Admin
|
||||
admin = Admin(
|
||||
app,
|
||||
@@ -696,6 +992,7 @@ def create_app(config_name='default'):
|
||||
admin.add_view(SiteAdmin(Site, db.session, name='网站管理'))
|
||||
admin.add_view(TagAdmin(Tag, db.session, name='标签管理'))
|
||||
admin.add_view(NewsAdmin(News, db.session, name='新闻管理'))
|
||||
admin.add_view(PromptAdmin(PromptTemplate, db.session, name='Prompt管理'))
|
||||
admin.add_view(AdminAdmin(AdminModel, db.session, name='管理员', endpoint='admin_users'))
|
||||
|
||||
return app
|
||||
|
||||
Reference in New Issue
Block a user