feat: v3.0 - 用户系统和收藏功能

核心功能:
- 用户注册/登录系统(用户名+密码)
- 工具收藏功能(一键收藏/取消收藏)
- 收藏分组管理(文件夹)
- 用户中心(个人资料、收藏列表)

数据库变更:
- 新增 users 表(用户信息)
- 新增 folders 表(收藏分组)
- 新增 collections 表(收藏记录)

安全增强:
- Admin 和 User 完全隔离
- 修复14个管理员路由的权限漏洞
- 所有管理功能添加用户类型检查

新增文件:
- templates/auth/register.html - 注册页面
- templates/auth/login.html - 登录页面
- templates/user/profile.html - 用户中心
- templates/user/collections.html - 收藏列表
- create_user_tables.py - 数据库迁移脚本
- USER_SYSTEM_README.md - 用户系统文档
- CHANGELOG_v3.0.md - 版本更新日志

Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
This commit is contained in:
Jowe
2026-02-06 19:19:05 +08:00
parent 34cd05b01c
commit 2067fb1712
11 changed files with 2542 additions and 6 deletions

663
app.py
View File

@@ -9,7 +9,7 @@ from flask_admin import Admin, AdminIndexView, expose
from flask_admin.contrib.sqla import ModelView
from datetime import datetime
from config import config
from models import db, Site, Tag, Admin as AdminModel, News, site_tags, PromptTemplate
from models import db, Site, Tag, Admin as AdminModel, News, site_tags, PromptTemplate, User, Folder, Collection
from utils.website_fetcher import WebsiteFetcher
from utils.tag_generator import TagGenerator
from utils.news_searcher import NewsSearcher
@@ -68,7 +68,17 @@ def create_app(config_name='default'):
@login_manager.user_loader
def load_user(user_id):
return AdminModel.query.get(int(user_id))
"""加载用户支持Admin和User两种类型"""
try:
user_type, uid = user_id.split(':', 1)
if user_type == 'admin':
return AdminModel.query.get(int(uid))
elif user_type == 'user':
return User.query.get(int(uid))
except (ValueError, AttributeError):
# 兼容旧格式纯数字ID默认为Admin
return AdminModel.query.get(int(user_id))
return None
# ========== 前台路由 ==========
@app.route('/')
@@ -593,10 +603,607 @@ def create_app(config_name='default'):
logout_user()
return redirect(url_for('index'))
# ========== 用户认证路由 ==========
@app.route('/register', methods=['GET', 'POST'])
def user_register():
"""用户注册"""
if current_user.is_authenticated:
# 如果已登录,跳转到首页
return redirect(url_for('index'))
if request.method == 'POST':
username = request.form.get('username', '').strip()
password = request.form.get('password', '').strip()
confirm_password = request.form.get('confirm_password', '').strip()
# 验证输入
if not username or not password:
flash('用户名和密码不能为空', 'error')
return render_template('auth/register.html')
if len(username) < 3:
flash('用户名至少3个字符', 'error')
return render_template('auth/register.html')
if len(password) < 6:
flash('密码至少6个字符', 'error')
return render_template('auth/register.html')
if password != confirm_password:
flash('两次输入的密码不一致', 'error')
return render_template('auth/register.html')
# 检查用户名是否已存在
if User.query.filter_by(username=username).first():
flash('该用户名已被注册', 'error')
return render_template('auth/register.html')
# 创建用户
try:
user = User(username=username)
user.set_password(password)
db.session.add(user)
db.session.commit()
# 自动登录
login_user(user)
user.last_login = datetime.now()
db.session.commit()
flash('注册成功!', 'success')
return redirect(url_for('index'))
except Exception as e:
db.session.rollback()
flash(f'注册失败:{str(e)}', 'error')
return render_template('auth/register.html')
return render_template('auth/register.html')
@app.route('/login', methods=['GET', 'POST'])
def user_login():
"""用户登录"""
if current_user.is_authenticated:
return redirect(url_for('index'))
if request.method == 'POST':
username = request.form.get('username', '').strip()
password = request.form.get('password', '').strip()
if not username or not password:
flash('请输入用户名和密码', 'error')
return render_template('auth/login.html')
# 查找用户
user = User.query.filter_by(username=username).first()
if user and user.check_password(password) and user.is_active:
login_user(user)
user.last_login = datetime.now()
db.session.commit()
# 获取next参数如果有则跳转否则跳转首页
next_page = request.args.get('next')
if next_page and next_page.startswith('/'):
return redirect(next_page)
return redirect(url_for('index'))
else:
flash('用户名或密码错误', 'error')
return render_template('auth/login.html')
@app.route('/logout')
@login_required
def user_logout():
"""用户登出"""
logout_user()
return redirect(url_for('index'))
# ========== 用户认证API ==========
@app.route('/api/auth/status', methods=['GET'])
def auth_status():
"""获取登录状态"""
if current_user.is_authenticated:
# 判断是Admin还是User
if isinstance(current_user, User):
return jsonify({
'logged_in': True,
'user_type': 'user',
'username': current_user.username,
'avatar': current_user.avatar,
'id': current_user.id
})
else:
return jsonify({
'logged_in': True,
'user_type': 'admin',
'username': current_user.username,
'id': current_user.id
})
return jsonify({'logged_in': False})
# ========== 收藏功能API ==========
@app.route('/api/collections/toggle', methods=['POST'])
@login_required
def toggle_collection():
"""收藏/取消收藏"""
# 检查是否为普通用户
if not isinstance(current_user, User):
return jsonify({'success': False, 'message': '管理员账号无法使用收藏功能'}), 403
try:
data = request.get_json() or {}
site_code = data.get('site_code', '').strip()
folder_id = data.get('folder_id') # 可选,指定文件夹
if not site_code:
return jsonify({'success': False, 'message': '请提供网站编码'}), 400
# 查找网站
site = Site.query.filter_by(code=site_code, is_active=True).first()
if not site:
return jsonify({'success': False, 'message': '网站不存在'}), 404
# 检查是否已收藏
existing = Collection.query.filter_by(
user_id=current_user.id,
site_id=site.id
).first()
if existing:
# 已收藏,则取消收藏
db.session.delete(existing)
db.session.commit()
return jsonify({
'success': True,
'action': 'removed',
'message': '已取消收藏'
})
else:
# 未收藏,则添加收藏
collection = Collection(
user_id=current_user.id,
site_id=site.id,
folder_id=folder_id
)
db.session.add(collection)
db.session.commit()
return jsonify({
'success': True,
'action': 'added',
'message': '收藏成功',
'collection_id': collection.id
})
except Exception as e:
db.session.rollback()
return jsonify({'success': False, 'message': f'操作失败:{str(e)}'}), 500
@app.route('/api/collections/status/<site_code>', methods=['GET'])
@login_required
def collection_status(site_code):
"""查询收藏状态"""
if not isinstance(current_user, User):
return jsonify({'is_collected': False})
try:
site = Site.query.filter_by(code=site_code, is_active=True).first()
if not site:
return jsonify({'is_collected': False})
collection = Collection.query.filter_by(
user_id=current_user.id,
site_id=site.id
).first()
return jsonify({
'is_collected': collection is not None,
'collection_id': collection.id if collection else None,
'folder_id': collection.folder_id if collection else None
})
except Exception as e:
return jsonify({'is_collected': False, 'error': str(e)})
@app.route('/api/collections/list', methods=['GET'])
@login_required
def list_collections():
"""获取收藏列表"""
if not isinstance(current_user, User):
return jsonify({'success': False, 'message': '仅普通用户可访问'}), 403
try:
folder_id = request.args.get('folder_id')
page = request.args.get('page', 1, type=int)
per_page = request.args.get('per_page', 20, type=int)
# 构建查询
query = Collection.query.filter_by(user_id=current_user.id)
if folder_id:
if folder_id == 'null' or folder_id == 'none':
query = query.filter_by(folder_id=None)
else:
query = query.filter_by(folder_id=int(folder_id))
# 按创建时间倒序
query = query.order_by(Collection.created_at.desc())
# 分页
pagination = query.paginate(page=page, per_page=per_page, error_out=False)
# 格式化数据
collections_data = []
for collection in pagination.items:
site = collection.site
collections_data.append({
'id': collection.id,
'site_id': site.id,
'site_code': site.code,
'site_name': site.name,
'site_url': site.url,
'site_logo': site.logo,
'site_short_desc': site.short_desc,
'folder_id': collection.folder_id,
'note': collection.note,
'created_at': collection.created_at.strftime('%Y-%m-%d %H:%M:%S') if collection.created_at else None
})
return jsonify({
'success': True,
'collections': collections_data,
'total': pagination.total,
'page': page,
'per_page': per_page,
'has_next': pagination.has_next,
'has_prev': pagination.has_prev
})
except Exception as e:
return jsonify({'success': False, 'message': f'获取失败:{str(e)}'}), 500
@app.route('/api/collections/<int:collection_id>/note', methods=['PUT'])
@login_required
def update_collection_note(collection_id):
"""更新收藏备注"""
if not isinstance(current_user, User):
return jsonify({'success': False, 'message': '仅普通用户可访问'}), 403
try:
data = request.get_json() or {}
note = data.get('note', '').strip()
collection = Collection.query.filter_by(
id=collection_id,
user_id=current_user.id
).first()
if not collection:
return jsonify({'success': False, 'message': '收藏不存在'}), 404
collection.note = note
db.session.commit()
return jsonify({
'success': True,
'message': '备注已更新'
})
except Exception as e:
db.session.rollback()
return jsonify({'success': False, 'message': f'更新失败:{str(e)}'}), 500
@app.route('/api/collections/<int:collection_id>/move', methods=['PUT'])
@login_required
def move_collection(collection_id):
"""移动收藏到其他文件夹"""
if not isinstance(current_user, User):
return jsonify({'success': False, 'message': '仅普通用户可访问'}), 403
try:
data = request.get_json() or {}
folder_id = data.get('folder_id') # None表示移到未分类
collection = Collection.query.filter_by(
id=collection_id,
user_id=current_user.id
).first()
if not collection:
return jsonify({'success': False, 'message': '收藏不存在'}), 404
# 如果指定了文件夹,验证文件夹是否属于当前用户
if folder_id:
folder = Folder.query.filter_by(
id=folder_id,
user_id=current_user.id
).first()
if not folder:
return jsonify({'success': False, 'message': '文件夹不存在'}), 404
collection.folder_id = folder_id
db.session.commit()
return jsonify({
'success': True,
'message': '已移动到指定文件夹'
})
except Exception as e:
db.session.rollback()
return jsonify({'success': False, 'message': f'移动失败:{str(e)}'}), 500
# ========== 文件夹管理API ==========
@app.route('/api/folders', methods=['GET'])
@login_required
def list_folders():
"""获取文件夹列表"""
if not isinstance(current_user, User):
return jsonify({'success': False, 'message': '仅普通用户可访问'}), 403
try:
folders = Folder.query.filter_by(user_id=current_user.id).order_by(
Folder.sort_order.desc(), Folder.created_at
).all()
folders_data = []
for folder in folders:
# 统计文件夹中的收藏数量
count = Collection.query.filter_by(
user_id=current_user.id,
folder_id=folder.id
).count()
folders_data.append({
'id': folder.id,
'name': folder.name,
'description': folder.description,
'icon': folder.icon,
'sort_order': folder.sort_order,
'is_public': folder.is_public,
'public_slug': folder.public_slug,
'count': count,
'created_at': folder.created_at.strftime('%Y-%m-%d %H:%M:%S') if folder.created_at else None
})
return jsonify({
'success': True,
'folders': folders_data
})
except Exception as e:
return jsonify({'success': False, 'message': f'获取失败:{str(e)}'}), 500
@app.route('/api/folders', methods=['POST'])
@login_required
def create_folder():
"""创建文件夹"""
if not isinstance(current_user, User):
return jsonify({'success': False, 'message': '仅普通用户可访问'}), 403
try:
data = request.get_json() or {}
name = data.get('name', '').strip()
description = data.get('description', '').strip()
icon = data.get('icon', '📁').strip()
if not name:
return jsonify({'success': False, 'message': '文件夹名称不能为空'}), 400
# 检查同名文件夹
existing = Folder.query.filter_by(
user_id=current_user.id,
name=name
).first()
if existing:
return jsonify({'success': False, 'message': '该文件夹名称已存在'}), 400
# 创建文件夹
folder = Folder(
user_id=current_user.id,
name=name,
description=description,
icon=icon
)
db.session.add(folder)
db.session.commit()
return jsonify({
'success': True,
'message': '文件夹创建成功',
'folder': {
'id': folder.id,
'name': folder.name,
'description': folder.description,
'icon': folder.icon
}
})
except Exception as e:
db.session.rollback()
return jsonify({'success': False, 'message': f'创建失败:{str(e)}'}), 500
@app.route('/api/folders/<int:folder_id>', methods=['PUT'])
@login_required
def update_folder(folder_id):
"""更新文件夹"""
if not isinstance(current_user, User):
return jsonify({'success': False, 'message': '仅普通用户可访问'}), 403
try:
data = request.get_json() or {}
folder = Folder.query.filter_by(
id=folder_id,
user_id=current_user.id
).first()
if not folder:
return jsonify({'success': False, 'message': '文件夹不存在'}), 404
# 更新字段
if 'name' in data:
name = data['name'].strip()
if not name:
return jsonify({'success': False, 'message': '文件夹名称不能为空'}), 400
# 检查同名(排除自己)
existing = Folder.query.filter(
Folder.user_id == current_user.id,
Folder.name == name,
Folder.id != folder_id
).first()
if existing:
return jsonify({'success': False, 'message': '该文件夹名称已存在'}), 400
folder.name = name
if 'description' in data:
folder.description = data['description'].strip()
if 'icon' in data:
folder.icon = data['icon'].strip()
if 'sort_order' in data:
folder.sort_order = int(data['sort_order'])
db.session.commit()
return jsonify({
'success': True,
'message': '文件夹已更新'
})
except Exception as e:
db.session.rollback()
return jsonify({'success': False, 'message': f'更新失败:{str(e)}'}), 500
@app.route('/api/folders/<int:folder_id>', methods=['DELETE'])
@login_required
def delete_folder(folder_id):
"""删除文件夹"""
if not isinstance(current_user, User):
return jsonify({'success': False, 'message': '仅普通用户可访问'}), 403
try:
folder = Folder.query.filter_by(
id=folder_id,
user_id=current_user.id
).first()
if not folder:
return jsonify({'success': False, 'message': '文件夹不存在'}), 404
# 删除文件夹(级联删除收藏记录)
db.session.delete(folder)
db.session.commit()
return jsonify({
'success': True,
'message': '文件夹已删除'
})
except Exception as e:
db.session.rollback()
return jsonify({'success': False, 'message': f'删除失败:{str(e)}'}), 500
# ========== 用户中心页面路由 ==========
@app.route('/user/profile')
@login_required
def user_profile():
"""用户中心主页"""
if not isinstance(current_user, User):
flash('仅普通用户可访问', 'error')
return redirect(url_for('index'))
# 统计信息
collections_count = Collection.query.filter_by(user_id=current_user.id).count()
folders_count = Folder.query.filter_by(user_id=current_user.id).count()
# 最近收藏5条
recent_collections = Collection.query.filter_by(
user_id=current_user.id
).order_by(Collection.created_at.desc()).limit(5).all()
return render_template('user/profile.html',
collections_count=collections_count,
folders_count=folders_count,
recent_collections=recent_collections)
@app.route('/user/collections')
@login_required
def user_collections():
"""收藏列表页面"""
if not isinstance(current_user, User):
flash('仅普通用户可访问', 'error')
return redirect(url_for('index'))
# 获取所有文件夹
folders = Folder.query.filter_by(user_id=current_user.id).order_by(
Folder.sort_order.desc(), Folder.created_at
).all()
# 获取收藏(分页)
page = request.args.get('page', 1, type=int)
folder_id = request.args.get('folder_id')
query = Collection.query.filter_by(user_id=current_user.id)
if folder_id:
if folder_id == 'none':
query = query.filter_by(folder_id=None)
else:
query = query.filter_by(folder_id=int(folder_id))
query = query.order_by(Collection.created_at.desc())
pagination = query.paginate(page=page, per_page=20, error_out=False)
return render_template('user/collections.html',
folders=folders,
collections=pagination.items,
pagination=pagination,
current_folder_id=folder_id)
@app.route('/api/user/profile', methods=['PUT'])
@login_required
def update_user_profile():
"""更新用户资料"""
if not isinstance(current_user, User):
return jsonify({'success': False, 'message': '仅普通用户可访问'}), 403
try:
data = request.get_json() or {}
if 'bio' in data:
current_user.bio = data['bio'].strip()
if 'avatar' in data:
current_user.avatar = data['avatar'].strip()
if 'is_public_profile' in data:
current_user.is_public_profile = bool(data['is_public_profile'])
db.session.commit()
return jsonify({
'success': True,
'message': '资料已更新'
})
except Exception as e:
db.session.rollback()
return jsonify({'success': False, 'message': f'更新失败:{str(e)}'}), 500
@app.route('/admin/change-password', methods=['GET', 'POST'])
@login_required
def change_password():
"""修改密码"""
# 只允许管理员访问
if not isinstance(current_user, AdminModel):
flash('无权访问此页面', 'error')
return redirect(url_for('index'))
if request.method == 'POST':
old_password = request.form.get('old_password', '').strip()
new_password = request.form.get('new_password', '').strip()
@@ -643,6 +1250,10 @@ def create_app(config_name='default'):
@login_required
def fetch_website_info():
"""抓取网站信息API"""
# 只允许管理员访问
if not isinstance(current_user, AdminModel):
return jsonify({'success': False, 'message': '无权访问'}), 403
try:
data = request.get_json()
url = data.get('url', '').strip()
@@ -690,6 +1301,10 @@ def create_app(config_name='default'):
@login_required
def upload_logo():
"""上传Logo图片API"""
# 只允许管理员访问
if not isinstance(current_user, AdminModel):
return jsonify({'success': False, 'message': '无权访问'}), 403
try:
# 检查文件是否存在
if 'logo' not in request.files:
@@ -748,6 +1363,10 @@ def create_app(config_name='default'):
@login_required
def generate_features():
"""使用DeepSeek自动生成网站主要功能"""
# 只允许管理员访问
if not isinstance(current_user, AdminModel):
return jsonify({'success': False, 'message': '无权访问'}), 403
try:
data = request.get_json()
name = data.get('name', '').strip()
@@ -790,6 +1409,10 @@ def create_app(config_name='default'):
@login_required
def generate_description():
"""使用DeepSeek自动生成网站详细介绍"""
# 只允许管理员访问
if not isinstance(current_user, AdminModel):
return jsonify({'success': False, 'message': '无权访问'}), 403
try:
data = request.get_json()
name = data.get('name', '').strip()
@@ -832,6 +1455,10 @@ def create_app(config_name='default'):
@login_required
def generate_tags():
"""使用DeepSeek自动生成标签"""
# 只允许管理员访问
if not isinstance(current_user, AdminModel):
return jsonify({'success': False, 'message': '无权访问'}), 403
try:
data = request.get_json()
name = data.get('name', '').strip()
@@ -877,6 +1504,10 @@ def create_app(config_name='default'):
@login_required
def fetch_site_news():
"""为指定网站获取最新新闻"""
# 只允许管理员访问
if not isinstance(current_user, AdminModel):
return jsonify({'success': False, 'message': '无权访问'}), 403
try:
data = request.get_json()
site_id = data.get('site_id')
@@ -970,6 +1601,10 @@ def create_app(config_name='default'):
@login_required
def fetch_all_news():
"""批量为所有网站获取新闻"""
# 只允许管理员访问
if not isinstance(current_user, AdminModel):
return jsonify({'success': False, 'message': '无权访问'}), 403
try:
data = request.get_json()
count_per_site = data.get('count', 5) # 每个网站获取的新闻数量
@@ -1148,6 +1783,11 @@ Sitemap: {}sitemap.xml
@login_required
def seo_tools():
"""SEO工具管理页面"""
# 只允许管理员访问
if not isinstance(current_user, AdminModel):
flash('无权访问此页面', 'error')
return redirect(url_for('index'))
# 检查static/sitemap.xml是否存在及最后更新时间
sitemap_path = 'static/sitemap.xml'
sitemap_info = None
@@ -1168,6 +1808,10 @@ Sitemap: {}sitemap.xml
@login_required
def generate_static_sitemap():
"""生成静态sitemap.xml文件"""
# 只允许管理员访问
if not isinstance(current_user, AdminModel):
return jsonify({'success': False, 'message': '无权访问'}), 403
try:
# 获取所有启用的网站
sites = Site.query.filter_by(is_active=True).order_by(Site.updated_at.desc()).all()
@@ -1240,6 +1884,10 @@ Sitemap: {}sitemap.xml
@login_required
def notify_search_engines():
"""通知搜索引擎sitemap更新"""
# 只允许管理员访问
if not isinstance(current_user, AdminModel):
return jsonify({'success': False, 'message': '无权访问'}), 403
try:
import requests
from urllib.parse import quote
@@ -1401,6 +2049,11 @@ Sitemap: {}sitemap.xml
@login_required
def batch_import():
"""批量导入网站"""
# 只允许管理员访问
if not isinstance(current_user, AdminModel):
flash('无权访问此页面', 'error')
return redirect(url_for('index'))
from utils.bookmark_parser import BookmarkParser
from utils.website_fetcher import WebsiteFetcher
@@ -1636,7 +2289,8 @@ Sitemap: {}sitemap.xml
named_filter_urls = True
def is_accessible(self):
return current_user.is_authenticated
# 只允许Admin类型的用户访问
return current_user.is_authenticated and isinstance(current_user, AdminModel)
def inaccessible_callback(self, name, **kwargs):
return redirect(url_for('admin_login'))
@@ -1644,7 +2298,8 @@ Sitemap: {}sitemap.xml
class SecureAdminIndexView(AdminIndexView):
"""需要登录的管理首页"""
def is_accessible(self):
return current_user.is_authenticated
# 只允许Admin类型的用户访问
return current_user.is_authenticated and isinstance(current_user, AdminModel)
def inaccessible_callback(self, name, **kwargs):
return redirect(url_for('admin_login'))