3066 lines
115 KiB
Python
3066 lines
115 KiB
Python
import os
|
||
import markdown
|
||
import random
|
||
import string
|
||
import secrets
|
||
from io import BytesIO
|
||
from flask import Flask, render_template, redirect, url_for, request, flash, jsonify, session, send_file
|
||
from flask_login import LoginManager, login_user, logout_user, login_required, current_user
|
||
from flask_admin import Admin, AdminIndexView, expose
|
||
from flask_admin.contrib.sqla import ModelView
|
||
from datetime import datetime, timedelta
|
||
from config import config
|
||
from models import db, Site, Tag, Admin as AdminModel, News, site_tags, PromptTemplate, User, Folder, Collection
|
||
from utils.website_fetcher import WebsiteFetcher
|
||
from utils.tag_generator import TagGenerator
|
||
from utils.news_searcher import NewsSearcher
|
||
from utils.email_sender import EmailSender
|
||
from PIL import Image, ImageDraw, ImageFont
|
||
|
||
def create_app(config_name='default'):
|
||
"""应用工厂函数"""
|
||
app = Flask(__name__)
|
||
|
||
# 加载配置
|
||
app.config.from_object(config[config_name])
|
||
|
||
# 初始化数据库
|
||
db.init_app(app)
|
||
|
||
# 添加Markdown过滤器
|
||
@app.template_filter('markdown')
|
||
def markdown_filter(text):
|
||
"""将Markdown文本转换为HTML"""
|
||
if not text:
|
||
return ''
|
||
return markdown.markdown(text, extensions=['nl2br', 'fenced_code'])
|
||
|
||
# v2.4新增: 自动内链过滤器
|
||
@app.template_filter('auto_link')
|
||
def auto_link_filter(text, current_site_id=None):
|
||
"""自动为内容中的工具名称添加链接"""
|
||
if not text:
|
||
return ''
|
||
|
||
# 获取所有启用的网站(排除当前网站)
|
||
sites = Site.query.filter_by(is_active=True).all()
|
||
if current_site_id:
|
||
sites = [s for s in sites if s.id != current_site_id]
|
||
|
||
# 按名称长度降序排序,优先匹配长名称
|
||
sites = sorted(sites, key=lambda s: len(s.name), reverse=True)
|
||
|
||
# 记录已经添加链接的位置,避免重复
|
||
linked_sites = set()
|
||
|
||
for site in sites:
|
||
if site.name in text and site.name not in linked_sites:
|
||
# 只链接第一次出现的位置
|
||
link = f'<a href="/site/{site.code}" title="{site.short_desc or site.name}" style="color: var(--primary-blue); text-decoration: underline; text-decoration-style: dotted;">{site.name}</a>'
|
||
text = text.replace(site.name, link, 1)
|
||
linked_sites.add(site.name)
|
||
|
||
return text
|
||
|
||
# 初始化登录管理
|
||
login_manager = LoginManager()
|
||
login_manager.init_app(app)
|
||
login_manager.login_view = 'admin_login'
|
||
login_manager.login_message = '请先登录'
|
||
|
||
@login_manager.user_loader
|
||
def load_user(user_id):
|
||
"""加载用户(支持Admin和User两种类型)"""
|
||
try:
|
||
user_type, uid = user_id.split(':', 1)
|
||
if user_type == 'admin':
|
||
return AdminModel.query.get(int(uid))
|
||
elif user_type == 'user':
|
||
return User.query.get(int(uid))
|
||
except (ValueError, AttributeError):
|
||
# 兼容旧格式(纯数字ID,默认为Admin)
|
||
return AdminModel.query.get(int(user_id))
|
||
return None
|
||
|
||
# ========== 前台路由 ==========
|
||
@app.route('/')
|
||
def index():
|
||
"""首页"""
|
||
# 获取所有启用的标签
|
||
tags = Tag.query.order_by(Tag.sort_order.desc(), Tag.id).all()
|
||
|
||
# 优化:使用一次SQL查询统计所有标签的网站数量
|
||
tag_counts = {}
|
||
if tags:
|
||
# 使用JOIN查询一次性获取所有标签的网站数量
|
||
from sqlalchemy import func
|
||
counts_query = db.session.query(
|
||
site_tags.c.tag_id,
|
||
func.count(site_tags.c.site_id).label('count')
|
||
).join(
|
||
Site, site_tags.c.site_id == Site.id
|
||
).filter(
|
||
Site.is_active == True
|
||
).group_by(site_tags.c.tag_id).all()
|
||
|
||
tag_counts = {tag_id: count for tag_id, count in counts_query}
|
||
|
||
# 获取筛选参数
|
||
tag_slug = request.args.get('tag')
|
||
search_query = request.args.get('q', '').strip()
|
||
current_tab = request.args.get('tab', 'latest') # 默认为"最新"
|
||
page = request.args.get('page', 1, type=int)
|
||
per_page = 100 # 每页显示100个站点
|
||
|
||
selected_tag = None
|
||
|
||
# 构建基础查询
|
||
query = Site.query.filter_by(is_active=True)
|
||
|
||
# 标签筛选
|
||
if tag_slug:
|
||
selected_tag = Tag.query.filter_by(slug=tag_slug).first()
|
||
if selected_tag:
|
||
query = query.filter(Site.tags.contains(selected_tag))
|
||
else:
|
||
sites = []
|
||
pagination = None
|
||
return render_template('index_new.html', sites=sites, tags=tags,
|
||
selected_tag=selected_tag, search_query=search_query,
|
||
pagination=pagination, tag_counts=tag_counts,
|
||
current_tab=current_tab)
|
||
|
||
# 搜索功能
|
||
if search_query:
|
||
# 使用OR条件搜索:网站名称、URL、描述
|
||
search_pattern = f'%{search_query}%'
|
||
query = query.filter(
|
||
db.or_(
|
||
Site.name.like(search_pattern),
|
||
Site.url.like(search_pattern),
|
||
Site.description.like(search_pattern),
|
||
Site.short_desc.like(search_pattern)
|
||
)
|
||
)
|
||
|
||
# Tab筛选和排序
|
||
if current_tab == 'popular':
|
||
# 热门:按浏览次数倒序
|
||
query = query.order_by(Site.view_count.desc(), Site.id.desc())
|
||
elif current_tab == 'recommended':
|
||
# 推荐:只显示is_recommended=True的
|
||
query = query.filter_by(is_recommended=True).order_by(Site.sort_order.desc(), Site.id.desc())
|
||
else:
|
||
# 最新:按创建时间倒序(默认)
|
||
query = query.order_by(Site.created_at.desc(), Site.id.desc())
|
||
|
||
# 分页
|
||
pagination = query.paginate(page=page, per_page=per_page, error_out=False)
|
||
sites = pagination.items
|
||
|
||
return render_template('index_new.html', sites=sites, tags=tags,
|
||
selected_tag=selected_tag, search_query=search_query,
|
||
pagination=pagination, tag_counts=tag_counts,
|
||
current_tab=current_tab)
|
||
|
||
@app.route('/site/<code>')
|
||
def site_detail(code):
|
||
"""网站详情页"""
|
||
site = Site.query.filter_by(code=code, is_active=True).first_or_404()
|
||
|
||
# 增加浏览次数
|
||
site.view_count += 1
|
||
db.session.commit()
|
||
|
||
# v2.6优化:移除自动调用博查API的逻辑,改为按需加载
|
||
# 只获取数据库中已有的新闻,不再自动调用API
|
||
|
||
# 获取该网站的相关新闻(最多显示5条)
|
||
news_list = News.query.filter_by(
|
||
site_id=site.id,
|
||
is_active=True
|
||
).order_by(News.published_at.desc()).limit(5).all()
|
||
|
||
# 检查是否有新闻,如果没有则标记需要加载
|
||
has_news = len(news_list) > 0
|
||
|
||
# 获取同类工具推荐(通过标签匹配,最多显示4个)
|
||
recommended_sites = []
|
||
if site.tags:
|
||
# 获取有相同标签的其他网站
|
||
recommended_sites = Site.query.filter(
|
||
Site.id != site.id,
|
||
Site.is_active == True,
|
||
Site.tags.any(Tag.id.in_([tag.id for tag in site.tags]))
|
||
).order_by(Site.view_count.desc()).limit(4).all()
|
||
|
||
return render_template('detail_new.html', site=site, news_list=news_list, has_news=has_news, recommended_sites=recommended_sites)
|
||
|
||
# ========== 验证码相关 (v2.6.1优化) ==========
|
||
def generate_captcha_image(text):
|
||
"""生成验证码图片"""
|
||
# 创建图片 (宽120, 高40)
|
||
width, height = 120, 40
|
||
image = Image.new('RGB', (width, height), color=(255, 255, 255))
|
||
draw = ImageDraw.Draw(image)
|
||
|
||
# 使用默认字体
|
||
try:
|
||
font = ImageFont.truetype("arial.ttf", 28)
|
||
except:
|
||
font = ImageFont.load_default()
|
||
|
||
# 添加随机干扰线
|
||
for i in range(3):
|
||
x1 = random.randint(0, width)
|
||
y1 = random.randint(0, height)
|
||
x2 = random.randint(0, width)
|
||
y2 = random.randint(0, height)
|
||
draw.line([(x1, y1), (x2, y2)], fill=(200, 200, 200), width=1)
|
||
|
||
# 绘制验证码文字
|
||
for i, char in enumerate(text):
|
||
x = 10 + i * 25
|
||
y = random.randint(5, 12)
|
||
color = (random.randint(0, 100), random.randint(0, 100), random.randint(0, 100))
|
||
draw.text((x, y), char, font=font, fill=color)
|
||
|
||
# 添加随机噪点
|
||
for _ in range(100):
|
||
x = random.randint(0, width - 1)
|
||
y = random.randint(0, height - 1)
|
||
draw.point((x, y), fill=(random.randint(150, 200), random.randint(150, 200), random.randint(150, 200)))
|
||
|
||
return image
|
||
|
||
@app.route('/api/captcha', methods=['GET'])
|
||
def get_captcha():
|
||
"""生成验证码图片"""
|
||
# 生成4位随机验证码
|
||
captcha_text = ''.join(random.choices(string.ascii_uppercase + string.digits, k=4))
|
||
|
||
# 存储到session
|
||
session['captcha'] = captcha_text
|
||
session['captcha_created'] = datetime.now().timestamp()
|
||
|
||
# 生成图片
|
||
image = generate_captcha_image(captcha_text)
|
||
|
||
# 转换为字节流
|
||
img_io = BytesIO()
|
||
image.save(img_io, 'PNG')
|
||
img_io.seek(0)
|
||
|
||
return send_file(img_io, mimetype='image/png')
|
||
|
||
# ========== 新闻相关API (v2.6优化) ==========
|
||
@app.route('/api/fetch-news/<code>', methods=['POST'])
|
||
def fetch_news_for_site(code):
|
||
"""
|
||
按需获取指定网站的新闻
|
||
v2.6.1优化:所有手动请求都需要验证码,防止API滥用
|
||
"""
|
||
try:
|
||
# 获取请求数据
|
||
data = request.get_json() or {}
|
||
captcha_input = data.get('captcha', '').strip().upper()
|
||
|
||
# 验证验证码
|
||
session_captcha = session.get('captcha', '').upper()
|
||
captcha_created = session.get('captcha_created', 0)
|
||
|
||
# 检查验证码是否存在
|
||
if not session_captcha:
|
||
return jsonify({
|
||
'success': False,
|
||
'error': '请先获取验证码'
|
||
}), 400
|
||
|
||
# 检查验证码是否过期(5分钟)
|
||
if datetime.now().timestamp() - captcha_created > 300:
|
||
session.pop('captcha', None)
|
||
session.pop('captcha_created', None)
|
||
return jsonify({
|
||
'success': False,
|
||
'error': '验证码已过期,请刷新后重试'
|
||
}), 400
|
||
|
||
# 检查验证码是否正确
|
||
if captcha_input != session_captcha:
|
||
return jsonify({
|
||
'success': False,
|
||
'error': '验证码错误,请重新输入'
|
||
}), 400
|
||
|
||
# 验证通过,清除验证码
|
||
session.pop('captcha', None)
|
||
session.pop('captcha_created', None)
|
||
|
||
# 查找网站
|
||
site = Site.query.filter_by(code=code, is_active=True).first_or_404()
|
||
|
||
# 检查博查API配置
|
||
api_key = app.config.get('BOCHA_API_KEY')
|
||
if not api_key:
|
||
return jsonify({'success': False, 'error': '博查API未配置'}), 500
|
||
|
||
# 创建新闻搜索器
|
||
searcher = NewsSearcher(api_key)
|
||
|
||
# 获取新闻(限制5条,一周内的)
|
||
news_items = searcher.search_site_news(
|
||
site_name=site.name,
|
||
site_url=site.url,
|
||
news_keywords=site.news_keywords,
|
||
count=5,
|
||
freshness='oneWeek'
|
||
)
|
||
|
||
# 保存新闻到数据库
|
||
new_count = 0
|
||
if news_items:
|
||
for item in news_items:
|
||
# 检查是否已存在(根据URL去重)
|
||
existing = News.query.filter_by(
|
||
site_id=site.id,
|
||
url=item['url']
|
||
).first()
|
||
|
||
if not existing:
|
||
news = News(
|
||
site_id=site.id,
|
||
title=item['title'],
|
||
content=item.get('summary') or item.get('snippet', ''),
|
||
url=item['url'],
|
||
source_name=item.get('site_name', ''),
|
||
source_icon=item.get('site_icon', ''),
|
||
published_at=item.get('published_at'),
|
||
news_type='Search Result',
|
||
is_active=True
|
||
)
|
||
db.session.add(news)
|
||
new_count += 1
|
||
|
||
db.session.commit()
|
||
|
||
# 返回最新的新闻列表
|
||
news_list = News.query.filter_by(
|
||
site_id=site.id,
|
||
is_active=True
|
||
).order_by(News.published_at.desc()).limit(5).all()
|
||
|
||
# 格式化新闻数据
|
||
news_data = []
|
||
for news in news_list:
|
||
news_data.append({
|
||
'title': news.title,
|
||
'content': news.content[:200] if news.content else '',
|
||
'url': news.url,
|
||
'source_name': news.source_name,
|
||
'source_icon': news.source_icon,
|
||
'published_at': news.published_at.strftime('%Y年%m月%d日') if news.published_at else '未知日期',
|
||
'news_type': news.news_type
|
||
})
|
||
|
||
return jsonify({
|
||
'success': True,
|
||
'news': news_data,
|
||
'new_count': new_count,
|
||
'message': f'成功获取{new_count}条新资讯' if new_count > 0 else '暂无新资讯'
|
||
})
|
||
|
||
except Exception as e:
|
||
print(f"获取新闻失败:{str(e)}")
|
||
import traceback
|
||
traceback.print_exc()
|
||
db.session.rollback()
|
||
return jsonify({'success': False, 'error': str(e)}), 500
|
||
|
||
|
||
# ========== 社媒营销路由 (v2.5新增) ==========
|
||
@app.route('/api/generate-social-share', methods=['POST'])
|
||
def generate_social_share():
|
||
"""一键生成社媒分享文案(前台可访问)"""
|
||
try:
|
||
data = request.get_json() or {}
|
||
site_code = (data.get('site_code') or '').strip()
|
||
platforms = data.get('platforms') or []
|
||
|
||
if not site_code:
|
||
return jsonify({'success': False, 'message': '请提供网站编码'}), 400
|
||
|
||
site = Site.query.filter_by(code=site_code, is_active=True).first()
|
||
if not site:
|
||
return jsonify({'success': False, 'message': '网站不存在或未启用'}), 404
|
||
|
||
# 允许的平台列表(前端也会限制,这里再做一次兜底)
|
||
allowed_platforms = {
|
||
'universal',
|
||
'xiaohongshu',
|
||
'douyin',
|
||
'bilibili',
|
||
'wechat',
|
||
'moments',
|
||
'x',
|
||
'linkedin'
|
||
}
|
||
|
||
if not isinstance(platforms, list):
|
||
platforms = []
|
||
platforms = [p for p in platforms if isinstance(p, str)]
|
||
platforms = [p.strip().lower() for p in platforms if p.strip()]
|
||
platforms = [p for p in platforms if p in allowed_platforms]
|
||
|
||
# 默认生成通用模板
|
||
if not platforms:
|
||
platforms = ['universal']
|
||
|
||
# 组织站点信息
|
||
site_info = {
|
||
'name': site.name,
|
||
'url': site.url,
|
||
'short_desc': site.short_desc or '',
|
||
'description': (site.description or '')[:1200],
|
||
'features': (site.features or '')[:1200],
|
||
'tags': [t.name for t in (site.tags or [])]
|
||
}
|
||
|
||
# 尝试使用PromptTemplate + DeepSeek生成
|
||
from utils.tag_generator import TagGenerator
|
||
generator = TagGenerator()
|
||
prompt = PromptTemplate.query.filter_by(key='social_share', is_active=True).first()
|
||
|
||
generated = {}
|
||
|
||
if prompt and generator.client:
|
||
# 构造提示词变量
|
||
tags_text = ','.join(site_info['tags'])
|
||
user_prompt = prompt.user_prompt_template.format(
|
||
name=site_info['name'],
|
||
url=site_info['url'],
|
||
short_desc=site_info['short_desc'],
|
||
description=site_info['description'],
|
||
features=site_info['features'],
|
||
tags=tags_text,
|
||
platforms=','.join(platforms)
|
||
)
|
||
|
||
try:
|
||
resp = generator.client.chat.completions.create(
|
||
model='deepseek-chat',
|
||
messages=[
|
||
{'role': 'system', 'content': prompt.system_prompt},
|
||
{'role': 'user', 'content': user_prompt}
|
||
],
|
||
temperature=0.7,
|
||
max_tokens=1200
|
||
)
|
||
text = (resp.choices[0].message.content or '').strip()
|
||
# 约定:模型返回JSON字符串;若不是JSON则回退到纯文本放到universal
|
||
import json as _json
|
||
try:
|
||
parsed = _json.loads(text)
|
||
if isinstance(parsed, dict):
|
||
for k, v in parsed.items():
|
||
if isinstance(k, str) and isinstance(v, str) and k in allowed_platforms:
|
||
generated[k] = v.strip()
|
||
except Exception:
|
||
generated['universal'] = text
|
||
except Exception as e:
|
||
# AI失败不影响整体,回退模板
|
||
print(f"社媒文案AI生成失败:{str(e)}")
|
||
|
||
# 回退:规则模板(根据各平台字数限制优化)
|
||
def build_fallback(p):
|
||
name = site_info['name']
|
||
url = site_info['url']
|
||
short_desc = site_info['short_desc'] or ''
|
||
full_desc = site_info['description'] or ''
|
||
features = site_info['features'] or ''
|
||
tags = site_info['tags'][:6]
|
||
tags_line = ' ' + ' '.join([f"#{t}" for t in tags]) if tags else ''
|
||
|
||
# X (Twitter) - 280字符限制,简洁为主
|
||
if p == 'x':
|
||
desc = (short_desc or full_desc)[:100]
|
||
base = f"🔥 {name}\n\n{desc}\n\n🔗 {url}{tags_line}"
|
||
return base[:280]
|
||
|
||
# LinkedIn - 3000字限制,专业风格
|
||
if p == 'linkedin':
|
||
desc = (full_desc or short_desc)[:500]
|
||
content = f"💡 推荐一个实用工具:{name}\n\n📝 简介:\n{desc}\n\n"
|
||
if features:
|
||
features_list = features.split('\n')[:3]
|
||
content += f"✨ 主要功能:\n" + '\n'.join([f"• {f.strip()}" for f in features_list if f.strip()][:3]) + "\n\n"
|
||
content += f"🔗 官网:{url}\n\n"
|
||
if tags:
|
||
content += f"📌 适用场景:{', '.join(tags)}"
|
||
return content[:3000]
|
||
|
||
# 小红书 - 2000字限制,图文风格
|
||
if p == 'xiaohongshu':
|
||
desc = (short_desc or full_desc)[:300]
|
||
content = f"✨ {name} | 我最近在用的宝藏工具\n\n"
|
||
content += f"📌 一句话介绍:\n{desc}\n\n"
|
||
if features:
|
||
features_list = features.split('\n')[:5]
|
||
content += f"🎯 核心功能:\n" + '\n'.join([f"✓ {f.strip()}" for f in features_list if f.strip()][:5]) + "\n\n"
|
||
content += f"🔗 体验入口:{url}\n\n{tags_line}"
|
||
return content[:2000]
|
||
|
||
# 抖音 - 2000字限制,短视频文案风格
|
||
if p == 'douyin':
|
||
desc = (short_desc or full_desc)[:200]
|
||
content = f"🔥 发现一个超好用的工具:{name}\n\n"
|
||
content += f"💡 {desc}\n\n"
|
||
if features:
|
||
features_list = features.split('\n')[:3]
|
||
content += "✨ 亮点功能:\n" + '\n'.join([f"👉 {f.strip()}" for f in features_list if f.strip()][:3]) + "\n\n"
|
||
content += f"🔗 想试试戳:{url}\n\n{tags_line}"
|
||
return content[:2000]
|
||
|
||
# B站 - 20000字限制,专栏风格
|
||
if p == 'bilibili':
|
||
desc = (full_desc or short_desc)[:800]
|
||
content = f"【分享】{name} - 值得一试的优质工具\n\n"
|
||
content += f"## 📖 工具简介\n{desc}\n\n"
|
||
if features:
|
||
content += f"## ✨ 主要功能\n{features}\n\n"
|
||
content += f"## 🔗 体验地址\n{url}\n\n"
|
||
if tags:
|
||
content += f"## 🏷️ 相关标签\n{' / '.join(tags)}"
|
||
return content[:20000]
|
||
|
||
# 微信公众号 - 基本无限制,正式风格
|
||
if p == 'wechat':
|
||
desc = (full_desc or short_desc)[:600]
|
||
content = f"今天给大家分享一个实用工具:{name}\n\n"
|
||
content += f"📝 工具介绍\n{desc}\n\n"
|
||
if features:
|
||
content += f"✨ 核心功能\n{features}\n\n"
|
||
content += f"🔗 官方网站\n{url}\n\n"
|
||
if tags:
|
||
content += f"如果你也在关注「{' · '.join(tags)}」相关的工具,不妨收藏一下这个实用的选择。"
|
||
return content
|
||
|
||
# 朋友圈 - 简短为主
|
||
if p == 'moments':
|
||
desc = (short_desc or full_desc)[:80]
|
||
return f"💡 {name}\n{desc}\n🔗 {url}{tags_line}".strip()
|
||
|
||
# 通用模板
|
||
desc = (full_desc or short_desc)[:300]
|
||
return f"{name}\n\n{desc}\n\n{url}{tags_line}".strip()
|
||
|
||
results = {}
|
||
for p in platforms:
|
||
results[p] = generated.get(p) or build_fallback(p)
|
||
|
||
# 统一补充一个通用版本
|
||
if 'universal' not in results:
|
||
results['universal'] = generated.get('universal') or build_fallback('universal')
|
||
|
||
return jsonify({
|
||
'success': True,
|
||
'site': {
|
||
'code': site.code,
|
||
'name': site.name,
|
||
'url': site.url
|
||
},
|
||
'platforms': platforms,
|
||
'content': results
|
||
})
|
||
|
||
except Exception as e:
|
||
return jsonify({'success': False, 'message': f'生成失败: {str(e)}'}), 500
|
||
|
||
# ========== 后台登录路由 ==========
|
||
@app.route('/admin/login', methods=['GET', 'POST'])
|
||
def admin_login():
|
||
"""管理员登录"""
|
||
if current_user.is_authenticated:
|
||
return redirect(url_for('admin.index'))
|
||
|
||
if request.method == 'POST':
|
||
username = request.form.get('username')
|
||
password = request.form.get('password')
|
||
|
||
admin = AdminModel.query.filter_by(username=username).first()
|
||
|
||
if admin and admin.check_password(password) and admin.is_active:
|
||
login_user(admin)
|
||
admin.last_login = datetime.now()
|
||
db.session.commit()
|
||
return redirect(url_for('admin.index'))
|
||
else:
|
||
flash('用户名或密码错误', 'error')
|
||
|
||
return render_template('admin_login.html')
|
||
|
||
@app.route('/admin/logout')
|
||
@login_required
|
||
def admin_logout():
|
||
"""管理员登出"""
|
||
logout_user()
|
||
return redirect(url_for('index'))
|
||
|
||
# ========== 用户认证路由 ==========
|
||
@app.route('/register', methods=['GET', 'POST'])
|
||
def user_register():
|
||
"""用户注册"""
|
||
if current_user.is_authenticated:
|
||
# 如果已登录,跳转到首页
|
||
return redirect(url_for('index'))
|
||
|
||
if request.method == 'POST':
|
||
username = request.form.get('username', '').strip()
|
||
password = request.form.get('password', '').strip()
|
||
confirm_password = request.form.get('confirm_password', '').strip()
|
||
|
||
# 验证输入
|
||
if not username or not password:
|
||
flash('用户名和密码不能为空', 'error')
|
||
return render_template('auth/register.html')
|
||
|
||
if len(username) < 3:
|
||
flash('用户名至少3个字符', 'error')
|
||
return render_template('auth/register.html')
|
||
|
||
if len(password) < 6:
|
||
flash('密码至少6个字符', 'error')
|
||
return render_template('auth/register.html')
|
||
|
||
if password != confirm_password:
|
||
flash('两次输入的密码不一致', 'error')
|
||
return render_template('auth/register.html')
|
||
|
||
# 检查用户名是否已存在
|
||
if User.query.filter_by(username=username).first():
|
||
flash('该用户名已被注册', 'error')
|
||
return render_template('auth/register.html')
|
||
|
||
# 创建用户
|
||
try:
|
||
user = User(username=username)
|
||
user.set_password(password)
|
||
user.last_login = datetime.now()
|
||
db.session.add(user)
|
||
db.session.commit()
|
||
|
||
# 自动登录
|
||
login_user(user)
|
||
|
||
flash('注册成功!', 'success')
|
||
return redirect(url_for('index'))
|
||
except Exception as e:
|
||
db.session.rollback()
|
||
flash(f'注册失败:{str(e)}', 'error')
|
||
return render_template('auth/register.html')
|
||
|
||
return render_template('auth/register.html')
|
||
|
||
@app.route('/login', methods=['GET', 'POST'])
|
||
def user_login():
|
||
"""用户登录"""
|
||
if current_user.is_authenticated:
|
||
return redirect(url_for('index'))
|
||
|
||
if request.method == 'POST':
|
||
username = request.form.get('username', '').strip()
|
||
password = request.form.get('password', '').strip()
|
||
|
||
if not username or not password:
|
||
flash('请输入用户名和密码', 'error')
|
||
return render_template('auth/login.html')
|
||
|
||
# 查找用户
|
||
user = User.query.filter_by(username=username).first()
|
||
|
||
if user and user.check_password(password) and user.is_active:
|
||
login_user(user)
|
||
user.last_login = datetime.now()
|
||
db.session.commit()
|
||
|
||
# 获取next参数,如果有则跳转,否则跳转首页
|
||
next_page = request.args.get('next')
|
||
if next_page and next_page.startswith('/'):
|
||
return redirect(next_page)
|
||
return redirect(url_for('index'))
|
||
else:
|
||
flash('用户名或密码错误', 'error')
|
||
|
||
return render_template('auth/login.html')
|
||
|
||
@app.route('/logout')
|
||
@login_required
|
||
def user_logout():
|
||
"""用户登出"""
|
||
logout_user()
|
||
return redirect(url_for('index'))
|
||
|
||
# ========== 用户认证API ==========
|
||
@app.route('/api/auth/status', methods=['GET'])
|
||
def auth_status():
|
||
"""获取登录状态"""
|
||
if current_user.is_authenticated:
|
||
# 判断是Admin还是User
|
||
if isinstance(current_user, User):
|
||
return jsonify({
|
||
'logged_in': True,
|
||
'user_type': 'user',
|
||
'username': current_user.username,
|
||
'avatar': current_user.avatar,
|
||
'id': current_user.id
|
||
})
|
||
else:
|
||
return jsonify({
|
||
'logged_in': True,
|
||
'user_type': 'admin',
|
||
'username': current_user.username,
|
||
'id': current_user.id
|
||
})
|
||
return jsonify({'logged_in': False})
|
||
|
||
# ========== 收藏功能API ==========
|
||
@app.route('/api/collections/toggle', methods=['POST'])
|
||
@login_required
|
||
def toggle_collection():
|
||
"""收藏/取消收藏"""
|
||
# 检查是否为普通用户
|
||
if not isinstance(current_user, User):
|
||
return jsonify({'success': False, 'message': '管理员账号无法使用收藏功能'}), 403
|
||
|
||
try:
|
||
data = request.get_json() or {}
|
||
site_code = data.get('site_code', '').strip()
|
||
folder_id = data.get('folder_id') # 可选,指定文件夹
|
||
|
||
if not site_code:
|
||
return jsonify({'success': False, 'message': '请提供网站编码'}), 400
|
||
|
||
# 查找网站
|
||
site = Site.query.filter_by(code=site_code, is_active=True).first()
|
||
if not site:
|
||
return jsonify({'success': False, 'message': '网站不存在'}), 404
|
||
|
||
# 检查是否已收藏
|
||
existing = Collection.query.filter_by(
|
||
user_id=current_user.id,
|
||
site_id=site.id
|
||
).first()
|
||
|
||
if existing:
|
||
# 已收藏,则取消收藏
|
||
db.session.delete(existing)
|
||
db.session.commit()
|
||
return jsonify({
|
||
'success': True,
|
||
'action': 'removed',
|
||
'message': '已取消收藏'
|
||
})
|
||
else:
|
||
# 未收藏,则添加收藏
|
||
collection = Collection(
|
||
user_id=current_user.id,
|
||
site_id=site.id,
|
||
folder_id=folder_id
|
||
)
|
||
db.session.add(collection)
|
||
db.session.commit()
|
||
return jsonify({
|
||
'success': True,
|
||
'action': 'added',
|
||
'message': '收藏成功',
|
||
'collection_id': collection.id
|
||
})
|
||
|
||
except Exception as e:
|
||
db.session.rollback()
|
||
return jsonify({'success': False, 'message': f'操作失败:{str(e)}'}), 500
|
||
|
||
@app.route('/api/collections/status/<site_code>', methods=['GET'])
|
||
@login_required
|
||
def collection_status(site_code):
|
||
"""查询收藏状态"""
|
||
if not isinstance(current_user, User):
|
||
return jsonify({'is_collected': False})
|
||
|
||
try:
|
||
site = Site.query.filter_by(code=site_code, is_active=True).first()
|
||
if not site:
|
||
return jsonify({'is_collected': False})
|
||
|
||
collection = Collection.query.filter_by(
|
||
user_id=current_user.id,
|
||
site_id=site.id
|
||
).first()
|
||
|
||
return jsonify({
|
||
'is_collected': collection is not None,
|
||
'collection_id': collection.id if collection else None,
|
||
'folder_id': collection.folder_id if collection else None
|
||
})
|
||
|
||
except Exception as e:
|
||
return jsonify({'is_collected': False, 'error': str(e)})
|
||
|
||
@app.route('/api/collections/list', methods=['GET'])
|
||
@login_required
|
||
def list_collections():
|
||
"""获取收藏列表"""
|
||
if not isinstance(current_user, User):
|
||
return jsonify({'success': False, 'message': '仅普通用户可访问'}), 403
|
||
|
||
try:
|
||
folder_id = request.args.get('folder_id')
|
||
page = request.args.get('page', 1, type=int)
|
||
per_page = request.args.get('per_page', 20, type=int)
|
||
|
||
# 构建查询
|
||
query = Collection.query.filter_by(user_id=current_user.id)
|
||
|
||
if folder_id:
|
||
if folder_id == 'null' or folder_id == 'none':
|
||
query = query.filter_by(folder_id=None)
|
||
else:
|
||
query = query.filter_by(folder_id=int(folder_id))
|
||
|
||
# 按创建时间倒序
|
||
query = query.order_by(Collection.created_at.desc())
|
||
|
||
# 分页
|
||
pagination = query.paginate(page=page, per_page=per_page, error_out=False)
|
||
|
||
# 格式化数据
|
||
collections_data = []
|
||
for collection in pagination.items:
|
||
site = collection.site
|
||
collections_data.append({
|
||
'id': collection.id,
|
||
'site_id': site.id,
|
||
'site_code': site.code,
|
||
'site_name': site.name,
|
||
'site_url': site.url,
|
||
'site_logo': site.logo,
|
||
'site_short_desc': site.short_desc,
|
||
'folder_id': collection.folder_id,
|
||
'note': collection.note,
|
||
'created_at': collection.created_at.strftime('%Y-%m-%d %H:%M:%S') if collection.created_at else None
|
||
})
|
||
|
||
return jsonify({
|
||
'success': True,
|
||
'collections': collections_data,
|
||
'total': pagination.total,
|
||
'page': page,
|
||
'per_page': per_page,
|
||
'has_next': pagination.has_next,
|
||
'has_prev': pagination.has_prev
|
||
})
|
||
|
||
except Exception as e:
|
||
return jsonify({'success': False, 'message': f'获取失败:{str(e)}'}), 500
|
||
|
||
@app.route('/api/collections/<int:collection_id>/note', methods=['PUT'])
|
||
@login_required
|
||
def update_collection_note(collection_id):
|
||
"""更新收藏备注"""
|
||
if not isinstance(current_user, User):
|
||
return jsonify({'success': False, 'message': '仅普通用户可访问'}), 403
|
||
|
||
try:
|
||
data = request.get_json() or {}
|
||
note = data.get('note', '').strip()
|
||
|
||
collection = Collection.query.filter_by(
|
||
id=collection_id,
|
||
user_id=current_user.id
|
||
).first()
|
||
|
||
if not collection:
|
||
return jsonify({'success': False, 'message': '收藏不存在'}), 404
|
||
|
||
collection.note = note
|
||
db.session.commit()
|
||
|
||
return jsonify({
|
||
'success': True,
|
||
'message': '备注已更新'
|
||
})
|
||
|
||
except Exception as e:
|
||
db.session.rollback()
|
||
return jsonify({'success': False, 'message': f'更新失败:{str(e)}'}), 500
|
||
|
||
@app.route('/api/collections/<int:collection_id>/move', methods=['PUT'])
|
||
@login_required
|
||
def move_collection(collection_id):
|
||
"""移动收藏到其他文件夹"""
|
||
if not isinstance(current_user, User):
|
||
return jsonify({'success': False, 'message': '仅普通用户可访问'}), 403
|
||
|
||
try:
|
||
data = request.get_json() or {}
|
||
folder_id = data.get('folder_id') # None表示移到未分类
|
||
|
||
collection = Collection.query.filter_by(
|
||
id=collection_id,
|
||
user_id=current_user.id
|
||
).first()
|
||
|
||
if not collection:
|
||
return jsonify({'success': False, 'message': '收藏不存在'}), 404
|
||
|
||
# 如果指定了文件夹,验证文件夹是否属于当前用户
|
||
if folder_id:
|
||
folder = Folder.query.filter_by(
|
||
id=folder_id,
|
||
user_id=current_user.id
|
||
).first()
|
||
if not folder:
|
||
return jsonify({'success': False, 'message': '文件夹不存在'}), 404
|
||
|
||
collection.folder_id = folder_id
|
||
db.session.commit()
|
||
|
||
return jsonify({
|
||
'success': True,
|
||
'message': '已移动到指定文件夹'
|
||
})
|
||
|
||
except Exception as e:
|
||
db.session.rollback()
|
||
return jsonify({'success': False, 'message': f'移动失败:{str(e)}'}), 500
|
||
|
||
# ========== 文件夹管理API ==========
|
||
@app.route('/api/folders', methods=['GET'])
|
||
@login_required
|
||
def list_folders():
|
||
"""获取文件夹列表"""
|
||
if not isinstance(current_user, User):
|
||
return jsonify({'success': False, 'message': '仅普通用户可访问'}), 403
|
||
|
||
try:
|
||
folders = Folder.query.filter_by(user_id=current_user.id).order_by(
|
||
Folder.sort_order.desc(), Folder.created_at
|
||
).all()
|
||
|
||
folders_data = []
|
||
for folder in folders:
|
||
# 统计文件夹中的收藏数量
|
||
count = Collection.query.filter_by(
|
||
user_id=current_user.id,
|
||
folder_id=folder.id
|
||
).count()
|
||
|
||
folders_data.append({
|
||
'id': folder.id,
|
||
'name': folder.name,
|
||
'description': folder.description,
|
||
'icon': folder.icon,
|
||
'sort_order': folder.sort_order,
|
||
'is_public': folder.is_public,
|
||
'public_slug': folder.public_slug,
|
||
'count': count,
|
||
'created_at': folder.created_at.strftime('%Y-%m-%d %H:%M:%S') if folder.created_at else None
|
||
})
|
||
|
||
return jsonify({
|
||
'success': True,
|
||
'folders': folders_data
|
||
})
|
||
|
||
except Exception as e:
|
||
return jsonify({'success': False, 'message': f'获取失败:{str(e)}'}), 500
|
||
|
||
@app.route('/api/folders', methods=['POST'])
|
||
@login_required
|
||
def create_folder():
|
||
"""创建文件夹"""
|
||
if not isinstance(current_user, User):
|
||
return jsonify({'success': False, 'message': '仅普通用户可访问'}), 403
|
||
|
||
try:
|
||
data = request.get_json() or {}
|
||
name = data.get('name', '').strip()
|
||
description = data.get('description', '').strip()
|
||
icon = data.get('icon', '📁').strip()
|
||
|
||
if not name:
|
||
return jsonify({'success': False, 'message': '文件夹名称不能为空'}), 400
|
||
|
||
# 检查同名文件夹
|
||
existing = Folder.query.filter_by(
|
||
user_id=current_user.id,
|
||
name=name
|
||
).first()
|
||
|
||
if existing:
|
||
return jsonify({'success': False, 'message': '该文件夹名称已存在'}), 400
|
||
|
||
# 创建文件夹
|
||
folder = Folder(
|
||
user_id=current_user.id,
|
||
name=name,
|
||
description=description,
|
||
icon=icon
|
||
)
|
||
db.session.add(folder)
|
||
db.session.commit()
|
||
|
||
return jsonify({
|
||
'success': True,
|
||
'message': '文件夹创建成功',
|
||
'folder': {
|
||
'id': folder.id,
|
||
'name': folder.name,
|
||
'description': folder.description,
|
||
'icon': folder.icon
|
||
}
|
||
})
|
||
|
||
except Exception as e:
|
||
db.session.rollback()
|
||
return jsonify({'success': False, 'message': f'创建失败:{str(e)}'}), 500
|
||
|
||
@app.route('/api/folders/<int:folder_id>', methods=['PUT'])
|
||
@login_required
|
||
def update_folder(folder_id):
|
||
"""更新文件夹"""
|
||
if not isinstance(current_user, User):
|
||
return jsonify({'success': False, 'message': '仅普通用户可访问'}), 403
|
||
|
||
try:
|
||
data = request.get_json() or {}
|
||
|
||
folder = Folder.query.filter_by(
|
||
id=folder_id,
|
||
user_id=current_user.id
|
||
).first()
|
||
|
||
if not folder:
|
||
return jsonify({'success': False, 'message': '文件夹不存在'}), 404
|
||
|
||
# 更新字段
|
||
if 'name' in data:
|
||
name = data['name'].strip()
|
||
if not name:
|
||
return jsonify({'success': False, 'message': '文件夹名称不能为空'}), 400
|
||
|
||
# 检查同名(排除自己)
|
||
existing = Folder.query.filter(
|
||
Folder.user_id == current_user.id,
|
||
Folder.name == name,
|
||
Folder.id != folder_id
|
||
).first()
|
||
|
||
if existing:
|
||
return jsonify({'success': False, 'message': '该文件夹名称已存在'}), 400
|
||
|
||
folder.name = name
|
||
|
||
if 'description' in data:
|
||
folder.description = data['description'].strip()
|
||
|
||
if 'icon' in data:
|
||
folder.icon = data['icon'].strip()
|
||
|
||
if 'sort_order' in data:
|
||
folder.sort_order = int(data['sort_order'])
|
||
|
||
db.session.commit()
|
||
|
||
return jsonify({
|
||
'success': True,
|
||
'message': '文件夹已更新'
|
||
})
|
||
|
||
except Exception as e:
|
||
db.session.rollback()
|
||
return jsonify({'success': False, 'message': f'更新失败:{str(e)}'}), 500
|
||
|
||
@app.route('/api/folders/<int:folder_id>', methods=['DELETE'])
|
||
@login_required
|
||
def delete_folder(folder_id):
|
||
"""删除文件夹"""
|
||
if not isinstance(current_user, User):
|
||
return jsonify({'success': False, 'message': '仅普通用户可访问'}), 403
|
||
|
||
try:
|
||
folder = Folder.query.filter_by(
|
||
id=folder_id,
|
||
user_id=current_user.id
|
||
).first()
|
||
|
||
if not folder:
|
||
return jsonify({'success': False, 'message': '文件夹不存在'}), 404
|
||
|
||
# 删除文件夹(级联删除收藏记录)
|
||
db.session.delete(folder)
|
||
db.session.commit()
|
||
|
||
return jsonify({
|
||
'success': True,
|
||
'message': '文件夹已删除'
|
||
})
|
||
|
||
except Exception as e:
|
||
db.session.rollback()
|
||
return jsonify({'success': False, 'message': f'删除失败:{str(e)}'}), 500
|
||
|
||
# ========== 用户中心页面路由 ==========
|
||
@app.route('/user/profile')
|
||
@login_required
|
||
def user_profile():
|
||
"""用户中心主页"""
|
||
if not isinstance(current_user, User):
|
||
flash('仅普通用户可访问', 'error')
|
||
return redirect(url_for('index'))
|
||
|
||
# 统计信息
|
||
collections_count = Collection.query.filter_by(user_id=current_user.id).count()
|
||
folders_count = Folder.query.filter_by(user_id=current_user.id).count()
|
||
|
||
# 最近收藏(5条)
|
||
recent_collections = Collection.query.filter_by(
|
||
user_id=current_user.id
|
||
).order_by(Collection.created_at.desc()).limit(5).all()
|
||
|
||
return render_template('user/profile.html',
|
||
collections_count=collections_count,
|
||
folders_count=folders_count,
|
||
recent_collections=recent_collections)
|
||
|
||
@app.route('/user/change-password')
|
||
@login_required
|
||
def user_change_password_page():
|
||
"""修改密码页面"""
|
||
if not isinstance(current_user, User):
|
||
flash('仅普通用户可访问', 'error')
|
||
return redirect(url_for('index'))
|
||
|
||
return render_template('user/change_password.html')
|
||
|
||
@app.route('/user/collections')
|
||
@login_required
|
||
def user_collections():
|
||
"""收藏列表页面"""
|
||
if not isinstance(current_user, User):
|
||
flash('仅普通用户可访问', 'error')
|
||
return redirect(url_for('index'))
|
||
|
||
# 获取所有文件夹
|
||
folders = Folder.query.filter_by(user_id=current_user.id).order_by(
|
||
Folder.sort_order.desc(), Folder.created_at
|
||
).all()
|
||
|
||
# 为每个文件夹添加收藏计数
|
||
for folder in folders:
|
||
folder.count = Collection.query.filter_by(
|
||
user_id=current_user.id,
|
||
folder_id=folder.id
|
||
).count()
|
||
|
||
# 获取收藏(分页)
|
||
page = request.args.get('page', 1, type=int)
|
||
folder_id = request.args.get('folder_id')
|
||
|
||
query = Collection.query.filter_by(user_id=current_user.id)
|
||
|
||
if folder_id:
|
||
if folder_id == 'none':
|
||
query = query.filter_by(folder_id=None)
|
||
else:
|
||
query = query.filter_by(folder_id=int(folder_id))
|
||
|
||
query = query.order_by(Collection.created_at.desc())
|
||
pagination = query.paginate(page=page, per_page=20, error_out=False)
|
||
|
||
return render_template('user/collections.html',
|
||
folders=folders,
|
||
collections=pagination.items,
|
||
pagination=pagination,
|
||
current_folder_id=folder_id)
|
||
|
||
@app.route('/api/user/profile', methods=['PUT'])
|
||
@login_required
|
||
def update_user_profile():
|
||
"""更新用户资料"""
|
||
if not isinstance(current_user, User):
|
||
return jsonify({'success': False, 'message': '仅普通用户可访问'}), 403
|
||
|
||
try:
|
||
data = request.get_json() or {}
|
||
|
||
if 'bio' in data:
|
||
current_user.bio = data['bio'].strip()
|
||
|
||
if 'avatar' in data:
|
||
current_user.avatar = data['avatar'].strip()
|
||
|
||
if 'is_public_profile' in data:
|
||
current_user.is_public_profile = bool(data['is_public_profile'])
|
||
|
||
db.session.commit()
|
||
|
||
return jsonify({
|
||
'success': True,
|
||
'message': '资料已更新'
|
||
})
|
||
|
||
except Exception as e:
|
||
db.session.rollback()
|
||
return jsonify({'success': False, 'message': f'更新失败:{str(e)}'}), 500
|
||
|
||
@app.route('/api/user/change-password', methods=['PUT'])
|
||
@login_required
|
||
def user_change_password():
|
||
"""普通用户修改密码"""
|
||
if not isinstance(current_user, User):
|
||
return jsonify({'success': False, 'message': '仅普通用户可访问'}), 403
|
||
|
||
try:
|
||
data = request.get_json() or {}
|
||
old_password = data.get('old_password', '').strip()
|
||
new_password = data.get('new_password', '').strip()
|
||
confirm_password = data.get('confirm_password', '').strip()
|
||
|
||
# 验证旧密码
|
||
if not old_password:
|
||
return jsonify({'success': False, 'message': '请输入旧密码'}), 400
|
||
|
||
if not current_user.check_password(old_password):
|
||
return jsonify({'success': False, 'message': '旧密码错误'}), 400
|
||
|
||
# 验证新密码
|
||
if not new_password:
|
||
return jsonify({'success': False, 'message': '请输入新密码'}), 400
|
||
|
||
if len(new_password) < 6:
|
||
return jsonify({'success': False, 'message': '新密码长度至少6位'}), 400
|
||
|
||
if new_password != confirm_password:
|
||
return jsonify({'success': False, 'message': '两次输入的新密码不一致'}), 400
|
||
|
||
if old_password == new_password:
|
||
return jsonify({'success': False, 'message': '新密码不能与旧密码相同'}), 400
|
||
|
||
# 更新密码
|
||
current_user.set_password(new_password)
|
||
db.session.commit()
|
||
|
||
return jsonify({
|
||
'success': True,
|
||
'message': '密码修改成功'
|
||
})
|
||
|
||
except Exception as e:
|
||
db.session.rollback()
|
||
return jsonify({'success': False, 'message': f'修改失败:{str(e)}'}), 500
|
||
|
||
@app.route('/api/user/email', methods=['PUT'])
|
||
@login_required
|
||
def update_user_email():
|
||
"""更新用户邮箱"""
|
||
if not isinstance(current_user, User):
|
||
return jsonify({'success': False, 'message': '仅普通用户可访问'}), 403
|
||
|
||
try:
|
||
data = request.get_json() or {}
|
||
email = data.get('email', '').strip()
|
||
|
||
# 验证邮箱格式
|
||
if not email:
|
||
return jsonify({'success': False, 'message': '请输入邮箱地址'}), 400
|
||
|
||
import re
|
||
email_pattern = r'^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$'
|
||
if not re.match(email_pattern, email):
|
||
return jsonify({'success': False, 'message': '邮箱格式不正确'}), 400
|
||
|
||
# 检查邮箱是否已被其他用户使用
|
||
existing_user = User.query.filter(
|
||
User.email == email,
|
||
User.id != current_user.id
|
||
).first()
|
||
|
||
if existing_user:
|
||
return jsonify({'success': False, 'message': '该邮箱已被其他用户使用'}), 400
|
||
|
||
# 更新邮箱
|
||
current_user.email = email
|
||
# 重置验证状态
|
||
current_user.email_verified = False
|
||
current_user.email_verified_at = None
|
||
current_user.email_verify_token = None
|
||
current_user.email_verify_token_expires = None
|
||
|
||
db.session.commit()
|
||
|
||
return jsonify({
|
||
'success': True,
|
||
'message': '邮箱已更新,请验证新邮箱'
|
||
})
|
||
|
||
except Exception as e:
|
||
db.session.rollback()
|
||
return jsonify({'success': False, 'message': f'更新失败:{str(e)}'}), 500
|
||
|
||
@app.route('/api/user/send-verify-email', methods=['POST'])
|
||
@login_required
|
||
def send_verify_email():
|
||
"""发送邮箱验证邮件"""
|
||
if not isinstance(current_user, User):
|
||
return jsonify({'success': False, 'message': '仅普通用户可访问'}), 403
|
||
|
||
try:
|
||
# 检查是否已绑定邮箱
|
||
if not current_user.email:
|
||
return jsonify({'success': False, 'message': '请先绑定邮箱'}), 400
|
||
|
||
# 检查是否已验证
|
||
if current_user.email_verified:
|
||
return jsonify({'success': False, 'message': '邮箱已验证,无需重复验证'}), 400
|
||
|
||
# 生成验证令牌(32位随机字符串)
|
||
token = secrets.token_urlsafe(32)
|
||
expires = datetime.now() + timedelta(hours=24)
|
||
|
||
# 保存令牌
|
||
current_user.email_verify_token = token
|
||
current_user.email_verify_token_expires = expires
|
||
db.session.commit()
|
||
|
||
# 发送验证邮件
|
||
verify_url = url_for('verify_email', token=token, _external=True)
|
||
|
||
html_content = f"""
|
||
<!DOCTYPE html>
|
||
<html>
|
||
<head>
|
||
<meta charset="UTF-8">
|
||
<style>
|
||
body {{ font-family: Arial, sans-serif; line-height: 1.6; color: #333; }}
|
||
.container {{ max-width: 600px; margin: 0 auto; padding: 20px; }}
|
||
.header {{ background: linear-gradient(135deg, #0ea5e9 0%, #0284c7 100%); color: white; padding: 30px; text-align: center; border-radius: 8px 8px 0 0; }}
|
||
.content {{ background: #f8fafc; padding: 30px; border-radius: 0 0 8px 8px; }}
|
||
.button {{ display: inline-block; padding: 12px 30px; background: #0ea5e9; color: white; text-decoration: none; border-radius: 6px; margin: 20px 0; }}
|
||
.footer {{ text-align: center; margin-top: 20px; color: #64748b; font-size: 12px; }}
|
||
</style>
|
||
</head>
|
||
<body>
|
||
<div class="container">
|
||
<div class="header">
|
||
<h1>验证您的邮箱</h1>
|
||
</div>
|
||
<div class="content">
|
||
<p>您好,{current_user.username}!</p>
|
||
<p>感谢您注册 ZJPB。请点击下面的按钮验证您的邮箱地址:</p>
|
||
<p style="text-align: center;">
|
||
<a href="{verify_url}" class="button">验证邮箱</a>
|
||
</p>
|
||
<p>或复制以下链接到浏览器:</p>
|
||
<p style="word-break: break-all; background: white; padding: 10px; border-radius: 4px;">{verify_url}</p>
|
||
<p style="color: #64748b; font-size: 14px;">此链接将在24小时后失效。</p>
|
||
</div>
|
||
<div class="footer">
|
||
<p>如果您没有注册 ZJPB,请忽略此邮件。</p>
|
||
<p>© 2025 ZJPB - 自己品吧</p>
|
||
</div>
|
||
</div>
|
||
</body>
|
||
</html>
|
||
"""
|
||
|
||
text_content = f"""
|
||
验证您的邮箱
|
||
|
||
您好,{current_user.username}!
|
||
|
||
感谢您注册 ZJPB。请访问以下链接验证您的邮箱地址:
|
||
|
||
{verify_url}
|
||
|
||
此链接将在24小时后失效。
|
||
|
||
如果您没有注册 ZJPB,请忽略此邮件。
|
||
"""
|
||
|
||
email_sender = EmailSender()
|
||
success = email_sender.send_email(
|
||
to_email=current_user.email,
|
||
subject='验证您的邮箱 - ZJPB',
|
||
html_content=html_content,
|
||
text_content=text_content
|
||
)
|
||
|
||
if success:
|
||
return jsonify({
|
||
'success': True,
|
||
'message': '验证邮件已发送,请查收'
|
||
})
|
||
else:
|
||
return jsonify({
|
||
'success': False,
|
||
'message': '邮件发送失败,请稍后重试'
|
||
}), 500
|
||
|
||
except Exception as e:
|
||
db.session.rollback()
|
||
return jsonify({'success': False, 'message': f'发送失败:{str(e)}'}), 500
|
||
|
||
@app.route('/verify-email/<token>')
|
||
def verify_email(token):
|
||
"""验证邮箱"""
|
||
try:
|
||
# 查找令牌对应的用户
|
||
user = User.query.filter_by(email_verify_token=token).first()
|
||
|
||
if not user:
|
||
flash('验证链接无效', 'error')
|
||
return redirect(url_for('index'))
|
||
|
||
# 检查令牌是否过期
|
||
if user.email_verify_token_expires < datetime.now():
|
||
flash('验证链接已过期,请重新发送', 'error')
|
||
return redirect(url_for('user_profile'))
|
||
|
||
# 验证成功
|
||
user.email_verified = True
|
||
user.email_verified_at = datetime.now()
|
||
user.email_verify_token = None
|
||
user.email_verify_token_expires = None
|
||
db.session.commit()
|
||
|
||
flash('邮箱验证成功!', 'success')
|
||
return redirect(url_for('user_profile'))
|
||
|
||
except Exception as e:
|
||
flash(f'验证失败:{str(e)}', 'error')
|
||
return redirect(url_for('index'))
|
||
|
||
@app.route('/admin/change-password', methods=['GET', 'POST'])
|
||
@login_required
|
||
def change_password():
|
||
"""修改密码"""
|
||
# 只允许管理员访问
|
||
if not isinstance(current_user, AdminModel):
|
||
flash('无权访问此页面', 'error')
|
||
return redirect(url_for('index'))
|
||
|
||
if request.method == 'POST':
|
||
old_password = request.form.get('old_password', '').strip()
|
||
new_password = request.form.get('new_password', '').strip()
|
||
confirm_password = request.form.get('confirm_password', '').strip()
|
||
|
||
# 验证旧密码
|
||
if not current_user.check_password(old_password):
|
||
flash('旧密码错误', 'error')
|
||
return render_template('admin/change_password.html')
|
||
|
||
# 验证新密码
|
||
if not new_password:
|
||
flash('新密码不能为空', 'error')
|
||
return render_template('admin/change_password.html')
|
||
|
||
if len(new_password) < 6:
|
||
flash('新密码长度至少6位', 'error')
|
||
return render_template('admin/change_password.html')
|
||
|
||
if new_password != confirm_password:
|
||
flash('两次输入的新密码不一致', 'error')
|
||
return render_template('admin/change_password.html')
|
||
|
||
if old_password == new_password:
|
||
flash('新密码不能与旧密码相同', 'error')
|
||
return render_template('admin/change_password.html')
|
||
|
||
# 修改密码
|
||
try:
|
||
current_user.set_password(new_password)
|
||
db.session.commit()
|
||
flash('密码修改成功,请重新登录', 'success')
|
||
logout_user()
|
||
return redirect(url_for('admin_login'))
|
||
except Exception as e:
|
||
db.session.rollback()
|
||
flash(f'密码修改失败:{str(e)}', 'error')
|
||
return render_template('admin/change_password.html')
|
||
|
||
return render_template('admin/change_password.html')
|
||
|
||
# ========== API路由 ==========
|
||
@app.route('/api/fetch-website-info', methods=['POST'])
|
||
@login_required
|
||
def fetch_website_info():
|
||
"""抓取网站信息API"""
|
||
# 只允许管理员访问
|
||
if not isinstance(current_user, AdminModel):
|
||
return jsonify({'success': False, 'message': '无权访问'}), 403
|
||
|
||
try:
|
||
data = request.get_json()
|
||
url = data.get('url', '').strip()
|
||
|
||
if not url:
|
||
return jsonify({
|
||
'success': False,
|
||
'message': '请提供网站URL'
|
||
}), 400
|
||
|
||
# 创建抓取器
|
||
fetcher = WebsiteFetcher(timeout=15)
|
||
|
||
# 抓取网站信息
|
||
info = fetcher.fetch_website_info(url)
|
||
|
||
if not info:
|
||
return jsonify({
|
||
'success': False,
|
||
'message': '无法获取网站信息,请检查URL是否正确或手动填写'
|
||
})
|
||
|
||
# 下载Logo到本地(如果有)
|
||
logo_path = None
|
||
if info.get('logo_url'):
|
||
logo_path = fetcher.download_logo(info['logo_url'], save_dir='static/logos')
|
||
|
||
# 如果下载失败,不返回远程URL,让用户手动上传
|
||
return jsonify({
|
||
'success': True,
|
||
'data': {
|
||
'name': info.get('name', ''),
|
||
'description': info.get('description', ''),
|
||
'logo': logo_path if logo_path else ''
|
||
}
|
||
})
|
||
|
||
except Exception as e:
|
||
return jsonify({
|
||
'success': False,
|
||
'message': f'抓取失败: {str(e)}'
|
||
}), 500
|
||
|
||
@app.route('/api/upload-logo', methods=['POST'])
|
||
@login_required
|
||
def upload_logo():
|
||
"""上传Logo图片API"""
|
||
# 只允许管理员访问
|
||
if not isinstance(current_user, AdminModel):
|
||
return jsonify({'success': False, 'message': '无权访问'}), 403
|
||
|
||
try:
|
||
# 检查文件是否存在
|
||
if 'logo' not in request.files:
|
||
return jsonify({
|
||
'success': False,
|
||
'message': '请选择要上传的图片'
|
||
}), 400
|
||
|
||
file = request.files['logo']
|
||
|
||
# 检查文件名
|
||
if file.filename == '':
|
||
return jsonify({
|
||
'success': False,
|
||
'message': '未选择文件'
|
||
}), 400
|
||
|
||
# 检查文件类型
|
||
allowed_extensions = {'png', 'jpg', 'jpeg', 'gif', 'svg', 'ico', 'webp'}
|
||
filename = file.filename.lower()
|
||
if not any(filename.endswith('.' + ext) for ext in allowed_extensions):
|
||
return jsonify({
|
||
'success': False,
|
||
'message': '不支持的文件格式,请上传图片文件(png, jpg, gif, svg, ico, webp)'
|
||
}), 400
|
||
|
||
# 创建保存目录
|
||
save_dir = 'static/logos'
|
||
os.makedirs(save_dir, exist_ok=True)
|
||
|
||
# 生成安全的文件名
|
||
import time
|
||
import hashlib
|
||
ext = os.path.splitext(filename)[1]
|
||
timestamp = str(int(time.time() * 1000))
|
||
hash_name = hashlib.md5(f"{filename}{timestamp}".encode()).hexdigest()[:16]
|
||
safe_filename = f"logo_{hash_name}{ext}"
|
||
filepath = os.path.join(save_dir, safe_filename)
|
||
|
||
# 保存文件
|
||
file.save(filepath)
|
||
|
||
# 返回相对路径
|
||
return jsonify({
|
||
'success': True,
|
||
'path': f'/{filepath.replace(os.sep, "/")}'
|
||
})
|
||
|
||
except Exception as e:
|
||
return jsonify({
|
||
'success': False,
|
||
'message': f'上传失败: {str(e)}'
|
||
}), 500
|
||
|
||
@app.route('/api/generate-features', methods=['POST'])
|
||
@login_required
|
||
def generate_features():
|
||
"""使用DeepSeek自动生成网站主要功能"""
|
||
# 只允许管理员访问
|
||
if not isinstance(current_user, AdminModel):
|
||
return jsonify({'success': False, 'message': '无权访问'}), 403
|
||
|
||
try:
|
||
data = request.get_json()
|
||
name = data.get('name', '').strip()
|
||
description = data.get('description', '').strip()
|
||
url = data.get('url', '').strip()
|
||
|
||
if not name or not description:
|
||
return jsonify({
|
||
'success': False,
|
||
'message': '请提供网站名称和描述'
|
||
}), 400
|
||
|
||
# 生成功能列表
|
||
generator = TagGenerator()
|
||
features = generator.generate_features(name, description, url)
|
||
|
||
if not features:
|
||
return jsonify({
|
||
'success': False,
|
||
'message': 'DeepSeek功能生成失败,请检查API配置'
|
||
}), 500
|
||
|
||
return jsonify({
|
||
'success': True,
|
||
'features': features
|
||
})
|
||
|
||
except ValueError as e:
|
||
return jsonify({
|
||
'success': False,
|
||
'message': str(e)
|
||
}), 400
|
||
except Exception as e:
|
||
return jsonify({
|
||
'success': False,
|
||
'message': f'生成失败: {str(e)}'
|
||
}), 500
|
||
|
||
@app.route('/api/generate-description', methods=['POST'])
|
||
@login_required
|
||
def generate_description():
|
||
"""使用DeepSeek自动生成网站详细介绍"""
|
||
# 只允许管理员访问
|
||
if not isinstance(current_user, AdminModel):
|
||
return jsonify({'success': False, 'message': '无权访问'}), 403
|
||
|
||
try:
|
||
data = request.get_json()
|
||
name = data.get('name', '').strip()
|
||
short_desc = data.get('short_desc', '').strip()
|
||
url = data.get('url', '').strip()
|
||
|
||
if not name:
|
||
return jsonify({
|
||
'success': False,
|
||
'message': '请提供网站名称'
|
||
}), 400
|
||
|
||
# 生成详细介绍
|
||
generator = TagGenerator()
|
||
description = generator.generate_description(name, short_desc, url)
|
||
|
||
if not description:
|
||
return jsonify({
|
||
'success': False,
|
||
'message': 'DeepSeek详细介绍生成失败,请检查API配置'
|
||
}), 500
|
||
|
||
return jsonify({
|
||
'success': True,
|
||
'description': description
|
||
})
|
||
|
||
except ValueError as e:
|
||
return jsonify({
|
||
'success': False,
|
||
'message': str(e)
|
||
}), 400
|
||
except Exception as e:
|
||
return jsonify({
|
||
'success': False,
|
||
'message': f'生成失败: {str(e)}'
|
||
}), 500
|
||
|
||
@app.route('/api/generate-tags', methods=['POST'])
|
||
@login_required
|
||
def generate_tags():
|
||
"""使用DeepSeek自动生成标签"""
|
||
# 只允许管理员访问
|
||
if not isinstance(current_user, AdminModel):
|
||
return jsonify({'success': False, 'message': '无权访问'}), 403
|
||
|
||
try:
|
||
data = request.get_json()
|
||
name = data.get('name', '').strip()
|
||
description = data.get('description', '').strip()
|
||
|
||
if not name or not description:
|
||
return jsonify({
|
||
'success': False,
|
||
'message': '请提供网站名称和描述'
|
||
}), 400
|
||
|
||
# 获取现有标签作为参考
|
||
existing_tags = [tag.name for tag in Tag.query.all()]
|
||
|
||
# 生成标签
|
||
generator = TagGenerator()
|
||
suggested_tags = generator.generate_tags(name, description, existing_tags)
|
||
|
||
if not suggested_tags:
|
||
return jsonify({
|
||
'success': False,
|
||
'message': 'DeepSeek标签生成失败,请检查API配置'
|
||
}), 500
|
||
|
||
return jsonify({
|
||
'success': True,
|
||
'tags': suggested_tags
|
||
})
|
||
|
||
except ValueError as e:
|
||
return jsonify({
|
||
'success': False,
|
||
'message': str(e)
|
||
}), 400
|
||
except Exception as e:
|
||
return jsonify({
|
||
'success': False,
|
||
'message': f'生成失败: {str(e)}'
|
||
}), 500
|
||
|
||
# ========== 新闻获取路由 ==========
|
||
@app.route('/api/fetch-site-news', methods=['POST'])
|
||
@login_required
|
||
def fetch_site_news():
|
||
"""为指定网站获取最新新闻"""
|
||
# 只允许管理员访问
|
||
if not isinstance(current_user, AdminModel):
|
||
return jsonify({'success': False, 'message': '无权访问'}), 403
|
||
|
||
try:
|
||
data = request.get_json()
|
||
site_id = data.get('site_id')
|
||
count = data.get('count', app.config.get('NEWS_SEARCH_COUNT', 10))
|
||
freshness = data.get('freshness', app.config.get('NEWS_SEARCH_FRESHNESS', 'oneMonth'))
|
||
|
||
if not site_id:
|
||
return jsonify({
|
||
'success': False,
|
||
'message': '请提供网站ID'
|
||
}), 400
|
||
|
||
# 获取网站信息
|
||
site = Site.query.get(site_id)
|
||
if not site:
|
||
return jsonify({
|
||
'success': False,
|
||
'message': '网站不存在'
|
||
}), 404
|
||
|
||
# 检查博查API配置
|
||
api_key = app.config.get('BOCHA_API_KEY')
|
||
if not api_key:
|
||
return jsonify({
|
||
'success': False,
|
||
'message': '博查API未配置,请在.env文件中设置BOCHA_API_KEY'
|
||
}), 500
|
||
|
||
# 创建新闻搜索器
|
||
searcher = NewsSearcher(api_key)
|
||
|
||
# 搜索新闻
|
||
news_items = searcher.search_site_news(
|
||
site_name=site.name,
|
||
site_url=site.url,
|
||
news_keywords=site.news_keywords, # v2.3新增:使用专用关键词
|
||
count=count,
|
||
freshness=freshness
|
||
)
|
||
|
||
if not news_items:
|
||
return jsonify({
|
||
'success': False,
|
||
'message': '未找到相关新闻'
|
||
}), 404
|
||
|
||
# 保存新闻到数据库
|
||
saved_count = 0
|
||
for item in news_items:
|
||
# 检查新闻是否已存在(根据URL判断)
|
||
existing_news = News.query.filter_by(
|
||
site_id=site_id,
|
||
url=item['url']
|
||
).first()
|
||
|
||
if not existing_news:
|
||
# 创建新闻记录
|
||
news = News(
|
||
site_id=site_id,
|
||
title=item['title'],
|
||
content=item.get('summary') or item.get('snippet', ''),
|
||
url=item['url'],
|
||
source_name=item.get('site_name', ''),
|
||
source_icon=item.get('site_icon', ''),
|
||
published_at=item.get('published_at'),
|
||
news_type='Search Result',
|
||
is_active=True
|
||
)
|
||
db.session.add(news)
|
||
saved_count += 1
|
||
|
||
# 提交事务
|
||
db.session.commit()
|
||
|
||
return jsonify({
|
||
'success': True,
|
||
'message': f'成功获取并保存 {saved_count} 条新闻',
|
||
'total_found': len(news_items),
|
||
'saved': saved_count,
|
||
'news_items': searcher.format_news_for_display(news_items)
|
||
})
|
||
|
||
except Exception as e:
|
||
db.session.rollback()
|
||
return jsonify({
|
||
'success': False,
|
||
'message': f'获取失败: {str(e)}'
|
||
}), 500
|
||
|
||
@app.route('/api/fetch-all-news', methods=['POST'])
|
||
@login_required
|
||
def fetch_all_news():
|
||
"""批量为所有网站获取新闻"""
|
||
# 只允许管理员访问
|
||
if not isinstance(current_user, AdminModel):
|
||
return jsonify({'success': False, 'message': '无权访问'}), 403
|
||
|
||
try:
|
||
data = request.get_json()
|
||
count_per_site = data.get('count', 5) # 每个网站获取的新闻数量
|
||
freshness = data.get('freshness', app.config.get('NEWS_SEARCH_FRESHNESS', 'oneMonth'))
|
||
limit = data.get('limit', 10) # 限制处理的网站数量
|
||
|
||
# 检查博查API配置
|
||
api_key = app.config.get('BOCHA_API_KEY')
|
||
if not api_key:
|
||
return jsonify({
|
||
'success': False,
|
||
'message': '博查API未配置,请在.env文件中设置BOCHA_API_KEY'
|
||
}), 500
|
||
|
||
# 获取启用的网站(按更新时间排序,优先处理旧的)
|
||
sites = Site.query.filter_by(is_active=True).order_by(Site.updated_at).limit(limit).all()
|
||
|
||
if not sites:
|
||
return jsonify({
|
||
'success': False,
|
||
'message': '没有可用的网站'
|
||
}), 404
|
||
|
||
# 创建新闻搜索器
|
||
searcher = NewsSearcher(api_key)
|
||
|
||
# 统计信息
|
||
total_saved = 0
|
||
total_found = 0
|
||
processed_sites = []
|
||
|
||
# 为每个网站获取新闻
|
||
for site in sites:
|
||
try:
|
||
# 搜索新闻
|
||
news_items = searcher.search_site_news(
|
||
site_name=site.name,
|
||
site_url=site.url,
|
||
news_keywords=site.news_keywords, # v2.3新增:使用专用关键词
|
||
count=count_per_site,
|
||
freshness=freshness
|
||
)
|
||
|
||
site_saved = 0
|
||
for item in news_items:
|
||
# 检查是否已存在
|
||
existing_news = News.query.filter_by(
|
||
site_id=site.id,
|
||
url=item['url']
|
||
).first()
|
||
|
||
if not existing_news:
|
||
news = News(
|
||
site_id=site.id,
|
||
title=item['title'],
|
||
content=item.get('summary') or item.get('snippet', ''),
|
||
url=item['url'],
|
||
source_name=item.get('site_name', ''),
|
||
source_icon=item.get('site_icon', ''),
|
||
published_at=item.get('published_at'),
|
||
news_type='Search Result',
|
||
is_active=True
|
||
)
|
||
db.session.add(news)
|
||
site_saved += 1
|
||
|
||
total_found += len(news_items)
|
||
total_saved += site_saved
|
||
|
||
processed_sites.append({
|
||
'id': site.id,
|
||
'name': site.name,
|
||
'found': len(news_items),
|
||
'saved': site_saved
|
||
})
|
||
|
||
except Exception as e:
|
||
# 单个网站失败不影响其他网站
|
||
processed_sites.append({
|
||
'id': site.id,
|
||
'name': site.name,
|
||
'error': str(e)
|
||
})
|
||
continue
|
||
|
||
# 提交事务
|
||
db.session.commit()
|
||
|
||
return jsonify({
|
||
'success': True,
|
||
'message': f'批量获取完成,共处理 {len(processed_sites)} 个网站',
|
||
'total_found': total_found,
|
||
'total_saved': total_saved,
|
||
'processed_sites': processed_sites
|
||
})
|
||
|
||
except Exception as e:
|
||
db.session.rollback()
|
||
return jsonify({
|
||
'success': False,
|
||
'message': f'批量获取失败: {str(e)}'
|
||
}), 500
|
||
|
||
# ========== SEO路由 (v2.4新增) ==========
|
||
@app.route('/sitemap.xml')
|
||
def sitemap():
|
||
"""动态生成sitemap.xml"""
|
||
from flask import make_response
|
||
|
||
# 获取所有启用的网站
|
||
sites = Site.query.filter_by(is_active=True).order_by(Site.updated_at.desc()).all()
|
||
|
||
# 获取所有标签
|
||
tags = Tag.query.all()
|
||
|
||
# 构建XML内容
|
||
xml_content = '''<?xml version="1.0" encoding="UTF-8"?>
|
||
<urlset xmlns="http://www.sitemaps.org/schemas/sitemap/0.9">'''
|
||
|
||
# 首页
|
||
xml_content += '''
|
||
<url>
|
||
<loc>{}</loc>
|
||
<changefreq>daily</changefreq>
|
||
<priority>1.0</priority>
|
||
</url>'''.format(request.url_root.rstrip('/'))
|
||
|
||
# 工具详情页
|
||
for site in sites:
|
||
xml_content += '''
|
||
<url>
|
||
<loc>{}</loc>
|
||
<lastmod>{}</lastmod>
|
||
<changefreq>weekly</changefreq>
|
||
<priority>0.8</priority>
|
||
</url>'''.format(
|
||
request.url_root.rstrip('/') + url_for('site_detail', code=site.code),
|
||
site.updated_at.strftime('%Y-%m-%d') if site.updated_at else datetime.now().strftime('%Y-%m-%d')
|
||
)
|
||
|
||
# 标签页
|
||
for tag in tags:
|
||
xml_content += '''
|
||
<url>
|
||
<loc>{}</loc>
|
||
<changefreq>weekly</changefreq>
|
||
<priority>0.6</priority>
|
||
</url>'''.format(request.url_root.rstrip('/') + '/?tag=' + tag.slug)
|
||
|
||
xml_content += '''
|
||
</urlset>'''
|
||
|
||
response = make_response(xml_content)
|
||
response.headers['Content-Type'] = 'application/xml; charset=utf-8'
|
||
return response
|
||
|
||
@app.route('/robots.txt')
|
||
def robots():
|
||
"""动态生成robots.txt"""
|
||
from flask import make_response
|
||
|
||
robots_content = '''User-agent: *
|
||
Allow: /
|
||
Disallow: /admin/
|
||
Disallow: /api/
|
||
|
||
Sitemap: {}sitemap.xml
|
||
'''.format(request.url_root)
|
||
|
||
response = make_response(robots_content)
|
||
response.headers['Content-Type'] = 'text/plain; charset=utf-8'
|
||
return response
|
||
|
||
# ========== SEO工具管理路由 (v2.4新增) ==========
|
||
@app.route('/admin/seo-tools')
|
||
@login_required
|
||
def seo_tools():
|
||
"""SEO工具管理页面"""
|
||
# 只允许管理员访问
|
||
if not isinstance(current_user, AdminModel):
|
||
flash('无权访问此页面', 'error')
|
||
return redirect(url_for('index'))
|
||
|
||
# 检查static/sitemap.xml是否存在及最后更新时间
|
||
sitemap_path = 'static/sitemap.xml'
|
||
sitemap_info = None
|
||
if os.path.exists(sitemap_path):
|
||
import time
|
||
mtime = os.path.getmtime(sitemap_path)
|
||
sitemap_info = {
|
||
'exists': True,
|
||
'last_updated': datetime.fromtimestamp(mtime).strftime('%Y-%m-%d %H:%M:%S'),
|
||
'size': os.path.getsize(sitemap_path)
|
||
}
|
||
else:
|
||
sitemap_info = {'exists': False}
|
||
|
||
return render_template('admin/seo_tools.html', sitemap_info=sitemap_info)
|
||
|
||
@app.route('/api/generate-static-sitemap', methods=['POST'])
|
||
@login_required
|
||
def generate_static_sitemap():
|
||
"""生成静态sitemap.xml文件"""
|
||
# 只允许管理员访问
|
||
if not isinstance(current_user, AdminModel):
|
||
return jsonify({'success': False, 'message': '无权访问'}), 403
|
||
|
||
try:
|
||
# 获取所有启用的网站
|
||
sites = Site.query.filter_by(is_active=True).order_by(Site.updated_at.desc()).all()
|
||
|
||
# 获取所有标签
|
||
tags = Tag.query.all()
|
||
|
||
# 构建XML内容(使用网站配置的域名)
|
||
base_url = request.url_root.rstrip('/')
|
||
|
||
xml_content = '''<?xml version="1.0" encoding="UTF-8"?>
|
||
<urlset xmlns="http://www.sitemaps.org/schemas/sitemap/0.9">'''
|
||
|
||
# 首页
|
||
xml_content += f'''
|
||
<url>
|
||
<loc>{base_url}</loc>
|
||
<changefreq>daily</changefreq>
|
||
<priority>1.0</priority>
|
||
</url>'''
|
||
|
||
# 工具详情页
|
||
for site in sites:
|
||
xml_content += f'''
|
||
<url>
|
||
<loc>{base_url}/site/{site.code}</loc>
|
||
<lastmod>{site.updated_at.strftime('%Y-%m-%d') if site.updated_at else datetime.now().strftime('%Y-%m-%d')}</lastmod>
|
||
<changefreq>weekly</changefreq>
|
||
<priority>0.8</priority>
|
||
</url>'''
|
||
|
||
# 标签页
|
||
for tag in tags:
|
||
xml_content += f'''
|
||
<url>
|
||
<loc>{base_url}/?tag={tag.slug}</loc>
|
||
<changefreq>weekly</changefreq>
|
||
<priority>0.6</priority>
|
||
</url>'''
|
||
|
||
xml_content += '''
|
||
</urlset>'''
|
||
|
||
# 保存到static目录
|
||
static_dir = 'static'
|
||
os.makedirs(static_dir, exist_ok=True)
|
||
sitemap_path = os.path.join(static_dir, 'sitemap.xml')
|
||
|
||
with open(sitemap_path, 'w', encoding='utf-8') as f:
|
||
f.write(xml_content)
|
||
|
||
# 统计信息
|
||
total_urls = 1 + len(sites) + len(tags) # 首页 + 工具页 + 标签页
|
||
|
||
return jsonify({
|
||
'success': True,
|
||
'message': f'静态sitemap.xml生成成功!共包含 {total_urls} 个URL',
|
||
'total_urls': total_urls,
|
||
'file_path': sitemap_path,
|
||
'timestamp': datetime.now().strftime('%Y-%m-%d %H:%M:%S')
|
||
})
|
||
|
||
except Exception as e:
|
||
return jsonify({
|
||
'success': False,
|
||
'message': f'生成失败: {str(e)}'
|
||
}), 500
|
||
|
||
@app.route('/api/notify-search-engines', methods=['POST'])
|
||
@login_required
|
||
def notify_search_engines():
|
||
"""通知搜索引擎sitemap更新"""
|
||
# 只允许管理员访问
|
||
if not isinstance(current_user, AdminModel):
|
||
return jsonify({'success': False, 'message': '无权访问'}), 403
|
||
|
||
try:
|
||
import requests
|
||
from urllib.parse import quote
|
||
|
||
# 获取sitemap URL(使用当前请求的域名)
|
||
sitemap_url = request.url_root.rstrip('/') + '/sitemap.xml'
|
||
encoded_sitemap_url = quote(sitemap_url, safe='')
|
||
|
||
results = []
|
||
|
||
# 1. 通知Google
|
||
google_ping_url = f'http://www.google.com/ping?sitemap={encoded_sitemap_url}'
|
||
try:
|
||
google_response = requests.get(google_ping_url, timeout=10)
|
||
results.append({
|
||
'engine': 'Google',
|
||
'status': 'success' if google_response.status_code == 200 else 'failed',
|
||
'status_code': google_response.status_code,
|
||
'message': '提交成功' if google_response.status_code == 200 else f'HTTP {google_response.status_code}'
|
||
})
|
||
except Exception as e:
|
||
results.append({
|
||
'engine': 'Google',
|
||
'status': 'error',
|
||
'message': f'请求失败: {str(e)}'
|
||
})
|
||
|
||
# 2. 通知Baidu
|
||
baidu_ping_url = f'http://data.zz.baidu.com/ping?sitemap={encoded_sitemap_url}'
|
||
try:
|
||
baidu_response = requests.get(baidu_ping_url, timeout=10)
|
||
results.append({
|
||
'engine': 'Baidu',
|
||
'status': 'success' if baidu_response.status_code == 200 else 'failed',
|
||
'status_code': baidu_response.status_code,
|
||
'message': '提交成功' if baidu_response.status_code == 200 else f'HTTP {baidu_response.status_code}'
|
||
})
|
||
except Exception as e:
|
||
results.append({
|
||
'engine': 'Baidu',
|
||
'status': 'error',
|
||
'message': f'请求失败: {str(e)}'
|
||
})
|
||
|
||
# 3. 通知Bing
|
||
bing_ping_url = f'http://www.bing.com/ping?sitemap={encoded_sitemap_url}'
|
||
try:
|
||
bing_response = requests.get(bing_ping_url, timeout=10)
|
||
results.append({
|
||
'engine': 'Bing',
|
||
'status': 'success' if bing_response.status_code == 200 else 'failed',
|
||
'status_code': bing_response.status_code,
|
||
'message': '提交成功' if bing_response.status_code == 200 else f'HTTP {bing_response.status_code}'
|
||
})
|
||
except Exception as e:
|
||
results.append({
|
||
'engine': 'Bing',
|
||
'status': 'error',
|
||
'message': f'请求失败: {str(e)}'
|
||
})
|
||
|
||
# 统计成功数量
|
||
success_count = sum(1 for r in results if r['status'] == 'success')
|
||
|
||
return jsonify({
|
||
'success': True,
|
||
'message': f'已通知 {success_count}/{len(results)} 个搜索引擎',
|
||
'sitemap_url': sitemap_url,
|
||
'results': results
|
||
})
|
||
|
||
except Exception as e:
|
||
return jsonify({
|
||
'success': False,
|
||
'message': f'通知失败: {str(e)}'
|
||
}), 500
|
||
|
||
@app.route('/api/refresh-site-news/<site_code>', methods=['POST'])
|
||
def refresh_site_news(site_code):
|
||
"""手动刷新指定网站的新闻(前台用户可访问)- v2.3新增"""
|
||
try:
|
||
# 根据code查找网站
|
||
site = Site.query.filter_by(code=site_code).first()
|
||
if not site:
|
||
return jsonify({
|
||
'success': False,
|
||
'message': '网站不存在'
|
||
}), 404
|
||
|
||
# 检查博查API配置
|
||
api_key = app.config.get('BOCHA_API_KEY')
|
||
if not api_key:
|
||
return jsonify({
|
||
'success': False,
|
||
'message': '新闻功能未启用'
|
||
}), 500
|
||
|
||
# 创建新闻搜索器
|
||
searcher = NewsSearcher(api_key)
|
||
|
||
# 搜索新闻(获取最新5条)
|
||
news_items = searcher.search_site_news(
|
||
site_name=site.name,
|
||
site_url=site.url,
|
||
news_keywords=site.news_keywords, # 使用专用关键词
|
||
count=5,
|
||
freshness='oneWeek' # 一周内的新闻
|
||
)
|
||
|
||
if not news_items:
|
||
return jsonify({
|
||
'success': False,
|
||
'message': '未找到相关新闻'
|
||
}), 404
|
||
|
||
# 保存新闻到数据库
|
||
saved_count = 0
|
||
for item in news_items:
|
||
# 检查新闻是否已存在(根据URL判断)
|
||
existing_news = News.query.filter_by(
|
||
site_id=site.id,
|
||
url=item['url']
|
||
).first()
|
||
|
||
if not existing_news:
|
||
news = News(
|
||
site_id=site.id,
|
||
title=item['title'],
|
||
content=item.get('summary') or item.get('snippet', ''),
|
||
url=item['url'],
|
||
source_name=item.get('site_name', ''),
|
||
source_icon=item.get('site_icon', ''),
|
||
published_at=item.get('published_at'),
|
||
news_type='Search Result',
|
||
is_active=True
|
||
)
|
||
db.session.add(news)
|
||
saved_count += 1
|
||
|
||
# 提交事务
|
||
db.session.commit()
|
||
|
||
return jsonify({
|
||
'success': True,
|
||
'message': f'成功获取 {saved_count} 条新资讯',
|
||
'total_found': len(news_items),
|
||
'saved_count': saved_count
|
||
})
|
||
|
||
except Exception as e:
|
||
db.session.rollback()
|
||
return jsonify({
|
||
'success': False,
|
||
'message': f'获取失败: {str(e)}'
|
||
}), 500
|
||
|
||
# ========== 批量导入路由 ==========
|
||
@app.route('/admin/batch-import', methods=['GET', 'POST'])
|
||
@login_required
|
||
def batch_import():
|
||
"""批量导入网站"""
|
||
# 只允许管理员访问
|
||
if not isinstance(current_user, AdminModel):
|
||
flash('无权访问此页面', 'error')
|
||
return redirect(url_for('index'))
|
||
|
||
from utils.bookmark_parser import BookmarkParser
|
||
from utils.website_fetcher import WebsiteFetcher
|
||
|
||
results = None
|
||
|
||
if request.method == 'POST':
|
||
import_type = request.form.get('import_type')
|
||
auto_activate = request.form.get('auto_activate') == 'on'
|
||
|
||
parser = BookmarkParser()
|
||
fetcher = WebsiteFetcher(timeout=15)
|
||
|
||
urls_to_import = []
|
||
|
||
try:
|
||
# 解析输入
|
||
if import_type == 'url_list':
|
||
url_list_text = request.form.get('url_list', '')
|
||
urls_to_import = parser.parse_url_list(url_list_text)
|
||
|
||
elif import_type == 'bookmark_file':
|
||
bookmark_file = request.files.get('bookmark_file')
|
||
if not bookmark_file:
|
||
flash('请选择书签文件', 'error')
|
||
return render_template('admin/batch_import.html')
|
||
|
||
html_content = bookmark_file.read().decode('utf-8', errors='ignore')
|
||
all_bookmarks = parser.parse_html_file(html_content)
|
||
|
||
# 筛选文件夹(如果指定)
|
||
folder_filter = request.form.get('folder_filter', '').strip()
|
||
if folder_filter:
|
||
urls_to_import = [
|
||
b for b in all_bookmarks
|
||
if folder_filter.lower() in b.get('folder', '').lower()
|
||
]
|
||
else:
|
||
urls_to_import = all_bookmarks
|
||
|
||
# 批量导入
|
||
success_list = []
|
||
failed_list = []
|
||
|
||
for idx, item in enumerate(urls_to_import, 1):
|
||
url = item['url']
|
||
name = item.get('name', '')
|
||
|
||
# 为每个URL创建独立的事务
|
||
try:
|
||
# 1. 检查URL是否已存在
|
||
try:
|
||
existing = Site.query.filter_by(url=url).first()
|
||
if existing:
|
||
failed_list.append({
|
||
'url': url,
|
||
'name': name or existing.name,
|
||
'error': f'该URL已存在(网站名称:{existing.name})'
|
||
})
|
||
continue
|
||
except Exception as e:
|
||
failed_list.append({
|
||
'url': url,
|
||
'name': name,
|
||
'error': f'检查URL时出错: {str(e)}'
|
||
})
|
||
continue
|
||
|
||
# 2. 抓取网站信息(带超时和错误处理)
|
||
info = None
|
||
try:
|
||
info = fetcher.fetch_website_info(url)
|
||
except Exception as e:
|
||
print(f"抓取 {url} 失败: {str(e)}")
|
||
# 抓取失败不是致命错误,继续尝试使用书签名称
|
||
|
||
# 3. 处理网站信息
|
||
if not info or not info.get('name'):
|
||
# 如果有书签名称,使用书签名称
|
||
if name:
|
||
info = {
|
||
'name': name,
|
||
'description': '',
|
||
'logo_url': ''
|
||
}
|
||
else:
|
||
# 尝试从URL提取域名作为名称
|
||
from urllib.parse import urlparse
|
||
try:
|
||
parsed = urlparse(url)
|
||
domain = parsed.netloc or parsed.path
|
||
if domain:
|
||
info = {
|
||
'name': domain,
|
||
'description': '',
|
||
'logo_url': ''
|
||
}
|
||
else:
|
||
failed_list.append({
|
||
'url': url,
|
||
'name': name,
|
||
'error': '无法获取网站信息且没有备用名称'
|
||
})
|
||
continue
|
||
except Exception:
|
||
failed_list.append({
|
||
'url': url,
|
||
'name': name,
|
||
'error': '无法获取网站信息且URL解析失败'
|
||
})
|
||
continue
|
||
|
||
# 4. 下载Logo到本地(失败不影响导入)
|
||
logo_path = None
|
||
if info.get('logo_url'):
|
||
try:
|
||
logo_path = fetcher.download_logo(info['logo_url'], save_dir='static/logos')
|
||
except Exception as e:
|
||
print(f"下载Logo失败 ({url}): {str(e)}")
|
||
# Logo下载失败不影响网站导入
|
||
|
||
# 5. 生成code和slug
|
||
try:
|
||
import random
|
||
from pypinyin import lazy_pinyin
|
||
import re
|
||
|
||
# 生成唯一的code
|
||
site_code = None
|
||
max_attempts = 10
|
||
for _ in range(max_attempts):
|
||
code = str(random.randint(10000000, 99999999))
|
||
if not Site.query.filter_by(code=code).first():
|
||
site_code = code
|
||
break
|
||
|
||
if not site_code:
|
||
# 如果10次都失败,使用时间戳
|
||
import time
|
||
site_code = str(int(time.time() * 1000))[-8:]
|
||
|
||
# 生成slug
|
||
site_name = info.get('name', name or 'Unknown')[:100]
|
||
slug = ''.join(lazy_pinyin(site_name))
|
||
slug = slug.lower()
|
||
slug = re.sub(r'[^\w\s-]', '', slug)
|
||
slug = re.sub(r'[-\s]+', '-', slug).strip('-')
|
||
if not slug:
|
||
slug = f"site-{site_code}"
|
||
|
||
# 确保slug唯一
|
||
base_slug = slug[:50] # 限制长度
|
||
counter = 1
|
||
final_slug = slug
|
||
while Site.query.filter_by(slug=final_slug).first():
|
||
final_slug = f"{base_slug}-{counter}"
|
||
counter += 1
|
||
if counter > 100: # 防止无限循环
|
||
final_slug = f"{base_slug}-{site_code}"
|
||
break
|
||
|
||
# 6. 创建网站记录(带code和slug)
|
||
site = Site(
|
||
code=site_code,
|
||
slug=final_slug,
|
||
name=site_name,
|
||
url=url[:500], # 限制URL长度
|
||
logo=logo_path or info.get('logo_url', '')[:500] if info.get('logo_url') else '',
|
||
short_desc=info.get('description', '')[:200] if info.get('description') else '',
|
||
description=info.get('description', '')[:2000] if info.get('description') else '',
|
||
is_active=auto_activate
|
||
)
|
||
|
||
# 添加到数据库并提交
|
||
db.session.add(site)
|
||
db.session.commit()
|
||
|
||
success_list.append({
|
||
'name': site.name,
|
||
'url': site.url
|
||
})
|
||
|
||
print(f"成功导入 [{idx}/{len(urls_to_import)}]: {site.name}")
|
||
|
||
except Exception as e:
|
||
db.session.rollback()
|
||
failed_list.append({
|
||
'url': url,
|
||
'name': name or info.get('name', 'Unknown'),
|
||
'error': f'数据库保存失败: {str(e)}'
|
||
})
|
||
continue
|
||
|
||
except Exception as e:
|
||
# 捕获所有未预期的错误
|
||
db.session.rollback()
|
||
failed_list.append({
|
||
'url': url,
|
||
'name': name,
|
||
'error': f'未知错误: {str(e)}'
|
||
})
|
||
print(f"导入 {url} 时发生未知错误: {str(e)}")
|
||
continue
|
||
|
||
results = {
|
||
'total_count': len(urls_to_import),
|
||
'success_count': len(success_list),
|
||
'failed_count': len(failed_list),
|
||
'success_list': success_list,
|
||
'failed_list': failed_list
|
||
}
|
||
|
||
if success_list:
|
||
flash(f'成功导入 {len(success_list)} 个网站!', 'success')
|
||
|
||
except Exception as e:
|
||
flash(f'导入失败: {str(e)}', 'error')
|
||
|
||
return render_template('admin/batch_import.html', results=results)
|
||
|
||
# ========== 用户管理路由 ==========
|
||
@app.route('/admin/users')
|
||
@login_required
|
||
def admin_users():
|
||
"""用户管理列表页"""
|
||
if not isinstance(current_user, AdminModel):
|
||
flash('无权访问', 'error')
|
||
return redirect(url_for('index'))
|
||
|
||
# 获取分页参数
|
||
page = request.args.get('page', 1, type=int)
|
||
per_page = 20
|
||
|
||
# 获取搜索参数
|
||
search = request.args.get('search', '').strip()
|
||
|
||
# 构建查询
|
||
query = User.query
|
||
|
||
if search:
|
||
query = query.filter(
|
||
db.or_(
|
||
User.username.like(f'%{search}%'),
|
||
User.email.like(f'%{search}%')
|
||
)
|
||
)
|
||
|
||
# 排序:按注册时间倒序
|
||
query = query.order_by(User.created_at.desc())
|
||
|
||
# 分页
|
||
pagination = query.paginate(page=page, per_page=per_page, error_out=False)
|
||
users = pagination.items
|
||
|
||
# 为每个用户统计收藏数据
|
||
user_stats = {}
|
||
for user in users:
|
||
user_stats[user.id] = {
|
||
'collections_count': Collection.query.filter_by(user_id=user.id).count(),
|
||
'folders_count': Folder.query.filter_by(user_id=user.id).count()
|
||
}
|
||
|
||
# 获取真实的 admin 实例
|
||
from flask import current_app
|
||
admin_instance = current_app.extensions['admin'][0]
|
||
|
||
# 创建模拟的 admin_view 对象
|
||
class MockAdminView:
|
||
name = '用户管理'
|
||
category = None
|
||
admin = admin_instance
|
||
|
||
return render_template('admin/users/list.html',
|
||
users=users,
|
||
pagination=pagination,
|
||
user_stats=user_stats,
|
||
search=search,
|
||
admin_view=MockAdminView())
|
||
|
||
@app.route('/admin/users/<int:user_id>')
|
||
@login_required
|
||
def admin_user_detail(user_id):
|
||
"""用户详情页"""
|
||
if not isinstance(current_user, AdminModel):
|
||
flash('无权访问', 'error')
|
||
return redirect(url_for('index'))
|
||
|
||
user = User.query.get_or_404(user_id)
|
||
|
||
# 统计数据
|
||
collections_count = Collection.query.filter_by(user_id=user.id).count()
|
||
folders_count = Folder.query.filter_by(user_id=user.id).count()
|
||
|
||
# 获取最近的收藏(前10条)
|
||
recent_collections = Collection.query.filter_by(user_id=user.id)\
|
||
.order_by(Collection.created_at.desc())\
|
||
.limit(10).all()
|
||
|
||
# 获取所有文件夹
|
||
folders = Folder.query.filter_by(user_id=user.id)\
|
||
.order_by(Folder.sort_order.desc(), Folder.created_at.desc())\
|
||
.all()
|
||
|
||
# 获取真实的 admin 实例
|
||
from flask import current_app
|
||
admin_instance = current_app.extensions['admin'][0]
|
||
|
||
# 创建模拟的 admin_view 对象
|
||
class MockAdminView:
|
||
name = '用户详情'
|
||
category = None
|
||
admin = admin_instance
|
||
|
||
return render_template('admin/users/detail.html',
|
||
user=user,
|
||
collections_count=collections_count,
|
||
folders_count=folders_count,
|
||
recent_collections=recent_collections,
|
||
folders=folders,
|
||
admin_view=MockAdminView())
|
||
|
||
@app.route('/api/admin/users/<int:user_id>/reset-password', methods=['POST'])
|
||
@login_required
|
||
def admin_reset_user_password(user_id):
|
||
"""管理员重置用户密码"""
|
||
if not isinstance(current_user, AdminModel):
|
||
return jsonify({'success': False, 'message': '无权操作'}), 403
|
||
|
||
user = User.query.get_or_404(user_id)
|
||
data = request.get_json()
|
||
new_password = data.get('new_password', '').strip()
|
||
|
||
# 验证新密码
|
||
if not new_password:
|
||
return jsonify({'success': False, 'message': '新密码不能为空'}), 400
|
||
|
||
if len(new_password) < 6:
|
||
return jsonify({'success': False, 'message': '新密码至少需要6位'}), 400
|
||
|
||
try:
|
||
# 设置新密码
|
||
user.set_password(new_password)
|
||
db.session.commit()
|
||
|
||
return jsonify({
|
||
'success': True,
|
||
'message': f'已成功重置用户 {user.username} 的密码'
|
||
})
|
||
except Exception as e:
|
||
db.session.rollback()
|
||
return jsonify({'success': False, 'message': f'重置密码失败: {str(e)}'}), 500
|
||
|
||
@app.route('/api/admin/users/<int:user_id>/update-username', methods=['POST'])
|
||
@login_required
|
||
def admin_update_username(user_id):
|
||
"""管理员修改用户昵称"""
|
||
if not isinstance(current_user, AdminModel):
|
||
return jsonify({'success': False, 'message': '无权操作'}), 403
|
||
|
||
user = User.query.get_or_404(user_id)
|
||
data = request.get_json()
|
||
new_username = data.get('new_username', '').strip()
|
||
|
||
# 验证新昵称
|
||
if not new_username:
|
||
return jsonify({'success': False, 'message': '昵称不能为空'}), 400
|
||
|
||
if len(new_username) < 2 or len(new_username) > 50:
|
||
return jsonify({'success': False, 'message': '昵称长度需要在2-50个字符之间'}), 400
|
||
|
||
# 检查昵称是否已被使用
|
||
existing_user = User.query.filter_by(username=new_username).first()
|
||
if existing_user and existing_user.id != user.id:
|
||
return jsonify({'success': False, 'message': '该昵称已被使用'}), 400
|
||
|
||
try:
|
||
old_username = user.username
|
||
user.username = new_username
|
||
db.session.commit()
|
||
|
||
return jsonify({
|
||
'success': True,
|
||
'message': f'已成功将昵称从 {old_username} 修改为 {new_username}'
|
||
})
|
||
except Exception as e:
|
||
db.session.rollback()
|
||
return jsonify({'success': False, 'message': f'修改昵称失败: {str(e)}'}), 500
|
||
|
||
# ========== Flask-Admin 配置 ==========
|
||
class SecureModelView(ModelView):
|
||
"""需要登录的模型视图"""
|
||
# 中文化配置
|
||
can_set_page_size = True
|
||
page_size = 20
|
||
|
||
# 自定义文本
|
||
list_template = 'admin/model/list.html'
|
||
create_template = 'admin/model/create.html'
|
||
edit_template = 'admin/model/edit.html'
|
||
|
||
# 覆盖英文文本
|
||
named_filter_urls = True
|
||
|
||
def is_accessible(self):
|
||
# 只允许Admin类型的用户访问
|
||
return current_user.is_authenticated and isinstance(current_user, AdminModel)
|
||
|
||
def inaccessible_callback(self, name, **kwargs):
|
||
return redirect(url_for('admin_login'))
|
||
|
||
class SecureAdminIndexView(AdminIndexView):
|
||
"""需要登录的管理首页"""
|
||
def is_accessible(self):
|
||
# 只允许Admin类型的用户访问
|
||
return current_user.is_authenticated and isinstance(current_user, AdminModel)
|
||
|
||
def inaccessible_callback(self, name, **kwargs):
|
||
return redirect(url_for('admin_login'))
|
||
|
||
@expose('/')
|
||
def index(self):
|
||
"""控制台首页,显示统计信息"""
|
||
|
||
# 优化查询:减少数据库往返次数
|
||
# 1. 获取 sites_count 和 total_views
|
||
site_stats = db.session.query(
|
||
db.func.count(Site.id).label('total'),
|
||
db.func.sum(Site.view_count).label('total_views')
|
||
).first()
|
||
|
||
# 2. 获取 active sites count
|
||
sites_count = Site.query.filter_by(is_active=True).count()
|
||
|
||
# 3. 获取 tags_count
|
||
tags_count = Tag.query.count()
|
||
|
||
# 4. 获取 news_count
|
||
news_count = News.query.filter_by(is_active=True).count()
|
||
|
||
# 5. 获取 users_count
|
||
users_count = User.query.count()
|
||
|
||
# 组装统计数据
|
||
stats = {
|
||
'sites_count': sites_count,
|
||
'tags_count': tags_count,
|
||
'news_count': news_count,
|
||
'total_views': site_stats.total_views or 0,
|
||
'users_count': users_count
|
||
}
|
||
|
||
# 最近添加的工具(最多5个)- 只查询必要字段
|
||
recent_sites = db.session.query(
|
||
Site.id, Site.name, Site.url, Site.logo,
|
||
Site.is_active, Site.view_count, Site.created_at
|
||
).order_by(Site.created_at.desc()).limit(5).all()
|
||
|
||
return self.render('admin/index.html', stats=stats, recent_sites=recent_sites)
|
||
|
||
# 网站管理视图
|
||
class SiteAdmin(SecureModelView):
|
||
# 自定义模板
|
||
create_template = 'admin/site/create.html'
|
||
edit_template = 'admin/site/edit.html'
|
||
|
||
# 启用编辑和删除
|
||
can_edit = True
|
||
can_delete = True
|
||
can_create = True
|
||
can_view_details = False # 禁用查看详情,点击名称即可查看
|
||
|
||
# 显示操作列
|
||
column_display_actions = True
|
||
action_disallowed_list = []
|
||
|
||
column_list = ['id', 'code', 'name', 'url', 'slug', 'is_active', 'is_recommended', 'view_count', 'created_at']
|
||
column_searchable_list = ['code', 'name', 'url', 'description']
|
||
column_filters = ['is_active', 'is_recommended', 'tags']
|
||
column_labels = {
|
||
'id': 'ID',
|
||
'code': '网站编码',
|
||
'name': '网站名称',
|
||
'url': 'URL',
|
||
'slug': 'URL别名',
|
||
'logo': 'Logo',
|
||
'short_desc': '简短描述',
|
||
'description': '详细介绍',
|
||
'features': '主要功能',
|
||
'news_keywords': '新闻关键词',
|
||
'is_active': '是否启用',
|
||
'is_recommended': '是否推荐',
|
||
'view_count': '浏览次数',
|
||
'sort_order': '排序权重',
|
||
'tags': '标签',
|
||
'created_at': '创建时间',
|
||
'updated_at': '更新时间'
|
||
}
|
||
form_columns = ['name', 'url', 'slug', 'logo', 'short_desc', 'description', 'features', 'news_keywords', 'tags', 'is_active', 'is_recommended', 'sort_order']
|
||
|
||
# 自定义编辑/删除文字
|
||
column_extra_row_actions = None
|
||
|
||
def on_model_change(self, form, model, is_created):
|
||
"""保存前自动生成code和slug(如果为空)"""
|
||
import re
|
||
import random
|
||
from pypinyin import lazy_pinyin
|
||
from flask import request
|
||
|
||
# 使用no_autoflush防止在查询时触发提前flush
|
||
with db.session.no_autoflush:
|
||
# 处理手动输入的新标签
|
||
new_tags_str = request.form.get('new_tags', '')
|
||
if new_tags_str:
|
||
new_tag_names = [name.strip() for name in new_tags_str.split(',') if name.strip()]
|
||
for tag_name in new_tag_names:
|
||
# 检查标签是否已存在
|
||
existing_tag = Tag.query.filter_by(name=tag_name).first()
|
||
if not existing_tag:
|
||
# 创建新标签
|
||
tag_slug = ''.join(lazy_pinyin(tag_name))
|
||
tag_slug = tag_slug.lower()
|
||
tag_slug = re.sub(r'[^\w\s-]', '', tag_slug)
|
||
tag_slug = re.sub(r'[-\s]+', '-', tag_slug).strip('-')
|
||
|
||
# 确保slug唯一
|
||
base_tag_slug = tag_slug[:50]
|
||
counter = 1
|
||
final_tag_slug = tag_slug
|
||
while Tag.query.filter_by(slug=final_tag_slug).first():
|
||
final_tag_slug = f"{base_tag_slug}-{counter}"
|
||
counter += 1
|
||
if counter > 100:
|
||
final_tag_slug = f"{base_tag_slug}-{random.randint(1000, 9999)}"
|
||
break
|
||
|
||
new_tag = Tag(name=tag_name, slug=final_tag_slug)
|
||
db.session.add(new_tag)
|
||
db.session.flush() # 确保新标签有ID
|
||
|
||
# 添加到模型的标签列表
|
||
if new_tag not in model.tags:
|
||
model.tags.append(new_tag)
|
||
else:
|
||
# 添加已存在的标签
|
||
if existing_tag not in model.tags:
|
||
model.tags.append(existing_tag)
|
||
|
||
# 如果code为空,自动生成唯一的8位数字编码
|
||
if not model.code or model.code.strip() == '':
|
||
max_attempts = 10
|
||
for attempt in range(max_attempts):
|
||
# 生成10000000-99999999之间的随机数
|
||
code = str(random.randint(10000000, 99999999))
|
||
# 检查是否已存在
|
||
existing = Site.query.filter(Site.code == code).first()
|
||
if not existing or existing.id == model.id:
|
||
model.code = code
|
||
break
|
||
|
||
# 如果10次都失败,使用时间戳
|
||
if not model.code:
|
||
import time
|
||
model.code = str(int(time.time() * 1000))[-8:]
|
||
|
||
# 如果slug为空,从name自动生成
|
||
if not model.slug or model.slug.strip() == '':
|
||
# 将中文转换为拼音
|
||
slug = ''.join(lazy_pinyin(model.name))
|
||
# 转换为小写,移除特殊字符
|
||
slug = slug.lower()
|
||
slug = re.sub(r'[^\w\s-]', '', slug)
|
||
slug = re.sub(r'[-\s]+', '-', slug).strip('-')
|
||
|
||
# 如果转换后为空,使用code
|
||
if not slug:
|
||
slug = f"site-{model.code}"
|
||
|
||
# 确保slug唯一(限制长度和重试次数)
|
||
base_slug = slug[:50]
|
||
counter = 1
|
||
final_slug = slug
|
||
max_slug_attempts = 100
|
||
|
||
while counter < max_slug_attempts:
|
||
existing = Site.query.filter(Site.slug == final_slug).first()
|
||
if not existing or existing.id == model.id:
|
||
break
|
||
final_slug = f"{base_slug}-{counter}"
|
||
counter += 1
|
||
|
||
# 如果100次都失败,使用code确保唯一
|
||
if counter >= max_slug_attempts:
|
||
final_slug = f"{base_slug}-{model.code}"
|
||
|
||
model.slug = final_slug
|
||
|
||
# 标签管理视图
|
||
class TagAdmin(SecureModelView):
|
||
can_edit = True
|
||
can_delete = True
|
||
can_create = True
|
||
can_view_details = False
|
||
|
||
# 显示操作列
|
||
column_display_actions = True
|
||
|
||
column_list = ['id', 'name', 'slug', 'description', 'sort_order']
|
||
column_searchable_list = ['name', 'description']
|
||
column_labels = {
|
||
'id': 'ID',
|
||
'name': '标签名称',
|
||
'slug': 'URL别名',
|
||
'description': '标签描述',
|
||
'seo_title': 'SEO标题 (v2.4)',
|
||
'seo_description': 'SEO描述 (v2.4)',
|
||
'seo_keywords': 'SEO关键词 (v2.4)',
|
||
'icon': '图标',
|
||
'sort_order': '排序权重',
|
||
'created_at': '创建时间'
|
||
}
|
||
form_columns = ['name', 'slug', 'description', 'seo_title', 'seo_description', 'seo_keywords', 'icon', 'sort_order']
|
||
|
||
# 管理员视图
|
||
class AdminAdmin(SecureModelView):
|
||
can_edit = True
|
||
can_delete = True
|
||
can_create = True
|
||
can_view_details = False
|
||
|
||
# 显示操作列
|
||
column_display_actions = True
|
||
|
||
column_list = ['id', 'username', 'email', 'is_active', 'last_login', 'created_at']
|
||
column_searchable_list = ['username', 'email']
|
||
column_filters = ['is_active']
|
||
column_labels = {
|
||
'id': 'ID',
|
||
'username': '用户名',
|
||
'email': '邮箱',
|
||
'is_active': '是否启用',
|
||
'created_at': '创建时间',
|
||
'last_login': '最后登录'
|
||
}
|
||
form_columns = ['username', 'email', 'is_active']
|
||
|
||
def on_model_change(self, form, model, is_created):
|
||
# 如果是新建管理员,设置默认密码
|
||
if is_created:
|
||
model.set_password('admin123') # 默认密码
|
||
|
||
# 新闻管理视图
|
||
class NewsAdmin(SecureModelView):
|
||
can_edit = True
|
||
can_delete = True
|
||
can_create = True
|
||
can_view_details = False
|
||
|
||
# 显示操作列
|
||
column_display_actions = True
|
||
|
||
column_list = ['id', 'site', 'title', 'source_name', 'news_type', 'published_at', 'is_active']
|
||
column_searchable_list = ['title', 'content', 'source_name']
|
||
column_filters = ['site', 'news_type', 'source_name', 'is_active', 'published_at']
|
||
column_labels = {
|
||
'id': 'ID',
|
||
'site': '关联网站',
|
||
'title': '新闻标题',
|
||
'content': '新闻内容',
|
||
'news_type': '新闻类型',
|
||
'url': '新闻链接',
|
||
'source_name': '来源网站',
|
||
'source_icon': '来源图标',
|
||
'published_at': '发布时间',
|
||
'is_active': '是否启用',
|
||
'created_at': '创建时间',
|
||
'updated_at': '更新时间'
|
||
}
|
||
form_columns = ['site', 'title', 'content', 'news_type', 'url', 'source_name', 'source_icon', 'published_at', 'is_active']
|
||
|
||
# 可选的新闻类型
|
||
form_choices = {
|
||
'news_type': [
|
||
('Search Result', 'Search Result'),
|
||
('Product Update', 'Product Update'),
|
||
('Industry News', 'Industry News'),
|
||
('Company News', 'Company News'),
|
||
('Other', 'Other')
|
||
]
|
||
}
|
||
|
||
# 默认排序
|
||
column_default_sort = ('published_at', True) # 按发布时间倒序排列
|
||
|
||
def get_query(self):
|
||
"""优化查询:使用joinedload避免N+1问题"""
|
||
return super().get_query().options(
|
||
db.orm.joinedload(News.site)
|
||
)
|
||
|
||
# Prompt模板管理视图
|
||
class PromptAdmin(SecureModelView):
|
||
can_edit = True
|
||
can_delete = False # 不允许删除,避免系统必需的prompt被删除
|
||
can_create = True
|
||
can_view_details = False
|
||
|
||
# 显示操作列
|
||
column_display_actions = True
|
||
|
||
column_list = ['id', 'key', 'name', 'description', 'is_active', 'updated_at']
|
||
column_searchable_list = ['key', 'name', 'description']
|
||
column_filters = ['is_active', 'key']
|
||
column_labels = {
|
||
'id': 'ID',
|
||
'key': '唯一标识',
|
||
'name': '模板名称',
|
||
'system_prompt': '系统提示词',
|
||
'user_prompt_template': '用户提示词模板',
|
||
'description': '模板说明',
|
||
'is_active': '是否启用',
|
||
'created_at': '创建时间',
|
||
'updated_at': '更新时间'
|
||
}
|
||
form_columns = ['key', 'name', 'description', 'system_prompt', 'user_prompt_template', 'is_active']
|
||
|
||
# 字段说明
|
||
column_descriptions = {
|
||
'key': '唯一标识,如: tags, features, description',
|
||
'system_prompt': 'AI的系统角色设定',
|
||
'user_prompt_template': '用户提示词模板,支持变量如 {name}, {description}, {url}',
|
||
}
|
||
|
||
# 表单字段配置
|
||
form_widget_args = {
|
||
'system_prompt': {
|
||
'rows': 3,
|
||
'style': 'font-family: monospace;'
|
||
},
|
||
'user_prompt_template': {
|
||
'rows': 20,
|
||
'style': 'font-family: monospace;'
|
||
}
|
||
}
|
||
|
||
# 初始化 Flask-Admin
|
||
admin = Admin(
|
||
app,
|
||
name='ZJPB 焦提示词',
|
||
template_mode='bootstrap4',
|
||
index_view=SecureAdminIndexView(name='控制台', url='/admin'),
|
||
base_template='admin/master.html'
|
||
)
|
||
|
||
admin.add_view(SiteAdmin(Site, db.session, name='网站管理'))
|
||
admin.add_view(TagAdmin(Tag, db.session, name='标签管理'))
|
||
admin.add_view(NewsAdmin(News, db.session, name='新闻管理'))
|
||
admin.add_view(PromptAdmin(PromptTemplate, db.session, name='Prompt管理'))
|
||
admin.add_view(AdminAdmin(AdminModel, db.session, name='管理员', endpoint='admin_users'))
|
||
|
||
return app
|
||
|
||
if __name__ == '__main__':
|
||
app = create_app(os.getenv('FLASK_ENV', 'development'))
|
||
app.run(host='0.0.0.0', port=5000, debug=True)
|