Files
zjpb.net/app.py
Jowe 495248bf5f feat: v2.2.0 智能新闻更新和布局优化
- 实现每日首次访问自动更新新闻功能
- 每个网站获取3条一周内的新闻
- 新闻模块放置在左侧主栏
- 相似推荐移至右侧边栏
- 自动去重防止重复新闻
2025-12-30 22:31:51 +08:00

1273 lines
48 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
import os
import markdown
from flask import Flask, render_template, redirect, url_for, request, flash, jsonify
from flask_login import LoginManager, login_user, logout_user, login_required, current_user
from flask_admin import Admin, AdminIndexView, expose
from flask_admin.contrib.sqla import ModelView
from datetime import datetime
from config import config
from models import db, Site, Tag, Admin as AdminModel, News, site_tags, PromptTemplate
from utils.website_fetcher import WebsiteFetcher
from utils.tag_generator import TagGenerator
from utils.news_searcher import NewsSearcher
def create_app(config_name='default'):
"""应用工厂函数"""
app = Flask(__name__)
# 加载配置
app.config.from_object(config[config_name])
# 初始化数据库
db.init_app(app)
# 添加Markdown过滤器
@app.template_filter('markdown')
def markdown_filter(text):
"""将Markdown文本转换为HTML"""
if not text:
return ''
return markdown.markdown(text, extensions=['nl2br', 'fenced_code'])
# 初始化登录管理
login_manager = LoginManager()
login_manager.init_app(app)
login_manager.login_view = 'admin_login'
login_manager.login_message = '请先登录'
@login_manager.user_loader
def load_user(user_id):
return AdminModel.query.get(int(user_id))
# ========== 前台路由 ==========
@app.route('/')
def index():
"""首页"""
# 获取所有启用的标签
tags = Tag.query.order_by(Tag.sort_order.desc(), Tag.id).all()
# 优化使用一次SQL查询统计所有标签的网站数量
tag_counts = {}
if tags:
# 使用JOIN查询一次性获取所有标签的网站数量
from sqlalchemy import func
counts_query = db.session.query(
site_tags.c.tag_id,
func.count(site_tags.c.site_id).label('count')
).join(
Site, site_tags.c.site_id == Site.id
).filter(
Site.is_active == True
).group_by(site_tags.c.tag_id).all()
tag_counts = {tag_id: count for tag_id, count in counts_query}
# 获取筛选参数
tag_slug = request.args.get('tag')
search_query = request.args.get('q', '').strip()
page = request.args.get('page', 1, type=int)
per_page = 100 # 每页显示100个站点
selected_tag = None
# 构建基础查询
query = Site.query.filter_by(is_active=True)
# 标签筛选
if tag_slug:
selected_tag = Tag.query.filter_by(slug=tag_slug).first()
if selected_tag:
query = query.filter(Site.tags.contains(selected_tag))
else:
sites = []
pagination = None
return render_template('index_new.html', sites=sites, tags=tags,
selected_tag=selected_tag, search_query=search_query,
pagination=pagination, tag_counts=tag_counts)
# 搜索功能
if search_query:
# 使用OR条件搜索网站名称、URL、描述
search_pattern = f'%{search_query}%'
query = query.filter(
db.or_(
Site.name.like(search_pattern),
Site.url.like(search_pattern),
Site.description.like(search_pattern),
Site.short_desc.like(search_pattern)
)
)
# 排序并分页
query = query.order_by(Site.sort_order.desc(), Site.id.desc())
pagination = query.paginate(page=page, per_page=per_page, error_out=False)
sites = pagination.items
return render_template('index_new.html', sites=sites, tags=tags,
selected_tag=selected_tag, search_query=search_query,
pagination=pagination, tag_counts=tag_counts)
@app.route('/site/<code>')
def site_detail(code):
"""网站详情页"""
site = Site.query.filter_by(code=code, is_active=True).first_or_404()
# 增加浏览次数
site.view_count += 1
db.session.commit()
# 智能新闻更新:检查今天是否已更新过新闻
from datetime import date
today = date.today()
# 检查该网站最新一条新闻的创建时间
latest_news = News.query.filter_by(
site_id=site.id
).order_by(News.created_at.desc()).first()
# 判断是否需要更新新闻
need_update = False
if not latest_news:
# 没有任何新闻,需要获取
need_update = True
elif latest_news.created_at.date() < today:
# 最新新闻不是今天创建的,需要更新
need_update = True
# 如果需要更新,自动获取最新新闻
if need_update:
api_key = app.config.get('BOCHA_API_KEY')
if api_key:
try:
# 创建新闻搜索器
searcher = NewsSearcher(api_key)
# 获取新闻限制3条一周内的
news_items = searcher.search_site_news(
site_name=site.name,
site_url=site.url,
count=3,
freshness='oneWeek'
)
# 保存新闻到数据库
if news_items:
for item in news_items:
# 检查是否已存在根据URL去重
existing = News.query.filter_by(
site_id=site.id,
url=item['url']
).first()
if not existing:
news = News(
site_id=site.id,
title=item['title'],
content=item.get('summary') or item.get('snippet', ''),
url=item['url'],
source_name=item.get('site_name', ''),
source_icon=item.get('site_icon', ''),
published_at=item.get('published_at'),
news_type='Search Result',
is_active=True
)
db.session.add(news)
db.session.commit()
except Exception as e:
# 获取新闻失败,不影响页面显示
print(f"自动获取新闻失败:{str(e)}")
db.session.rollback()
# 获取该网站的相关新闻最多显示5条
news_list = News.query.filter_by(
site_id=site.id,
is_active=True
).order_by(News.published_at.desc()).limit(5).all()
# 获取同类工具推荐通过标签匹配最多显示4个
recommended_sites = []
if site.tags:
# 获取有相同标签的其他网站
recommended_sites = Site.query.filter(
Site.id != site.id,
Site.is_active == True,
Site.tags.any(Tag.id.in_([tag.id for tag in site.tags]))
).order_by(Site.view_count.desc()).limit(4).all()
return render_template('detail_new.html', site=site, news_list=news_list, recommended_sites=recommended_sites)
# ========== 后台登录路由 ==========
@app.route('/admin/login', methods=['GET', 'POST'])
def admin_login():
"""管理员登录"""
if current_user.is_authenticated:
return redirect(url_for('admin.index'))
if request.method == 'POST':
username = request.form.get('username')
password = request.form.get('password')
admin = AdminModel.query.filter_by(username=username).first()
if admin and admin.check_password(password) and admin.is_active:
login_user(admin)
admin.last_login = datetime.now()
db.session.commit()
return redirect(url_for('admin.index'))
else:
flash('用户名或密码错误', 'error')
return render_template('admin_login.html')
@app.route('/admin/logout')
@login_required
def admin_logout():
"""管理员登出"""
logout_user()
return redirect(url_for('index'))
@app.route('/admin/change-password', methods=['GET', 'POST'])
@login_required
def change_password():
"""修改密码"""
if request.method == 'POST':
old_password = request.form.get('old_password', '').strip()
new_password = request.form.get('new_password', '').strip()
confirm_password = request.form.get('confirm_password', '').strip()
# 验证旧密码
if not current_user.check_password(old_password):
flash('旧密码错误', 'error')
return render_template('admin/change_password.html')
# 验证新密码
if not new_password:
flash('新密码不能为空', 'error')
return render_template('admin/change_password.html')
if len(new_password) < 6:
flash('新密码长度至少6位', 'error')
return render_template('admin/change_password.html')
if new_password != confirm_password:
flash('两次输入的新密码不一致', 'error')
return render_template('admin/change_password.html')
if old_password == new_password:
flash('新密码不能与旧密码相同', 'error')
return render_template('admin/change_password.html')
# 修改密码
try:
current_user.set_password(new_password)
db.session.commit()
flash('密码修改成功,请重新登录', 'success')
logout_user()
return redirect(url_for('admin_login'))
except Exception as e:
db.session.rollback()
flash(f'密码修改失败:{str(e)}', 'error')
return render_template('admin/change_password.html')
return render_template('admin/change_password.html')
# ========== API路由 ==========
@app.route('/api/fetch-website-info', methods=['POST'])
@login_required
def fetch_website_info():
"""抓取网站信息API"""
try:
data = request.get_json()
url = data.get('url', '').strip()
if not url:
return jsonify({
'success': False,
'message': '请提供网站URL'
}), 400
# 创建抓取器
fetcher = WebsiteFetcher(timeout=15)
# 抓取网站信息
info = fetcher.fetch_website_info(url)
if not info:
return jsonify({
'success': False,
'message': '无法获取网站信息请检查URL是否正确或手动填写'
})
# 下载Logo到本地如果有
logo_path = None
if info.get('logo_url'):
logo_path = fetcher.download_logo(info['logo_url'], save_dir='static/logos')
# 如果下载失败不返回远程URL让用户手动上传
return jsonify({
'success': True,
'data': {
'name': info.get('name', ''),
'description': info.get('description', ''),
'logo': logo_path if logo_path else ''
}
})
except Exception as e:
return jsonify({
'success': False,
'message': f'抓取失败: {str(e)}'
}), 500
@app.route('/api/upload-logo', methods=['POST'])
@login_required
def upload_logo():
"""上传Logo图片API"""
try:
# 检查文件是否存在
if 'logo' not in request.files:
return jsonify({
'success': False,
'message': '请选择要上传的图片'
}), 400
file = request.files['logo']
# 检查文件名
if file.filename == '':
return jsonify({
'success': False,
'message': '未选择文件'
}), 400
# 检查文件类型
allowed_extensions = {'png', 'jpg', 'jpeg', 'gif', 'svg', 'ico', 'webp'}
filename = file.filename.lower()
if not any(filename.endswith('.' + ext) for ext in allowed_extensions):
return jsonify({
'success': False,
'message': '不支持的文件格式,请上传图片文件(png, jpg, gif, svg, ico, webp)'
}), 400
# 创建保存目录
save_dir = 'static/logos'
os.makedirs(save_dir, exist_ok=True)
# 生成安全的文件名
import time
import hashlib
ext = os.path.splitext(filename)[1]
timestamp = str(int(time.time() * 1000))
hash_name = hashlib.md5(f"{filename}{timestamp}".encode()).hexdigest()[:16]
safe_filename = f"logo_{hash_name}{ext}"
filepath = os.path.join(save_dir, safe_filename)
# 保存文件
file.save(filepath)
# 返回相对路径
return jsonify({
'success': True,
'path': f'/{filepath.replace(os.sep, "/")}'
})
except Exception as e:
return jsonify({
'success': False,
'message': f'上传失败: {str(e)}'
}), 500
@app.route('/api/generate-features', methods=['POST'])
@login_required
def generate_features():
"""使用DeepSeek自动生成网站主要功能"""
try:
data = request.get_json()
name = data.get('name', '').strip()
description = data.get('description', '').strip()
url = data.get('url', '').strip()
if not name or not description:
return jsonify({
'success': False,
'message': '请提供网站名称和描述'
}), 400
# 生成功能列表
generator = TagGenerator()
features = generator.generate_features(name, description, url)
if not features:
return jsonify({
'success': False,
'message': 'DeepSeek功能生成失败请检查API配置'
}), 500
return jsonify({
'success': True,
'features': features
})
except ValueError as e:
return jsonify({
'success': False,
'message': str(e)
}), 400
except Exception as e:
return jsonify({
'success': False,
'message': f'生成失败: {str(e)}'
}), 500
@app.route('/api/generate-description', methods=['POST'])
@login_required
def generate_description():
"""使用DeepSeek自动生成网站详细介绍"""
try:
data = request.get_json()
name = data.get('name', '').strip()
short_desc = data.get('short_desc', '').strip()
url = data.get('url', '').strip()
if not name:
return jsonify({
'success': False,
'message': '请提供网站名称'
}), 400
# 生成详细介绍
generator = TagGenerator()
description = generator.generate_description(name, short_desc, url)
if not description:
return jsonify({
'success': False,
'message': 'DeepSeek详细介绍生成失败请检查API配置'
}), 500
return jsonify({
'success': True,
'description': description
})
except ValueError as e:
return jsonify({
'success': False,
'message': str(e)
}), 400
except Exception as e:
return jsonify({
'success': False,
'message': f'生成失败: {str(e)}'
}), 500
@app.route('/api/generate-tags', methods=['POST'])
@login_required
def generate_tags():
"""使用DeepSeek自动生成标签"""
try:
data = request.get_json()
name = data.get('name', '').strip()
description = data.get('description', '').strip()
if not name or not description:
return jsonify({
'success': False,
'message': '请提供网站名称和描述'
}), 400
# 获取现有标签作为参考
existing_tags = [tag.name for tag in Tag.query.all()]
# 生成标签
generator = TagGenerator()
suggested_tags = generator.generate_tags(name, description, existing_tags)
if not suggested_tags:
return jsonify({
'success': False,
'message': 'DeepSeek标签生成失败请检查API配置'
}), 500
return jsonify({
'success': True,
'tags': suggested_tags
})
except ValueError as e:
return jsonify({
'success': False,
'message': str(e)
}), 400
except Exception as e:
return jsonify({
'success': False,
'message': f'生成失败: {str(e)}'
}), 500
# ========== 新闻获取路由 ==========
@app.route('/api/fetch-site-news', methods=['POST'])
@login_required
def fetch_site_news():
"""为指定网站获取最新新闻"""
try:
data = request.get_json()
site_id = data.get('site_id')
count = data.get('count', app.config.get('NEWS_SEARCH_COUNT', 10))
freshness = data.get('freshness', app.config.get('NEWS_SEARCH_FRESHNESS', 'oneMonth'))
if not site_id:
return jsonify({
'success': False,
'message': '请提供网站ID'
}), 400
# 获取网站信息
site = Site.query.get(site_id)
if not site:
return jsonify({
'success': False,
'message': '网站不存在'
}), 404
# 检查博查API配置
api_key = app.config.get('BOCHA_API_KEY')
if not api_key:
return jsonify({
'success': False,
'message': '博查API未配置请在.env文件中设置BOCHA_API_KEY'
}), 500
# 创建新闻搜索器
searcher = NewsSearcher(api_key)
# 搜索新闻
news_items = searcher.search_site_news(
site_name=site.name,
site_url=site.url,
count=count,
freshness=freshness
)
if not news_items:
return jsonify({
'success': False,
'message': '未找到相关新闻'
}), 404
# 保存新闻到数据库
saved_count = 0
for item in news_items:
# 检查新闻是否已存在根据URL判断
existing_news = News.query.filter_by(
site_id=site_id,
url=item['url']
).first()
if not existing_news:
# 创建新闻记录
news = News(
site_id=site_id,
title=item['title'],
content=item.get('summary') or item.get('snippet', ''),
url=item['url'],
source_name=item.get('site_name', ''),
source_icon=item.get('site_icon', ''),
published_at=item.get('published_at'),
news_type='Search Result',
is_active=True
)
db.session.add(news)
saved_count += 1
# 提交事务
db.session.commit()
return jsonify({
'success': True,
'message': f'成功获取并保存 {saved_count} 条新闻',
'total_found': len(news_items),
'saved': saved_count,
'news_items': searcher.format_news_for_display(news_items)
})
except Exception as e:
db.session.rollback()
return jsonify({
'success': False,
'message': f'获取失败: {str(e)}'
}), 500
@app.route('/api/fetch-all-news', methods=['POST'])
@login_required
def fetch_all_news():
"""批量为所有网站获取新闻"""
try:
data = request.get_json()
count_per_site = data.get('count', 5) # 每个网站获取的新闻数量
freshness = data.get('freshness', app.config.get('NEWS_SEARCH_FRESHNESS', 'oneMonth'))
limit = data.get('limit', 10) # 限制处理的网站数量
# 检查博查API配置
api_key = app.config.get('BOCHA_API_KEY')
if not api_key:
return jsonify({
'success': False,
'message': '博查API未配置请在.env文件中设置BOCHA_API_KEY'
}), 500
# 获取启用的网站(按更新时间排序,优先处理旧的)
sites = Site.query.filter_by(is_active=True).order_by(Site.updated_at).limit(limit).all()
if not sites:
return jsonify({
'success': False,
'message': '没有可用的网站'
}), 404
# 创建新闻搜索器
searcher = NewsSearcher(api_key)
# 统计信息
total_saved = 0
total_found = 0
processed_sites = []
# 为每个网站获取新闻
for site in sites:
try:
# 搜索新闻
news_items = searcher.search_site_news(
site_name=site.name,
site_url=site.url,
count=count_per_site,
freshness=freshness
)
site_saved = 0
for item in news_items:
# 检查是否已存在
existing_news = News.query.filter_by(
site_id=site.id,
url=item['url']
).first()
if not existing_news:
news = News(
site_id=site.id,
title=item['title'],
content=item.get('summary') or item.get('snippet', ''),
url=item['url'],
source_name=item.get('site_name', ''),
source_icon=item.get('site_icon', ''),
published_at=item.get('published_at'),
news_type='Search Result',
is_active=True
)
db.session.add(news)
site_saved += 1
total_found += len(news_items)
total_saved += site_saved
processed_sites.append({
'id': site.id,
'name': site.name,
'found': len(news_items),
'saved': site_saved
})
except Exception as e:
# 单个网站失败不影响其他网站
processed_sites.append({
'id': site.id,
'name': site.name,
'error': str(e)
})
continue
# 提交事务
db.session.commit()
return jsonify({
'success': True,
'message': f'批量获取完成,共处理 {len(processed_sites)} 个网站',
'total_found': total_found,
'total_saved': total_saved,
'processed_sites': processed_sites
})
except Exception as e:
db.session.rollback()
return jsonify({
'success': False,
'message': f'批量获取失败: {str(e)}'
}), 500
# ========== 批量导入路由 ==========
@app.route('/admin/batch-import', methods=['GET', 'POST'])
@login_required
def batch_import():
"""批量导入网站"""
from utils.bookmark_parser import BookmarkParser
from utils.website_fetcher import WebsiteFetcher
results = None
if request.method == 'POST':
import_type = request.form.get('import_type')
auto_activate = request.form.get('auto_activate') == 'on'
parser = BookmarkParser()
fetcher = WebsiteFetcher(timeout=15)
urls_to_import = []
try:
# 解析输入
if import_type == 'url_list':
url_list_text = request.form.get('url_list', '')
urls_to_import = parser.parse_url_list(url_list_text)
elif import_type == 'bookmark_file':
bookmark_file = request.files.get('bookmark_file')
if not bookmark_file:
flash('请选择书签文件', 'error')
return render_template('admin/batch_import.html')
html_content = bookmark_file.read().decode('utf-8', errors='ignore')
all_bookmarks = parser.parse_html_file(html_content)
# 筛选文件夹(如果指定)
folder_filter = request.form.get('folder_filter', '').strip()
if folder_filter:
urls_to_import = [
b for b in all_bookmarks
if folder_filter.lower() in b.get('folder', '').lower()
]
else:
urls_to_import = all_bookmarks
# 批量导入
success_list = []
failed_list = []
for idx, item in enumerate(urls_to_import, 1):
url = item['url']
name = item.get('name', '')
# 为每个URL创建独立的事务
try:
# 1. 检查URL是否已存在
try:
existing = Site.query.filter_by(url=url).first()
if existing:
failed_list.append({
'url': url,
'name': name or existing.name,
'error': f'该URL已存在网站名称{existing.name}'
})
continue
except Exception as e:
failed_list.append({
'url': url,
'name': name,
'error': f'检查URL时出错: {str(e)}'
})
continue
# 2. 抓取网站信息(带超时和错误处理)
info = None
try:
info = fetcher.fetch_website_info(url)
except Exception as e:
print(f"抓取 {url} 失败: {str(e)}")
# 抓取失败不是致命错误,继续尝试使用书签名称
# 3. 处理网站信息
if not info or not info.get('name'):
# 如果有书签名称,使用书签名称
if name:
info = {
'name': name,
'description': '',
'logo_url': ''
}
else:
# 尝试从URL提取域名作为名称
from urllib.parse import urlparse
try:
parsed = urlparse(url)
domain = parsed.netloc or parsed.path
if domain:
info = {
'name': domain,
'description': '',
'logo_url': ''
}
else:
failed_list.append({
'url': url,
'name': name,
'error': '无法获取网站信息且没有备用名称'
})
continue
except Exception:
failed_list.append({
'url': url,
'name': name,
'error': '无法获取网站信息且URL解析失败'
})
continue
# 4. 下载Logo到本地失败不影响导入
logo_path = None
if info.get('logo_url'):
try:
logo_path = fetcher.download_logo(info['logo_url'], save_dir='static/logos')
except Exception as e:
print(f"下载Logo失败 ({url}): {str(e)}")
# Logo下载失败不影响网站导入
# 5. 生成code和slug
try:
import random
from pypinyin import lazy_pinyin
import re
# 生成唯一的code
site_code = None
max_attempts = 10
for _ in range(max_attempts):
code = str(random.randint(10000000, 99999999))
if not Site.query.filter_by(code=code).first():
site_code = code
break
if not site_code:
# 如果10次都失败使用时间戳
import time
site_code = str(int(time.time() * 1000))[-8:]
# 生成slug
site_name = info.get('name', name or 'Unknown')[:100]
slug = ''.join(lazy_pinyin(site_name))
slug = slug.lower()
slug = re.sub(r'[^\w\s-]', '', slug)
slug = re.sub(r'[-\s]+', '-', slug).strip('-')
if not slug:
slug = f"site-{site_code}"
# 确保slug唯一
base_slug = slug[:50] # 限制长度
counter = 1
final_slug = slug
while Site.query.filter_by(slug=final_slug).first():
final_slug = f"{base_slug}-{counter}"
counter += 1
if counter > 100: # 防止无限循环
final_slug = f"{base_slug}-{site_code}"
break
# 6. 创建网站记录带code和slug
site = Site(
code=site_code,
slug=final_slug,
name=site_name,
url=url[:500], # 限制URL长度
logo=logo_path or info.get('logo_url', '')[:500] if info.get('logo_url') else '',
short_desc=info.get('description', '')[:200] if info.get('description') else '',
description=info.get('description', '')[:2000] if info.get('description') else '',
is_active=auto_activate
)
# 添加到数据库并提交
db.session.add(site)
db.session.commit()
success_list.append({
'name': site.name,
'url': site.url
})
print(f"成功导入 [{idx}/{len(urls_to_import)}]: {site.name}")
except Exception as e:
db.session.rollback()
failed_list.append({
'url': url,
'name': name or info.get('name', 'Unknown'),
'error': f'数据库保存失败: {str(e)}'
})
continue
except Exception as e:
# 捕获所有未预期的错误
db.session.rollback()
failed_list.append({
'url': url,
'name': name,
'error': f'未知错误: {str(e)}'
})
print(f"导入 {url} 时发生未知错误: {str(e)}")
continue
results = {
'total_count': len(urls_to_import),
'success_count': len(success_list),
'failed_count': len(failed_list),
'success_list': success_list,
'failed_list': failed_list
}
if success_list:
flash(f'成功导入 {len(success_list)} 个网站!', 'success')
except Exception as e:
flash(f'导入失败: {str(e)}', 'error')
return render_template('admin/batch_import.html', results=results)
# ========== Flask-Admin 配置 ==========
class SecureModelView(ModelView):
"""需要登录的模型视图"""
# 中文化配置
can_set_page_size = True
page_size = 20
# 自定义文本
list_template = 'admin/model/list.html'
create_template = 'admin/model/create.html'
edit_template = 'admin/model/edit.html'
# 覆盖英文文本
named_filter_urls = True
def is_accessible(self):
return current_user.is_authenticated
def inaccessible_callback(self, name, **kwargs):
return redirect(url_for('admin_login'))
class SecureAdminIndexView(AdminIndexView):
"""需要登录的管理首页"""
def is_accessible(self):
return current_user.is_authenticated
def inaccessible_callback(self, name, **kwargs):
return redirect(url_for('admin_login'))
@expose('/')
def index(self):
"""控制台首页,显示统计信息"""
# 统计数据
stats = {
'sites_count': Site.query.filter_by(is_active=True).count(),
'tags_count': Tag.query.count(),
'news_count': News.query.filter_by(is_active=True).count(),
'total_views': db.session.query(db.func.sum(Site.view_count)).scalar() or 0
}
# 最近添加的工具最多5个
recent_sites = Site.query.order_by(Site.created_at.desc()).limit(5).all()
return self.render('admin/index.html', stats=stats, recent_sites=recent_sites)
# 网站管理视图
class SiteAdmin(SecureModelView):
# 自定义模板
create_template = 'admin/site/create.html'
edit_template = 'admin/site/edit.html'
# 启用编辑和删除
can_edit = True
can_delete = True
can_create = True
can_view_details = False # 禁用查看详情,点击名称即可查看
# 显示操作列
column_display_actions = True
action_disallowed_list = []
column_list = ['id', 'code', 'name', 'url', 'slug', 'is_active', 'view_count', 'created_at']
column_searchable_list = ['code', 'name', 'url', 'description']
column_filters = ['is_active', 'tags']
column_labels = {
'id': 'ID',
'code': '网站编码',
'name': '网站名称',
'url': 'URL',
'slug': 'URL别名',
'logo': 'Logo',
'short_desc': '简短描述',
'description': '详细介绍',
'features': '主要功能',
'is_active': '是否启用',
'view_count': '浏览次数',
'sort_order': '排序权重',
'tags': '标签',
'created_at': '创建时间',
'updated_at': '更新时间'
}
form_columns = ['name', 'url', 'slug', 'logo', 'short_desc', 'description', 'features', 'tags', 'is_active', 'sort_order']
# 自定义编辑/删除文字
column_extra_row_actions = None
def on_model_change(self, form, model, is_created):
"""保存前自动生成code和slug如果为空"""
import re
import random
from pypinyin import lazy_pinyin
from flask import request
# 使用no_autoflush防止在查询时触发提前flush
with db.session.no_autoflush:
# 处理手动输入的新标签
new_tags_str = request.form.get('new_tags', '')
if new_tags_str:
new_tag_names = [name.strip() for name in new_tags_str.split(',') if name.strip()]
for tag_name in new_tag_names:
# 检查标签是否已存在
existing_tag = Tag.query.filter_by(name=tag_name).first()
if not existing_tag:
# 创建新标签
tag_slug = ''.join(lazy_pinyin(tag_name))
tag_slug = tag_slug.lower()
tag_slug = re.sub(r'[^\w\s-]', '', tag_slug)
tag_slug = re.sub(r'[-\s]+', '-', tag_slug).strip('-')
# 确保slug唯一
base_tag_slug = tag_slug[:50]
counter = 1
final_tag_slug = tag_slug
while Tag.query.filter_by(slug=final_tag_slug).first():
final_tag_slug = f"{base_tag_slug}-{counter}"
counter += 1
if counter > 100:
final_tag_slug = f"{base_tag_slug}-{random.randint(1000, 9999)}"
break
new_tag = Tag(name=tag_name, slug=final_tag_slug)
db.session.add(new_tag)
db.session.flush() # 确保新标签有ID
# 添加到模型的标签列表
if new_tag not in model.tags:
model.tags.append(new_tag)
else:
# 添加已存在的标签
if existing_tag not in model.tags:
model.tags.append(existing_tag)
# 如果code为空自动生成唯一的8位数字编码
if not model.code or model.code.strip() == '':
max_attempts = 10
for attempt in range(max_attempts):
# 生成10000000-99999999之间的随机数
code = str(random.randint(10000000, 99999999))
# 检查是否已存在
existing = Site.query.filter(Site.code == code).first()
if not existing or existing.id == model.id:
model.code = code
break
# 如果10次都失败使用时间戳
if not model.code:
import time
model.code = str(int(time.time() * 1000))[-8:]
# 如果slug为空从name自动生成
if not model.slug or model.slug.strip() == '':
# 将中文转换为拼音
slug = ''.join(lazy_pinyin(model.name))
# 转换为小写,移除特殊字符
slug = slug.lower()
slug = re.sub(r'[^\w\s-]', '', slug)
slug = re.sub(r'[-\s]+', '-', slug).strip('-')
# 如果转换后为空使用code
if not slug:
slug = f"site-{model.code}"
# 确保slug唯一限制长度和重试次数
base_slug = slug[:50]
counter = 1
final_slug = slug
max_slug_attempts = 100
while counter < max_slug_attempts:
existing = Site.query.filter(Site.slug == final_slug).first()
if not existing or existing.id == model.id:
break
final_slug = f"{base_slug}-{counter}"
counter += 1
# 如果100次都失败使用code确保唯一
if counter >= max_slug_attempts:
final_slug = f"{base_slug}-{model.code}"
model.slug = final_slug
# 标签管理视图
class TagAdmin(SecureModelView):
can_edit = True
can_delete = True
can_create = True
can_view_details = False
# 显示操作列
column_display_actions = True
column_list = ['id', 'name', 'slug', 'description', 'sort_order']
column_searchable_list = ['name', 'description']
column_labels = {
'id': 'ID',
'name': '标签名称',
'slug': 'URL别名',
'description': '标签描述',
'icon': '图标',
'sort_order': '排序权重',
'created_at': '创建时间'
}
form_columns = ['name', 'slug', 'description', 'icon', 'sort_order']
# 管理员视图
class AdminAdmin(SecureModelView):
can_edit = True
can_delete = True
can_create = True
can_view_details = False
# 显示操作列
column_display_actions = True
column_list = ['id', 'username', 'email', 'is_active', 'last_login', 'created_at']
column_searchable_list = ['username', 'email']
column_filters = ['is_active']
column_labels = {
'id': 'ID',
'username': '用户名',
'email': '邮箱',
'is_active': '是否启用',
'created_at': '创建时间',
'last_login': '最后登录'
}
form_columns = ['username', 'email', 'is_active']
def on_model_change(self, form, model, is_created):
# 如果是新建管理员,设置默认密码
if is_created:
model.set_password('admin123') # 默认密码
# 新闻管理视图
class NewsAdmin(SecureModelView):
can_edit = True
can_delete = True
can_create = True
can_view_details = False
# 显示操作列
column_display_actions = True
column_list = ['id', 'site', 'title', 'source_name', 'news_type', 'published_at', 'is_active']
column_searchable_list = ['title', 'content', 'source_name']
column_filters = ['site', 'news_type', 'source_name', 'is_active', 'published_at']
column_labels = {
'id': 'ID',
'site': '关联网站',
'title': '新闻标题',
'content': '新闻内容',
'news_type': '新闻类型',
'url': '新闻链接',
'source_name': '来源网站',
'source_icon': '来源图标',
'published_at': '发布时间',
'is_active': '是否启用',
'created_at': '创建时间',
'updated_at': '更新时间'
}
form_columns = ['site', 'title', 'content', 'news_type', 'url', 'source_name', 'source_icon', 'published_at', 'is_active']
# 可选的新闻类型
form_choices = {
'news_type': [
('Search Result', 'Search Result'),
('Product Update', 'Product Update'),
('Industry News', 'Industry News'),
('Company News', 'Company News'),
('Other', 'Other')
]
}
# 默认排序
column_default_sort = ('published_at', True) # 按发布时间倒序排列
# Prompt模板管理视图
class PromptAdmin(SecureModelView):
can_edit = True
can_delete = False # 不允许删除避免系统必需的prompt被删除
can_create = True
can_view_details = False
# 显示操作列
column_display_actions = True
column_list = ['id', 'key', 'name', 'description', 'is_active', 'updated_at']
column_searchable_list = ['key', 'name', 'description']
column_filters = ['is_active', 'key']
column_labels = {
'id': 'ID',
'key': '唯一标识',
'name': '模板名称',
'system_prompt': '系统提示词',
'user_prompt_template': '用户提示词模板',
'description': '模板说明',
'is_active': '是否启用',
'created_at': '创建时间',
'updated_at': '更新时间'
}
form_columns = ['key', 'name', 'description', 'system_prompt', 'user_prompt_template', 'is_active']
# 字段说明
column_descriptions = {
'key': '唯一标识,如: tags, features, description',
'system_prompt': 'AI的系统角色设定',
'user_prompt_template': '用户提示词模板,支持变量如 {name}, {description}, {url}',
}
# 表单字段配置
form_widget_args = {
'system_prompt': {
'rows': 3,
'style': 'font-family: monospace;'
},
'user_prompt_template': {
'rows': 20,
'style': 'font-family: monospace;'
}
}
# 初始化 Flask-Admin
admin = Admin(
app,
name='ZJPB 焦提示词',
template_mode='bootstrap4',
index_view=SecureAdminIndexView(name='控制台', url='/admin'),
base_template='admin/master.html'
)
admin.add_view(SiteAdmin(Site, db.session, name='网站管理'))
admin.add_view(TagAdmin(Tag, db.session, name='标签管理'))
admin.add_view(NewsAdmin(News, db.session, name='新闻管理'))
admin.add_view(PromptAdmin(PromptTemplate, db.session, name='Prompt管理'))
admin.add_view(AdminAdmin(AdminModel, db.session, name='管理员', endpoint='admin_users'))
return app
if __name__ == '__main__':
app = create_app(os.getenv('FLASK_ENV', 'development'))
app.run(host='0.0.0.0', port=5000, debug=True)