您好,{current_user.username}!
感谢您注册 ZJPB。请点击下面的按钮验证您的邮箱地址:
或复制以下链接到浏览器:
{verify_url}
此链接将在24小时后失效。
')
def site_detail(code):
"""网站详情页"""
site = Site.query.filter_by(code=code, is_active=True).first_or_404()
# 增加浏览次数
site.view_count += 1
db.session.commit()
# v2.6优化:移除自动调用博查API的逻辑,改为按需加载
# 只获取数据库中已有的新闻,不再自动调用API
# 获取该网站的相关新闻(最多显示5条)
news_list = News.query.filter_by(
site_id=site.id,
is_active=True
).order_by(News.published_at.desc()).limit(5).all()
# 检查是否有新闻,如果没有则标记需要加载
has_news = len(news_list) > 0
# 获取同类工具推荐(通过标签匹配,最多显示4个)
recommended_sites = []
if site.tags:
# 获取有相同标签的其他网站
recommended_sites = Site.query.filter(
Site.id != site.id,
Site.is_active == True,
Site.tags.any(Tag.id.in_([tag.id for tag in site.tags]))
).order_by(Site.view_count.desc()).limit(4).all()
return render_template('detail_new.html', site=site, news_list=news_list, has_news=has_news, recommended_sites=recommended_sites)
# ========== 验证码相关 (v2.6.1优化) ==========
def generate_captcha_image(text):
"""生成验证码图片"""
# 创建图片 (宽120, 高40)
width, height = 120, 40
image = Image.new('RGB', (width, height), color=(255, 255, 255))
draw = ImageDraw.Draw(image)
# 使用默认字体
try:
font = ImageFont.truetype("arial.ttf", 28)
except:
font = ImageFont.load_default()
# 添加随机干扰线
for i in range(3):
x1 = random.randint(0, width)
y1 = random.randint(0, height)
x2 = random.randint(0, width)
y2 = random.randint(0, height)
draw.line([(x1, y1), (x2, y2)], fill=(200, 200, 200), width=1)
# 绘制验证码文字
for i, char in enumerate(text):
x = 10 + i * 25
y = random.randint(5, 12)
color = (random.randint(0, 100), random.randint(0, 100), random.randint(0, 100))
draw.text((x, y), char, font=font, fill=color)
# 添加随机噪点
for _ in range(100):
x = random.randint(0, width - 1)
y = random.randint(0, height - 1)
draw.point((x, y), fill=(random.randint(150, 200), random.randint(150, 200), random.randint(150, 200)))
return image
@app.route('/api/captcha', methods=['GET'])
def get_captcha():
"""生成验证码图片"""
# 生成4位随机验证码
captcha_text = ''.join(random.choices(string.ascii_uppercase + string.digits, k=4))
# 存储到session
session['captcha'] = captcha_text
session['captcha_created'] = datetime.now().timestamp()
# 生成图片
image = generate_captcha_image(captcha_text)
# 转换为字节流
img_io = BytesIO()
image.save(img_io, 'PNG')
img_io.seek(0)
return send_file(img_io, mimetype='image/png')
# ========== 新闻相关API (v2.6优化) ==========
@app.route('/api/fetch-news/', methods=['POST'])
def fetch_news_for_site(code):
"""
按需获取指定网站的新闻
v2.6.1优化:所有手动请求都需要验证码,防止API滥用
"""
try:
# 获取请求数据
data = request.get_json() or {}
captcha_input = data.get('captcha', '').strip().upper()
# 验证验证码
session_captcha = session.get('captcha', '').upper()
captcha_created = session.get('captcha_created', 0)
# 检查验证码是否存在
if not session_captcha:
return jsonify({
'success': False,
'error': '请先获取验证码'
}), 400
# 检查验证码是否过期(5分钟)
if datetime.now().timestamp() - captcha_created > 300:
session.pop('captcha', None)
session.pop('captcha_created', None)
return jsonify({
'success': False,
'error': '验证码已过期,请刷新后重试'
}), 400
# 检查验证码是否正确
if captcha_input != session_captcha:
return jsonify({
'success': False,
'error': '验证码错误,请重新输入'
}), 400
# 验证通过,清除验证码
session.pop('captcha', None)
session.pop('captcha_created', None)
# 查找网站
site = Site.query.filter_by(code=code, is_active=True).first_or_404()
# 检查博查API配置
api_key = app.config.get('BOCHA_API_KEY')
if not api_key:
return jsonify({'success': False, 'error': '博查API未配置'}), 500
# 创建新闻搜索器
searcher = NewsSearcher(api_key)
# 获取新闻(限制5条,一周内的)
news_items = searcher.search_site_news(
site_name=site.name,
site_url=site.url,
news_keywords=site.news_keywords,
count=5,
freshness='oneWeek'
)
# 保存新闻到数据库
new_count = 0
if news_items:
for item in news_items:
# 检查是否已存在(根据URL去重)
existing = News.query.filter_by(
site_id=site.id,
url=item['url']
).first()
if not existing:
news = News(
site_id=site.id,
title=item['title'],
content=item.get('summary') or item.get('snippet', ''),
url=item['url'],
source_name=item.get('site_name', ''),
source_icon=item.get('site_icon', ''),
published_at=item.get('published_at'),
news_type='Search Result',
is_active=True
)
db.session.add(news)
new_count += 1
db.session.commit()
# 返回最新的新闻列表
news_list = News.query.filter_by(
site_id=site.id,
is_active=True
).order_by(News.published_at.desc()).limit(5).all()
# 格式化新闻数据
news_data = []
for news in news_list:
news_data.append({
'title': news.title,
'content': news.content[:200] if news.content else '',
'url': news.url,
'source_name': news.source_name,
'source_icon': news.source_icon,
'published_at': news.published_at.strftime('%Y年%m月%d日') if news.published_at else '未知日期',
'news_type': news.news_type
})
return jsonify({
'success': True,
'news': news_data,
'new_count': new_count,
'message': f'成功获取{new_count}条新资讯' if new_count > 0 else '暂无新资讯'
})
except Exception as e:
print(f"获取新闻失败:{str(e)}")
import traceback
traceback.print_exc()
db.session.rollback()
return jsonify({'success': False, 'error': str(e)}), 500
# ========== 社媒营销路由 (v2.5新增) ==========
@app.route('/api/generate-social-share', methods=['POST'])
def generate_social_share():
"""一键生成社媒分享文案(前台可访问)"""
try:
data = request.get_json() or {}
site_code = (data.get('site_code') or '').strip()
platforms = data.get('platforms') or []
if not site_code:
return jsonify({'success': False, 'message': '请提供网站编码'}), 400
site = Site.query.filter_by(code=site_code, is_active=True).first()
if not site:
return jsonify({'success': False, 'message': '网站不存在或未启用'}), 404
# 允许的平台列表(前端也会限制,这里再做一次兜底)
allowed_platforms = {
'universal',
'xiaohongshu',
'douyin',
'bilibili',
'wechat',
'moments',
'x',
'linkedin'
}
if not isinstance(platforms, list):
platforms = []
platforms = [p for p in platforms if isinstance(p, str)]
platforms = [p.strip().lower() for p in platforms if p.strip()]
platforms = [p for p in platforms if p in allowed_platforms]
# 默认生成通用模板
if not platforms:
platforms = ['universal']
# 组织站点信息
site_info = {
'name': site.name,
'url': site.url,
'short_desc': site.short_desc or '',
'description': (site.description or '')[:1200],
'features': (site.features or '')[:1200],
'tags': [t.name for t in (site.tags or [])]
}
# 尝试使用PromptTemplate + DeepSeek生成
from utils.tag_generator import TagGenerator
generator = TagGenerator()
prompt = PromptTemplate.query.filter_by(key='social_share', is_active=True).first()
generated = {}
if prompt and generator.client:
# 构造提示词变量
tags_text = ','.join(site_info['tags'])
user_prompt = prompt.user_prompt_template.format(
name=site_info['name'],
url=site_info['url'],
short_desc=site_info['short_desc'],
description=site_info['description'],
features=site_info['features'],
tags=tags_text,
platforms=','.join(platforms)
)
try:
resp = generator.client.chat.completions.create(
model='deepseek-chat',
messages=[
{'role': 'system', 'content': prompt.system_prompt},
{'role': 'user', 'content': user_prompt}
],
temperature=0.7,
max_tokens=1200
)
text = (resp.choices[0].message.content or '').strip()
# 约定:模型返回JSON字符串;若不是JSON则回退到纯文本放到universal
import json as _json
try:
parsed = _json.loads(text)
if isinstance(parsed, dict):
for k, v in parsed.items():
if isinstance(k, str) and isinstance(v, str) and k in allowed_platforms:
generated[k] = v.strip()
except Exception:
generated['universal'] = text
except Exception as e:
# AI失败不影响整体,回退模板
print(f"社媒文案AI生成失败:{str(e)}")
# 回退:规则模板(根据各平台字数限制优化)
def build_fallback(p):
name = site_info['name']
url = site_info['url']
short_desc = site_info['short_desc'] or ''
full_desc = site_info['description'] or ''
features = site_info['features'] or ''
tags = site_info['tags'][:6]
tags_line = ' ' + ' '.join([f"#{t}" for t in tags]) if tags else ''
# X (Twitter) - 280字符限制,简洁为主
if p == 'x':
desc = (short_desc or full_desc)[:100]
base = f"🔥 {name}\n\n{desc}\n\n🔗 {url}{tags_line}"
return base[:280]
# LinkedIn - 3000字限制,专业风格
if p == 'linkedin':
desc = (full_desc or short_desc)[:500]
content = f"💡 推荐一个实用工具:{name}\n\n📝 简介:\n{desc}\n\n"
if features:
features_list = features.split('\n')[:3]
content += f"✨ 主要功能:\n" + '\n'.join([f"• {f.strip()}" for f in features_list if f.strip()][:3]) + "\n\n"
content += f"🔗 官网:{url}\n\n"
if tags:
content += f"📌 适用场景:{', '.join(tags)}"
return content[:3000]
# 小红书 - 2000字限制,图文风格
if p == 'xiaohongshu':
desc = (short_desc or full_desc)[:300]
content = f"✨ {name} | 我最近在用的宝藏工具\n\n"
content += f"📌 一句话介绍:\n{desc}\n\n"
if features:
features_list = features.split('\n')[:5]
content += f"🎯 核心功能:\n" + '\n'.join([f"✓ {f.strip()}" for f in features_list if f.strip()][:5]) + "\n\n"
content += f"🔗 体验入口:{url}\n\n{tags_line}"
return content[:2000]
# 抖音 - 2000字限制,短视频文案风格
if p == 'douyin':
desc = (short_desc or full_desc)[:200]
content = f"🔥 发现一个超好用的工具:{name}\n\n"
content += f"💡 {desc}\n\n"
if features:
features_list = features.split('\n')[:3]
content += "✨ 亮点功能:\n" + '\n'.join([f"👉 {f.strip()}" for f in features_list if f.strip()][:3]) + "\n\n"
content += f"🔗 想试试戳:{url}\n\n{tags_line}"
return content[:2000]
# B站 - 20000字限制,专栏风格
if p == 'bilibili':
desc = (full_desc or short_desc)[:800]
content = f"【分享】{name} - 值得一试的优质工具\n\n"
content += f"## 📖 工具简介\n{desc}\n\n"
if features:
content += f"## ✨ 主要功能\n{features}\n\n"
content += f"## 🔗 体验地址\n{url}\n\n"
if tags:
content += f"## 🏷️ 相关标签\n{' / '.join(tags)}"
return content[:20000]
# 微信公众号 - 基本无限制,正式风格
if p == 'wechat':
desc = (full_desc or short_desc)[:600]
content = f"今天给大家分享一个实用工具:{name}\n\n"
content += f"📝 工具介绍\n{desc}\n\n"
if features:
content += f"✨ 核心功能\n{features}\n\n"
content += f"🔗 官方网站\n{url}\n\n"
if tags:
content += f"如果你也在关注「{' · '.join(tags)}」相关的工具,不妨收藏一下这个实用的选择。"
return content
# 朋友圈 - 简短为主
if p == 'moments':
desc = (short_desc or full_desc)[:80]
return f"💡 {name}\n{desc}\n🔗 {url}{tags_line}".strip()
# 通用模板
desc = (full_desc or short_desc)[:300]
return f"{name}\n\n{desc}\n\n{url}{tags_line}".strip()
results = {}
for p in platforms:
results[p] = generated.get(p) or build_fallback(p)
# 统一补充一个通用版本
if 'universal' not in results:
results['universal'] = generated.get('universal') or build_fallback('universal')
return jsonify({
'success': True,
'site': {
'code': site.code,
'name': site.name,
'url': site.url
},
'platforms': platforms,
'content': results
})
except Exception as e:
return jsonify({'success': False, 'message': f'生成失败: {str(e)}'}), 500
# ========== 后台登录路由 ==========
@app.route('/admin/login', methods=['GET', 'POST'])
def admin_login():
"""管理员登录"""
if current_user.is_authenticated:
return redirect(url_for('admin.index'))
if request.method == 'POST':
username = request.form.get('username')
password = request.form.get('password')
admin = AdminModel.query.filter_by(username=username).first()
if admin and admin.check_password(password) and admin.is_active:
login_user(admin)
admin.last_login = datetime.now()
db.session.commit()
return redirect(url_for('admin.index'))
else:
flash('用户名或密码错误', 'error')
return render_template('admin_login.html')
@app.route('/admin/logout')
@login_required
def admin_logout():
"""管理员登出"""
logout_user()
return redirect(url_for('index'))
# ========== 用户认证路由 ==========
@app.route('/register', methods=['GET', 'POST'])
def user_register():
"""用户注册"""
if current_user.is_authenticated:
# 如果已登录,跳转到首页
return redirect(url_for('index'))
if request.method == 'POST':
username = request.form.get('username', '').strip()
password = request.form.get('password', '').strip()
confirm_password = request.form.get('confirm_password', '').strip()
# 验证输入
if not username or not password:
flash('用户名和密码不能为空', 'error')
return render_template('auth/register.html')
if len(username) < 3:
flash('用户名至少3个字符', 'error')
return render_template('auth/register.html')
if len(password) < 6:
flash('密码至少6个字符', 'error')
return render_template('auth/register.html')
if password != confirm_password:
flash('两次输入的密码不一致', 'error')
return render_template('auth/register.html')
# 检查用户名是否已存在
if User.query.filter_by(username=username).first():
flash('该用户名已被注册', 'error')
return render_template('auth/register.html')
# 创建用户
try:
user = User(username=username)
user.set_password(password)
user.last_login = datetime.now()
db.session.add(user)
db.session.commit()
# 自动登录
login_user(user)
flash('注册成功!', 'success')
return redirect(url_for('index'))
except Exception as e:
db.session.rollback()
flash(f'注册失败:{str(e)}', 'error')
return render_template('auth/register.html')
return render_template('auth/register.html')
@app.route('/login', methods=['GET', 'POST'])
def user_login():
"""用户登录"""
if current_user.is_authenticated:
return redirect(url_for('index'))
if request.method == 'POST':
username = request.form.get('username', '').strip()
password = request.form.get('password', '').strip()
if not username or not password:
flash('请输入用户名和密码', 'error')
return render_template('auth/login.html')
# 查找用户
user = User.query.filter_by(username=username).first()
if user and user.check_password(password) and user.is_active:
login_user(user)
user.last_login = datetime.now()
db.session.commit()
# 获取next参数,如果有则跳转,否则跳转首页
next_page = request.args.get('next')
if next_page and next_page.startswith('/'):
return redirect(next_page)
return redirect(url_for('index'))
else:
flash('用户名或密码错误', 'error')
return render_template('auth/login.html')
@app.route('/logout')
@login_required
def user_logout():
"""用户登出"""
logout_user()
return redirect(url_for('index'))
# ========== 用户认证API ==========
@app.route('/api/auth/status', methods=['GET'])
def auth_status():
"""获取登录状态"""
if current_user.is_authenticated:
# 判断是Admin还是User
if isinstance(current_user, User):
return jsonify({
'logged_in': True,
'user_type': 'user',
'username': current_user.username,
'avatar': current_user.avatar,
'id': current_user.id
})
else:
return jsonify({
'logged_in': True,
'user_type': 'admin',
'username': current_user.username,
'id': current_user.id
})
return jsonify({'logged_in': False})
# ========== 收藏功能API ==========
@app.route('/api/collections/toggle', methods=['POST'])
@login_required
def toggle_collection():
"""收藏/取消收藏"""
# 检查是否为普通用户
if not isinstance(current_user, User):
return jsonify({'success': False, 'message': '管理员账号无法使用收藏功能'}), 403
try:
data = request.get_json() or {}
site_code = data.get('site_code', '').strip()
folder_id = data.get('folder_id') # 可选,指定文件夹
if not site_code:
return jsonify({'success': False, 'message': '请提供网站编码'}), 400
# 查找网站
site = Site.query.filter_by(code=site_code, is_active=True).first()
if not site:
return jsonify({'success': False, 'message': '网站不存在'}), 404
# 检查是否已收藏
existing = Collection.query.filter_by(
user_id=current_user.id,
site_id=site.id
).first()
if existing:
# 已收藏,则取消收藏
db.session.delete(existing)
db.session.commit()
return jsonify({
'success': True,
'action': 'removed',
'message': '已取消收藏'
})
else:
# 未收藏,则添加收藏
collection = Collection(
user_id=current_user.id,
site_id=site.id,
folder_id=folder_id
)
db.session.add(collection)
db.session.commit()
return jsonify({
'success': True,
'action': 'added',
'message': '收藏成功',
'collection_id': collection.id
})
except Exception as e:
db.session.rollback()
return jsonify({'success': False, 'message': f'操作失败:{str(e)}'}), 500
@app.route('/api/collections/status/', methods=['GET'])
@login_required
def collection_status(site_code):
"""查询收藏状态"""
if not isinstance(current_user, User):
return jsonify({'is_collected': False})
try:
site = Site.query.filter_by(code=site_code, is_active=True).first()
if not site:
return jsonify({'is_collected': False})
collection = Collection.query.filter_by(
user_id=current_user.id,
site_id=site.id
).first()
return jsonify({
'is_collected': collection is not None,
'collection_id': collection.id if collection else None,
'folder_id': collection.folder_id if collection else None
})
except Exception as e:
return jsonify({'is_collected': False, 'error': str(e)})
@app.route('/api/collections/list', methods=['GET'])
@login_required
def list_collections():
"""获取收藏列表"""
if not isinstance(current_user, User):
return jsonify({'success': False, 'message': '仅普通用户可访问'}), 403
try:
folder_id = request.args.get('folder_id')
page = request.args.get('page', 1, type=int)
per_page = request.args.get('per_page', 20, type=int)
# 构建查询
query = Collection.query.filter_by(user_id=current_user.id)
if folder_id:
if folder_id == 'null' or folder_id == 'none':
query = query.filter_by(folder_id=None)
else:
query = query.filter_by(folder_id=int(folder_id))
# 按创建时间倒序
query = query.order_by(Collection.created_at.desc())
# 分页
pagination = query.paginate(page=page, per_page=per_page, error_out=False)
# 格式化数据
collections_data = []
for collection in pagination.items:
site = collection.site
collections_data.append({
'id': collection.id,
'site_id': site.id,
'site_code': site.code,
'site_name': site.name,
'site_url': site.url,
'site_logo': site.logo,
'site_short_desc': site.short_desc,
'folder_id': collection.folder_id,
'note': collection.note,
'created_at': collection.created_at.strftime('%Y-%m-%d %H:%M:%S') if collection.created_at else None
})
return jsonify({
'success': True,
'collections': collections_data,
'total': pagination.total,
'page': page,
'per_page': per_page,
'has_next': pagination.has_next,
'has_prev': pagination.has_prev
})
except Exception as e:
return jsonify({'success': False, 'message': f'获取失败:{str(e)}'}), 500
@app.route('/api/collections//note', methods=['PUT'])
@login_required
def update_collection_note(collection_id):
"""更新收藏备注"""
if not isinstance(current_user, User):
return jsonify({'success': False, 'message': '仅普通用户可访问'}), 403
try:
data = request.get_json() or {}
note = data.get('note', '').strip()
collection = Collection.query.filter_by(
id=collection_id,
user_id=current_user.id
).first()
if not collection:
return jsonify({'success': False, 'message': '收藏不存在'}), 404
collection.note = note
db.session.commit()
return jsonify({
'success': True,
'message': '备注已更新'
})
except Exception as e:
db.session.rollback()
return jsonify({'success': False, 'message': f'更新失败:{str(e)}'}), 500
@app.route('/api/collections//move', methods=['PUT'])
@login_required
def move_collection(collection_id):
"""移动收藏到其他文件夹"""
if not isinstance(current_user, User):
return jsonify({'success': False, 'message': '仅普通用户可访问'}), 403
try:
data = request.get_json() or {}
folder_id = data.get('folder_id') # None表示移到未分类
collection = Collection.query.filter_by(
id=collection_id,
user_id=current_user.id
).first()
if not collection:
return jsonify({'success': False, 'message': '收藏不存在'}), 404
# 如果指定了文件夹,验证文件夹是否属于当前用户
if folder_id:
folder = Folder.query.filter_by(
id=folder_id,
user_id=current_user.id
).first()
if not folder:
return jsonify({'success': False, 'message': '文件夹不存在'}), 404
collection.folder_id = folder_id
db.session.commit()
return jsonify({
'success': True,
'message': '已移动到指定文件夹'
})
except Exception as e:
db.session.rollback()
return jsonify({'success': False, 'message': f'移动失败:{str(e)}'}), 500
# ========== 文件夹管理API ==========
@app.route('/api/folders', methods=['GET'])
@login_required
def list_folders():
"""获取文件夹列表"""
if not isinstance(current_user, User):
return jsonify({'success': False, 'message': '仅普通用户可访问'}), 403
try:
folders = Folder.query.filter_by(user_id=current_user.id).order_by(
Folder.sort_order.desc(), Folder.created_at
).all()
folders_data = []
for folder in folders:
# 统计文件夹中的收藏数量
count = Collection.query.filter_by(
user_id=current_user.id,
folder_id=folder.id
).count()
folders_data.append({
'id': folder.id,
'name': folder.name,
'description': folder.description,
'icon': folder.icon,
'sort_order': folder.sort_order,
'is_public': folder.is_public,
'public_slug': folder.public_slug,
'count': count,
'created_at': folder.created_at.strftime('%Y-%m-%d %H:%M:%S') if folder.created_at else None
})
return jsonify({
'success': True,
'folders': folders_data
})
except Exception as e:
return jsonify({'success': False, 'message': f'获取失败:{str(e)}'}), 500
@app.route('/api/folders', methods=['POST'])
@login_required
def create_folder():
"""创建文件夹"""
if not isinstance(current_user, User):
return jsonify({'success': False, 'message': '仅普通用户可访问'}), 403
try:
data = request.get_json() or {}
name = data.get('name', '').strip()
description = data.get('description', '').strip()
icon = data.get('icon', '📁').strip()
if not name:
return jsonify({'success': False, 'message': '文件夹名称不能为空'}), 400
# 检查同名文件夹
existing = Folder.query.filter_by(
user_id=current_user.id,
name=name
).first()
if existing:
return jsonify({'success': False, 'message': '该文件夹名称已存在'}), 400
# 创建文件夹
folder = Folder(
user_id=current_user.id,
name=name,
description=description,
icon=icon
)
db.session.add(folder)
db.session.commit()
return jsonify({
'success': True,
'message': '文件夹创建成功',
'folder': {
'id': folder.id,
'name': folder.name,
'description': folder.description,
'icon': folder.icon
}
})
except Exception as e:
db.session.rollback()
return jsonify({'success': False, 'message': f'创建失败:{str(e)}'}), 500
@app.route('/api/folders/', methods=['PUT'])
@login_required
def update_folder(folder_id):
"""更新文件夹"""
if not isinstance(current_user, User):
return jsonify({'success': False, 'message': '仅普通用户可访问'}), 403
try:
data = request.get_json() or {}
folder = Folder.query.filter_by(
id=folder_id,
user_id=current_user.id
).first()
if not folder:
return jsonify({'success': False, 'message': '文件夹不存在'}), 404
# 更新字段
if 'name' in data:
name = data['name'].strip()
if not name:
return jsonify({'success': False, 'message': '文件夹名称不能为空'}), 400
# 检查同名(排除自己)
existing = Folder.query.filter(
Folder.user_id == current_user.id,
Folder.name == name,
Folder.id != folder_id
).first()
if existing:
return jsonify({'success': False, 'message': '该文件夹名称已存在'}), 400
folder.name = name
if 'description' in data:
folder.description = data['description'].strip()
if 'icon' in data:
folder.icon = data['icon'].strip()
if 'sort_order' in data:
folder.sort_order = int(data['sort_order'])
db.session.commit()
return jsonify({
'success': True,
'message': '文件夹已更新'
})
except Exception as e:
db.session.rollback()
return jsonify({'success': False, 'message': f'更新失败:{str(e)}'}), 500
@app.route('/api/folders/', methods=['DELETE'])
@login_required
def delete_folder(folder_id):
"""删除文件夹"""
if not isinstance(current_user, User):
return jsonify({'success': False, 'message': '仅普通用户可访问'}), 403
try:
folder = Folder.query.filter_by(
id=folder_id,
user_id=current_user.id
).first()
if not folder:
return jsonify({'success': False, 'message': '文件夹不存在'}), 404
# 删除文件夹(级联删除收藏记录)
db.session.delete(folder)
db.session.commit()
return jsonify({
'success': True,
'message': '文件夹已删除'
})
except Exception as e:
db.session.rollback()
return jsonify({'success': False, 'message': f'删除失败:{str(e)}'}), 500
# ========== 用户中心页面路由 ==========
@app.route('/user/profile')
@login_required
def user_profile():
"""用户中心主页"""
if not isinstance(current_user, User):
flash('仅普通用户可访问', 'error')
return redirect(url_for('index'))
# 统计信息
collections_count = Collection.query.filter_by(user_id=current_user.id).count()
folders_count = Folder.query.filter_by(user_id=current_user.id).count()
# 最近收藏(5条)
recent_collections = Collection.query.filter_by(
user_id=current_user.id
).order_by(Collection.created_at.desc()).limit(5).all()
return render_template('user/profile.html',
collections_count=collections_count,
folders_count=folders_count,
recent_collections=recent_collections)
@app.route('/user/change-password')
@login_required
def user_change_password_page():
"""修改密码页面"""
if not isinstance(current_user, User):
flash('仅普通用户可访问', 'error')
return redirect(url_for('index'))
return render_template('user/change_password.html')
@app.route('/user/collections')
@login_required
def user_collections():
"""收藏列表页面"""
if not isinstance(current_user, User):
flash('仅普通用户可访问', 'error')
return redirect(url_for('index'))
# 获取所有文件夹
folders = Folder.query.filter_by(user_id=current_user.id).order_by(
Folder.sort_order.desc(), Folder.created_at
).all()
# 为每个文件夹添加收藏计数
for folder in folders:
folder.count = Collection.query.filter_by(
user_id=current_user.id,
folder_id=folder.id
).count()
# 获取收藏(分页)
page = request.args.get('page', 1, type=int)
folder_id = request.args.get('folder_id')
query = Collection.query.filter_by(user_id=current_user.id)
if folder_id:
if folder_id == 'none':
query = query.filter_by(folder_id=None)
else:
query = query.filter_by(folder_id=int(folder_id))
query = query.order_by(Collection.created_at.desc())
pagination = query.paginate(page=page, per_page=20, error_out=False)
return render_template('user/collections.html',
folders=folders,
collections=pagination.items,
pagination=pagination,
current_folder_id=folder_id)
@app.route('/api/user/profile', methods=['PUT'])
@login_required
def update_user_profile():
"""更新用户资料"""
if not isinstance(current_user, User):
return jsonify({'success': False, 'message': '仅普通用户可访问'}), 403
try:
data = request.get_json() or {}
if 'bio' in data:
current_user.bio = data['bio'].strip()
if 'avatar' in data:
current_user.avatar = data['avatar'].strip()
if 'is_public_profile' in data:
current_user.is_public_profile = bool(data['is_public_profile'])
db.session.commit()
return jsonify({
'success': True,
'message': '资料已更新'
})
except Exception as e:
db.session.rollback()
return jsonify({'success': False, 'message': f'更新失败:{str(e)}'}), 500
@app.route('/api/user/change-password', methods=['PUT'])
@login_required
def user_change_password():
"""普通用户修改密码"""
if not isinstance(current_user, User):
return jsonify({'success': False, 'message': '仅普通用户可访问'}), 403
try:
data = request.get_json() or {}
old_password = data.get('old_password', '').strip()
new_password = data.get('new_password', '').strip()
confirm_password = data.get('confirm_password', '').strip()
# 验证旧密码
if not old_password:
return jsonify({'success': False, 'message': '请输入旧密码'}), 400
if not current_user.check_password(old_password):
return jsonify({'success': False, 'message': '旧密码错误'}), 400
# 验证新密码
if not new_password:
return jsonify({'success': False, 'message': '请输入新密码'}), 400
if len(new_password) < 6:
return jsonify({'success': False, 'message': '新密码长度至少6位'}), 400
if new_password != confirm_password:
return jsonify({'success': False, 'message': '两次输入的新密码不一致'}), 400
if old_password == new_password:
return jsonify({'success': False, 'message': '新密码不能与旧密码相同'}), 400
# 更新密码
current_user.set_password(new_password)
db.session.commit()
return jsonify({
'success': True,
'message': '密码修改成功'
})
except Exception as e:
db.session.rollback()
return jsonify({'success': False, 'message': f'修改失败:{str(e)}'}), 500
@app.route('/api/user/email', methods=['PUT'])
@login_required
def update_user_email():
"""更新用户邮箱"""
if not isinstance(current_user, User):
return jsonify({'success': False, 'message': '仅普通用户可访问'}), 403
try:
data = request.get_json() or {}
email = data.get('email', '').strip()
# 验证邮箱格式
if not email:
return jsonify({'success': False, 'message': '请输入邮箱地址'}), 400
import re
email_pattern = r'^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$'
if not re.match(email_pattern, email):
return jsonify({'success': False, 'message': '邮箱格式不正确'}), 400
# 检查邮箱是否已被其他用户使用
existing_user = User.query.filter(
User.email == email,
User.id != current_user.id
).first()
if existing_user:
return jsonify({'success': False, 'message': '该邮箱已被其他用户使用'}), 400
# 更新邮箱
current_user.email = email
# 重置验证状态
current_user.email_verified = False
current_user.email_verified_at = None
current_user.email_verify_token = None
current_user.email_verify_token_expires = None
db.session.commit()
return jsonify({
'success': True,
'message': '邮箱已更新,请验证新邮箱'
})
except Exception as e:
db.session.rollback()
return jsonify({'success': False, 'message': f'更新失败:{str(e)}'}), 500
@app.route('/api/user/send-verify-email', methods=['POST'])
@login_required
def send_verify_email():
"""发送邮箱验证邮件"""
if not isinstance(current_user, User):
return jsonify({'success': False, 'message': '仅普通用户可访问'}), 403
try:
# 检查是否已绑定邮箱
if not current_user.email:
return jsonify({'success': False, 'message': '请先绑定邮箱'}), 400
# 检查是否已验证
if current_user.email_verified:
return jsonify({'success': False, 'message': '邮箱已验证,无需重复验证'}), 400
# 生成验证令牌(32位随机字符串)
token = secrets.token_urlsafe(32)
expires = datetime.now() + timedelta(hours=24)
# 保存令牌
current_user.email_verify_token = token
current_user.email_verify_token_expires = expires
db.session.commit()
# 发送验证邮件
verify_url = url_for('verify_email', token=token, _external=True)
html_content = f"""
验证您的邮箱
您好,{current_user.username}!
感谢您注册 ZJPB。请点击下面的按钮验证您的邮箱地址:
或复制以下链接到浏览器:
{verify_url}
此链接将在24小时后失效。
"""
text_content = f"""
验证您的邮箱
您好,{current_user.username}!
感谢您注册 ZJPB。请访问以下链接验证您的邮箱地址:
{verify_url}
此链接将在24小时后失效。
如果您没有注册 ZJPB,请忽略此邮件。
"""
email_sender = EmailSender()
success = email_sender.send_email(
to_email=current_user.email,
subject='验证您的邮箱 - ZJPB',
html_content=html_content,
text_content=text_content
)
if success:
return jsonify({
'success': True,
'message': '验证邮件已发送,请查收'
})
else:
return jsonify({
'success': False,
'message': '邮件发送失败,请稍后重试'
}), 500
except Exception as e:
db.session.rollback()
return jsonify({'success': False, 'message': f'发送失败:{str(e)}'}), 500
@app.route('/verify-email/')
def verify_email(token):
"""验证邮箱"""
try:
# 查找令牌对应的用户
user = User.query.filter_by(email_verify_token=token).first()
if not user:
flash('验证链接无效', 'error')
return redirect(url_for('index'))
# 检查令牌是否过期
if user.email_verify_token_expires < datetime.now():
flash('验证链接已过期,请重新发送', 'error')
return redirect(url_for('user_profile'))
# 验证成功
user.email_verified = True
user.email_verified_at = datetime.now()
user.email_verify_token = None
user.email_verify_token_expires = None
db.session.commit()
flash('邮箱验证成功!', 'success')
return redirect(url_for('user_profile'))
except Exception as e:
flash(f'验证失败:{str(e)}', 'error')
return redirect(url_for('index'))
@app.route('/admin/change-password', methods=['GET', 'POST'])
@login_required
def change_password():
"""修改密码"""
# 只允许管理员访问
if not isinstance(current_user, AdminModel):
flash('无权访问此页面', 'error')
return redirect(url_for('index'))
if request.method == 'POST':
old_password = request.form.get('old_password', '').strip()
new_password = request.form.get('new_password', '').strip()
confirm_password = request.form.get('confirm_password', '').strip()
# 验证旧密码
if not current_user.check_password(old_password):
flash('旧密码错误', 'error')
return render_template('admin/change_password.html')
# 验证新密码
if not new_password:
flash('新密码不能为空', 'error')
return render_template('admin/change_password.html')
if len(new_password) < 6:
flash('新密码长度至少6位', 'error')
return render_template('admin/change_password.html')
if new_password != confirm_password:
flash('两次输入的新密码不一致', 'error')
return render_template('admin/change_password.html')
if old_password == new_password:
flash('新密码不能与旧密码相同', 'error')
return render_template('admin/change_password.html')
# 修改密码
try:
current_user.set_password(new_password)
db.session.commit()
flash('密码修改成功,请重新登录', 'success')
logout_user()
return redirect(url_for('admin_login'))
except Exception as e:
db.session.rollback()
flash(f'密码修改失败:{str(e)}', 'error')
return render_template('admin/change_password.html')
return render_template('admin/change_password.html')
# ========== API路由 ==========
@app.route('/api/fetch-website-info', methods=['POST'])
@login_required
def fetch_website_info():
"""抓取网站信息API"""
# 只允许管理员访问
if not isinstance(current_user, AdminModel):
return jsonify({'success': False, 'message': '无权访问'}), 403
try:
data = request.get_json()
url = data.get('url', '').strip()
if not url:
return jsonify({
'success': False,
'message': '请提供网站URL'
}), 400
# 创建抓取器
fetcher = WebsiteFetcher(timeout=15)
# 抓取网站信息
info = fetcher.fetch_website_info(url)
if not info:
return jsonify({
'success': False,
'message': '无法获取网站信息,请检查URL是否正确或手动填写'
})
# 下载Logo到本地(如果有)
logo_path = None
if info.get('logo_url'):
logo_path = fetcher.download_logo(info['logo_url'], save_dir='static/logos')
# 如果下载失败,不返回远程URL,让用户手动上传
return jsonify({
'success': True,
'data': {
'name': info.get('name', ''),
'description': info.get('description', ''),
'logo': logo_path if logo_path else ''
}
})
except Exception as e:
return jsonify({
'success': False,
'message': f'抓取失败: {str(e)}'
}), 500
@app.route('/api/upload-logo', methods=['POST'])
@login_required
def upload_logo():
"""上传Logo图片API"""
# 只允许管理员访问
if not isinstance(current_user, AdminModel):
return jsonify({'success': False, 'message': '无权访问'}), 403
try:
# 检查文件是否存在
if 'logo' not in request.files:
return jsonify({
'success': False,
'message': '请选择要上传的图片'
}), 400
file = request.files['logo']
# 检查文件名
if file.filename == '':
return jsonify({
'success': False,
'message': '未选择文件'
}), 400
# 检查文件类型
allowed_extensions = {'png', 'jpg', 'jpeg', 'gif', 'svg', 'ico', 'webp'}
filename = file.filename.lower()
if not any(filename.endswith('.' + ext) for ext in allowed_extensions):
return jsonify({
'success': False,
'message': '不支持的文件格式,请上传图片文件(png, jpg, gif, svg, ico, webp)'
}), 400
# 创建保存目录
save_dir = 'static/logos'
os.makedirs(save_dir, exist_ok=True)
# 生成安全的文件名
import time
import hashlib
ext = os.path.splitext(filename)[1]
timestamp = str(int(time.time() * 1000))
hash_name = hashlib.md5(f"{filename}{timestamp}".encode()).hexdigest()[:16]
safe_filename = f"logo_{hash_name}{ext}"
filepath = os.path.join(save_dir, safe_filename)
# 保存文件
file.save(filepath)
# 返回相对路径
return jsonify({
'success': True,
'path': f'/{filepath.replace(os.sep, "/")}'
})
except Exception as e:
return jsonify({
'success': False,
'message': f'上传失败: {str(e)}'
}), 500
@app.route('/api/generate-features', methods=['POST'])
@login_required
def generate_features():
"""使用DeepSeek自动生成网站主要功能"""
# 只允许管理员访问
if not isinstance(current_user, AdminModel):
return jsonify({'success': False, 'message': '无权访问'}), 403
try:
data = request.get_json()
name = data.get('name', '').strip()
description = data.get('description', '').strip()
url = data.get('url', '').strip()
if not name or not description:
return jsonify({
'success': False,
'message': '请提供网站名称和描述'
}), 400
# 生成功能列表
generator = TagGenerator()
features = generator.generate_features(name, description, url)
if not features:
return jsonify({
'success': False,
'message': 'DeepSeek功能生成失败,请检查API配置'
}), 500
return jsonify({
'success': True,
'features': features
})
except ValueError as e:
return jsonify({
'success': False,
'message': str(e)
}), 400
except Exception as e:
return jsonify({
'success': False,
'message': f'生成失败: {str(e)}'
}), 500
@app.route('/api/generate-description', methods=['POST'])
@login_required
def generate_description():
"""使用DeepSeek自动生成网站详细介绍"""
# 只允许管理员访问
if not isinstance(current_user, AdminModel):
return jsonify({'success': False, 'message': '无权访问'}), 403
try:
data = request.get_json()
name = data.get('name', '').strip()
short_desc = data.get('short_desc', '').strip()
url = data.get('url', '').strip()
if not name:
return jsonify({
'success': False,
'message': '请提供网站名称'
}), 400
# 生成详细介绍
generator = TagGenerator()
description = generator.generate_description(name, short_desc, url)
if not description:
return jsonify({
'success': False,
'message': 'DeepSeek详细介绍生成失败,请检查API配置'
}), 500
return jsonify({
'success': True,
'description': description
})
except ValueError as e:
return jsonify({
'success': False,
'message': str(e)
}), 400
except Exception as e:
return jsonify({
'success': False,
'message': f'生成失败: {str(e)}'
}), 500
@app.route('/api/generate-tags', methods=['POST'])
@login_required
def generate_tags():
"""使用DeepSeek自动生成标签"""
# 只允许管理员访问
if not isinstance(current_user, AdminModel):
return jsonify({'success': False, 'message': '无权访问'}), 403
try:
data = request.get_json()
name = data.get('name', '').strip()
description = data.get('description', '').strip()
if not name or not description:
return jsonify({
'success': False,
'message': '请提供网站名称和描述'
}), 400
# 获取现有标签作为参考
existing_tags = [tag.name for tag in Tag.query.all()]
# 生成标签
generator = TagGenerator()
suggested_tags = generator.generate_tags(name, description, existing_tags)
if not suggested_tags:
return jsonify({
'success': False,
'message': 'DeepSeek标签生成失败,请检查API配置'
}), 500
return jsonify({
'success': True,
'tags': suggested_tags
})
except ValueError as e:
return jsonify({
'success': False,
'message': str(e)
}), 400
except Exception as e:
return jsonify({
'success': False,
'message': f'生成失败: {str(e)}'
}), 500
# ========== 新闻获取路由 ==========
@app.route('/api/fetch-site-news', methods=['POST'])
@login_required
def fetch_site_news():
"""为指定网站获取最新新闻"""
# 只允许管理员访问
if not isinstance(current_user, AdminModel):
return jsonify({'success': False, 'message': '无权访问'}), 403
try:
data = request.get_json()
site_id = data.get('site_id')
count = data.get('count', app.config.get('NEWS_SEARCH_COUNT', 10))
freshness = data.get('freshness', app.config.get('NEWS_SEARCH_FRESHNESS', 'oneMonth'))
if not site_id:
return jsonify({
'success': False,
'message': '请提供网站ID'
}), 400
# 获取网站信息
site = Site.query.get(site_id)
if not site:
return jsonify({
'success': False,
'message': '网站不存在'
}), 404
# 检查博查API配置
api_key = app.config.get('BOCHA_API_KEY')
if not api_key:
return jsonify({
'success': False,
'message': '博查API未配置,请在.env文件中设置BOCHA_API_KEY'
}), 500
# 创建新闻搜索器
searcher = NewsSearcher(api_key)
# 搜索新闻
news_items = searcher.search_site_news(
site_name=site.name,
site_url=site.url,
news_keywords=site.news_keywords, # v2.3新增:使用专用关键词
count=count,
freshness=freshness
)
if not news_items:
return jsonify({
'success': False,
'message': '未找到相关新闻'
}), 404
# 保存新闻到数据库
saved_count = 0
for item in news_items:
# 检查新闻是否已存在(根据URL判断)
existing_news = News.query.filter_by(
site_id=site_id,
url=item['url']
).first()
if not existing_news:
# 创建新闻记录
news = News(
site_id=site_id,
title=item['title'],
content=item.get('summary') or item.get('snippet', ''),
url=item['url'],
source_name=item.get('site_name', ''),
source_icon=item.get('site_icon', ''),
published_at=item.get('published_at'),
news_type='Search Result',
is_active=True
)
db.session.add(news)
saved_count += 1
# 提交事务
db.session.commit()
return jsonify({
'success': True,
'message': f'成功获取并保存 {saved_count} 条新闻',
'total_found': len(news_items),
'saved': saved_count,
'news_items': searcher.format_news_for_display(news_items)
})
except Exception as e:
db.session.rollback()
return jsonify({
'success': False,
'message': f'获取失败: {str(e)}'
}), 500
@app.route('/api/fetch-all-news', methods=['POST'])
@login_required
def fetch_all_news():
"""批量为所有网站获取新闻"""
# 只允许管理员访问
if not isinstance(current_user, AdminModel):
return jsonify({'success': False, 'message': '无权访问'}), 403
try:
data = request.get_json()
count_per_site = data.get('count', 5) # 每个网站获取的新闻数量
freshness = data.get('freshness', app.config.get('NEWS_SEARCH_FRESHNESS', 'oneMonth'))
limit = data.get('limit', 10) # 限制处理的网站数量
# 检查博查API配置
api_key = app.config.get('BOCHA_API_KEY')
if not api_key:
return jsonify({
'success': False,
'message': '博查API未配置,请在.env文件中设置BOCHA_API_KEY'
}), 500
# 获取启用的网站(按更新时间排序,优先处理旧的)
sites = Site.query.filter_by(is_active=True).order_by(Site.updated_at).limit(limit).all()
if not sites:
return jsonify({
'success': False,
'message': '没有可用的网站'
}), 404
# 创建新闻搜索器
searcher = NewsSearcher(api_key)
# 统计信息
total_saved = 0
total_found = 0
processed_sites = []
# 为每个网站获取新闻
for site in sites:
try:
# 搜索新闻
news_items = searcher.search_site_news(
site_name=site.name,
site_url=site.url,
news_keywords=site.news_keywords, # v2.3新增:使用专用关键词
count=count_per_site,
freshness=freshness
)
site_saved = 0
for item in news_items:
# 检查是否已存在
existing_news = News.query.filter_by(
site_id=site.id,
url=item['url']
).first()
if not existing_news:
news = News(
site_id=site.id,
title=item['title'],
content=item.get('summary') or item.get('snippet', ''),
url=item['url'],
source_name=item.get('site_name', ''),
source_icon=item.get('site_icon', ''),
published_at=item.get('published_at'),
news_type='Search Result',
is_active=True
)
db.session.add(news)
site_saved += 1
total_found += len(news_items)
total_saved += site_saved
processed_sites.append({
'id': site.id,
'name': site.name,
'found': len(news_items),
'saved': site_saved
})
except Exception as e:
# 单个网站失败不影响其他网站
processed_sites.append({
'id': site.id,
'name': site.name,
'error': str(e)
})
continue
# 提交事务
db.session.commit()
return jsonify({
'success': True,
'message': f'批量获取完成,共处理 {len(processed_sites)} 个网站',
'total_found': total_found,
'total_saved': total_saved,
'processed_sites': processed_sites
})
except Exception as e:
db.session.rollback()
return jsonify({
'success': False,
'message': f'批量获取失败: {str(e)}'
}), 500
# ========== SEO路由 (v2.4新增) ==========
@app.route('/sitemap.xml')
def sitemap():
"""动态生成sitemap.xml"""
from flask import make_response
# 获取所有启用的网站
sites = Site.query.filter_by(is_active=True).order_by(Site.updated_at.desc()).all()
# 获取所有标签
tags = Tag.query.all()
# 构建XML内容
xml_content = '''
'''
# 首页
xml_content += '''
{}
daily
1.0
'''.format(request.url_root.rstrip('/'))
# 工具详情页
for site in sites:
xml_content += '''
{}
{}
weekly
0.8
'''.format(
request.url_root.rstrip('/') + url_for('site_detail', code=site.code),
site.updated_at.strftime('%Y-%m-%d') if site.updated_at else datetime.now().strftime('%Y-%m-%d')
)
# 标签页
for tag in tags:
xml_content += '''
{}
weekly
0.6
'''.format(request.url_root.rstrip('/') + '/?tag=' + tag.slug)
xml_content += '''
'''
response = make_response(xml_content)
response.headers['Content-Type'] = 'application/xml; charset=utf-8'
return response
@app.route('/robots.txt')
def robots():
"""动态生成robots.txt"""
from flask import make_response
robots_content = '''User-agent: *
Allow: /
Disallow: /admin/
Disallow: /api/
Sitemap: {}sitemap.xml
'''.format(request.url_root)
response = make_response(robots_content)
response.headers['Content-Type'] = 'text/plain; charset=utf-8'
return response
# ========== SEO工具管理路由 (v2.4新增) ==========
@app.route('/admin/seo-tools')
@login_required
def seo_tools():
"""SEO工具管理页面"""
# 只允许管理员访问
if not isinstance(current_user, AdminModel):
flash('无权访问此页面', 'error')
return redirect(url_for('index'))
# 检查static/sitemap.xml是否存在及最后更新时间
sitemap_path = 'static/sitemap.xml'
sitemap_info = None
if os.path.exists(sitemap_path):
import time
mtime = os.path.getmtime(sitemap_path)
sitemap_info = {
'exists': True,
'last_updated': datetime.fromtimestamp(mtime).strftime('%Y-%m-%d %H:%M:%S'),
'size': os.path.getsize(sitemap_path)
}
else:
sitemap_info = {'exists': False}
return render_template('admin/seo_tools.html', sitemap_info=sitemap_info)
@app.route('/api/generate-static-sitemap', methods=['POST'])
@login_required
def generate_static_sitemap():
"""生成静态sitemap.xml文件"""
# 只允许管理员访问
if not isinstance(current_user, AdminModel):
return jsonify({'success': False, 'message': '无权访问'}), 403
try:
# 获取所有启用的网站
sites = Site.query.filter_by(is_active=True).order_by(Site.updated_at.desc()).all()
# 获取所有标签
tags = Tag.query.all()
# 构建XML内容(使用网站配置的域名)
base_url = request.url_root.rstrip('/')
xml_content = '''
'''
# 首页
xml_content += f'''
{base_url}
daily
1.0
'''
# 工具详情页
for site in sites:
xml_content += f'''
{base_url}/site/{site.code}
{site.updated_at.strftime('%Y-%m-%d') if site.updated_at else datetime.now().strftime('%Y-%m-%d')}
weekly
0.8
'''
# 标签页
for tag in tags:
xml_content += f'''
{base_url}/?tag={tag.slug}
weekly
0.6
'''
xml_content += '''
'''
# 保存到static目录
static_dir = 'static'
os.makedirs(static_dir, exist_ok=True)
sitemap_path = os.path.join(static_dir, 'sitemap.xml')
with open(sitemap_path, 'w', encoding='utf-8') as f:
f.write(xml_content)
# 统计信息
total_urls = 1 + len(sites) + len(tags) # 首页 + 工具页 + 标签页
return jsonify({
'success': True,
'message': f'静态sitemap.xml生成成功!共包含 {total_urls} 个URL',
'total_urls': total_urls,
'file_path': sitemap_path,
'timestamp': datetime.now().strftime('%Y-%m-%d %H:%M:%S')
})
except Exception as e:
return jsonify({
'success': False,
'message': f'生成失败: {str(e)}'
}), 500
@app.route('/api/notify-search-engines', methods=['POST'])
@login_required
def notify_search_engines():
"""通知搜索引擎sitemap更新"""
# 只允许管理员访问
if not isinstance(current_user, AdminModel):
return jsonify({'success': False, 'message': '无权访问'}), 403
try:
import requests
from urllib.parse import quote
# 获取sitemap URL(使用当前请求的域名)
sitemap_url = request.url_root.rstrip('/') + '/sitemap.xml'
encoded_sitemap_url = quote(sitemap_url, safe='')
results = []
# 1. 通知Google
google_ping_url = f'http://www.google.com/ping?sitemap={encoded_sitemap_url}'
try:
google_response = requests.get(google_ping_url, timeout=10)
results.append({
'engine': 'Google',
'status': 'success' if google_response.status_code == 200 else 'failed',
'status_code': google_response.status_code,
'message': '提交成功' if google_response.status_code == 200 else f'HTTP {google_response.status_code}'
})
except Exception as e:
results.append({
'engine': 'Google',
'status': 'error',
'message': f'请求失败: {str(e)}'
})
# 2. 通知Baidu
baidu_ping_url = f'http://data.zz.baidu.com/ping?sitemap={encoded_sitemap_url}'
try:
baidu_response = requests.get(baidu_ping_url, timeout=10)
results.append({
'engine': 'Baidu',
'status': 'success' if baidu_response.status_code == 200 else 'failed',
'status_code': baidu_response.status_code,
'message': '提交成功' if baidu_response.status_code == 200 else f'HTTP {baidu_response.status_code}'
})
except Exception as e:
results.append({
'engine': 'Baidu',
'status': 'error',
'message': f'请求失败: {str(e)}'
})
# 3. 通知Bing
bing_ping_url = f'http://www.bing.com/ping?sitemap={encoded_sitemap_url}'
try:
bing_response = requests.get(bing_ping_url, timeout=10)
results.append({
'engine': 'Bing',
'status': 'success' if bing_response.status_code == 200 else 'failed',
'status_code': bing_response.status_code,
'message': '提交成功' if bing_response.status_code == 200 else f'HTTP {bing_response.status_code}'
})
except Exception as e:
results.append({
'engine': 'Bing',
'status': 'error',
'message': f'请求失败: {str(e)}'
})
# 统计成功数量
success_count = sum(1 for r in results if r['status'] == 'success')
return jsonify({
'success': True,
'message': f'已通知 {success_count}/{len(results)} 个搜索引擎',
'sitemap_url': sitemap_url,
'results': results
})
except Exception as e:
return jsonify({
'success': False,
'message': f'通知失败: {str(e)}'
}), 500
@app.route('/api/refresh-site-news/', methods=['POST'])
def refresh_site_news(site_code):
"""手动刷新指定网站的新闻(前台用户可访问)- v2.3新增"""
try:
# 根据code查找网站
site = Site.query.filter_by(code=site_code).first()
if not site:
return jsonify({
'success': False,
'message': '网站不存在'
}), 404
# 检查博查API配置
api_key = app.config.get('BOCHA_API_KEY')
if not api_key:
return jsonify({
'success': False,
'message': '新闻功能未启用'
}), 500
# 创建新闻搜索器
searcher = NewsSearcher(api_key)
# 搜索新闻(获取最新5条)
news_items = searcher.search_site_news(
site_name=site.name,
site_url=site.url,
news_keywords=site.news_keywords, # 使用专用关键词
count=5,
freshness='oneWeek' # 一周内的新闻
)
if not news_items:
return jsonify({
'success': False,
'message': '未找到相关新闻'
}), 404
# 保存新闻到数据库
saved_count = 0
for item in news_items:
# 检查新闻是否已存在(根据URL判断)
existing_news = News.query.filter_by(
site_id=site.id,
url=item['url']
).first()
if not existing_news:
news = News(
site_id=site.id,
title=item['title'],
content=item.get('summary') or item.get('snippet', ''),
url=item['url'],
source_name=item.get('site_name', ''),
source_icon=item.get('site_icon', ''),
published_at=item.get('published_at'),
news_type='Search Result',
is_active=True
)
db.session.add(news)
saved_count += 1
# 提交事务
db.session.commit()
return jsonify({
'success': True,
'message': f'成功获取 {saved_count} 条新资讯',
'total_found': len(news_items),
'saved_count': saved_count
})
except Exception as e:
db.session.rollback()
return jsonify({
'success': False,
'message': f'获取失败: {str(e)}'
}), 500
# ========== 批量导入路由 ==========
@app.route('/admin/batch-import', methods=['GET', 'POST'])
@login_required
def batch_import():
"""批量导入网站"""
# 只允许管理员访问
if not isinstance(current_user, AdminModel):
flash('无权访问此页面', 'error')
return redirect(url_for('index'))
from utils.bookmark_parser import BookmarkParser
from utils.website_fetcher import WebsiteFetcher
results = None
if request.method == 'POST':
import_type = request.form.get('import_type')
auto_activate = request.form.get('auto_activate') == 'on'
parser = BookmarkParser()
fetcher = WebsiteFetcher(timeout=15)
urls_to_import = []
try:
# 解析输入
if import_type == 'url_list':
url_list_text = request.form.get('url_list', '')
urls_to_import = parser.parse_url_list(url_list_text)
elif import_type == 'bookmark_file':
bookmark_file = request.files.get('bookmark_file')
if not bookmark_file:
flash('请选择书签文件', 'error')
return render_template('admin/batch_import.html')
html_content = bookmark_file.read().decode('utf-8', errors='ignore')
all_bookmarks = parser.parse_html_file(html_content)
# 筛选文件夹(如果指定)
folder_filter = request.form.get('folder_filter', '').strip()
if folder_filter:
urls_to_import = [
b for b in all_bookmarks
if folder_filter.lower() in b.get('folder', '').lower()
]
else:
urls_to_import = all_bookmarks
# 批量导入
success_list = []
failed_list = []
for idx, item in enumerate(urls_to_import, 1):
url = item['url']
name = item.get('name', '')
# 为每个URL创建独立的事务
try:
# 1. 检查URL是否已存在
try:
existing = Site.query.filter_by(url=url).first()
if existing:
failed_list.append({
'url': url,
'name': name or existing.name,
'error': f'该URL已存在(网站名称:{existing.name})'
})
continue
except Exception as e:
failed_list.append({
'url': url,
'name': name,
'error': f'检查URL时出错: {str(e)}'
})
continue
# 2. 抓取网站信息(带超时和错误处理)
info = None
try:
info = fetcher.fetch_website_info(url)
except Exception as e:
print(f"抓取 {url} 失败: {str(e)}")
# 抓取失败不是致命错误,继续尝试使用书签名称
# 3. 处理网站信息
if not info or not info.get('name'):
# 如果有书签名称,使用书签名称
if name:
info = {
'name': name,
'description': '',
'logo_url': ''
}
else:
# 尝试从URL提取域名作为名称
from urllib.parse import urlparse
try:
parsed = urlparse(url)
domain = parsed.netloc or parsed.path
if domain:
info = {
'name': domain,
'description': '',
'logo_url': ''
}
else:
failed_list.append({
'url': url,
'name': name,
'error': '无法获取网站信息且没有备用名称'
})
continue
except Exception:
failed_list.append({
'url': url,
'name': name,
'error': '无法获取网站信息且URL解析失败'
})
continue
# 4. 下载Logo到本地(失败不影响导入)
logo_path = None
if info.get('logo_url'):
try:
logo_path = fetcher.download_logo(info['logo_url'], save_dir='static/logos')
except Exception as e:
print(f"下载Logo失败 ({url}): {str(e)}")
# Logo下载失败不影响网站导入
# 5. 生成code和slug
try:
import random
from pypinyin import lazy_pinyin
import re
# 生成唯一的code
site_code = None
max_attempts = 10
for _ in range(max_attempts):
code = str(random.randint(10000000, 99999999))
if not Site.query.filter_by(code=code).first():
site_code = code
break
if not site_code:
# 如果10次都失败,使用时间戳
import time
site_code = str(int(time.time() * 1000))[-8:]
# 生成slug
site_name = info.get('name', name or 'Unknown')[:100]
slug = ''.join(lazy_pinyin(site_name))
slug = slug.lower()
slug = re.sub(r'[^\w\s-]', '', slug)
slug = re.sub(r'[-\s]+', '-', slug).strip('-')
if not slug:
slug = f"site-{site_code}"
# 确保slug唯一
base_slug = slug[:50] # 限制长度
counter = 1
final_slug = slug
while Site.query.filter_by(slug=final_slug).first():
final_slug = f"{base_slug}-{counter}"
counter += 1
if counter > 100: # 防止无限循环
final_slug = f"{base_slug}-{site_code}"
break
# 6. 创建网站记录(带code和slug)
site = Site(
code=site_code,
slug=final_slug,
name=site_name,
url=url[:500], # 限制URL长度
logo=logo_path or info.get('logo_url', '')[:500] if info.get('logo_url') else '',
short_desc=info.get('description', '')[:200] if info.get('description') else '',
description=info.get('description', '')[:2000] if info.get('description') else '',
is_active=auto_activate
)
# 添加到数据库并提交
db.session.add(site)
db.session.commit()
success_list.append({
'name': site.name,
'url': site.url
})
print(f"成功导入 [{idx}/{len(urls_to_import)}]: {site.name}")
except Exception as e:
db.session.rollback()
failed_list.append({
'url': url,
'name': name or info.get('name', 'Unknown'),
'error': f'数据库保存失败: {str(e)}'
})
continue
except Exception as e:
# 捕获所有未预期的错误
db.session.rollback()
failed_list.append({
'url': url,
'name': name,
'error': f'未知错误: {str(e)}'
})
print(f"导入 {url} 时发生未知错误: {str(e)}")
continue
results = {
'total_count': len(urls_to_import),
'success_count': len(success_list),
'failed_count': len(failed_list),
'success_list': success_list,
'failed_list': failed_list
}
if success_list:
flash(f'成功导入 {len(success_list)} 个网站!', 'success')
except Exception as e:
flash(f'导入失败: {str(e)}', 'error')
return render_template('admin/batch_import.html', results=results)
# ========== Flask-Admin 配置 ==========
class SecureModelView(ModelView):
"""需要登录的模型视图"""
# 中文化配置
can_set_page_size = True
page_size = 20
# 自定义文本
list_template = 'admin/model/list.html'
create_template = 'admin/model/create.html'
edit_template = 'admin/model/edit.html'
# 覆盖英文文本
named_filter_urls = True
def is_accessible(self):
# 只允许Admin类型的用户访问
return current_user.is_authenticated and isinstance(current_user, AdminModel)
def inaccessible_callback(self, name, **kwargs):
return redirect(url_for('admin_login'))
class SecureAdminIndexView(AdminIndexView):
"""需要登录的管理首页"""
def is_accessible(self):
# 只允许Admin类型的用户访问
return current_user.is_authenticated and isinstance(current_user, AdminModel)
def inaccessible_callback(self, name, **kwargs):
return redirect(url_for('admin_login'))
@expose('/')
def index(self):
"""控制台首页,显示统计信息"""
# 统计数据
stats = {
'sites_count': Site.query.filter_by(is_active=True).count(),
'tags_count': Tag.query.count(),
'news_count': News.query.filter_by(is_active=True).count(),
'total_views': db.session.query(db.func.sum(Site.view_count)).scalar() or 0
}
# 最近添加的工具(最多5个)
recent_sites = Site.query.order_by(Site.created_at.desc()).limit(5).all()
return self.render('admin/index.html', stats=stats, recent_sites=recent_sites)
# 网站管理视图
class SiteAdmin(SecureModelView):
# 自定义模板
create_template = 'admin/site/create.html'
edit_template = 'admin/site/edit.html'
# 启用编辑和删除
can_edit = True
can_delete = True
can_create = True
can_view_details = False # 禁用查看详情,点击名称即可查看
# 显示操作列
column_display_actions = True
action_disallowed_list = []
column_list = ['id', 'code', 'name', 'url', 'slug', 'is_active', 'is_recommended', 'view_count', 'created_at']
column_searchable_list = ['code', 'name', 'url', 'description']
column_filters = ['is_active', 'is_recommended', 'tags']
column_labels = {
'id': 'ID',
'code': '网站编码',
'name': '网站名称',
'url': 'URL',
'slug': 'URL别名',
'logo': 'Logo',
'short_desc': '简短描述',
'description': '详细介绍',
'features': '主要功能',
'news_keywords': '新闻关键词',
'is_active': '是否启用',
'is_recommended': '是否推荐',
'view_count': '浏览次数',
'sort_order': '排序权重',
'tags': '标签',
'created_at': '创建时间',
'updated_at': '更新时间'
}
form_columns = ['name', 'url', 'slug', 'logo', 'short_desc', 'description', 'features', 'news_keywords', 'tags', 'is_active', 'is_recommended', 'sort_order']
# 自定义编辑/删除文字
column_extra_row_actions = None
def on_model_change(self, form, model, is_created):
"""保存前自动生成code和slug(如果为空)"""
import re
import random
from pypinyin import lazy_pinyin
from flask import request
# 使用no_autoflush防止在查询时触发提前flush
with db.session.no_autoflush:
# 处理手动输入的新标签
new_tags_str = request.form.get('new_tags', '')
if new_tags_str:
new_tag_names = [name.strip() for name in new_tags_str.split(',') if name.strip()]
for tag_name in new_tag_names:
# 检查标签是否已存在
existing_tag = Tag.query.filter_by(name=tag_name).first()
if not existing_tag:
# 创建新标签
tag_slug = ''.join(lazy_pinyin(tag_name))
tag_slug = tag_slug.lower()
tag_slug = re.sub(r'[^\w\s-]', '', tag_slug)
tag_slug = re.sub(r'[-\s]+', '-', tag_slug).strip('-')
# 确保slug唯一
base_tag_slug = tag_slug[:50]
counter = 1
final_tag_slug = tag_slug
while Tag.query.filter_by(slug=final_tag_slug).first():
final_tag_slug = f"{base_tag_slug}-{counter}"
counter += 1
if counter > 100:
final_tag_slug = f"{base_tag_slug}-{random.randint(1000, 9999)}"
break
new_tag = Tag(name=tag_name, slug=final_tag_slug)
db.session.add(new_tag)
db.session.flush() # 确保新标签有ID
# 添加到模型的标签列表
if new_tag not in model.tags:
model.tags.append(new_tag)
else:
# 添加已存在的标签
if existing_tag not in model.tags:
model.tags.append(existing_tag)
# 如果code为空,自动生成唯一的8位数字编码
if not model.code or model.code.strip() == '':
max_attempts = 10
for attempt in range(max_attempts):
# 生成10000000-99999999之间的随机数
code = str(random.randint(10000000, 99999999))
# 检查是否已存在
existing = Site.query.filter(Site.code == code).first()
if not existing or existing.id == model.id:
model.code = code
break
# 如果10次都失败,使用时间戳
if not model.code:
import time
model.code = str(int(time.time() * 1000))[-8:]
# 如果slug为空,从name自动生成
if not model.slug or model.slug.strip() == '':
# 将中文转换为拼音
slug = ''.join(lazy_pinyin(model.name))
# 转换为小写,移除特殊字符
slug = slug.lower()
slug = re.sub(r'[^\w\s-]', '', slug)
slug = re.sub(r'[-\s]+', '-', slug).strip('-')
# 如果转换后为空,使用code
if not slug:
slug = f"site-{model.code}"
# 确保slug唯一(限制长度和重试次数)
base_slug = slug[:50]
counter = 1
final_slug = slug
max_slug_attempts = 100
while counter < max_slug_attempts:
existing = Site.query.filter(Site.slug == final_slug).first()
if not existing or existing.id == model.id:
break
final_slug = f"{base_slug}-{counter}"
counter += 1
# 如果100次都失败,使用code确保唯一
if counter >= max_slug_attempts:
final_slug = f"{base_slug}-{model.code}"
model.slug = final_slug
# 标签管理视图
class TagAdmin(SecureModelView):
can_edit = True
can_delete = True
can_create = True
can_view_details = False
# 显示操作列
column_display_actions = True
column_list = ['id', 'name', 'slug', 'description', 'sort_order']
column_searchable_list = ['name', 'description']
column_labels = {
'id': 'ID',
'name': '标签名称',
'slug': 'URL别名',
'description': '标签描述',
'seo_title': 'SEO标题 (v2.4)',
'seo_description': 'SEO描述 (v2.4)',
'seo_keywords': 'SEO关键词 (v2.4)',
'icon': '图标',
'sort_order': '排序权重',
'created_at': '创建时间'
}
form_columns = ['name', 'slug', 'description', 'seo_title', 'seo_description', 'seo_keywords', 'icon', 'sort_order']
# 管理员视图
class AdminAdmin(SecureModelView):
can_edit = True
can_delete = True
can_create = True
can_view_details = False
# 显示操作列
column_display_actions = True
column_list = ['id', 'username', 'email', 'is_active', 'last_login', 'created_at']
column_searchable_list = ['username', 'email']
column_filters = ['is_active']
column_labels = {
'id': 'ID',
'username': '用户名',
'email': '邮箱',
'is_active': '是否启用',
'created_at': '创建时间',
'last_login': '最后登录'
}
form_columns = ['username', 'email', 'is_active']
def on_model_change(self, form, model, is_created):
# 如果是新建管理员,设置默认密码
if is_created:
model.set_password('admin123') # 默认密码
# 新闻管理视图
class NewsAdmin(SecureModelView):
can_edit = True
can_delete = True
can_create = True
can_view_details = False
# 显示操作列
column_display_actions = True
column_list = ['id', 'site', 'title', 'source_name', 'news_type', 'published_at', 'is_active']
column_searchable_list = ['title', 'content', 'source_name']
column_filters = ['site', 'news_type', 'source_name', 'is_active', 'published_at']
column_labels = {
'id': 'ID',
'site': '关联网站',
'title': '新闻标题',
'content': '新闻内容',
'news_type': '新闻类型',
'url': '新闻链接',
'source_name': '来源网站',
'source_icon': '来源图标',
'published_at': '发布时间',
'is_active': '是否启用',
'created_at': '创建时间',
'updated_at': '更新时间'
}
form_columns = ['site', 'title', 'content', 'news_type', 'url', 'source_name', 'source_icon', 'published_at', 'is_active']
# 可选的新闻类型
form_choices = {
'news_type': [
('Search Result', 'Search Result'),
('Product Update', 'Product Update'),
('Industry News', 'Industry News'),
('Company News', 'Company News'),
('Other', 'Other')
]
}
# 默认排序
column_default_sort = ('published_at', True) # 按发布时间倒序排列
# Prompt模板管理视图
class PromptAdmin(SecureModelView):
can_edit = True
can_delete = False # 不允许删除,避免系统必需的prompt被删除
can_create = True
can_view_details = False
# 显示操作列
column_display_actions = True
column_list = ['id', 'key', 'name', 'description', 'is_active', 'updated_at']
column_searchable_list = ['key', 'name', 'description']
column_filters = ['is_active', 'key']
column_labels = {
'id': 'ID',
'key': '唯一标识',
'name': '模板名称',
'system_prompt': '系统提示词',
'user_prompt_template': '用户提示词模板',
'description': '模板说明',
'is_active': '是否启用',
'created_at': '创建时间',
'updated_at': '更新时间'
}
form_columns = ['key', 'name', 'description', 'system_prompt', 'user_prompt_template', 'is_active']
# 字段说明
column_descriptions = {
'key': '唯一标识,如: tags, features, description',
'system_prompt': 'AI的系统角色设定',
'user_prompt_template': '用户提示词模板,支持变量如 {name}, {description}, {url}',
}
# 表单字段配置
form_widget_args = {
'system_prompt': {
'rows': 3,
'style': 'font-family: monospace;'
},
'user_prompt_template': {
'rows': 20,
'style': 'font-family: monospace;'
}
}
# 初始化 Flask-Admin
admin = Admin(
app,
name='ZJPB 焦提示词',
template_mode='bootstrap4',
index_view=SecureAdminIndexView(name='控制台', url='/admin'),
base_template='admin/master.html'
)
admin.add_view(SiteAdmin(Site, db.session, name='网站管理'))
admin.add_view(TagAdmin(Tag, db.session, name='标签管理'))
admin.add_view(NewsAdmin(News, db.session, name='新闻管理'))
admin.add_view(PromptAdmin(PromptTemplate, db.session, name='Prompt管理'))
admin.add_view(AdminAdmin(AdminModel, db.session, name='管理员', endpoint='admin_users'))
return app
if __name__ == '__main__':
app = create_app(os.getenv('FLASK_ENV', 'development'))
app.run(host='0.0.0.0', port=5000, debug=True)