- 移除顶部热门工具排行榜模块 - 在标签下方添加三个tab(最新/热门/推荐) - 添加is_recommended字段到Site模型 - 创建数据库迁移脚本add_is_recommended.py - 更新后台管理界面支持推荐标记 - 更新分页链接保持tab状态 - 所有功能已本地测试验证通过 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
1644 lines
62 KiB
Python
1644 lines
62 KiB
Python
import os
|
||
import markdown
|
||
from flask import Flask, render_template, redirect, url_for, request, flash, jsonify
|
||
from flask_login import LoginManager, login_user, logout_user, login_required, current_user
|
||
from flask_admin import Admin, AdminIndexView, expose
|
||
from flask_admin.contrib.sqla import ModelView
|
||
from datetime import datetime
|
||
from config import config
|
||
from models import db, Site, Tag, Admin as AdminModel, News, site_tags, PromptTemplate
|
||
from utils.website_fetcher import WebsiteFetcher
|
||
from utils.tag_generator import TagGenerator
|
||
from utils.news_searcher import NewsSearcher
|
||
|
||
def create_app(config_name='default'):
|
||
"""应用工厂函数"""
|
||
app = Flask(__name__)
|
||
|
||
# 加载配置
|
||
app.config.from_object(config[config_name])
|
||
|
||
# 初始化数据库
|
||
db.init_app(app)
|
||
|
||
# 添加Markdown过滤器
|
||
@app.template_filter('markdown')
|
||
def markdown_filter(text):
|
||
"""将Markdown文本转换为HTML"""
|
||
if not text:
|
||
return ''
|
||
return markdown.markdown(text, extensions=['nl2br', 'fenced_code'])
|
||
|
||
# v2.4新增: 自动内链过滤器
|
||
@app.template_filter('auto_link')
|
||
def auto_link_filter(text, current_site_id=None):
|
||
"""自动为内容中的工具名称添加链接"""
|
||
if not text:
|
||
return ''
|
||
|
||
# 获取所有启用的网站(排除当前网站)
|
||
sites = Site.query.filter_by(is_active=True).all()
|
||
if current_site_id:
|
||
sites = [s for s in sites if s.id != current_site_id]
|
||
|
||
# 按名称长度降序排序,优先匹配长名称
|
||
sites = sorted(sites, key=lambda s: len(s.name), reverse=True)
|
||
|
||
# 记录已经添加链接的位置,避免重复
|
||
linked_sites = set()
|
||
|
||
for site in sites:
|
||
if site.name in text and site.name not in linked_sites:
|
||
# 只链接第一次出现的位置
|
||
link = f'<a href="/site/{site.code}" title="{site.short_desc or site.name}" style="color: var(--primary-blue); text-decoration: underline; text-decoration-style: dotted;">{site.name}</a>'
|
||
text = text.replace(site.name, link, 1)
|
||
linked_sites.add(site.name)
|
||
|
||
return text
|
||
|
||
# 初始化登录管理
|
||
login_manager = LoginManager()
|
||
login_manager.init_app(app)
|
||
login_manager.login_view = 'admin_login'
|
||
login_manager.login_message = '请先登录'
|
||
|
||
@login_manager.user_loader
|
||
def load_user(user_id):
|
||
return AdminModel.query.get(int(user_id))
|
||
|
||
# ========== 前台路由 ==========
|
||
@app.route('/')
|
||
def index():
|
||
"""首页"""
|
||
# 获取所有启用的标签
|
||
tags = Tag.query.order_by(Tag.sort_order.desc(), Tag.id).all()
|
||
|
||
# 优化:使用一次SQL查询统计所有标签的网站数量
|
||
tag_counts = {}
|
||
if tags:
|
||
# 使用JOIN查询一次性获取所有标签的网站数量
|
||
from sqlalchemy import func
|
||
counts_query = db.session.query(
|
||
site_tags.c.tag_id,
|
||
func.count(site_tags.c.site_id).label('count')
|
||
).join(
|
||
Site, site_tags.c.site_id == Site.id
|
||
).filter(
|
||
Site.is_active == True
|
||
).group_by(site_tags.c.tag_id).all()
|
||
|
||
tag_counts = {tag_id: count for tag_id, count in counts_query}
|
||
|
||
# 获取筛选参数
|
||
tag_slug = request.args.get('tag')
|
||
search_query = request.args.get('q', '').strip()
|
||
current_tab = request.args.get('tab', 'latest') # 默认为"最新"
|
||
page = request.args.get('page', 1, type=int)
|
||
per_page = 100 # 每页显示100个站点
|
||
|
||
selected_tag = None
|
||
|
||
# 构建基础查询
|
||
query = Site.query.filter_by(is_active=True)
|
||
|
||
# 标签筛选
|
||
if tag_slug:
|
||
selected_tag = Tag.query.filter_by(slug=tag_slug).first()
|
||
if selected_tag:
|
||
query = query.filter(Site.tags.contains(selected_tag))
|
||
else:
|
||
sites = []
|
||
pagination = None
|
||
return render_template('index_new.html', sites=sites, tags=tags,
|
||
selected_tag=selected_tag, search_query=search_query,
|
||
pagination=pagination, tag_counts=tag_counts,
|
||
current_tab=current_tab)
|
||
|
||
# 搜索功能
|
||
if search_query:
|
||
# 使用OR条件搜索:网站名称、URL、描述
|
||
search_pattern = f'%{search_query}%'
|
||
query = query.filter(
|
||
db.or_(
|
||
Site.name.like(search_pattern),
|
||
Site.url.like(search_pattern),
|
||
Site.description.like(search_pattern),
|
||
Site.short_desc.like(search_pattern)
|
||
)
|
||
)
|
||
|
||
# Tab筛选和排序
|
||
if current_tab == 'popular':
|
||
# 热门:按浏览次数倒序
|
||
query = query.order_by(Site.view_count.desc(), Site.id.desc())
|
||
elif current_tab == 'recommended':
|
||
# 推荐:只显示is_recommended=True的
|
||
query = query.filter_by(is_recommended=True).order_by(Site.sort_order.desc(), Site.id.desc())
|
||
else:
|
||
# 最新:按创建时间倒序(默认)
|
||
query = query.order_by(Site.created_at.desc(), Site.id.desc())
|
||
|
||
# 分页
|
||
pagination = query.paginate(page=page, per_page=per_page, error_out=False)
|
||
sites = pagination.items
|
||
|
||
return render_template('index_new.html', sites=sites, tags=tags,
|
||
selected_tag=selected_tag, search_query=search_query,
|
||
pagination=pagination, tag_counts=tag_counts,
|
||
current_tab=current_tab)
|
||
|
||
@app.route('/site/<code>')
|
||
def site_detail(code):
|
||
"""网站详情页"""
|
||
site = Site.query.filter_by(code=code, is_active=True).first_or_404()
|
||
|
||
# 增加浏览次数
|
||
site.view_count += 1
|
||
db.session.commit()
|
||
|
||
# 智能新闻更新:检查今天是否已更新过新闻
|
||
from datetime import date
|
||
today = date.today()
|
||
|
||
# 检查该网站最新一条新闻的创建时间
|
||
latest_news = News.query.filter_by(
|
||
site_id=site.id
|
||
).order_by(News.created_at.desc()).first()
|
||
|
||
# 判断是否需要更新新闻
|
||
need_update = False
|
||
if not latest_news:
|
||
# 没有任何新闻,需要获取
|
||
need_update = True
|
||
elif latest_news.created_at.date() < today:
|
||
# 最新新闻不是今天创建的,需要更新
|
||
need_update = True
|
||
|
||
# 如果需要更新,自动获取最新新闻
|
||
if need_update:
|
||
api_key = app.config.get('BOCHA_API_KEY')
|
||
if api_key:
|
||
try:
|
||
# 创建新闻搜索器
|
||
searcher = NewsSearcher(api_key)
|
||
|
||
# 获取新闻(限制3条,一周内的)
|
||
news_items = searcher.search_site_news(
|
||
site_name=site.name,
|
||
site_url=site.url,
|
||
news_keywords=site.news_keywords, # v2.3新增:使用专用关键词
|
||
count=3,
|
||
freshness='oneWeek'
|
||
)
|
||
|
||
# 保存新闻到数据库
|
||
if news_items:
|
||
for item in news_items:
|
||
# 检查是否已存在(根据URL去重)
|
||
existing = News.query.filter_by(
|
||
site_id=site.id,
|
||
url=item['url']
|
||
).first()
|
||
|
||
if not existing:
|
||
news = News(
|
||
site_id=site.id,
|
||
title=item['title'],
|
||
content=item.get('summary') or item.get('snippet', ''),
|
||
url=item['url'],
|
||
source_name=item.get('site_name', ''),
|
||
source_icon=item.get('site_icon', ''),
|
||
published_at=item.get('published_at'),
|
||
news_type='Search Result',
|
||
is_active=True
|
||
)
|
||
db.session.add(news)
|
||
|
||
db.session.commit()
|
||
|
||
except Exception as e:
|
||
# 获取新闻失败,不影响页面显示
|
||
print(f"自动获取新闻失败:{str(e)}")
|
||
db.session.rollback()
|
||
|
||
# 获取该网站的相关新闻(最多显示5条)
|
||
news_list = News.query.filter_by(
|
||
site_id=site.id,
|
||
is_active=True
|
||
).order_by(News.published_at.desc()).limit(5).all()
|
||
|
||
# 获取同类工具推荐(通过标签匹配,最多显示4个)
|
||
recommended_sites = []
|
||
if site.tags:
|
||
# 获取有相同标签的其他网站
|
||
recommended_sites = Site.query.filter(
|
||
Site.id != site.id,
|
||
Site.is_active == True,
|
||
Site.tags.any(Tag.id.in_([tag.id for tag in site.tags]))
|
||
).order_by(Site.view_count.desc()).limit(4).all()
|
||
|
||
return render_template('detail_new.html', site=site, news_list=news_list, recommended_sites=recommended_sites)
|
||
|
||
# ========== 后台登录路由 ==========
|
||
@app.route('/admin/login', methods=['GET', 'POST'])
|
||
def admin_login():
|
||
"""管理员登录"""
|
||
if current_user.is_authenticated:
|
||
return redirect(url_for('admin.index'))
|
||
|
||
if request.method == 'POST':
|
||
username = request.form.get('username')
|
||
password = request.form.get('password')
|
||
|
||
admin = AdminModel.query.filter_by(username=username).first()
|
||
|
||
if admin and admin.check_password(password) and admin.is_active:
|
||
login_user(admin)
|
||
admin.last_login = datetime.now()
|
||
db.session.commit()
|
||
return redirect(url_for('admin.index'))
|
||
else:
|
||
flash('用户名或密码错误', 'error')
|
||
|
||
return render_template('admin_login.html')
|
||
|
||
@app.route('/admin/logout')
|
||
@login_required
|
||
def admin_logout():
|
||
"""管理员登出"""
|
||
logout_user()
|
||
return redirect(url_for('index'))
|
||
|
||
@app.route('/admin/change-password', methods=['GET', 'POST'])
|
||
@login_required
|
||
def change_password():
|
||
"""修改密码"""
|
||
if request.method == 'POST':
|
||
old_password = request.form.get('old_password', '').strip()
|
||
new_password = request.form.get('new_password', '').strip()
|
||
confirm_password = request.form.get('confirm_password', '').strip()
|
||
|
||
# 验证旧密码
|
||
if not current_user.check_password(old_password):
|
||
flash('旧密码错误', 'error')
|
||
return render_template('admin/change_password.html')
|
||
|
||
# 验证新密码
|
||
if not new_password:
|
||
flash('新密码不能为空', 'error')
|
||
return render_template('admin/change_password.html')
|
||
|
||
if len(new_password) < 6:
|
||
flash('新密码长度至少6位', 'error')
|
||
return render_template('admin/change_password.html')
|
||
|
||
if new_password != confirm_password:
|
||
flash('两次输入的新密码不一致', 'error')
|
||
return render_template('admin/change_password.html')
|
||
|
||
if old_password == new_password:
|
||
flash('新密码不能与旧密码相同', 'error')
|
||
return render_template('admin/change_password.html')
|
||
|
||
# 修改密码
|
||
try:
|
||
current_user.set_password(new_password)
|
||
db.session.commit()
|
||
flash('密码修改成功,请重新登录', 'success')
|
||
logout_user()
|
||
return redirect(url_for('admin_login'))
|
||
except Exception as e:
|
||
db.session.rollback()
|
||
flash(f'密码修改失败:{str(e)}', 'error')
|
||
return render_template('admin/change_password.html')
|
||
|
||
return render_template('admin/change_password.html')
|
||
|
||
# ========== API路由 ==========
|
||
@app.route('/api/fetch-website-info', methods=['POST'])
|
||
@login_required
|
||
def fetch_website_info():
|
||
"""抓取网站信息API"""
|
||
try:
|
||
data = request.get_json()
|
||
url = data.get('url', '').strip()
|
||
|
||
if not url:
|
||
return jsonify({
|
||
'success': False,
|
||
'message': '请提供网站URL'
|
||
}), 400
|
||
|
||
# 创建抓取器
|
||
fetcher = WebsiteFetcher(timeout=15)
|
||
|
||
# 抓取网站信息
|
||
info = fetcher.fetch_website_info(url)
|
||
|
||
if not info:
|
||
return jsonify({
|
||
'success': False,
|
||
'message': '无法获取网站信息,请检查URL是否正确或手动填写'
|
||
})
|
||
|
||
# 下载Logo到本地(如果有)
|
||
logo_path = None
|
||
if info.get('logo_url'):
|
||
logo_path = fetcher.download_logo(info['logo_url'], save_dir='static/logos')
|
||
|
||
# 如果下载失败,不返回远程URL,让用户手动上传
|
||
return jsonify({
|
||
'success': True,
|
||
'data': {
|
||
'name': info.get('name', ''),
|
||
'description': info.get('description', ''),
|
||
'logo': logo_path if logo_path else ''
|
||
}
|
||
})
|
||
|
||
except Exception as e:
|
||
return jsonify({
|
||
'success': False,
|
||
'message': f'抓取失败: {str(e)}'
|
||
}), 500
|
||
|
||
@app.route('/api/upload-logo', methods=['POST'])
|
||
@login_required
|
||
def upload_logo():
|
||
"""上传Logo图片API"""
|
||
try:
|
||
# 检查文件是否存在
|
||
if 'logo' not in request.files:
|
||
return jsonify({
|
||
'success': False,
|
||
'message': '请选择要上传的图片'
|
||
}), 400
|
||
|
||
file = request.files['logo']
|
||
|
||
# 检查文件名
|
||
if file.filename == '':
|
||
return jsonify({
|
||
'success': False,
|
||
'message': '未选择文件'
|
||
}), 400
|
||
|
||
# 检查文件类型
|
||
allowed_extensions = {'png', 'jpg', 'jpeg', 'gif', 'svg', 'ico', 'webp'}
|
||
filename = file.filename.lower()
|
||
if not any(filename.endswith('.' + ext) for ext in allowed_extensions):
|
||
return jsonify({
|
||
'success': False,
|
||
'message': '不支持的文件格式,请上传图片文件(png, jpg, gif, svg, ico, webp)'
|
||
}), 400
|
||
|
||
# 创建保存目录
|
||
save_dir = 'static/logos'
|
||
os.makedirs(save_dir, exist_ok=True)
|
||
|
||
# 生成安全的文件名
|
||
import time
|
||
import hashlib
|
||
ext = os.path.splitext(filename)[1]
|
||
timestamp = str(int(time.time() * 1000))
|
||
hash_name = hashlib.md5(f"{filename}{timestamp}".encode()).hexdigest()[:16]
|
||
safe_filename = f"logo_{hash_name}{ext}"
|
||
filepath = os.path.join(save_dir, safe_filename)
|
||
|
||
# 保存文件
|
||
file.save(filepath)
|
||
|
||
# 返回相对路径
|
||
return jsonify({
|
||
'success': True,
|
||
'path': f'/{filepath.replace(os.sep, "/")}'
|
||
})
|
||
|
||
except Exception as e:
|
||
return jsonify({
|
||
'success': False,
|
||
'message': f'上传失败: {str(e)}'
|
||
}), 500
|
||
|
||
@app.route('/api/generate-features', methods=['POST'])
|
||
@login_required
|
||
def generate_features():
|
||
"""使用DeepSeek自动生成网站主要功能"""
|
||
try:
|
||
data = request.get_json()
|
||
name = data.get('name', '').strip()
|
||
description = data.get('description', '').strip()
|
||
url = data.get('url', '').strip()
|
||
|
||
if not name or not description:
|
||
return jsonify({
|
||
'success': False,
|
||
'message': '请提供网站名称和描述'
|
||
}), 400
|
||
|
||
# 生成功能列表
|
||
generator = TagGenerator()
|
||
features = generator.generate_features(name, description, url)
|
||
|
||
if not features:
|
||
return jsonify({
|
||
'success': False,
|
||
'message': 'DeepSeek功能生成失败,请检查API配置'
|
||
}), 500
|
||
|
||
return jsonify({
|
||
'success': True,
|
||
'features': features
|
||
})
|
||
|
||
except ValueError as e:
|
||
return jsonify({
|
||
'success': False,
|
||
'message': str(e)
|
||
}), 400
|
||
except Exception as e:
|
||
return jsonify({
|
||
'success': False,
|
||
'message': f'生成失败: {str(e)}'
|
||
}), 500
|
||
|
||
@app.route('/api/generate-description', methods=['POST'])
|
||
@login_required
|
||
def generate_description():
|
||
"""使用DeepSeek自动生成网站详细介绍"""
|
||
try:
|
||
data = request.get_json()
|
||
name = data.get('name', '').strip()
|
||
short_desc = data.get('short_desc', '').strip()
|
||
url = data.get('url', '').strip()
|
||
|
||
if not name:
|
||
return jsonify({
|
||
'success': False,
|
||
'message': '请提供网站名称'
|
||
}), 400
|
||
|
||
# 生成详细介绍
|
||
generator = TagGenerator()
|
||
description = generator.generate_description(name, short_desc, url)
|
||
|
||
if not description:
|
||
return jsonify({
|
||
'success': False,
|
||
'message': 'DeepSeek详细介绍生成失败,请检查API配置'
|
||
}), 500
|
||
|
||
return jsonify({
|
||
'success': True,
|
||
'description': description
|
||
})
|
||
|
||
except ValueError as e:
|
||
return jsonify({
|
||
'success': False,
|
||
'message': str(e)
|
||
}), 400
|
||
except Exception as e:
|
||
return jsonify({
|
||
'success': False,
|
||
'message': f'生成失败: {str(e)}'
|
||
}), 500
|
||
|
||
@app.route('/api/generate-tags', methods=['POST'])
|
||
@login_required
|
||
def generate_tags():
|
||
"""使用DeepSeek自动生成标签"""
|
||
try:
|
||
data = request.get_json()
|
||
name = data.get('name', '').strip()
|
||
description = data.get('description', '').strip()
|
||
|
||
if not name or not description:
|
||
return jsonify({
|
||
'success': False,
|
||
'message': '请提供网站名称和描述'
|
||
}), 400
|
||
|
||
# 获取现有标签作为参考
|
||
existing_tags = [tag.name for tag in Tag.query.all()]
|
||
|
||
# 生成标签
|
||
generator = TagGenerator()
|
||
suggested_tags = generator.generate_tags(name, description, existing_tags)
|
||
|
||
if not suggested_tags:
|
||
return jsonify({
|
||
'success': False,
|
||
'message': 'DeepSeek标签生成失败,请检查API配置'
|
||
}), 500
|
||
|
||
return jsonify({
|
||
'success': True,
|
||
'tags': suggested_tags
|
||
})
|
||
|
||
except ValueError as e:
|
||
return jsonify({
|
||
'success': False,
|
||
'message': str(e)
|
||
}), 400
|
||
except Exception as e:
|
||
return jsonify({
|
||
'success': False,
|
||
'message': f'生成失败: {str(e)}'
|
||
}), 500
|
||
|
||
# ========== 新闻获取路由 ==========
|
||
@app.route('/api/fetch-site-news', methods=['POST'])
|
||
@login_required
|
||
def fetch_site_news():
|
||
"""为指定网站获取最新新闻"""
|
||
try:
|
||
data = request.get_json()
|
||
site_id = data.get('site_id')
|
||
count = data.get('count', app.config.get('NEWS_SEARCH_COUNT', 10))
|
||
freshness = data.get('freshness', app.config.get('NEWS_SEARCH_FRESHNESS', 'oneMonth'))
|
||
|
||
if not site_id:
|
||
return jsonify({
|
||
'success': False,
|
||
'message': '请提供网站ID'
|
||
}), 400
|
||
|
||
# 获取网站信息
|
||
site = Site.query.get(site_id)
|
||
if not site:
|
||
return jsonify({
|
||
'success': False,
|
||
'message': '网站不存在'
|
||
}), 404
|
||
|
||
# 检查博查API配置
|
||
api_key = app.config.get('BOCHA_API_KEY')
|
||
if not api_key:
|
||
return jsonify({
|
||
'success': False,
|
||
'message': '博查API未配置,请在.env文件中设置BOCHA_API_KEY'
|
||
}), 500
|
||
|
||
# 创建新闻搜索器
|
||
searcher = NewsSearcher(api_key)
|
||
|
||
# 搜索新闻
|
||
news_items = searcher.search_site_news(
|
||
site_name=site.name,
|
||
site_url=site.url,
|
||
news_keywords=site.news_keywords, # v2.3新增:使用专用关键词
|
||
count=count,
|
||
freshness=freshness
|
||
)
|
||
|
||
if not news_items:
|
||
return jsonify({
|
||
'success': False,
|
||
'message': '未找到相关新闻'
|
||
}), 404
|
||
|
||
# 保存新闻到数据库
|
||
saved_count = 0
|
||
for item in news_items:
|
||
# 检查新闻是否已存在(根据URL判断)
|
||
existing_news = News.query.filter_by(
|
||
site_id=site_id,
|
||
url=item['url']
|
||
).first()
|
||
|
||
if not existing_news:
|
||
# 创建新闻记录
|
||
news = News(
|
||
site_id=site_id,
|
||
title=item['title'],
|
||
content=item.get('summary') or item.get('snippet', ''),
|
||
url=item['url'],
|
||
source_name=item.get('site_name', ''),
|
||
source_icon=item.get('site_icon', ''),
|
||
published_at=item.get('published_at'),
|
||
news_type='Search Result',
|
||
is_active=True
|
||
)
|
||
db.session.add(news)
|
||
saved_count += 1
|
||
|
||
# 提交事务
|
||
db.session.commit()
|
||
|
||
return jsonify({
|
||
'success': True,
|
||
'message': f'成功获取并保存 {saved_count} 条新闻',
|
||
'total_found': len(news_items),
|
||
'saved': saved_count,
|
||
'news_items': searcher.format_news_for_display(news_items)
|
||
})
|
||
|
||
except Exception as e:
|
||
db.session.rollback()
|
||
return jsonify({
|
||
'success': False,
|
||
'message': f'获取失败: {str(e)}'
|
||
}), 500
|
||
|
||
@app.route('/api/fetch-all-news', methods=['POST'])
|
||
@login_required
|
||
def fetch_all_news():
|
||
"""批量为所有网站获取新闻"""
|
||
try:
|
||
data = request.get_json()
|
||
count_per_site = data.get('count', 5) # 每个网站获取的新闻数量
|
||
freshness = data.get('freshness', app.config.get('NEWS_SEARCH_FRESHNESS', 'oneMonth'))
|
||
limit = data.get('limit', 10) # 限制处理的网站数量
|
||
|
||
# 检查博查API配置
|
||
api_key = app.config.get('BOCHA_API_KEY')
|
||
if not api_key:
|
||
return jsonify({
|
||
'success': False,
|
||
'message': '博查API未配置,请在.env文件中设置BOCHA_API_KEY'
|
||
}), 500
|
||
|
||
# 获取启用的网站(按更新时间排序,优先处理旧的)
|
||
sites = Site.query.filter_by(is_active=True).order_by(Site.updated_at).limit(limit).all()
|
||
|
||
if not sites:
|
||
return jsonify({
|
||
'success': False,
|
||
'message': '没有可用的网站'
|
||
}), 404
|
||
|
||
# 创建新闻搜索器
|
||
searcher = NewsSearcher(api_key)
|
||
|
||
# 统计信息
|
||
total_saved = 0
|
||
total_found = 0
|
||
processed_sites = []
|
||
|
||
# 为每个网站获取新闻
|
||
for site in sites:
|
||
try:
|
||
# 搜索新闻
|
||
news_items = searcher.search_site_news(
|
||
site_name=site.name,
|
||
site_url=site.url,
|
||
news_keywords=site.news_keywords, # v2.3新增:使用专用关键词
|
||
count=count_per_site,
|
||
freshness=freshness
|
||
)
|
||
|
||
site_saved = 0
|
||
for item in news_items:
|
||
# 检查是否已存在
|
||
existing_news = News.query.filter_by(
|
||
site_id=site.id,
|
||
url=item['url']
|
||
).first()
|
||
|
||
if not existing_news:
|
||
news = News(
|
||
site_id=site.id,
|
||
title=item['title'],
|
||
content=item.get('summary') or item.get('snippet', ''),
|
||
url=item['url'],
|
||
source_name=item.get('site_name', ''),
|
||
source_icon=item.get('site_icon', ''),
|
||
published_at=item.get('published_at'),
|
||
news_type='Search Result',
|
||
is_active=True
|
||
)
|
||
db.session.add(news)
|
||
site_saved += 1
|
||
|
||
total_found += len(news_items)
|
||
total_saved += site_saved
|
||
|
||
processed_sites.append({
|
||
'id': site.id,
|
||
'name': site.name,
|
||
'found': len(news_items),
|
||
'saved': site_saved
|
||
})
|
||
|
||
except Exception as e:
|
||
# 单个网站失败不影响其他网站
|
||
processed_sites.append({
|
||
'id': site.id,
|
||
'name': site.name,
|
||
'error': str(e)
|
||
})
|
||
continue
|
||
|
||
# 提交事务
|
||
db.session.commit()
|
||
|
||
return jsonify({
|
||
'success': True,
|
||
'message': f'批量获取完成,共处理 {len(processed_sites)} 个网站',
|
||
'total_found': total_found,
|
||
'total_saved': total_saved,
|
||
'processed_sites': processed_sites
|
||
})
|
||
|
||
except Exception as e:
|
||
db.session.rollback()
|
||
return jsonify({
|
||
'success': False,
|
||
'message': f'批量获取失败: {str(e)}'
|
||
}), 500
|
||
|
||
# ========== SEO路由 (v2.4新增) ==========
|
||
@app.route('/sitemap.xml')
|
||
def sitemap():
|
||
"""动态生成sitemap.xml"""
|
||
from flask import make_response
|
||
|
||
# 获取所有启用的网站
|
||
sites = Site.query.filter_by(is_active=True).order_by(Site.updated_at.desc()).all()
|
||
|
||
# 获取所有标签
|
||
tags = Tag.query.all()
|
||
|
||
# 构建XML内容
|
||
xml_content = '''<?xml version="1.0" encoding="UTF-8"?>
|
||
<urlset xmlns="http://www.sitemaps.org/schemas/sitemap/0.9">'''
|
||
|
||
# 首页
|
||
xml_content += '''
|
||
<url>
|
||
<loc>{}</loc>
|
||
<changefreq>daily</changefreq>
|
||
<priority>1.0</priority>
|
||
</url>'''.format(request.url_root.rstrip('/'))
|
||
|
||
# 工具详情页
|
||
for site in sites:
|
||
xml_content += '''
|
||
<url>
|
||
<loc>{}</loc>
|
||
<lastmod>{}</lastmod>
|
||
<changefreq>weekly</changefreq>
|
||
<priority>0.8</priority>
|
||
</url>'''.format(
|
||
request.url_root.rstrip('/') + url_for('site_detail', code=site.code),
|
||
site.updated_at.strftime('%Y-%m-%d') if site.updated_at else datetime.now().strftime('%Y-%m-%d')
|
||
)
|
||
|
||
# 标签页
|
||
for tag in tags:
|
||
xml_content += '''
|
||
<url>
|
||
<loc>{}</loc>
|
||
<changefreq>weekly</changefreq>
|
||
<priority>0.6</priority>
|
||
</url>'''.format(request.url_root.rstrip('/') + '/?tag=' + tag.slug)
|
||
|
||
xml_content += '''
|
||
</urlset>'''
|
||
|
||
response = make_response(xml_content)
|
||
response.headers['Content-Type'] = 'application/xml; charset=utf-8'
|
||
return response
|
||
|
||
@app.route('/robots.txt')
|
||
def robots():
|
||
"""动态生成robots.txt"""
|
||
from flask import make_response
|
||
|
||
robots_content = '''User-agent: *
|
||
Allow: /
|
||
Disallow: /admin/
|
||
Disallow: /api/
|
||
|
||
Sitemap: {}sitemap.xml
|
||
'''.format(request.url_root)
|
||
|
||
response = make_response(robots_content)
|
||
response.headers['Content-Type'] = 'text/plain; charset=utf-8'
|
||
return response
|
||
|
||
# ========== SEO工具管理路由 (v2.4新增) ==========
|
||
@app.route('/admin/seo-tools')
|
||
@login_required
|
||
def seo_tools():
|
||
"""SEO工具管理页面"""
|
||
# 检查static/sitemap.xml是否存在及最后更新时间
|
||
sitemap_path = 'static/sitemap.xml'
|
||
sitemap_info = None
|
||
if os.path.exists(sitemap_path):
|
||
import time
|
||
mtime = os.path.getmtime(sitemap_path)
|
||
sitemap_info = {
|
||
'exists': True,
|
||
'last_updated': datetime.fromtimestamp(mtime).strftime('%Y-%m-%d %H:%M:%S'),
|
||
'size': os.path.getsize(sitemap_path)
|
||
}
|
||
else:
|
||
sitemap_info = {'exists': False}
|
||
|
||
return render_template('admin/seo_tools.html', sitemap_info=sitemap_info)
|
||
|
||
@app.route('/api/generate-static-sitemap', methods=['POST'])
|
||
@login_required
|
||
def generate_static_sitemap():
|
||
"""生成静态sitemap.xml文件"""
|
||
try:
|
||
# 获取所有启用的网站
|
||
sites = Site.query.filter_by(is_active=True).order_by(Site.updated_at.desc()).all()
|
||
|
||
# 获取所有标签
|
||
tags = Tag.query.all()
|
||
|
||
# 构建XML内容(使用网站配置的域名)
|
||
base_url = request.url_root.rstrip('/')
|
||
|
||
xml_content = '''<?xml version="1.0" encoding="UTF-8"?>
|
||
<urlset xmlns="http://www.sitemaps.org/schemas/sitemap/0.9">'''
|
||
|
||
# 首页
|
||
xml_content += f'''
|
||
<url>
|
||
<loc>{base_url}</loc>
|
||
<changefreq>daily</changefreq>
|
||
<priority>1.0</priority>
|
||
</url>'''
|
||
|
||
# 工具详情页
|
||
for site in sites:
|
||
xml_content += f'''
|
||
<url>
|
||
<loc>{base_url}/site/{site.code}</loc>
|
||
<lastmod>{site.updated_at.strftime('%Y-%m-%d') if site.updated_at else datetime.now().strftime('%Y-%m-%d')}</lastmod>
|
||
<changefreq>weekly</changefreq>
|
||
<priority>0.8</priority>
|
||
</url>'''
|
||
|
||
# 标签页
|
||
for tag in tags:
|
||
xml_content += f'''
|
||
<url>
|
||
<loc>{base_url}/?tag={tag.slug}</loc>
|
||
<changefreq>weekly</changefreq>
|
||
<priority>0.6</priority>
|
||
</url>'''
|
||
|
||
xml_content += '''
|
||
</urlset>'''
|
||
|
||
# 保存到static目录
|
||
static_dir = 'static'
|
||
os.makedirs(static_dir, exist_ok=True)
|
||
sitemap_path = os.path.join(static_dir, 'sitemap.xml')
|
||
|
||
with open(sitemap_path, 'w', encoding='utf-8') as f:
|
||
f.write(xml_content)
|
||
|
||
# 统计信息
|
||
total_urls = 1 + len(sites) + len(tags) # 首页 + 工具页 + 标签页
|
||
|
||
return jsonify({
|
||
'success': True,
|
||
'message': f'静态sitemap.xml生成成功!共包含 {total_urls} 个URL',
|
||
'total_urls': total_urls,
|
||
'file_path': sitemap_path,
|
||
'timestamp': datetime.now().strftime('%Y-%m-%d %H:%M:%S')
|
||
})
|
||
|
||
except Exception as e:
|
||
return jsonify({
|
||
'success': False,
|
||
'message': f'生成失败: {str(e)}'
|
||
}), 500
|
||
|
||
@app.route('/api/notify-search-engines', methods=['POST'])
|
||
@login_required
|
||
def notify_search_engines():
|
||
"""通知搜索引擎sitemap更新"""
|
||
try:
|
||
import requests
|
||
from urllib.parse import quote
|
||
|
||
# 获取sitemap URL(使用当前请求的域名)
|
||
sitemap_url = request.url_root.rstrip('/') + '/sitemap.xml'
|
||
encoded_sitemap_url = quote(sitemap_url, safe='')
|
||
|
||
results = []
|
||
|
||
# 1. 通知Google
|
||
google_ping_url = f'http://www.google.com/ping?sitemap={encoded_sitemap_url}'
|
||
try:
|
||
google_response = requests.get(google_ping_url, timeout=10)
|
||
results.append({
|
||
'engine': 'Google',
|
||
'status': 'success' if google_response.status_code == 200 else 'failed',
|
||
'status_code': google_response.status_code,
|
||
'message': '提交成功' if google_response.status_code == 200 else f'HTTP {google_response.status_code}'
|
||
})
|
||
except Exception as e:
|
||
results.append({
|
||
'engine': 'Google',
|
||
'status': 'error',
|
||
'message': f'请求失败: {str(e)}'
|
||
})
|
||
|
||
# 2. 通知Baidu
|
||
baidu_ping_url = f'http://data.zz.baidu.com/ping?sitemap={encoded_sitemap_url}'
|
||
try:
|
||
baidu_response = requests.get(baidu_ping_url, timeout=10)
|
||
results.append({
|
||
'engine': 'Baidu',
|
||
'status': 'success' if baidu_response.status_code == 200 else 'failed',
|
||
'status_code': baidu_response.status_code,
|
||
'message': '提交成功' if baidu_response.status_code == 200 else f'HTTP {baidu_response.status_code}'
|
||
})
|
||
except Exception as e:
|
||
results.append({
|
||
'engine': 'Baidu',
|
||
'status': 'error',
|
||
'message': f'请求失败: {str(e)}'
|
||
})
|
||
|
||
# 3. 通知Bing
|
||
bing_ping_url = f'http://www.bing.com/ping?sitemap={encoded_sitemap_url}'
|
||
try:
|
||
bing_response = requests.get(bing_ping_url, timeout=10)
|
||
results.append({
|
||
'engine': 'Bing',
|
||
'status': 'success' if bing_response.status_code == 200 else 'failed',
|
||
'status_code': bing_response.status_code,
|
||
'message': '提交成功' if bing_response.status_code == 200 else f'HTTP {bing_response.status_code}'
|
||
})
|
||
except Exception as e:
|
||
results.append({
|
||
'engine': 'Bing',
|
||
'status': 'error',
|
||
'message': f'请求失败: {str(e)}'
|
||
})
|
||
|
||
# 统计成功数量
|
||
success_count = sum(1 for r in results if r['status'] == 'success')
|
||
|
||
return jsonify({
|
||
'success': True,
|
||
'message': f'已通知 {success_count}/{len(results)} 个搜索引擎',
|
||
'sitemap_url': sitemap_url,
|
||
'results': results
|
||
})
|
||
|
||
except Exception as e:
|
||
return jsonify({
|
||
'success': False,
|
||
'message': f'通知失败: {str(e)}'
|
||
}), 500
|
||
|
||
@app.route('/api/refresh-site-news/<site_code>', methods=['POST'])
|
||
def refresh_site_news(site_code):
|
||
"""手动刷新指定网站的新闻(前台用户可访问)- v2.3新增"""
|
||
try:
|
||
# 根据code查找网站
|
||
site = Site.query.filter_by(code=site_code).first()
|
||
if not site:
|
||
return jsonify({
|
||
'success': False,
|
||
'message': '网站不存在'
|
||
}), 404
|
||
|
||
# 检查博查API配置
|
||
api_key = app.config.get('BOCHA_API_KEY')
|
||
if not api_key:
|
||
return jsonify({
|
||
'success': False,
|
||
'message': '新闻功能未启用'
|
||
}), 500
|
||
|
||
# 创建新闻搜索器
|
||
searcher = NewsSearcher(api_key)
|
||
|
||
# 搜索新闻(获取最新5条)
|
||
news_items = searcher.search_site_news(
|
||
site_name=site.name,
|
||
site_url=site.url,
|
||
news_keywords=site.news_keywords, # 使用专用关键词
|
||
count=5,
|
||
freshness='oneWeek' # 一周内的新闻
|
||
)
|
||
|
||
if not news_items:
|
||
return jsonify({
|
||
'success': False,
|
||
'message': '未找到相关新闻'
|
||
}), 404
|
||
|
||
# 保存新闻到数据库
|
||
saved_count = 0
|
||
for item in news_items:
|
||
# 检查新闻是否已存在(根据URL判断)
|
||
existing_news = News.query.filter_by(
|
||
site_id=site.id,
|
||
url=item['url']
|
||
).first()
|
||
|
||
if not existing_news:
|
||
news = News(
|
||
site_id=site.id,
|
||
title=item['title'],
|
||
content=item.get('summary') or item.get('snippet', ''),
|
||
url=item['url'],
|
||
source_name=item.get('site_name', ''),
|
||
source_icon=item.get('site_icon', ''),
|
||
published_at=item.get('published_at'),
|
||
news_type='Search Result',
|
||
is_active=True
|
||
)
|
||
db.session.add(news)
|
||
saved_count += 1
|
||
|
||
# 提交事务
|
||
db.session.commit()
|
||
|
||
return jsonify({
|
||
'success': True,
|
||
'message': f'成功获取 {saved_count} 条新资讯',
|
||
'total_found': len(news_items),
|
||
'saved_count': saved_count
|
||
})
|
||
|
||
except Exception as e:
|
||
db.session.rollback()
|
||
return jsonify({
|
||
'success': False,
|
||
'message': f'获取失败: {str(e)}'
|
||
}), 500
|
||
|
||
# ========== 批量导入路由 ==========
|
||
@app.route('/admin/batch-import', methods=['GET', 'POST'])
|
||
@login_required
|
||
def batch_import():
|
||
"""批量导入网站"""
|
||
from utils.bookmark_parser import BookmarkParser
|
||
from utils.website_fetcher import WebsiteFetcher
|
||
|
||
results = None
|
||
|
||
if request.method == 'POST':
|
||
import_type = request.form.get('import_type')
|
||
auto_activate = request.form.get('auto_activate') == 'on'
|
||
|
||
parser = BookmarkParser()
|
||
fetcher = WebsiteFetcher(timeout=15)
|
||
|
||
urls_to_import = []
|
||
|
||
try:
|
||
# 解析输入
|
||
if import_type == 'url_list':
|
||
url_list_text = request.form.get('url_list', '')
|
||
urls_to_import = parser.parse_url_list(url_list_text)
|
||
|
||
elif import_type == 'bookmark_file':
|
||
bookmark_file = request.files.get('bookmark_file')
|
||
if not bookmark_file:
|
||
flash('请选择书签文件', 'error')
|
||
return render_template('admin/batch_import.html')
|
||
|
||
html_content = bookmark_file.read().decode('utf-8', errors='ignore')
|
||
all_bookmarks = parser.parse_html_file(html_content)
|
||
|
||
# 筛选文件夹(如果指定)
|
||
folder_filter = request.form.get('folder_filter', '').strip()
|
||
if folder_filter:
|
||
urls_to_import = [
|
||
b for b in all_bookmarks
|
||
if folder_filter.lower() in b.get('folder', '').lower()
|
||
]
|
||
else:
|
||
urls_to_import = all_bookmarks
|
||
|
||
# 批量导入
|
||
success_list = []
|
||
failed_list = []
|
||
|
||
for idx, item in enumerate(urls_to_import, 1):
|
||
url = item['url']
|
||
name = item.get('name', '')
|
||
|
||
# 为每个URL创建独立的事务
|
||
try:
|
||
# 1. 检查URL是否已存在
|
||
try:
|
||
existing = Site.query.filter_by(url=url).first()
|
||
if existing:
|
||
failed_list.append({
|
||
'url': url,
|
||
'name': name or existing.name,
|
||
'error': f'该URL已存在(网站名称:{existing.name})'
|
||
})
|
||
continue
|
||
except Exception as e:
|
||
failed_list.append({
|
||
'url': url,
|
||
'name': name,
|
||
'error': f'检查URL时出错: {str(e)}'
|
||
})
|
||
continue
|
||
|
||
# 2. 抓取网站信息(带超时和错误处理)
|
||
info = None
|
||
try:
|
||
info = fetcher.fetch_website_info(url)
|
||
except Exception as e:
|
||
print(f"抓取 {url} 失败: {str(e)}")
|
||
# 抓取失败不是致命错误,继续尝试使用书签名称
|
||
|
||
# 3. 处理网站信息
|
||
if not info or not info.get('name'):
|
||
# 如果有书签名称,使用书签名称
|
||
if name:
|
||
info = {
|
||
'name': name,
|
||
'description': '',
|
||
'logo_url': ''
|
||
}
|
||
else:
|
||
# 尝试从URL提取域名作为名称
|
||
from urllib.parse import urlparse
|
||
try:
|
||
parsed = urlparse(url)
|
||
domain = parsed.netloc or parsed.path
|
||
if domain:
|
||
info = {
|
||
'name': domain,
|
||
'description': '',
|
||
'logo_url': ''
|
||
}
|
||
else:
|
||
failed_list.append({
|
||
'url': url,
|
||
'name': name,
|
||
'error': '无法获取网站信息且没有备用名称'
|
||
})
|
||
continue
|
||
except Exception:
|
||
failed_list.append({
|
||
'url': url,
|
||
'name': name,
|
||
'error': '无法获取网站信息且URL解析失败'
|
||
})
|
||
continue
|
||
|
||
# 4. 下载Logo到本地(失败不影响导入)
|
||
logo_path = None
|
||
if info.get('logo_url'):
|
||
try:
|
||
logo_path = fetcher.download_logo(info['logo_url'], save_dir='static/logos')
|
||
except Exception as e:
|
||
print(f"下载Logo失败 ({url}): {str(e)}")
|
||
# Logo下载失败不影响网站导入
|
||
|
||
# 5. 生成code和slug
|
||
try:
|
||
import random
|
||
from pypinyin import lazy_pinyin
|
||
import re
|
||
|
||
# 生成唯一的code
|
||
site_code = None
|
||
max_attempts = 10
|
||
for _ in range(max_attempts):
|
||
code = str(random.randint(10000000, 99999999))
|
||
if not Site.query.filter_by(code=code).first():
|
||
site_code = code
|
||
break
|
||
|
||
if not site_code:
|
||
# 如果10次都失败,使用时间戳
|
||
import time
|
||
site_code = str(int(time.time() * 1000))[-8:]
|
||
|
||
# 生成slug
|
||
site_name = info.get('name', name or 'Unknown')[:100]
|
||
slug = ''.join(lazy_pinyin(site_name))
|
||
slug = slug.lower()
|
||
slug = re.sub(r'[^\w\s-]', '', slug)
|
||
slug = re.sub(r'[-\s]+', '-', slug).strip('-')
|
||
if not slug:
|
||
slug = f"site-{site_code}"
|
||
|
||
# 确保slug唯一
|
||
base_slug = slug[:50] # 限制长度
|
||
counter = 1
|
||
final_slug = slug
|
||
while Site.query.filter_by(slug=final_slug).first():
|
||
final_slug = f"{base_slug}-{counter}"
|
||
counter += 1
|
||
if counter > 100: # 防止无限循环
|
||
final_slug = f"{base_slug}-{site_code}"
|
||
break
|
||
|
||
# 6. 创建网站记录(带code和slug)
|
||
site = Site(
|
||
code=site_code,
|
||
slug=final_slug,
|
||
name=site_name,
|
||
url=url[:500], # 限制URL长度
|
||
logo=logo_path or info.get('logo_url', '')[:500] if info.get('logo_url') else '',
|
||
short_desc=info.get('description', '')[:200] if info.get('description') else '',
|
||
description=info.get('description', '')[:2000] if info.get('description') else '',
|
||
is_active=auto_activate
|
||
)
|
||
|
||
# 添加到数据库并提交
|
||
db.session.add(site)
|
||
db.session.commit()
|
||
|
||
success_list.append({
|
||
'name': site.name,
|
||
'url': site.url
|
||
})
|
||
|
||
print(f"成功导入 [{idx}/{len(urls_to_import)}]: {site.name}")
|
||
|
||
except Exception as e:
|
||
db.session.rollback()
|
||
failed_list.append({
|
||
'url': url,
|
||
'name': name or info.get('name', 'Unknown'),
|
||
'error': f'数据库保存失败: {str(e)}'
|
||
})
|
||
continue
|
||
|
||
except Exception as e:
|
||
# 捕获所有未预期的错误
|
||
db.session.rollback()
|
||
failed_list.append({
|
||
'url': url,
|
||
'name': name,
|
||
'error': f'未知错误: {str(e)}'
|
||
})
|
||
print(f"导入 {url} 时发生未知错误: {str(e)}")
|
||
continue
|
||
|
||
results = {
|
||
'total_count': len(urls_to_import),
|
||
'success_count': len(success_list),
|
||
'failed_count': len(failed_list),
|
||
'success_list': success_list,
|
||
'failed_list': failed_list
|
||
}
|
||
|
||
if success_list:
|
||
flash(f'成功导入 {len(success_list)} 个网站!', 'success')
|
||
|
||
except Exception as e:
|
||
flash(f'导入失败: {str(e)}', 'error')
|
||
|
||
return render_template('admin/batch_import.html', results=results)
|
||
|
||
# ========== Flask-Admin 配置 ==========
|
||
class SecureModelView(ModelView):
|
||
"""需要登录的模型视图"""
|
||
# 中文化配置
|
||
can_set_page_size = True
|
||
page_size = 20
|
||
|
||
# 自定义文本
|
||
list_template = 'admin/model/list.html'
|
||
create_template = 'admin/model/create.html'
|
||
edit_template = 'admin/model/edit.html'
|
||
|
||
# 覆盖英文文本
|
||
named_filter_urls = True
|
||
|
||
def is_accessible(self):
|
||
return current_user.is_authenticated
|
||
|
||
def inaccessible_callback(self, name, **kwargs):
|
||
return redirect(url_for('admin_login'))
|
||
|
||
class SecureAdminIndexView(AdminIndexView):
|
||
"""需要登录的管理首页"""
|
||
def is_accessible(self):
|
||
return current_user.is_authenticated
|
||
|
||
def inaccessible_callback(self, name, **kwargs):
|
||
return redirect(url_for('admin_login'))
|
||
|
||
@expose('/')
|
||
def index(self):
|
||
"""控制台首页,显示统计信息"""
|
||
# 统计数据
|
||
stats = {
|
||
'sites_count': Site.query.filter_by(is_active=True).count(),
|
||
'tags_count': Tag.query.count(),
|
||
'news_count': News.query.filter_by(is_active=True).count(),
|
||
'total_views': db.session.query(db.func.sum(Site.view_count)).scalar() or 0
|
||
}
|
||
|
||
# 最近添加的工具(最多5个)
|
||
recent_sites = Site.query.order_by(Site.created_at.desc()).limit(5).all()
|
||
|
||
return self.render('admin/index.html', stats=stats, recent_sites=recent_sites)
|
||
|
||
# 网站管理视图
|
||
class SiteAdmin(SecureModelView):
|
||
# 自定义模板
|
||
create_template = 'admin/site/create.html'
|
||
edit_template = 'admin/site/edit.html'
|
||
|
||
# 启用编辑和删除
|
||
can_edit = True
|
||
can_delete = True
|
||
can_create = True
|
||
can_view_details = False # 禁用查看详情,点击名称即可查看
|
||
|
||
# 显示操作列
|
||
column_display_actions = True
|
||
action_disallowed_list = []
|
||
|
||
column_list = ['id', 'code', 'name', 'url', 'slug', 'is_active', 'is_recommended', 'view_count', 'created_at']
|
||
column_searchable_list = ['code', 'name', 'url', 'description']
|
||
column_filters = ['is_active', 'is_recommended', 'tags']
|
||
column_labels = {
|
||
'id': 'ID',
|
||
'code': '网站编码',
|
||
'name': '网站名称',
|
||
'url': 'URL',
|
||
'slug': 'URL别名',
|
||
'logo': 'Logo',
|
||
'short_desc': '简短描述',
|
||
'description': '详细介绍',
|
||
'features': '主要功能',
|
||
'news_keywords': '新闻关键词',
|
||
'is_active': '是否启用',
|
||
'is_recommended': '是否推荐',
|
||
'view_count': '浏览次数',
|
||
'sort_order': '排序权重',
|
||
'tags': '标签',
|
||
'created_at': '创建时间',
|
||
'updated_at': '更新时间'
|
||
}
|
||
form_columns = ['name', 'url', 'slug', 'logo', 'short_desc', 'description', 'features', 'news_keywords', 'tags', 'is_active', 'is_recommended', 'sort_order']
|
||
|
||
# 自定义编辑/删除文字
|
||
column_extra_row_actions = None
|
||
|
||
def on_model_change(self, form, model, is_created):
|
||
"""保存前自动生成code和slug(如果为空)"""
|
||
import re
|
||
import random
|
||
from pypinyin import lazy_pinyin
|
||
from flask import request
|
||
|
||
# 使用no_autoflush防止在查询时触发提前flush
|
||
with db.session.no_autoflush:
|
||
# 处理手动输入的新标签
|
||
new_tags_str = request.form.get('new_tags', '')
|
||
if new_tags_str:
|
||
new_tag_names = [name.strip() for name in new_tags_str.split(',') if name.strip()]
|
||
for tag_name in new_tag_names:
|
||
# 检查标签是否已存在
|
||
existing_tag = Tag.query.filter_by(name=tag_name).first()
|
||
if not existing_tag:
|
||
# 创建新标签
|
||
tag_slug = ''.join(lazy_pinyin(tag_name))
|
||
tag_slug = tag_slug.lower()
|
||
tag_slug = re.sub(r'[^\w\s-]', '', tag_slug)
|
||
tag_slug = re.sub(r'[-\s]+', '-', tag_slug).strip('-')
|
||
|
||
# 确保slug唯一
|
||
base_tag_slug = tag_slug[:50]
|
||
counter = 1
|
||
final_tag_slug = tag_slug
|
||
while Tag.query.filter_by(slug=final_tag_slug).first():
|
||
final_tag_slug = f"{base_tag_slug}-{counter}"
|
||
counter += 1
|
||
if counter > 100:
|
||
final_tag_slug = f"{base_tag_slug}-{random.randint(1000, 9999)}"
|
||
break
|
||
|
||
new_tag = Tag(name=tag_name, slug=final_tag_slug)
|
||
db.session.add(new_tag)
|
||
db.session.flush() # 确保新标签有ID
|
||
|
||
# 添加到模型的标签列表
|
||
if new_tag not in model.tags:
|
||
model.tags.append(new_tag)
|
||
else:
|
||
# 添加已存在的标签
|
||
if existing_tag not in model.tags:
|
||
model.tags.append(existing_tag)
|
||
|
||
# 如果code为空,自动生成唯一的8位数字编码
|
||
if not model.code or model.code.strip() == '':
|
||
max_attempts = 10
|
||
for attempt in range(max_attempts):
|
||
# 生成10000000-99999999之间的随机数
|
||
code = str(random.randint(10000000, 99999999))
|
||
# 检查是否已存在
|
||
existing = Site.query.filter(Site.code == code).first()
|
||
if not existing or existing.id == model.id:
|
||
model.code = code
|
||
break
|
||
|
||
# 如果10次都失败,使用时间戳
|
||
if not model.code:
|
||
import time
|
||
model.code = str(int(time.time() * 1000))[-8:]
|
||
|
||
# 如果slug为空,从name自动生成
|
||
if not model.slug or model.slug.strip() == '':
|
||
# 将中文转换为拼音
|
||
slug = ''.join(lazy_pinyin(model.name))
|
||
# 转换为小写,移除特殊字符
|
||
slug = slug.lower()
|
||
slug = re.sub(r'[^\w\s-]', '', slug)
|
||
slug = re.sub(r'[-\s]+', '-', slug).strip('-')
|
||
|
||
# 如果转换后为空,使用code
|
||
if not slug:
|
||
slug = f"site-{model.code}"
|
||
|
||
# 确保slug唯一(限制长度和重试次数)
|
||
base_slug = slug[:50]
|
||
counter = 1
|
||
final_slug = slug
|
||
max_slug_attempts = 100
|
||
|
||
while counter < max_slug_attempts:
|
||
existing = Site.query.filter(Site.slug == final_slug).first()
|
||
if not existing or existing.id == model.id:
|
||
break
|
||
final_slug = f"{base_slug}-{counter}"
|
||
counter += 1
|
||
|
||
# 如果100次都失败,使用code确保唯一
|
||
if counter >= max_slug_attempts:
|
||
final_slug = f"{base_slug}-{model.code}"
|
||
|
||
model.slug = final_slug
|
||
|
||
# 标签管理视图
|
||
class TagAdmin(SecureModelView):
|
||
can_edit = True
|
||
can_delete = True
|
||
can_create = True
|
||
can_view_details = False
|
||
|
||
# 显示操作列
|
||
column_display_actions = True
|
||
|
||
column_list = ['id', 'name', 'slug', 'description', 'sort_order']
|
||
column_searchable_list = ['name', 'description']
|
||
column_labels = {
|
||
'id': 'ID',
|
||
'name': '标签名称',
|
||
'slug': 'URL别名',
|
||
'description': '标签描述',
|
||
'seo_title': 'SEO标题 (v2.4)',
|
||
'seo_description': 'SEO描述 (v2.4)',
|
||
'seo_keywords': 'SEO关键词 (v2.4)',
|
||
'icon': '图标',
|
||
'sort_order': '排序权重',
|
||
'created_at': '创建时间'
|
||
}
|
||
form_columns = ['name', 'slug', 'description', 'seo_title', 'seo_description', 'seo_keywords', 'icon', 'sort_order']
|
||
|
||
# 管理员视图
|
||
class AdminAdmin(SecureModelView):
|
||
can_edit = True
|
||
can_delete = True
|
||
can_create = True
|
||
can_view_details = False
|
||
|
||
# 显示操作列
|
||
column_display_actions = True
|
||
|
||
column_list = ['id', 'username', 'email', 'is_active', 'last_login', 'created_at']
|
||
column_searchable_list = ['username', 'email']
|
||
column_filters = ['is_active']
|
||
column_labels = {
|
||
'id': 'ID',
|
||
'username': '用户名',
|
||
'email': '邮箱',
|
||
'is_active': '是否启用',
|
||
'created_at': '创建时间',
|
||
'last_login': '最后登录'
|
||
}
|
||
form_columns = ['username', 'email', 'is_active']
|
||
|
||
def on_model_change(self, form, model, is_created):
|
||
# 如果是新建管理员,设置默认密码
|
||
if is_created:
|
||
model.set_password('admin123') # 默认密码
|
||
|
||
# 新闻管理视图
|
||
class NewsAdmin(SecureModelView):
|
||
can_edit = True
|
||
can_delete = True
|
||
can_create = True
|
||
can_view_details = False
|
||
|
||
# 显示操作列
|
||
column_display_actions = True
|
||
|
||
column_list = ['id', 'site', 'title', 'source_name', 'news_type', 'published_at', 'is_active']
|
||
column_searchable_list = ['title', 'content', 'source_name']
|
||
column_filters = ['site', 'news_type', 'source_name', 'is_active', 'published_at']
|
||
column_labels = {
|
||
'id': 'ID',
|
||
'site': '关联网站',
|
||
'title': '新闻标题',
|
||
'content': '新闻内容',
|
||
'news_type': '新闻类型',
|
||
'url': '新闻链接',
|
||
'source_name': '来源网站',
|
||
'source_icon': '来源图标',
|
||
'published_at': '发布时间',
|
||
'is_active': '是否启用',
|
||
'created_at': '创建时间',
|
||
'updated_at': '更新时间'
|
||
}
|
||
form_columns = ['site', 'title', 'content', 'news_type', 'url', 'source_name', 'source_icon', 'published_at', 'is_active']
|
||
|
||
# 可选的新闻类型
|
||
form_choices = {
|
||
'news_type': [
|
||
('Search Result', 'Search Result'),
|
||
('Product Update', 'Product Update'),
|
||
('Industry News', 'Industry News'),
|
||
('Company News', 'Company News'),
|
||
('Other', 'Other')
|
||
]
|
||
}
|
||
|
||
# 默认排序
|
||
column_default_sort = ('published_at', True) # 按发布时间倒序排列
|
||
|
||
# Prompt模板管理视图
|
||
class PromptAdmin(SecureModelView):
|
||
can_edit = True
|
||
can_delete = False # 不允许删除,避免系统必需的prompt被删除
|
||
can_create = True
|
||
can_view_details = False
|
||
|
||
# 显示操作列
|
||
column_display_actions = True
|
||
|
||
column_list = ['id', 'key', 'name', 'description', 'is_active', 'updated_at']
|
||
column_searchable_list = ['key', 'name', 'description']
|
||
column_filters = ['is_active', 'key']
|
||
column_labels = {
|
||
'id': 'ID',
|
||
'key': '唯一标识',
|
||
'name': '模板名称',
|
||
'system_prompt': '系统提示词',
|
||
'user_prompt_template': '用户提示词模板',
|
||
'description': '模板说明',
|
||
'is_active': '是否启用',
|
||
'created_at': '创建时间',
|
||
'updated_at': '更新时间'
|
||
}
|
||
form_columns = ['key', 'name', 'description', 'system_prompt', 'user_prompt_template', 'is_active']
|
||
|
||
# 字段说明
|
||
column_descriptions = {
|
||
'key': '唯一标识,如: tags, features, description',
|
||
'system_prompt': 'AI的系统角色设定',
|
||
'user_prompt_template': '用户提示词模板,支持变量如 {name}, {description}, {url}',
|
||
}
|
||
|
||
# 表单字段配置
|
||
form_widget_args = {
|
||
'system_prompt': {
|
||
'rows': 3,
|
||
'style': 'font-family: monospace;'
|
||
},
|
||
'user_prompt_template': {
|
||
'rows': 20,
|
||
'style': 'font-family: monospace;'
|
||
}
|
||
}
|
||
|
||
# 初始化 Flask-Admin
|
||
admin = Admin(
|
||
app,
|
||
name='ZJPB 焦提示词',
|
||
template_mode='bootstrap4',
|
||
index_view=SecureAdminIndexView(name='控制台', url='/admin'),
|
||
base_template='admin/master.html'
|
||
)
|
||
|
||
admin.add_view(SiteAdmin(Site, db.session, name='网站管理'))
|
||
admin.add_view(TagAdmin(Tag, db.session, name='标签管理'))
|
||
admin.add_view(NewsAdmin(News, db.session, name='新闻管理'))
|
||
admin.add_view(PromptAdmin(PromptTemplate, db.session, name='Prompt管理'))
|
||
admin.add_view(AdminAdmin(AdminModel, db.session, name='管理员', endpoint='admin_users'))
|
||
|
||
return app
|
||
|
||
if __name__ == '__main__':
|
||
app = create_app(os.getenv('FLASK_ENV', 'development'))
|
||
app.run(host='0.0.0.0', port=5000, debug=True)
|