Files
zjpb.net/app.py
Jowe 2067fb1712 feat: v3.0 - 用户系统和收藏功能
核心功能:
- 用户注册/登录系统(用户名+密码)
- 工具收藏功能(一键收藏/取消收藏)
- 收藏分组管理(文件夹)
- 用户中心(个人资料、收藏列表)

数据库变更:
- 新增 users 表(用户信息)
- 新增 folders 表(收藏分组)
- 新增 collections 表(收藏记录)

安全增强:
- Admin 和 User 完全隔离
- 修复14个管理员路由的权限漏洞
- 所有管理功能添加用户类型检查

新增文件:
- templates/auth/register.html - 注册页面
- templates/auth/login.html - 登录页面
- templates/user/profile.html - 用户中心
- templates/user/collections.html - 收藏列表
- create_user_tables.py - 数据库迁移脚本
- USER_SYSTEM_README.md - 用户系统文档
- CHANGELOG_v3.0.md - 版本更新日志

Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
2026-02-06 19:19:05 +08:00

2623 lines
99 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
import os
import markdown
import random
import string
from io import BytesIO
from flask import Flask, render_template, redirect, url_for, request, flash, jsonify, session, send_file
from flask_login import LoginManager, login_user, logout_user, login_required, current_user
from flask_admin import Admin, AdminIndexView, expose
from flask_admin.contrib.sqla import ModelView
from datetime import datetime
from config import config
from models import db, Site, Tag, Admin as AdminModel, News, site_tags, PromptTemplate, User, Folder, Collection
from utils.website_fetcher import WebsiteFetcher
from utils.tag_generator import TagGenerator
from utils.news_searcher import NewsSearcher
from PIL import Image, ImageDraw, ImageFont
def create_app(config_name='default'):
"""应用工厂函数"""
app = Flask(__name__)
# 加载配置
app.config.from_object(config[config_name])
# 初始化数据库
db.init_app(app)
# 添加Markdown过滤器
@app.template_filter('markdown')
def markdown_filter(text):
"""将Markdown文本转换为HTML"""
if not text:
return ''
return markdown.markdown(text, extensions=['nl2br', 'fenced_code'])
# v2.4新增: 自动内链过滤器
@app.template_filter('auto_link')
def auto_link_filter(text, current_site_id=None):
"""自动为内容中的工具名称添加链接"""
if not text:
return ''
# 获取所有启用的网站(排除当前网站)
sites = Site.query.filter_by(is_active=True).all()
if current_site_id:
sites = [s for s in sites if s.id != current_site_id]
# 按名称长度降序排序,优先匹配长名称
sites = sorted(sites, key=lambda s: len(s.name), reverse=True)
# 记录已经添加链接的位置,避免重复
linked_sites = set()
for site in sites:
if site.name in text and site.name not in linked_sites:
# 只链接第一次出现的位置
link = f'<a href="/site/{site.code}" title="{site.short_desc or site.name}" style="color: var(--primary-blue); text-decoration: underline; text-decoration-style: dotted;">{site.name}</a>'
text = text.replace(site.name, link, 1)
linked_sites.add(site.name)
return text
# 初始化登录管理
login_manager = LoginManager()
login_manager.init_app(app)
login_manager.login_view = 'admin_login'
login_manager.login_message = '请先登录'
@login_manager.user_loader
def load_user(user_id):
"""加载用户支持Admin和User两种类型"""
try:
user_type, uid = user_id.split(':', 1)
if user_type == 'admin':
return AdminModel.query.get(int(uid))
elif user_type == 'user':
return User.query.get(int(uid))
except (ValueError, AttributeError):
# 兼容旧格式纯数字ID默认为Admin
return AdminModel.query.get(int(user_id))
return None
# ========== 前台路由 ==========
@app.route('/')
def index():
"""首页"""
# 获取所有启用的标签
tags = Tag.query.order_by(Tag.sort_order.desc(), Tag.id).all()
# 优化使用一次SQL查询统计所有标签的网站数量
tag_counts = {}
if tags:
# 使用JOIN查询一次性获取所有标签的网站数量
from sqlalchemy import func
counts_query = db.session.query(
site_tags.c.tag_id,
func.count(site_tags.c.site_id).label('count')
).join(
Site, site_tags.c.site_id == Site.id
).filter(
Site.is_active == True
).group_by(site_tags.c.tag_id).all()
tag_counts = {tag_id: count for tag_id, count in counts_query}
# 获取筛选参数
tag_slug = request.args.get('tag')
search_query = request.args.get('q', '').strip()
current_tab = request.args.get('tab', 'latest') # 默认为"最新"
page = request.args.get('page', 1, type=int)
per_page = 100 # 每页显示100个站点
selected_tag = None
# 构建基础查询
query = Site.query.filter_by(is_active=True)
# 标签筛选
if tag_slug:
selected_tag = Tag.query.filter_by(slug=tag_slug).first()
if selected_tag:
query = query.filter(Site.tags.contains(selected_tag))
else:
sites = []
pagination = None
return render_template('index_new.html', sites=sites, tags=tags,
selected_tag=selected_tag, search_query=search_query,
pagination=pagination, tag_counts=tag_counts,
current_tab=current_tab)
# 搜索功能
if search_query:
# 使用OR条件搜索网站名称、URL、描述
search_pattern = f'%{search_query}%'
query = query.filter(
db.or_(
Site.name.like(search_pattern),
Site.url.like(search_pattern),
Site.description.like(search_pattern),
Site.short_desc.like(search_pattern)
)
)
# Tab筛选和排序
if current_tab == 'popular':
# 热门:按浏览次数倒序
query = query.order_by(Site.view_count.desc(), Site.id.desc())
elif current_tab == 'recommended':
# 推荐只显示is_recommended=True的
query = query.filter_by(is_recommended=True).order_by(Site.sort_order.desc(), Site.id.desc())
else:
# 最新:按创建时间倒序(默认)
query = query.order_by(Site.created_at.desc(), Site.id.desc())
# 分页
pagination = query.paginate(page=page, per_page=per_page, error_out=False)
sites = pagination.items
return render_template('index_new.html', sites=sites, tags=tags,
selected_tag=selected_tag, search_query=search_query,
pagination=pagination, tag_counts=tag_counts,
current_tab=current_tab)
@app.route('/site/<code>')
def site_detail(code):
"""网站详情页"""
site = Site.query.filter_by(code=code, is_active=True).first_or_404()
# 增加浏览次数
site.view_count += 1
db.session.commit()
# v2.6优化移除自动调用博查API的逻辑改为按需加载
# 只获取数据库中已有的新闻不再自动调用API
# 获取该网站的相关新闻最多显示5条
news_list = News.query.filter_by(
site_id=site.id,
is_active=True
).order_by(News.published_at.desc()).limit(5).all()
# 检查是否有新闻,如果没有则标记需要加载
has_news = len(news_list) > 0
# 获取同类工具推荐通过标签匹配最多显示4个
recommended_sites = []
if site.tags:
# 获取有相同标签的其他网站
recommended_sites = Site.query.filter(
Site.id != site.id,
Site.is_active == True,
Site.tags.any(Tag.id.in_([tag.id for tag in site.tags]))
).order_by(Site.view_count.desc()).limit(4).all()
return render_template('detail_new.html', site=site, news_list=news_list, has_news=has_news, recommended_sites=recommended_sites)
# ========== 验证码相关 (v2.6.1优化) ==========
def generate_captcha_image(text):
"""生成验证码图片"""
# 创建图片 (宽120, 高40)
width, height = 120, 40
image = Image.new('RGB', (width, height), color=(255, 255, 255))
draw = ImageDraw.Draw(image)
# 使用默认字体
try:
font = ImageFont.truetype("arial.ttf", 28)
except:
font = ImageFont.load_default()
# 添加随机干扰线
for i in range(3):
x1 = random.randint(0, width)
y1 = random.randint(0, height)
x2 = random.randint(0, width)
y2 = random.randint(0, height)
draw.line([(x1, y1), (x2, y2)], fill=(200, 200, 200), width=1)
# 绘制验证码文字
for i, char in enumerate(text):
x = 10 + i * 25
y = random.randint(5, 12)
color = (random.randint(0, 100), random.randint(0, 100), random.randint(0, 100))
draw.text((x, y), char, font=font, fill=color)
# 添加随机噪点
for _ in range(100):
x = random.randint(0, width - 1)
y = random.randint(0, height - 1)
draw.point((x, y), fill=(random.randint(150, 200), random.randint(150, 200), random.randint(150, 200)))
return image
@app.route('/api/captcha', methods=['GET'])
def get_captcha():
"""生成验证码图片"""
# 生成4位随机验证码
captcha_text = ''.join(random.choices(string.ascii_uppercase + string.digits, k=4))
# 存储到session
session['captcha'] = captcha_text
session['captcha_created'] = datetime.now().timestamp()
# 生成图片
image = generate_captcha_image(captcha_text)
# 转换为字节流
img_io = BytesIO()
image.save(img_io, 'PNG')
img_io.seek(0)
return send_file(img_io, mimetype='image/png')
# ========== 新闻相关API (v2.6优化) ==========
@app.route('/api/fetch-news/<code>', methods=['POST'])
def fetch_news_for_site(code):
"""
按需获取指定网站的新闻
v2.6.1优化所有手动请求都需要验证码防止API滥用
"""
try:
# 获取请求数据
data = request.get_json() or {}
captcha_input = data.get('captcha', '').strip().upper()
# 验证验证码
session_captcha = session.get('captcha', '').upper()
captcha_created = session.get('captcha_created', 0)
# 检查验证码是否存在
if not session_captcha:
return jsonify({
'success': False,
'error': '请先获取验证码'
}), 400
# 检查验证码是否过期5分钟
if datetime.now().timestamp() - captcha_created > 300:
session.pop('captcha', None)
session.pop('captcha_created', None)
return jsonify({
'success': False,
'error': '验证码已过期,请刷新后重试'
}), 400
# 检查验证码是否正确
if captcha_input != session_captcha:
return jsonify({
'success': False,
'error': '验证码错误,请重新输入'
}), 400
# 验证通过,清除验证码
session.pop('captcha', None)
session.pop('captcha_created', None)
# 查找网站
site = Site.query.filter_by(code=code, is_active=True).first_or_404()
# 检查博查API配置
api_key = app.config.get('BOCHA_API_KEY')
if not api_key:
return jsonify({'success': False, 'error': '博查API未配置'}), 500
# 创建新闻搜索器
searcher = NewsSearcher(api_key)
# 获取新闻限制5条一周内的
news_items = searcher.search_site_news(
site_name=site.name,
site_url=site.url,
news_keywords=site.news_keywords,
count=5,
freshness='oneWeek'
)
# 保存新闻到数据库
new_count = 0
if news_items:
for item in news_items:
# 检查是否已存在根据URL去重
existing = News.query.filter_by(
site_id=site.id,
url=item['url']
).first()
if not existing:
news = News(
site_id=site.id,
title=item['title'],
content=item.get('summary') or item.get('snippet', ''),
url=item['url'],
source_name=item.get('site_name', ''),
source_icon=item.get('site_icon', ''),
published_at=item.get('published_at'),
news_type='Search Result',
is_active=True
)
db.session.add(news)
new_count += 1
db.session.commit()
# 返回最新的新闻列表
news_list = News.query.filter_by(
site_id=site.id,
is_active=True
).order_by(News.published_at.desc()).limit(5).all()
# 格式化新闻数据
news_data = []
for news in news_list:
news_data.append({
'title': news.title,
'content': news.content[:200] if news.content else '',
'url': news.url,
'source_name': news.source_name,
'source_icon': news.source_icon,
'published_at': news.published_at.strftime('%Y年%m月%d') if news.published_at else '未知日期',
'news_type': news.news_type
})
return jsonify({
'success': True,
'news': news_data,
'new_count': new_count,
'message': f'成功获取{new_count}条新资讯' if new_count > 0 else '暂无新资讯'
})
except Exception as e:
print(f"获取新闻失败:{str(e)}")
import traceback
traceback.print_exc()
db.session.rollback()
return jsonify({'success': False, 'error': str(e)}), 500
# ========== 社媒营销路由 (v2.5新增) ==========
@app.route('/api/generate-social-share', methods=['POST'])
def generate_social_share():
"""一键生成社媒分享文案(前台可访问)"""
try:
data = request.get_json() or {}
site_code = (data.get('site_code') or '').strip()
platforms = data.get('platforms') or []
if not site_code:
return jsonify({'success': False, 'message': '请提供网站编码'}), 400
site = Site.query.filter_by(code=site_code, is_active=True).first()
if not site:
return jsonify({'success': False, 'message': '网站不存在或未启用'}), 404
# 允许的平台列表(前端也会限制,这里再做一次兜底)
allowed_platforms = {
'universal',
'xiaohongshu',
'douyin',
'bilibili',
'wechat',
'moments',
'x',
'linkedin'
}
if not isinstance(platforms, list):
platforms = []
platforms = [p for p in platforms if isinstance(p, str)]
platforms = [p.strip().lower() for p in platforms if p.strip()]
platforms = [p for p in platforms if p in allowed_platforms]
# 默认生成通用模板
if not platforms:
platforms = ['universal']
# 组织站点信息
site_info = {
'name': site.name,
'url': site.url,
'short_desc': site.short_desc or '',
'description': (site.description or '')[:1200],
'features': (site.features or '')[:1200],
'tags': [t.name for t in (site.tags or [])]
}
# 尝试使用PromptTemplate + DeepSeek生成
from utils.tag_generator import TagGenerator
generator = TagGenerator()
prompt = PromptTemplate.query.filter_by(key='social_share', is_active=True).first()
generated = {}
if prompt and generator.client:
# 构造提示词变量
tags_text = ','.join(site_info['tags'])
user_prompt = prompt.user_prompt_template.format(
name=site_info['name'],
url=site_info['url'],
short_desc=site_info['short_desc'],
description=site_info['description'],
features=site_info['features'],
tags=tags_text,
platforms=','.join(platforms)
)
try:
resp = generator.client.chat.completions.create(
model='deepseek-chat',
messages=[
{'role': 'system', 'content': prompt.system_prompt},
{'role': 'user', 'content': user_prompt}
],
temperature=0.7,
max_tokens=1200
)
text = (resp.choices[0].message.content or '').strip()
# 约定模型返回JSON字符串若不是JSON则回退到纯文本放到universal
import json as _json
try:
parsed = _json.loads(text)
if isinstance(parsed, dict):
for k, v in parsed.items():
if isinstance(k, str) and isinstance(v, str) and k in allowed_platforms:
generated[k] = v.strip()
except Exception:
generated['universal'] = text
except Exception as e:
# AI失败不影响整体回退模板
print(f"社媒文案AI生成失败{str(e)}")
# 回退:规则模板(根据各平台字数限制优化)
def build_fallback(p):
name = site_info['name']
url = site_info['url']
short_desc = site_info['short_desc'] or ''
full_desc = site_info['description'] or ''
features = site_info['features'] or ''
tags = site_info['tags'][:6]
tags_line = ' ' + ' '.join([f"#{t}" for t in tags]) if tags else ''
# X (Twitter) - 280字符限制简洁为主
if p == 'x':
desc = (short_desc or full_desc)[:100]
base = f"🔥 {name}\n\n{desc}\n\n🔗 {url}{tags_line}"
return base[:280]
# LinkedIn - 3000字限制专业风格
if p == 'linkedin':
desc = (full_desc or short_desc)[:500]
content = f"💡 推荐一个实用工具:{name}\n\n📝 简介:\n{desc}\n\n"
if features:
features_list = features.split('\n')[:3]
content += f"✨ 主要功能:\n" + '\n'.join([f"{f.strip()}" for f in features_list if f.strip()][:3]) + "\n\n"
content += f"🔗 官网:{url}\n\n"
if tags:
content += f"📌 适用场景:{', '.join(tags)}"
return content[:3000]
# 小红书 - 2000字限制图文风格
if p == 'xiaohongshu':
desc = (short_desc or full_desc)[:300]
content = f"{name} | 我最近在用的宝藏工具\n\n"
content += f"📌 一句话介绍:\n{desc}\n\n"
if features:
features_list = features.split('\n')[:5]
content += f"🎯 核心功能:\n" + '\n'.join([f"{f.strip()}" for f in features_list if f.strip()][:5]) + "\n\n"
content += f"🔗 体验入口:{url}\n\n{tags_line}"
return content[:2000]
# 抖音 - 2000字限制短视频文案风格
if p == 'douyin':
desc = (short_desc or full_desc)[:200]
content = f"🔥 发现一个超好用的工具:{name}\n\n"
content += f"💡 {desc}\n\n"
if features:
features_list = features.split('\n')[:3]
content += "✨ 亮点功能:\n" + '\n'.join([f"👉 {f.strip()}" for f in features_list if f.strip()][:3]) + "\n\n"
content += f"🔗 想试试戳:{url}\n\n{tags_line}"
return content[:2000]
# B站 - 20000字限制专栏风格
if p == 'bilibili':
desc = (full_desc or short_desc)[:800]
content = f"【分享】{name} - 值得一试的优质工具\n\n"
content += f"## 📖 工具简介\n{desc}\n\n"
if features:
content += f"## ✨ 主要功能\n{features}\n\n"
content += f"## 🔗 体验地址\n{url}\n\n"
if tags:
content += f"## 🏷️ 相关标签\n{' / '.join(tags)}"
return content[:20000]
# 微信公众号 - 基本无限制,正式风格
if p == 'wechat':
desc = (full_desc or short_desc)[:600]
content = f"今天给大家分享一个实用工具:{name}\n\n"
content += f"📝 工具介绍\n{desc}\n\n"
if features:
content += f"✨ 核心功能\n{features}\n\n"
content += f"🔗 官方网站\n{url}\n\n"
if tags:
content += f"如果你也在关注「{' · '.join(tags)}」相关的工具,不妨收藏一下这个实用的选择。"
return content
# 朋友圈 - 简短为主
if p == 'moments':
desc = (short_desc or full_desc)[:80]
return f"💡 {name}\n{desc}\n🔗 {url}{tags_line}".strip()
# 通用模板
desc = (full_desc or short_desc)[:300]
return f"{name}\n\n{desc}\n\n{url}{tags_line}".strip()
results = {}
for p in platforms:
results[p] = generated.get(p) or build_fallback(p)
# 统一补充一个通用版本
if 'universal' not in results:
results['universal'] = generated.get('universal') or build_fallback('universal')
return jsonify({
'success': True,
'site': {
'code': site.code,
'name': site.name,
'url': site.url
},
'platforms': platforms,
'content': results
})
except Exception as e:
return jsonify({'success': False, 'message': f'生成失败: {str(e)}'}), 500
# ========== 后台登录路由 ==========
@app.route('/admin/login', methods=['GET', 'POST'])
def admin_login():
"""管理员登录"""
if current_user.is_authenticated:
return redirect(url_for('admin.index'))
if request.method == 'POST':
username = request.form.get('username')
password = request.form.get('password')
admin = AdminModel.query.filter_by(username=username).first()
if admin and admin.check_password(password) and admin.is_active:
login_user(admin)
admin.last_login = datetime.now()
db.session.commit()
return redirect(url_for('admin.index'))
else:
flash('用户名或密码错误', 'error')
return render_template('admin_login.html')
@app.route('/admin/logout')
@login_required
def admin_logout():
"""管理员登出"""
logout_user()
return redirect(url_for('index'))
# ========== 用户认证路由 ==========
@app.route('/register', methods=['GET', 'POST'])
def user_register():
"""用户注册"""
if current_user.is_authenticated:
# 如果已登录,跳转到首页
return redirect(url_for('index'))
if request.method == 'POST':
username = request.form.get('username', '').strip()
password = request.form.get('password', '').strip()
confirm_password = request.form.get('confirm_password', '').strip()
# 验证输入
if not username or not password:
flash('用户名和密码不能为空', 'error')
return render_template('auth/register.html')
if len(username) < 3:
flash('用户名至少3个字符', 'error')
return render_template('auth/register.html')
if len(password) < 6:
flash('密码至少6个字符', 'error')
return render_template('auth/register.html')
if password != confirm_password:
flash('两次输入的密码不一致', 'error')
return render_template('auth/register.html')
# 检查用户名是否已存在
if User.query.filter_by(username=username).first():
flash('该用户名已被注册', 'error')
return render_template('auth/register.html')
# 创建用户
try:
user = User(username=username)
user.set_password(password)
db.session.add(user)
db.session.commit()
# 自动登录
login_user(user)
user.last_login = datetime.now()
db.session.commit()
flash('注册成功!', 'success')
return redirect(url_for('index'))
except Exception as e:
db.session.rollback()
flash(f'注册失败:{str(e)}', 'error')
return render_template('auth/register.html')
return render_template('auth/register.html')
@app.route('/login', methods=['GET', 'POST'])
def user_login():
"""用户登录"""
if current_user.is_authenticated:
return redirect(url_for('index'))
if request.method == 'POST':
username = request.form.get('username', '').strip()
password = request.form.get('password', '').strip()
if not username or not password:
flash('请输入用户名和密码', 'error')
return render_template('auth/login.html')
# 查找用户
user = User.query.filter_by(username=username).first()
if user and user.check_password(password) and user.is_active:
login_user(user)
user.last_login = datetime.now()
db.session.commit()
# 获取next参数如果有则跳转否则跳转首页
next_page = request.args.get('next')
if next_page and next_page.startswith('/'):
return redirect(next_page)
return redirect(url_for('index'))
else:
flash('用户名或密码错误', 'error')
return render_template('auth/login.html')
@app.route('/logout')
@login_required
def user_logout():
"""用户登出"""
logout_user()
return redirect(url_for('index'))
# ========== 用户认证API ==========
@app.route('/api/auth/status', methods=['GET'])
def auth_status():
"""获取登录状态"""
if current_user.is_authenticated:
# 判断是Admin还是User
if isinstance(current_user, User):
return jsonify({
'logged_in': True,
'user_type': 'user',
'username': current_user.username,
'avatar': current_user.avatar,
'id': current_user.id
})
else:
return jsonify({
'logged_in': True,
'user_type': 'admin',
'username': current_user.username,
'id': current_user.id
})
return jsonify({'logged_in': False})
# ========== 收藏功能API ==========
@app.route('/api/collections/toggle', methods=['POST'])
@login_required
def toggle_collection():
"""收藏/取消收藏"""
# 检查是否为普通用户
if not isinstance(current_user, User):
return jsonify({'success': False, 'message': '管理员账号无法使用收藏功能'}), 403
try:
data = request.get_json() or {}
site_code = data.get('site_code', '').strip()
folder_id = data.get('folder_id') # 可选,指定文件夹
if not site_code:
return jsonify({'success': False, 'message': '请提供网站编码'}), 400
# 查找网站
site = Site.query.filter_by(code=site_code, is_active=True).first()
if not site:
return jsonify({'success': False, 'message': '网站不存在'}), 404
# 检查是否已收藏
existing = Collection.query.filter_by(
user_id=current_user.id,
site_id=site.id
).first()
if existing:
# 已收藏,则取消收藏
db.session.delete(existing)
db.session.commit()
return jsonify({
'success': True,
'action': 'removed',
'message': '已取消收藏'
})
else:
# 未收藏,则添加收藏
collection = Collection(
user_id=current_user.id,
site_id=site.id,
folder_id=folder_id
)
db.session.add(collection)
db.session.commit()
return jsonify({
'success': True,
'action': 'added',
'message': '收藏成功',
'collection_id': collection.id
})
except Exception as e:
db.session.rollback()
return jsonify({'success': False, 'message': f'操作失败:{str(e)}'}), 500
@app.route('/api/collections/status/<site_code>', methods=['GET'])
@login_required
def collection_status(site_code):
"""查询收藏状态"""
if not isinstance(current_user, User):
return jsonify({'is_collected': False})
try:
site = Site.query.filter_by(code=site_code, is_active=True).first()
if not site:
return jsonify({'is_collected': False})
collection = Collection.query.filter_by(
user_id=current_user.id,
site_id=site.id
).first()
return jsonify({
'is_collected': collection is not None,
'collection_id': collection.id if collection else None,
'folder_id': collection.folder_id if collection else None
})
except Exception as e:
return jsonify({'is_collected': False, 'error': str(e)})
@app.route('/api/collections/list', methods=['GET'])
@login_required
def list_collections():
"""获取收藏列表"""
if not isinstance(current_user, User):
return jsonify({'success': False, 'message': '仅普通用户可访问'}), 403
try:
folder_id = request.args.get('folder_id')
page = request.args.get('page', 1, type=int)
per_page = request.args.get('per_page', 20, type=int)
# 构建查询
query = Collection.query.filter_by(user_id=current_user.id)
if folder_id:
if folder_id == 'null' or folder_id == 'none':
query = query.filter_by(folder_id=None)
else:
query = query.filter_by(folder_id=int(folder_id))
# 按创建时间倒序
query = query.order_by(Collection.created_at.desc())
# 分页
pagination = query.paginate(page=page, per_page=per_page, error_out=False)
# 格式化数据
collections_data = []
for collection in pagination.items:
site = collection.site
collections_data.append({
'id': collection.id,
'site_id': site.id,
'site_code': site.code,
'site_name': site.name,
'site_url': site.url,
'site_logo': site.logo,
'site_short_desc': site.short_desc,
'folder_id': collection.folder_id,
'note': collection.note,
'created_at': collection.created_at.strftime('%Y-%m-%d %H:%M:%S') if collection.created_at else None
})
return jsonify({
'success': True,
'collections': collections_data,
'total': pagination.total,
'page': page,
'per_page': per_page,
'has_next': pagination.has_next,
'has_prev': pagination.has_prev
})
except Exception as e:
return jsonify({'success': False, 'message': f'获取失败:{str(e)}'}), 500
@app.route('/api/collections/<int:collection_id>/note', methods=['PUT'])
@login_required
def update_collection_note(collection_id):
"""更新收藏备注"""
if not isinstance(current_user, User):
return jsonify({'success': False, 'message': '仅普通用户可访问'}), 403
try:
data = request.get_json() or {}
note = data.get('note', '').strip()
collection = Collection.query.filter_by(
id=collection_id,
user_id=current_user.id
).first()
if not collection:
return jsonify({'success': False, 'message': '收藏不存在'}), 404
collection.note = note
db.session.commit()
return jsonify({
'success': True,
'message': '备注已更新'
})
except Exception as e:
db.session.rollback()
return jsonify({'success': False, 'message': f'更新失败:{str(e)}'}), 500
@app.route('/api/collections/<int:collection_id>/move', methods=['PUT'])
@login_required
def move_collection(collection_id):
"""移动收藏到其他文件夹"""
if not isinstance(current_user, User):
return jsonify({'success': False, 'message': '仅普通用户可访问'}), 403
try:
data = request.get_json() or {}
folder_id = data.get('folder_id') # None表示移到未分类
collection = Collection.query.filter_by(
id=collection_id,
user_id=current_user.id
).first()
if not collection:
return jsonify({'success': False, 'message': '收藏不存在'}), 404
# 如果指定了文件夹,验证文件夹是否属于当前用户
if folder_id:
folder = Folder.query.filter_by(
id=folder_id,
user_id=current_user.id
).first()
if not folder:
return jsonify({'success': False, 'message': '文件夹不存在'}), 404
collection.folder_id = folder_id
db.session.commit()
return jsonify({
'success': True,
'message': '已移动到指定文件夹'
})
except Exception as e:
db.session.rollback()
return jsonify({'success': False, 'message': f'移动失败:{str(e)}'}), 500
# ========== 文件夹管理API ==========
@app.route('/api/folders', methods=['GET'])
@login_required
def list_folders():
"""获取文件夹列表"""
if not isinstance(current_user, User):
return jsonify({'success': False, 'message': '仅普通用户可访问'}), 403
try:
folders = Folder.query.filter_by(user_id=current_user.id).order_by(
Folder.sort_order.desc(), Folder.created_at
).all()
folders_data = []
for folder in folders:
# 统计文件夹中的收藏数量
count = Collection.query.filter_by(
user_id=current_user.id,
folder_id=folder.id
).count()
folders_data.append({
'id': folder.id,
'name': folder.name,
'description': folder.description,
'icon': folder.icon,
'sort_order': folder.sort_order,
'is_public': folder.is_public,
'public_slug': folder.public_slug,
'count': count,
'created_at': folder.created_at.strftime('%Y-%m-%d %H:%M:%S') if folder.created_at else None
})
return jsonify({
'success': True,
'folders': folders_data
})
except Exception as e:
return jsonify({'success': False, 'message': f'获取失败:{str(e)}'}), 500
@app.route('/api/folders', methods=['POST'])
@login_required
def create_folder():
"""创建文件夹"""
if not isinstance(current_user, User):
return jsonify({'success': False, 'message': '仅普通用户可访问'}), 403
try:
data = request.get_json() or {}
name = data.get('name', '').strip()
description = data.get('description', '').strip()
icon = data.get('icon', '📁').strip()
if not name:
return jsonify({'success': False, 'message': '文件夹名称不能为空'}), 400
# 检查同名文件夹
existing = Folder.query.filter_by(
user_id=current_user.id,
name=name
).first()
if existing:
return jsonify({'success': False, 'message': '该文件夹名称已存在'}), 400
# 创建文件夹
folder = Folder(
user_id=current_user.id,
name=name,
description=description,
icon=icon
)
db.session.add(folder)
db.session.commit()
return jsonify({
'success': True,
'message': '文件夹创建成功',
'folder': {
'id': folder.id,
'name': folder.name,
'description': folder.description,
'icon': folder.icon
}
})
except Exception as e:
db.session.rollback()
return jsonify({'success': False, 'message': f'创建失败:{str(e)}'}), 500
@app.route('/api/folders/<int:folder_id>', methods=['PUT'])
@login_required
def update_folder(folder_id):
"""更新文件夹"""
if not isinstance(current_user, User):
return jsonify({'success': False, 'message': '仅普通用户可访问'}), 403
try:
data = request.get_json() or {}
folder = Folder.query.filter_by(
id=folder_id,
user_id=current_user.id
).first()
if not folder:
return jsonify({'success': False, 'message': '文件夹不存在'}), 404
# 更新字段
if 'name' in data:
name = data['name'].strip()
if not name:
return jsonify({'success': False, 'message': '文件夹名称不能为空'}), 400
# 检查同名(排除自己)
existing = Folder.query.filter(
Folder.user_id == current_user.id,
Folder.name == name,
Folder.id != folder_id
).first()
if existing:
return jsonify({'success': False, 'message': '该文件夹名称已存在'}), 400
folder.name = name
if 'description' in data:
folder.description = data['description'].strip()
if 'icon' in data:
folder.icon = data['icon'].strip()
if 'sort_order' in data:
folder.sort_order = int(data['sort_order'])
db.session.commit()
return jsonify({
'success': True,
'message': '文件夹已更新'
})
except Exception as e:
db.session.rollback()
return jsonify({'success': False, 'message': f'更新失败:{str(e)}'}), 500
@app.route('/api/folders/<int:folder_id>', methods=['DELETE'])
@login_required
def delete_folder(folder_id):
"""删除文件夹"""
if not isinstance(current_user, User):
return jsonify({'success': False, 'message': '仅普通用户可访问'}), 403
try:
folder = Folder.query.filter_by(
id=folder_id,
user_id=current_user.id
).first()
if not folder:
return jsonify({'success': False, 'message': '文件夹不存在'}), 404
# 删除文件夹(级联删除收藏记录)
db.session.delete(folder)
db.session.commit()
return jsonify({
'success': True,
'message': '文件夹已删除'
})
except Exception as e:
db.session.rollback()
return jsonify({'success': False, 'message': f'删除失败:{str(e)}'}), 500
# ========== 用户中心页面路由 ==========
@app.route('/user/profile')
@login_required
def user_profile():
"""用户中心主页"""
if not isinstance(current_user, User):
flash('仅普通用户可访问', 'error')
return redirect(url_for('index'))
# 统计信息
collections_count = Collection.query.filter_by(user_id=current_user.id).count()
folders_count = Folder.query.filter_by(user_id=current_user.id).count()
# 最近收藏5条
recent_collections = Collection.query.filter_by(
user_id=current_user.id
).order_by(Collection.created_at.desc()).limit(5).all()
return render_template('user/profile.html',
collections_count=collections_count,
folders_count=folders_count,
recent_collections=recent_collections)
@app.route('/user/collections')
@login_required
def user_collections():
"""收藏列表页面"""
if not isinstance(current_user, User):
flash('仅普通用户可访问', 'error')
return redirect(url_for('index'))
# 获取所有文件夹
folders = Folder.query.filter_by(user_id=current_user.id).order_by(
Folder.sort_order.desc(), Folder.created_at
).all()
# 获取收藏(分页)
page = request.args.get('page', 1, type=int)
folder_id = request.args.get('folder_id')
query = Collection.query.filter_by(user_id=current_user.id)
if folder_id:
if folder_id == 'none':
query = query.filter_by(folder_id=None)
else:
query = query.filter_by(folder_id=int(folder_id))
query = query.order_by(Collection.created_at.desc())
pagination = query.paginate(page=page, per_page=20, error_out=False)
return render_template('user/collections.html',
folders=folders,
collections=pagination.items,
pagination=pagination,
current_folder_id=folder_id)
@app.route('/api/user/profile', methods=['PUT'])
@login_required
def update_user_profile():
"""更新用户资料"""
if not isinstance(current_user, User):
return jsonify({'success': False, 'message': '仅普通用户可访问'}), 403
try:
data = request.get_json() or {}
if 'bio' in data:
current_user.bio = data['bio'].strip()
if 'avatar' in data:
current_user.avatar = data['avatar'].strip()
if 'is_public_profile' in data:
current_user.is_public_profile = bool(data['is_public_profile'])
db.session.commit()
return jsonify({
'success': True,
'message': '资料已更新'
})
except Exception as e:
db.session.rollback()
return jsonify({'success': False, 'message': f'更新失败:{str(e)}'}), 500
@app.route('/admin/change-password', methods=['GET', 'POST'])
@login_required
def change_password():
"""修改密码"""
# 只允许管理员访问
if not isinstance(current_user, AdminModel):
flash('无权访问此页面', 'error')
return redirect(url_for('index'))
if request.method == 'POST':
old_password = request.form.get('old_password', '').strip()
new_password = request.form.get('new_password', '').strip()
confirm_password = request.form.get('confirm_password', '').strip()
# 验证旧密码
if not current_user.check_password(old_password):
flash('旧密码错误', 'error')
return render_template('admin/change_password.html')
# 验证新密码
if not new_password:
flash('新密码不能为空', 'error')
return render_template('admin/change_password.html')
if len(new_password) < 6:
flash('新密码长度至少6位', 'error')
return render_template('admin/change_password.html')
if new_password != confirm_password:
flash('两次输入的新密码不一致', 'error')
return render_template('admin/change_password.html')
if old_password == new_password:
flash('新密码不能与旧密码相同', 'error')
return render_template('admin/change_password.html')
# 修改密码
try:
current_user.set_password(new_password)
db.session.commit()
flash('密码修改成功,请重新登录', 'success')
logout_user()
return redirect(url_for('admin_login'))
except Exception as e:
db.session.rollback()
flash(f'密码修改失败:{str(e)}', 'error')
return render_template('admin/change_password.html')
return render_template('admin/change_password.html')
# ========== API路由 ==========
@app.route('/api/fetch-website-info', methods=['POST'])
@login_required
def fetch_website_info():
"""抓取网站信息API"""
# 只允许管理员访问
if not isinstance(current_user, AdminModel):
return jsonify({'success': False, 'message': '无权访问'}), 403
try:
data = request.get_json()
url = data.get('url', '').strip()
if not url:
return jsonify({
'success': False,
'message': '请提供网站URL'
}), 400
# 创建抓取器
fetcher = WebsiteFetcher(timeout=15)
# 抓取网站信息
info = fetcher.fetch_website_info(url)
if not info:
return jsonify({
'success': False,
'message': '无法获取网站信息请检查URL是否正确或手动填写'
})
# 下载Logo到本地如果有
logo_path = None
if info.get('logo_url'):
logo_path = fetcher.download_logo(info['logo_url'], save_dir='static/logos')
# 如果下载失败不返回远程URL让用户手动上传
return jsonify({
'success': True,
'data': {
'name': info.get('name', ''),
'description': info.get('description', ''),
'logo': logo_path if logo_path else ''
}
})
except Exception as e:
return jsonify({
'success': False,
'message': f'抓取失败: {str(e)}'
}), 500
@app.route('/api/upload-logo', methods=['POST'])
@login_required
def upload_logo():
"""上传Logo图片API"""
# 只允许管理员访问
if not isinstance(current_user, AdminModel):
return jsonify({'success': False, 'message': '无权访问'}), 403
try:
# 检查文件是否存在
if 'logo' not in request.files:
return jsonify({
'success': False,
'message': '请选择要上传的图片'
}), 400
file = request.files['logo']
# 检查文件名
if file.filename == '':
return jsonify({
'success': False,
'message': '未选择文件'
}), 400
# 检查文件类型
allowed_extensions = {'png', 'jpg', 'jpeg', 'gif', 'svg', 'ico', 'webp'}
filename = file.filename.lower()
if not any(filename.endswith('.' + ext) for ext in allowed_extensions):
return jsonify({
'success': False,
'message': '不支持的文件格式,请上传图片文件(png, jpg, gif, svg, ico, webp)'
}), 400
# 创建保存目录
save_dir = 'static/logos'
os.makedirs(save_dir, exist_ok=True)
# 生成安全的文件名
import time
import hashlib
ext = os.path.splitext(filename)[1]
timestamp = str(int(time.time() * 1000))
hash_name = hashlib.md5(f"{filename}{timestamp}".encode()).hexdigest()[:16]
safe_filename = f"logo_{hash_name}{ext}"
filepath = os.path.join(save_dir, safe_filename)
# 保存文件
file.save(filepath)
# 返回相对路径
return jsonify({
'success': True,
'path': f'/{filepath.replace(os.sep, "/")}'
})
except Exception as e:
return jsonify({
'success': False,
'message': f'上传失败: {str(e)}'
}), 500
@app.route('/api/generate-features', methods=['POST'])
@login_required
def generate_features():
"""使用DeepSeek自动生成网站主要功能"""
# 只允许管理员访问
if not isinstance(current_user, AdminModel):
return jsonify({'success': False, 'message': '无权访问'}), 403
try:
data = request.get_json()
name = data.get('name', '').strip()
description = data.get('description', '').strip()
url = data.get('url', '').strip()
if not name or not description:
return jsonify({
'success': False,
'message': '请提供网站名称和描述'
}), 400
# 生成功能列表
generator = TagGenerator()
features = generator.generate_features(name, description, url)
if not features:
return jsonify({
'success': False,
'message': 'DeepSeek功能生成失败请检查API配置'
}), 500
return jsonify({
'success': True,
'features': features
})
except ValueError as e:
return jsonify({
'success': False,
'message': str(e)
}), 400
except Exception as e:
return jsonify({
'success': False,
'message': f'生成失败: {str(e)}'
}), 500
@app.route('/api/generate-description', methods=['POST'])
@login_required
def generate_description():
"""使用DeepSeek自动生成网站详细介绍"""
# 只允许管理员访问
if not isinstance(current_user, AdminModel):
return jsonify({'success': False, 'message': '无权访问'}), 403
try:
data = request.get_json()
name = data.get('name', '').strip()
short_desc = data.get('short_desc', '').strip()
url = data.get('url', '').strip()
if not name:
return jsonify({
'success': False,
'message': '请提供网站名称'
}), 400
# 生成详细介绍
generator = TagGenerator()
description = generator.generate_description(name, short_desc, url)
if not description:
return jsonify({
'success': False,
'message': 'DeepSeek详细介绍生成失败请检查API配置'
}), 500
return jsonify({
'success': True,
'description': description
})
except ValueError as e:
return jsonify({
'success': False,
'message': str(e)
}), 400
except Exception as e:
return jsonify({
'success': False,
'message': f'生成失败: {str(e)}'
}), 500
@app.route('/api/generate-tags', methods=['POST'])
@login_required
def generate_tags():
"""使用DeepSeek自动生成标签"""
# 只允许管理员访问
if not isinstance(current_user, AdminModel):
return jsonify({'success': False, 'message': '无权访问'}), 403
try:
data = request.get_json()
name = data.get('name', '').strip()
description = data.get('description', '').strip()
if not name or not description:
return jsonify({
'success': False,
'message': '请提供网站名称和描述'
}), 400
# 获取现有标签作为参考
existing_tags = [tag.name for tag in Tag.query.all()]
# 生成标签
generator = TagGenerator()
suggested_tags = generator.generate_tags(name, description, existing_tags)
if not suggested_tags:
return jsonify({
'success': False,
'message': 'DeepSeek标签生成失败请检查API配置'
}), 500
return jsonify({
'success': True,
'tags': suggested_tags
})
except ValueError as e:
return jsonify({
'success': False,
'message': str(e)
}), 400
except Exception as e:
return jsonify({
'success': False,
'message': f'生成失败: {str(e)}'
}), 500
# ========== 新闻获取路由 ==========
@app.route('/api/fetch-site-news', methods=['POST'])
@login_required
def fetch_site_news():
"""为指定网站获取最新新闻"""
# 只允许管理员访问
if not isinstance(current_user, AdminModel):
return jsonify({'success': False, 'message': '无权访问'}), 403
try:
data = request.get_json()
site_id = data.get('site_id')
count = data.get('count', app.config.get('NEWS_SEARCH_COUNT', 10))
freshness = data.get('freshness', app.config.get('NEWS_SEARCH_FRESHNESS', 'oneMonth'))
if not site_id:
return jsonify({
'success': False,
'message': '请提供网站ID'
}), 400
# 获取网站信息
site = Site.query.get(site_id)
if not site:
return jsonify({
'success': False,
'message': '网站不存在'
}), 404
# 检查博查API配置
api_key = app.config.get('BOCHA_API_KEY')
if not api_key:
return jsonify({
'success': False,
'message': '博查API未配置请在.env文件中设置BOCHA_API_KEY'
}), 500
# 创建新闻搜索器
searcher = NewsSearcher(api_key)
# 搜索新闻
news_items = searcher.search_site_news(
site_name=site.name,
site_url=site.url,
news_keywords=site.news_keywords, # v2.3新增:使用专用关键词
count=count,
freshness=freshness
)
if not news_items:
return jsonify({
'success': False,
'message': '未找到相关新闻'
}), 404
# 保存新闻到数据库
saved_count = 0
for item in news_items:
# 检查新闻是否已存在根据URL判断
existing_news = News.query.filter_by(
site_id=site_id,
url=item['url']
).first()
if not existing_news:
# 创建新闻记录
news = News(
site_id=site_id,
title=item['title'],
content=item.get('summary') or item.get('snippet', ''),
url=item['url'],
source_name=item.get('site_name', ''),
source_icon=item.get('site_icon', ''),
published_at=item.get('published_at'),
news_type='Search Result',
is_active=True
)
db.session.add(news)
saved_count += 1
# 提交事务
db.session.commit()
return jsonify({
'success': True,
'message': f'成功获取并保存 {saved_count} 条新闻',
'total_found': len(news_items),
'saved': saved_count,
'news_items': searcher.format_news_for_display(news_items)
})
except Exception as e:
db.session.rollback()
return jsonify({
'success': False,
'message': f'获取失败: {str(e)}'
}), 500
@app.route('/api/fetch-all-news', methods=['POST'])
@login_required
def fetch_all_news():
"""批量为所有网站获取新闻"""
# 只允许管理员访问
if not isinstance(current_user, AdminModel):
return jsonify({'success': False, 'message': '无权访问'}), 403
try:
data = request.get_json()
count_per_site = data.get('count', 5) # 每个网站获取的新闻数量
freshness = data.get('freshness', app.config.get('NEWS_SEARCH_FRESHNESS', 'oneMonth'))
limit = data.get('limit', 10) # 限制处理的网站数量
# 检查博查API配置
api_key = app.config.get('BOCHA_API_KEY')
if not api_key:
return jsonify({
'success': False,
'message': '博查API未配置请在.env文件中设置BOCHA_API_KEY'
}), 500
# 获取启用的网站(按更新时间排序,优先处理旧的)
sites = Site.query.filter_by(is_active=True).order_by(Site.updated_at).limit(limit).all()
if not sites:
return jsonify({
'success': False,
'message': '没有可用的网站'
}), 404
# 创建新闻搜索器
searcher = NewsSearcher(api_key)
# 统计信息
total_saved = 0
total_found = 0
processed_sites = []
# 为每个网站获取新闻
for site in sites:
try:
# 搜索新闻
news_items = searcher.search_site_news(
site_name=site.name,
site_url=site.url,
news_keywords=site.news_keywords, # v2.3新增:使用专用关键词
count=count_per_site,
freshness=freshness
)
site_saved = 0
for item in news_items:
# 检查是否已存在
existing_news = News.query.filter_by(
site_id=site.id,
url=item['url']
).first()
if not existing_news:
news = News(
site_id=site.id,
title=item['title'],
content=item.get('summary') or item.get('snippet', ''),
url=item['url'],
source_name=item.get('site_name', ''),
source_icon=item.get('site_icon', ''),
published_at=item.get('published_at'),
news_type='Search Result',
is_active=True
)
db.session.add(news)
site_saved += 1
total_found += len(news_items)
total_saved += site_saved
processed_sites.append({
'id': site.id,
'name': site.name,
'found': len(news_items),
'saved': site_saved
})
except Exception as e:
# 单个网站失败不影响其他网站
processed_sites.append({
'id': site.id,
'name': site.name,
'error': str(e)
})
continue
# 提交事务
db.session.commit()
return jsonify({
'success': True,
'message': f'批量获取完成,共处理 {len(processed_sites)} 个网站',
'total_found': total_found,
'total_saved': total_saved,
'processed_sites': processed_sites
})
except Exception as e:
db.session.rollback()
return jsonify({
'success': False,
'message': f'批量获取失败: {str(e)}'
}), 500
# ========== SEO路由 (v2.4新增) ==========
@app.route('/sitemap.xml')
def sitemap():
"""动态生成sitemap.xml"""
from flask import make_response
# 获取所有启用的网站
sites = Site.query.filter_by(is_active=True).order_by(Site.updated_at.desc()).all()
# 获取所有标签
tags = Tag.query.all()
# 构建XML内容
xml_content = '''<?xml version="1.0" encoding="UTF-8"?>
<urlset xmlns="http://www.sitemaps.org/schemas/sitemap/0.9">'''
# 首页
xml_content += '''
<url>
<loc>{}</loc>
<changefreq>daily</changefreq>
<priority>1.0</priority>
</url>'''.format(request.url_root.rstrip('/'))
# 工具详情页
for site in sites:
xml_content += '''
<url>
<loc>{}</loc>
<lastmod>{}</lastmod>
<changefreq>weekly</changefreq>
<priority>0.8</priority>
</url>'''.format(
request.url_root.rstrip('/') + url_for('site_detail', code=site.code),
site.updated_at.strftime('%Y-%m-%d') if site.updated_at else datetime.now().strftime('%Y-%m-%d')
)
# 标签页
for tag in tags:
xml_content += '''
<url>
<loc>{}</loc>
<changefreq>weekly</changefreq>
<priority>0.6</priority>
</url>'''.format(request.url_root.rstrip('/') + '/?tag=' + tag.slug)
xml_content += '''
</urlset>'''
response = make_response(xml_content)
response.headers['Content-Type'] = 'application/xml; charset=utf-8'
return response
@app.route('/robots.txt')
def robots():
"""动态生成robots.txt"""
from flask import make_response
robots_content = '''User-agent: *
Allow: /
Disallow: /admin/
Disallow: /api/
Sitemap: {}sitemap.xml
'''.format(request.url_root)
response = make_response(robots_content)
response.headers['Content-Type'] = 'text/plain; charset=utf-8'
return response
# ========== SEO工具管理路由 (v2.4新增) ==========
@app.route('/admin/seo-tools')
@login_required
def seo_tools():
"""SEO工具管理页面"""
# 只允许管理员访问
if not isinstance(current_user, AdminModel):
flash('无权访问此页面', 'error')
return redirect(url_for('index'))
# 检查static/sitemap.xml是否存在及最后更新时间
sitemap_path = 'static/sitemap.xml'
sitemap_info = None
if os.path.exists(sitemap_path):
import time
mtime = os.path.getmtime(sitemap_path)
sitemap_info = {
'exists': True,
'last_updated': datetime.fromtimestamp(mtime).strftime('%Y-%m-%d %H:%M:%S'),
'size': os.path.getsize(sitemap_path)
}
else:
sitemap_info = {'exists': False}
return render_template('admin/seo_tools.html', sitemap_info=sitemap_info)
@app.route('/api/generate-static-sitemap', methods=['POST'])
@login_required
def generate_static_sitemap():
"""生成静态sitemap.xml文件"""
# 只允许管理员访问
if not isinstance(current_user, AdminModel):
return jsonify({'success': False, 'message': '无权访问'}), 403
try:
# 获取所有启用的网站
sites = Site.query.filter_by(is_active=True).order_by(Site.updated_at.desc()).all()
# 获取所有标签
tags = Tag.query.all()
# 构建XML内容使用网站配置的域名
base_url = request.url_root.rstrip('/')
xml_content = '''<?xml version="1.0" encoding="UTF-8"?>
<urlset xmlns="http://www.sitemaps.org/schemas/sitemap/0.9">'''
# 首页
xml_content += f'''
<url>
<loc>{base_url}</loc>
<changefreq>daily</changefreq>
<priority>1.0</priority>
</url>'''
# 工具详情页
for site in sites:
xml_content += f'''
<url>
<loc>{base_url}/site/{site.code}</loc>
<lastmod>{site.updated_at.strftime('%Y-%m-%d') if site.updated_at else datetime.now().strftime('%Y-%m-%d')}</lastmod>
<changefreq>weekly</changefreq>
<priority>0.8</priority>
</url>'''
# 标签页
for tag in tags:
xml_content += f'''
<url>
<loc>{base_url}/?tag={tag.slug}</loc>
<changefreq>weekly</changefreq>
<priority>0.6</priority>
</url>'''
xml_content += '''
</urlset>'''
# 保存到static目录
static_dir = 'static'
os.makedirs(static_dir, exist_ok=True)
sitemap_path = os.path.join(static_dir, 'sitemap.xml')
with open(sitemap_path, 'w', encoding='utf-8') as f:
f.write(xml_content)
# 统计信息
total_urls = 1 + len(sites) + len(tags) # 首页 + 工具页 + 标签页
return jsonify({
'success': True,
'message': f'静态sitemap.xml生成成功共包含 {total_urls} 个URL',
'total_urls': total_urls,
'file_path': sitemap_path,
'timestamp': datetime.now().strftime('%Y-%m-%d %H:%M:%S')
})
except Exception as e:
return jsonify({
'success': False,
'message': f'生成失败: {str(e)}'
}), 500
@app.route('/api/notify-search-engines', methods=['POST'])
@login_required
def notify_search_engines():
"""通知搜索引擎sitemap更新"""
# 只允许管理员访问
if not isinstance(current_user, AdminModel):
return jsonify({'success': False, 'message': '无权访问'}), 403
try:
import requests
from urllib.parse import quote
# 获取sitemap URL使用当前请求的域名
sitemap_url = request.url_root.rstrip('/') + '/sitemap.xml'
encoded_sitemap_url = quote(sitemap_url, safe='')
results = []
# 1. 通知Google
google_ping_url = f'http://www.google.com/ping?sitemap={encoded_sitemap_url}'
try:
google_response = requests.get(google_ping_url, timeout=10)
results.append({
'engine': 'Google',
'status': 'success' if google_response.status_code == 200 else 'failed',
'status_code': google_response.status_code,
'message': '提交成功' if google_response.status_code == 200 else f'HTTP {google_response.status_code}'
})
except Exception as e:
results.append({
'engine': 'Google',
'status': 'error',
'message': f'请求失败: {str(e)}'
})
# 2. 通知Baidu
baidu_ping_url = f'http://data.zz.baidu.com/ping?sitemap={encoded_sitemap_url}'
try:
baidu_response = requests.get(baidu_ping_url, timeout=10)
results.append({
'engine': 'Baidu',
'status': 'success' if baidu_response.status_code == 200 else 'failed',
'status_code': baidu_response.status_code,
'message': '提交成功' if baidu_response.status_code == 200 else f'HTTP {baidu_response.status_code}'
})
except Exception as e:
results.append({
'engine': 'Baidu',
'status': 'error',
'message': f'请求失败: {str(e)}'
})
# 3. 通知Bing
bing_ping_url = f'http://www.bing.com/ping?sitemap={encoded_sitemap_url}'
try:
bing_response = requests.get(bing_ping_url, timeout=10)
results.append({
'engine': 'Bing',
'status': 'success' if bing_response.status_code == 200 else 'failed',
'status_code': bing_response.status_code,
'message': '提交成功' if bing_response.status_code == 200 else f'HTTP {bing_response.status_code}'
})
except Exception as e:
results.append({
'engine': 'Bing',
'status': 'error',
'message': f'请求失败: {str(e)}'
})
# 统计成功数量
success_count = sum(1 for r in results if r['status'] == 'success')
return jsonify({
'success': True,
'message': f'已通知 {success_count}/{len(results)} 个搜索引擎',
'sitemap_url': sitemap_url,
'results': results
})
except Exception as e:
return jsonify({
'success': False,
'message': f'通知失败: {str(e)}'
}), 500
@app.route('/api/refresh-site-news/<site_code>', methods=['POST'])
def refresh_site_news(site_code):
"""手动刷新指定网站的新闻(前台用户可访问)- v2.3新增"""
try:
# 根据code查找网站
site = Site.query.filter_by(code=site_code).first()
if not site:
return jsonify({
'success': False,
'message': '网站不存在'
}), 404
# 检查博查API配置
api_key = app.config.get('BOCHA_API_KEY')
if not api_key:
return jsonify({
'success': False,
'message': '新闻功能未启用'
}), 500
# 创建新闻搜索器
searcher = NewsSearcher(api_key)
# 搜索新闻获取最新5条
news_items = searcher.search_site_news(
site_name=site.name,
site_url=site.url,
news_keywords=site.news_keywords, # 使用专用关键词
count=5,
freshness='oneWeek' # 一周内的新闻
)
if not news_items:
return jsonify({
'success': False,
'message': '未找到相关新闻'
}), 404
# 保存新闻到数据库
saved_count = 0
for item in news_items:
# 检查新闻是否已存在根据URL判断
existing_news = News.query.filter_by(
site_id=site.id,
url=item['url']
).first()
if not existing_news:
news = News(
site_id=site.id,
title=item['title'],
content=item.get('summary') or item.get('snippet', ''),
url=item['url'],
source_name=item.get('site_name', ''),
source_icon=item.get('site_icon', ''),
published_at=item.get('published_at'),
news_type='Search Result',
is_active=True
)
db.session.add(news)
saved_count += 1
# 提交事务
db.session.commit()
return jsonify({
'success': True,
'message': f'成功获取 {saved_count} 条新资讯',
'total_found': len(news_items),
'saved_count': saved_count
})
except Exception as e:
db.session.rollback()
return jsonify({
'success': False,
'message': f'获取失败: {str(e)}'
}), 500
# ========== 批量导入路由 ==========
@app.route('/admin/batch-import', methods=['GET', 'POST'])
@login_required
def batch_import():
"""批量导入网站"""
# 只允许管理员访问
if not isinstance(current_user, AdminModel):
flash('无权访问此页面', 'error')
return redirect(url_for('index'))
from utils.bookmark_parser import BookmarkParser
from utils.website_fetcher import WebsiteFetcher
results = None
if request.method == 'POST':
import_type = request.form.get('import_type')
auto_activate = request.form.get('auto_activate') == 'on'
parser = BookmarkParser()
fetcher = WebsiteFetcher(timeout=15)
urls_to_import = []
try:
# 解析输入
if import_type == 'url_list':
url_list_text = request.form.get('url_list', '')
urls_to_import = parser.parse_url_list(url_list_text)
elif import_type == 'bookmark_file':
bookmark_file = request.files.get('bookmark_file')
if not bookmark_file:
flash('请选择书签文件', 'error')
return render_template('admin/batch_import.html')
html_content = bookmark_file.read().decode('utf-8', errors='ignore')
all_bookmarks = parser.parse_html_file(html_content)
# 筛选文件夹(如果指定)
folder_filter = request.form.get('folder_filter', '').strip()
if folder_filter:
urls_to_import = [
b for b in all_bookmarks
if folder_filter.lower() in b.get('folder', '').lower()
]
else:
urls_to_import = all_bookmarks
# 批量导入
success_list = []
failed_list = []
for idx, item in enumerate(urls_to_import, 1):
url = item['url']
name = item.get('name', '')
# 为每个URL创建独立的事务
try:
# 1. 检查URL是否已存在
try:
existing = Site.query.filter_by(url=url).first()
if existing:
failed_list.append({
'url': url,
'name': name or existing.name,
'error': f'该URL已存在网站名称{existing.name}'
})
continue
except Exception as e:
failed_list.append({
'url': url,
'name': name,
'error': f'检查URL时出错: {str(e)}'
})
continue
# 2. 抓取网站信息(带超时和错误处理)
info = None
try:
info = fetcher.fetch_website_info(url)
except Exception as e:
print(f"抓取 {url} 失败: {str(e)}")
# 抓取失败不是致命错误,继续尝试使用书签名称
# 3. 处理网站信息
if not info or not info.get('name'):
# 如果有书签名称,使用书签名称
if name:
info = {
'name': name,
'description': '',
'logo_url': ''
}
else:
# 尝试从URL提取域名作为名称
from urllib.parse import urlparse
try:
parsed = urlparse(url)
domain = parsed.netloc or parsed.path
if domain:
info = {
'name': domain,
'description': '',
'logo_url': ''
}
else:
failed_list.append({
'url': url,
'name': name,
'error': '无法获取网站信息且没有备用名称'
})
continue
except Exception:
failed_list.append({
'url': url,
'name': name,
'error': '无法获取网站信息且URL解析失败'
})
continue
# 4. 下载Logo到本地失败不影响导入
logo_path = None
if info.get('logo_url'):
try:
logo_path = fetcher.download_logo(info['logo_url'], save_dir='static/logos')
except Exception as e:
print(f"下载Logo失败 ({url}): {str(e)}")
# Logo下载失败不影响网站导入
# 5. 生成code和slug
try:
import random
from pypinyin import lazy_pinyin
import re
# 生成唯一的code
site_code = None
max_attempts = 10
for _ in range(max_attempts):
code = str(random.randint(10000000, 99999999))
if not Site.query.filter_by(code=code).first():
site_code = code
break
if not site_code:
# 如果10次都失败使用时间戳
import time
site_code = str(int(time.time() * 1000))[-8:]
# 生成slug
site_name = info.get('name', name or 'Unknown')[:100]
slug = ''.join(lazy_pinyin(site_name))
slug = slug.lower()
slug = re.sub(r'[^\w\s-]', '', slug)
slug = re.sub(r'[-\s]+', '-', slug).strip('-')
if not slug:
slug = f"site-{site_code}"
# 确保slug唯一
base_slug = slug[:50] # 限制长度
counter = 1
final_slug = slug
while Site.query.filter_by(slug=final_slug).first():
final_slug = f"{base_slug}-{counter}"
counter += 1
if counter > 100: # 防止无限循环
final_slug = f"{base_slug}-{site_code}"
break
# 6. 创建网站记录带code和slug
site = Site(
code=site_code,
slug=final_slug,
name=site_name,
url=url[:500], # 限制URL长度
logo=logo_path or info.get('logo_url', '')[:500] if info.get('logo_url') else '',
short_desc=info.get('description', '')[:200] if info.get('description') else '',
description=info.get('description', '')[:2000] if info.get('description') else '',
is_active=auto_activate
)
# 添加到数据库并提交
db.session.add(site)
db.session.commit()
success_list.append({
'name': site.name,
'url': site.url
})
print(f"成功导入 [{idx}/{len(urls_to_import)}]: {site.name}")
except Exception as e:
db.session.rollback()
failed_list.append({
'url': url,
'name': name or info.get('name', 'Unknown'),
'error': f'数据库保存失败: {str(e)}'
})
continue
except Exception as e:
# 捕获所有未预期的错误
db.session.rollback()
failed_list.append({
'url': url,
'name': name,
'error': f'未知错误: {str(e)}'
})
print(f"导入 {url} 时发生未知错误: {str(e)}")
continue
results = {
'total_count': len(urls_to_import),
'success_count': len(success_list),
'failed_count': len(failed_list),
'success_list': success_list,
'failed_list': failed_list
}
if success_list:
flash(f'成功导入 {len(success_list)} 个网站!', 'success')
except Exception as e:
flash(f'导入失败: {str(e)}', 'error')
return render_template('admin/batch_import.html', results=results)
# ========== Flask-Admin 配置 ==========
class SecureModelView(ModelView):
"""需要登录的模型视图"""
# 中文化配置
can_set_page_size = True
page_size = 20
# 自定义文本
list_template = 'admin/model/list.html'
create_template = 'admin/model/create.html'
edit_template = 'admin/model/edit.html'
# 覆盖英文文本
named_filter_urls = True
def is_accessible(self):
# 只允许Admin类型的用户访问
return current_user.is_authenticated and isinstance(current_user, AdminModel)
def inaccessible_callback(self, name, **kwargs):
return redirect(url_for('admin_login'))
class SecureAdminIndexView(AdminIndexView):
"""需要登录的管理首页"""
def is_accessible(self):
# 只允许Admin类型的用户访问
return current_user.is_authenticated and isinstance(current_user, AdminModel)
def inaccessible_callback(self, name, **kwargs):
return redirect(url_for('admin_login'))
@expose('/')
def index(self):
"""控制台首页,显示统计信息"""
# 统计数据
stats = {
'sites_count': Site.query.filter_by(is_active=True).count(),
'tags_count': Tag.query.count(),
'news_count': News.query.filter_by(is_active=True).count(),
'total_views': db.session.query(db.func.sum(Site.view_count)).scalar() or 0
}
# 最近添加的工具最多5个
recent_sites = Site.query.order_by(Site.created_at.desc()).limit(5).all()
return self.render('admin/index.html', stats=stats, recent_sites=recent_sites)
# 网站管理视图
class SiteAdmin(SecureModelView):
# 自定义模板
create_template = 'admin/site/create.html'
edit_template = 'admin/site/edit.html'
# 启用编辑和删除
can_edit = True
can_delete = True
can_create = True
can_view_details = False # 禁用查看详情,点击名称即可查看
# 显示操作列
column_display_actions = True
action_disallowed_list = []
column_list = ['id', 'code', 'name', 'url', 'slug', 'is_active', 'is_recommended', 'view_count', 'created_at']
column_searchable_list = ['code', 'name', 'url', 'description']
column_filters = ['is_active', 'is_recommended', 'tags']
column_labels = {
'id': 'ID',
'code': '网站编码',
'name': '网站名称',
'url': 'URL',
'slug': 'URL别名',
'logo': 'Logo',
'short_desc': '简短描述',
'description': '详细介绍',
'features': '主要功能',
'news_keywords': '新闻关键词',
'is_active': '是否启用',
'is_recommended': '是否推荐',
'view_count': '浏览次数',
'sort_order': '排序权重',
'tags': '标签',
'created_at': '创建时间',
'updated_at': '更新时间'
}
form_columns = ['name', 'url', 'slug', 'logo', 'short_desc', 'description', 'features', 'news_keywords', 'tags', 'is_active', 'is_recommended', 'sort_order']
# 自定义编辑/删除文字
column_extra_row_actions = None
def on_model_change(self, form, model, is_created):
"""保存前自动生成code和slug如果为空"""
import re
import random
from pypinyin import lazy_pinyin
from flask import request
# 使用no_autoflush防止在查询时触发提前flush
with db.session.no_autoflush:
# 处理手动输入的新标签
new_tags_str = request.form.get('new_tags', '')
if new_tags_str:
new_tag_names = [name.strip() for name in new_tags_str.split(',') if name.strip()]
for tag_name in new_tag_names:
# 检查标签是否已存在
existing_tag = Tag.query.filter_by(name=tag_name).first()
if not existing_tag:
# 创建新标签
tag_slug = ''.join(lazy_pinyin(tag_name))
tag_slug = tag_slug.lower()
tag_slug = re.sub(r'[^\w\s-]', '', tag_slug)
tag_slug = re.sub(r'[-\s]+', '-', tag_slug).strip('-')
# 确保slug唯一
base_tag_slug = tag_slug[:50]
counter = 1
final_tag_slug = tag_slug
while Tag.query.filter_by(slug=final_tag_slug).first():
final_tag_slug = f"{base_tag_slug}-{counter}"
counter += 1
if counter > 100:
final_tag_slug = f"{base_tag_slug}-{random.randint(1000, 9999)}"
break
new_tag = Tag(name=tag_name, slug=final_tag_slug)
db.session.add(new_tag)
db.session.flush() # 确保新标签有ID
# 添加到模型的标签列表
if new_tag not in model.tags:
model.tags.append(new_tag)
else:
# 添加已存在的标签
if existing_tag not in model.tags:
model.tags.append(existing_tag)
# 如果code为空自动生成唯一的8位数字编码
if not model.code or model.code.strip() == '':
max_attempts = 10
for attempt in range(max_attempts):
# 生成10000000-99999999之间的随机数
code = str(random.randint(10000000, 99999999))
# 检查是否已存在
existing = Site.query.filter(Site.code == code).first()
if not existing or existing.id == model.id:
model.code = code
break
# 如果10次都失败使用时间戳
if not model.code:
import time
model.code = str(int(time.time() * 1000))[-8:]
# 如果slug为空从name自动生成
if not model.slug or model.slug.strip() == '':
# 将中文转换为拼音
slug = ''.join(lazy_pinyin(model.name))
# 转换为小写,移除特殊字符
slug = slug.lower()
slug = re.sub(r'[^\w\s-]', '', slug)
slug = re.sub(r'[-\s]+', '-', slug).strip('-')
# 如果转换后为空使用code
if not slug:
slug = f"site-{model.code}"
# 确保slug唯一限制长度和重试次数
base_slug = slug[:50]
counter = 1
final_slug = slug
max_slug_attempts = 100
while counter < max_slug_attempts:
existing = Site.query.filter(Site.slug == final_slug).first()
if not existing or existing.id == model.id:
break
final_slug = f"{base_slug}-{counter}"
counter += 1
# 如果100次都失败使用code确保唯一
if counter >= max_slug_attempts:
final_slug = f"{base_slug}-{model.code}"
model.slug = final_slug
# 标签管理视图
class TagAdmin(SecureModelView):
can_edit = True
can_delete = True
can_create = True
can_view_details = False
# 显示操作列
column_display_actions = True
column_list = ['id', 'name', 'slug', 'description', 'sort_order']
column_searchable_list = ['name', 'description']
column_labels = {
'id': 'ID',
'name': '标签名称',
'slug': 'URL别名',
'description': '标签描述',
'seo_title': 'SEO标题 (v2.4)',
'seo_description': 'SEO描述 (v2.4)',
'seo_keywords': 'SEO关键词 (v2.4)',
'icon': '图标',
'sort_order': '排序权重',
'created_at': '创建时间'
}
form_columns = ['name', 'slug', 'description', 'seo_title', 'seo_description', 'seo_keywords', 'icon', 'sort_order']
# 管理员视图
class AdminAdmin(SecureModelView):
can_edit = True
can_delete = True
can_create = True
can_view_details = False
# 显示操作列
column_display_actions = True
column_list = ['id', 'username', 'email', 'is_active', 'last_login', 'created_at']
column_searchable_list = ['username', 'email']
column_filters = ['is_active']
column_labels = {
'id': 'ID',
'username': '用户名',
'email': '邮箱',
'is_active': '是否启用',
'created_at': '创建时间',
'last_login': '最后登录'
}
form_columns = ['username', 'email', 'is_active']
def on_model_change(self, form, model, is_created):
# 如果是新建管理员,设置默认密码
if is_created:
model.set_password('admin123') # 默认密码
# 新闻管理视图
class NewsAdmin(SecureModelView):
can_edit = True
can_delete = True
can_create = True
can_view_details = False
# 显示操作列
column_display_actions = True
column_list = ['id', 'site', 'title', 'source_name', 'news_type', 'published_at', 'is_active']
column_searchable_list = ['title', 'content', 'source_name']
column_filters = ['site', 'news_type', 'source_name', 'is_active', 'published_at']
column_labels = {
'id': 'ID',
'site': '关联网站',
'title': '新闻标题',
'content': '新闻内容',
'news_type': '新闻类型',
'url': '新闻链接',
'source_name': '来源网站',
'source_icon': '来源图标',
'published_at': '发布时间',
'is_active': '是否启用',
'created_at': '创建时间',
'updated_at': '更新时间'
}
form_columns = ['site', 'title', 'content', 'news_type', 'url', 'source_name', 'source_icon', 'published_at', 'is_active']
# 可选的新闻类型
form_choices = {
'news_type': [
('Search Result', 'Search Result'),
('Product Update', 'Product Update'),
('Industry News', 'Industry News'),
('Company News', 'Company News'),
('Other', 'Other')
]
}
# 默认排序
column_default_sort = ('published_at', True) # 按发布时间倒序排列
# Prompt模板管理视图
class PromptAdmin(SecureModelView):
can_edit = True
can_delete = False # 不允许删除避免系统必需的prompt被删除
can_create = True
can_view_details = False
# 显示操作列
column_display_actions = True
column_list = ['id', 'key', 'name', 'description', 'is_active', 'updated_at']
column_searchable_list = ['key', 'name', 'description']
column_filters = ['is_active', 'key']
column_labels = {
'id': 'ID',
'key': '唯一标识',
'name': '模板名称',
'system_prompt': '系统提示词',
'user_prompt_template': '用户提示词模板',
'description': '模板说明',
'is_active': '是否启用',
'created_at': '创建时间',
'updated_at': '更新时间'
}
form_columns = ['key', 'name', 'description', 'system_prompt', 'user_prompt_template', 'is_active']
# 字段说明
column_descriptions = {
'key': '唯一标识,如: tags, features, description',
'system_prompt': 'AI的系统角色设定',
'user_prompt_template': '用户提示词模板,支持变量如 {name}, {description}, {url}',
}
# 表单字段配置
form_widget_args = {
'system_prompt': {
'rows': 3,
'style': 'font-family: monospace;'
},
'user_prompt_template': {
'rows': 20,
'style': 'font-family: monospace;'
}
}
# 初始化 Flask-Admin
admin = Admin(
app,
name='ZJPB 焦提示词',
template_mode='bootstrap4',
index_view=SecureAdminIndexView(name='控制台', url='/admin'),
base_template='admin/master.html'
)
admin.add_view(SiteAdmin(Site, db.session, name='网站管理'))
admin.add_view(TagAdmin(Tag, db.session, name='标签管理'))
admin.add_view(NewsAdmin(News, db.session, name='新闻管理'))
admin.add_view(PromptAdmin(PromptTemplate, db.session, name='Prompt管理'))
admin.add_view(AdminAdmin(AdminModel, db.session, name='管理员', endpoint='admin_users'))
return app
if __name__ == '__main__':
app = create_app(os.getenv('FLASK_ENV', 'development'))
app.run(host='0.0.0.0', port=5000, debug=True)