import os
import markdown
from flask import Flask, render_template, redirect, url_for, request, flash, jsonify
from flask_login import LoginManager, login_user, logout_user, login_required, current_user
from flask_admin import Admin, AdminIndexView, expose
from flask_admin.contrib.sqla import ModelView
from datetime import datetime
from config import config
from models import db, Site, Tag, Admin as AdminModel, News, site_tags, PromptTemplate
from utils.website_fetcher import WebsiteFetcher
from utils.tag_generator import TagGenerator
from utils.news_searcher import NewsSearcher
def create_app(config_name='default'):
"""应用工厂函数"""
app = Flask(__name__)
# 加载配置
app.config.from_object(config[config_name])
# 初始化数据库
db.init_app(app)
# 添加Markdown过滤器
@app.template_filter('markdown')
def markdown_filter(text):
"""将Markdown文本转换为HTML"""
if not text:
return ''
return markdown.markdown(text, extensions=['nl2br', 'fenced_code'])
# v2.4新增: 自动内链过滤器
@app.template_filter('auto_link')
def auto_link_filter(text, current_site_id=None):
"""自动为内容中的工具名称添加链接"""
if not text:
return ''
# 获取所有启用的网站(排除当前网站)
sites = Site.query.filter_by(is_active=True).all()
if current_site_id:
sites = [s for s in sites if s.id != current_site_id]
# 按名称长度降序排序,优先匹配长名称
sites = sorted(sites, key=lambda s: len(s.name), reverse=True)
# 记录已经添加链接的位置,避免重复
linked_sites = set()
for site in sites:
if site.name in text and site.name not in linked_sites:
# 只链接第一次出现的位置
link = f'{site.name}'
text = text.replace(site.name, link, 1)
linked_sites.add(site.name)
return text
# 初始化登录管理
login_manager = LoginManager()
login_manager.init_app(app)
login_manager.login_view = 'admin_login'
login_manager.login_message = '请先登录'
@login_manager.user_loader
def load_user(user_id):
return AdminModel.query.get(int(user_id))
# ========== 前台路由 ==========
@app.route('/')
def index():
"""首页"""
# 获取所有启用的标签
tags = Tag.query.order_by(Tag.sort_order.desc(), Tag.id).all()
# 优化:使用一次SQL查询统计所有标签的网站数量
tag_counts = {}
if tags:
# 使用JOIN查询一次性获取所有标签的网站数量
from sqlalchemy import func
counts_query = db.session.query(
site_tags.c.tag_id,
func.count(site_tags.c.site_id).label('count')
).join(
Site, site_tags.c.site_id == Site.id
).filter(
Site.is_active == True
).group_by(site_tags.c.tag_id).all()
tag_counts = {tag_id: count for tag_id, count in counts_query}
# 获取筛选参数
tag_slug = request.args.get('tag')
search_query = request.args.get('q', '').strip()
current_tab = request.args.get('tab', 'latest') # 默认为"最新"
page = request.args.get('page', 1, type=int)
per_page = 100 # 每页显示100个站点
selected_tag = None
# 构建基础查询
query = Site.query.filter_by(is_active=True)
# 标签筛选
if tag_slug:
selected_tag = Tag.query.filter_by(slug=tag_slug).first()
if selected_tag:
query = query.filter(Site.tags.contains(selected_tag))
else:
sites = []
pagination = None
return render_template('index_new.html', sites=sites, tags=tags,
selected_tag=selected_tag, search_query=search_query,
pagination=pagination, tag_counts=tag_counts,
current_tab=current_tab)
# 搜索功能
if search_query:
# 使用OR条件搜索:网站名称、URL、描述
search_pattern = f'%{search_query}%'
query = query.filter(
db.or_(
Site.name.like(search_pattern),
Site.url.like(search_pattern),
Site.description.like(search_pattern),
Site.short_desc.like(search_pattern)
)
)
# Tab筛选和排序
if current_tab == 'popular':
# 热门:按浏览次数倒序
query = query.order_by(Site.view_count.desc(), Site.id.desc())
elif current_tab == 'recommended':
# 推荐:只显示is_recommended=True的
query = query.filter_by(is_recommended=True).order_by(Site.sort_order.desc(), Site.id.desc())
else:
# 最新:按创建时间倒序(默认)
query = query.order_by(Site.created_at.desc(), Site.id.desc())
# 分页
pagination = query.paginate(page=page, per_page=per_page, error_out=False)
sites = pagination.items
return render_template('index_new.html', sites=sites, tags=tags,
selected_tag=selected_tag, search_query=search_query,
pagination=pagination, tag_counts=tag_counts,
current_tab=current_tab)
@app.route('/site/')
def site_detail(code):
"""网站详情页"""
site = Site.query.filter_by(code=code, is_active=True).first_or_404()
# 增加浏览次数
site.view_count += 1
db.session.commit()
# 智能新闻更新:检查今天是否已更新过新闻
from datetime import date
today = date.today()
# 检查该网站最新一条新闻的创建时间
latest_news = News.query.filter_by(
site_id=site.id
).order_by(News.created_at.desc()).first()
# 判断是否需要更新新闻
need_update = False
if not latest_news:
# 没有任何新闻,需要获取
need_update = True
elif latest_news.created_at.date() < today:
# 最新新闻不是今天创建的,需要更新
need_update = True
# 如果需要更新,自动获取最新新闻
if need_update:
api_key = app.config.get('BOCHA_API_KEY')
if api_key:
try:
# 创建新闻搜索器
searcher = NewsSearcher(api_key)
# 获取新闻(限制3条,一周内的)
news_items = searcher.search_site_news(
site_name=site.name,
site_url=site.url,
news_keywords=site.news_keywords, # v2.3新增:使用专用关键词
count=3,
freshness='oneWeek'
)
# 保存新闻到数据库
if news_items:
for item in news_items:
# 检查是否已存在(根据URL去重)
existing = News.query.filter_by(
site_id=site.id,
url=item['url']
).first()
if not existing:
news = News(
site_id=site.id,
title=item['title'],
content=item.get('summary') or item.get('snippet', ''),
url=item['url'],
source_name=item.get('site_name', ''),
source_icon=item.get('site_icon', ''),
published_at=item.get('published_at'),
news_type='Search Result',
is_active=True
)
db.session.add(news)
db.session.commit()
except Exception as e:
# 获取新闻失败,不影响页面显示
print(f"自动获取新闻失败:{str(e)}")
db.session.rollback()
# 获取该网站的相关新闻(最多显示5条)
news_list = News.query.filter_by(
site_id=site.id,
is_active=True
).order_by(News.published_at.desc()).limit(5).all()
# 获取同类工具推荐(通过标签匹配,最多显示4个)
recommended_sites = []
if site.tags:
# 获取有相同标签的其他网站
recommended_sites = Site.query.filter(
Site.id != site.id,
Site.is_active == True,
Site.tags.any(Tag.id.in_([tag.id for tag in site.tags]))
).order_by(Site.view_count.desc()).limit(4).all()
return render_template('detail_new.html', site=site, news_list=news_list, recommended_sites=recommended_sites)
# ========== 社媒营销路由 (v2.5新增) ==========
@app.route('/api/generate-social-share', methods=['POST'])
def generate_social_share():
"""一键生成社媒分享文案(前台可访问)"""
try:
data = request.get_json() or {}
site_code = (data.get('site_code') or '').strip()
platforms = data.get('platforms') or []
if not site_code:
return jsonify({'success': False, 'message': '请提供网站编码'}), 400
site = Site.query.filter_by(code=site_code, is_active=True).first()
if not site:
return jsonify({'success': False, 'message': '网站不存在或未启用'}), 404
# 允许的平台列表(前端也会限制,这里再做一次兜底)
allowed_platforms = {
'universal',
'xiaohongshu',
'douyin',
'bilibili',
'wechat',
'moments',
'x',
'linkedin'
}
if not isinstance(platforms, list):
platforms = []
platforms = [p for p in platforms if isinstance(p, str)]
platforms = [p.strip().lower() for p in platforms if p.strip()]
platforms = [p for p in platforms if p in allowed_platforms]
# 默认生成通用模板
if not platforms:
platforms = ['universal']
# 组织站点信息
site_info = {
'name': site.name,
'url': site.url,
'short_desc': site.short_desc or '',
'description': (site.description or '')[:1200],
'features': (site.features or '')[:1200],
'tags': [t.name for t in (site.tags or [])]
}
# 尝试使用PromptTemplate + DeepSeek生成
from utils.tag_generator import TagGenerator
generator = TagGenerator()
prompt = PromptTemplate.query.filter_by(key='social_share', is_active=True).first()
generated = {}
if prompt and generator.client:
# 构造提示词变量
tags_text = ','.join(site_info['tags'])
user_prompt = prompt.user_prompt_template.format(
name=site_info['name'],
url=site_info['url'],
short_desc=site_info['short_desc'],
description=site_info['description'],
features=site_info['features'],
tags=tags_text,
platforms=','.join(platforms)
)
try:
resp = generator.client.chat.completions.create(
model='deepseek-chat',
messages=[
{'role': 'system', 'content': prompt.system_prompt},
{'role': 'user', 'content': user_prompt}
],
temperature=0.7,
max_tokens=1200
)
text = (resp.choices[0].message.content or '').strip()
# 约定:模型返回JSON字符串;若不是JSON则回退到纯文本放到universal
import json as _json
try:
parsed = _json.loads(text)
if isinstance(parsed, dict):
for k, v in parsed.items():
if isinstance(k, str) and isinstance(v, str) and k in allowed_platforms:
generated[k] = v.strip()
except Exception:
generated['universal'] = text
except Exception as e:
# AI失败不影响整体,回退模板
print(f"社媒文案AI生成失败:{str(e)}")
# 回退:规则模板(根据各平台字数限制优化)
def build_fallback(p):
name = site_info['name']
url = site_info['url']
short_desc = site_info['short_desc'] or ''
full_desc = site_info['description'] or ''
features = site_info['features'] or ''
tags = site_info['tags'][:6]
tags_line = ' ' + ' '.join([f"#{t}" for t in tags]) if tags else ''
# X (Twitter) - 280字符限制,简洁为主
if p == 'x':
desc = (short_desc or full_desc)[:100]
base = f"🔥 {name}\n\n{desc}\n\n🔗 {url}{tags_line}"
return base[:280]
# LinkedIn - 3000字限制,专业风格
if p == 'linkedin':
desc = (full_desc or short_desc)[:500]
content = f"💡 推荐一个实用工具:{name}\n\n📝 简介:\n{desc}\n\n"
if features:
features_list = features.split('\n')[:3]
content += f"✨ 主要功能:\n" + '\n'.join([f"• {f.strip()}" for f in features_list if f.strip()][:3]) + "\n\n"
content += f"🔗 官网:{url}\n\n"
if tags:
content += f"📌 适用场景:{', '.join(tags)}"
return content[:3000]
# 小红书 - 2000字限制,图文风格
if p == 'xiaohongshu':
desc = (short_desc or full_desc)[:300]
content = f"✨ {name} | 我最近在用的宝藏工具\n\n"
content += f"📌 一句话介绍:\n{desc}\n\n"
if features:
features_list = features.split('\n')[:5]
content += f"🎯 核心功能:\n" + '\n'.join([f"✓ {f.strip()}" for f in features_list if f.strip()][:5]) + "\n\n"
content += f"🔗 体验入口:{url}\n\n{tags_line}"
return content[:2000]
# 抖音 - 2000字限制,短视频文案风格
if p == 'douyin':
desc = (short_desc or full_desc)[:200]
content = f"🔥 发现一个超好用的工具:{name}\n\n"
content += f"💡 {desc}\n\n"
if features:
features_list = features.split('\n')[:3]
content += "✨ 亮点功能:\n" + '\n'.join([f"👉 {f.strip()}" for f in features_list if f.strip()][:3]) + "\n\n"
content += f"🔗 想试试戳:{url}\n\n{tags_line}"
return content[:2000]
# B站 - 20000字限制,专栏风格
if p == 'bilibili':
desc = (full_desc or short_desc)[:800]
content = f"【分享】{name} - 值得一试的优质工具\n\n"
content += f"## 📖 工具简介\n{desc}\n\n"
if features:
content += f"## ✨ 主要功能\n{features}\n\n"
content += f"## 🔗 体验地址\n{url}\n\n"
if tags:
content += f"## 🏷️ 相关标签\n{' / '.join(tags)}"
return content[:20000]
# 微信公众号 - 基本无限制,正式风格
if p == 'wechat':
desc = (full_desc or short_desc)[:600]
content = f"今天给大家分享一个实用工具:{name}\n\n"
content += f"📝 工具介绍\n{desc}\n\n"
if features:
content += f"✨ 核心功能\n{features}\n\n"
content += f"🔗 官方网站\n{url}\n\n"
if tags:
content += f"如果你也在关注「{' · '.join(tags)}」相关的工具,不妨收藏一下这个实用的选择。"
return content
# 朋友圈 - 简短为主
if p == 'moments':
desc = (short_desc or full_desc)[:80]
return f"💡 {name}\n{desc}\n🔗 {url}{tags_line}".strip()
# 通用模板
desc = (full_desc or short_desc)[:300]
return f"{name}\n\n{desc}\n\n{url}{tags_line}".strip()
results = {}
for p in platforms:
results[p] = generated.get(p) or build_fallback(p)
# 统一补充一个通用版本
if 'universal' not in results:
results['universal'] = generated.get('universal') or build_fallback('universal')
return jsonify({
'success': True,
'site': {
'code': site.code,
'name': site.name,
'url': site.url
},
'platforms': platforms,
'content': results
})
except Exception as e:
return jsonify({'success': False, 'message': f'生成失败: {str(e)}'}), 500
# ========== 后台登录路由 ==========
@app.route('/admin/login', methods=['GET', 'POST'])
def admin_login():
"""管理员登录"""
if current_user.is_authenticated:
return redirect(url_for('admin.index'))
if request.method == 'POST':
username = request.form.get('username')
password = request.form.get('password')
admin = AdminModel.query.filter_by(username=username).first()
if admin and admin.check_password(password) and admin.is_active:
login_user(admin)
admin.last_login = datetime.now()
db.session.commit()
return redirect(url_for('admin.index'))
else:
flash('用户名或密码错误', 'error')
return render_template('admin_login.html')
@app.route('/admin/logout')
@login_required
def admin_logout():
"""管理员登出"""
logout_user()
return redirect(url_for('index'))
@app.route('/admin/change-password', methods=['GET', 'POST'])
@login_required
def change_password():
"""修改密码"""
if request.method == 'POST':
old_password = request.form.get('old_password', '').strip()
new_password = request.form.get('new_password', '').strip()
confirm_password = request.form.get('confirm_password', '').strip()
# 验证旧密码
if not current_user.check_password(old_password):
flash('旧密码错误', 'error')
return render_template('admin/change_password.html')
# 验证新密码
if not new_password:
flash('新密码不能为空', 'error')
return render_template('admin/change_password.html')
if len(new_password) < 6:
flash('新密码长度至少6位', 'error')
return render_template('admin/change_password.html')
if new_password != confirm_password:
flash('两次输入的新密码不一致', 'error')
return render_template('admin/change_password.html')
if old_password == new_password:
flash('新密码不能与旧密码相同', 'error')
return render_template('admin/change_password.html')
# 修改密码
try:
current_user.set_password(new_password)
db.session.commit()
flash('密码修改成功,请重新登录', 'success')
logout_user()
return redirect(url_for('admin_login'))
except Exception as e:
db.session.rollback()
flash(f'密码修改失败:{str(e)}', 'error')
return render_template('admin/change_password.html')
return render_template('admin/change_password.html')
# ========== API路由 ==========
@app.route('/api/fetch-website-info', methods=['POST'])
@login_required
def fetch_website_info():
"""抓取网站信息API"""
try:
data = request.get_json()
url = data.get('url', '').strip()
if not url:
return jsonify({
'success': False,
'message': '请提供网站URL'
}), 400
# 创建抓取器
fetcher = WebsiteFetcher(timeout=15)
# 抓取网站信息
info = fetcher.fetch_website_info(url)
if not info:
return jsonify({
'success': False,
'message': '无法获取网站信息,请检查URL是否正确或手动填写'
})
# 下载Logo到本地(如果有)
logo_path = None
if info.get('logo_url'):
logo_path = fetcher.download_logo(info['logo_url'], save_dir='static/logos')
# 如果下载失败,不返回远程URL,让用户手动上传
return jsonify({
'success': True,
'data': {
'name': info.get('name', ''),
'description': info.get('description', ''),
'logo': logo_path if logo_path else ''
}
})
except Exception as e:
return jsonify({
'success': False,
'message': f'抓取失败: {str(e)}'
}), 500
@app.route('/api/upload-logo', methods=['POST'])
@login_required
def upload_logo():
"""上传Logo图片API"""
try:
# 检查文件是否存在
if 'logo' not in request.files:
return jsonify({
'success': False,
'message': '请选择要上传的图片'
}), 400
file = request.files['logo']
# 检查文件名
if file.filename == '':
return jsonify({
'success': False,
'message': '未选择文件'
}), 400
# 检查文件类型
allowed_extensions = {'png', 'jpg', 'jpeg', 'gif', 'svg', 'ico', 'webp'}
filename = file.filename.lower()
if not any(filename.endswith('.' + ext) for ext in allowed_extensions):
return jsonify({
'success': False,
'message': '不支持的文件格式,请上传图片文件(png, jpg, gif, svg, ico, webp)'
}), 400
# 创建保存目录
save_dir = 'static/logos'
os.makedirs(save_dir, exist_ok=True)
# 生成安全的文件名
import time
import hashlib
ext = os.path.splitext(filename)[1]
timestamp = str(int(time.time() * 1000))
hash_name = hashlib.md5(f"{filename}{timestamp}".encode()).hexdigest()[:16]
safe_filename = f"logo_{hash_name}{ext}"
filepath = os.path.join(save_dir, safe_filename)
# 保存文件
file.save(filepath)
# 返回相对路径
return jsonify({
'success': True,
'path': f'/{filepath.replace(os.sep, "/")}'
})
except Exception as e:
return jsonify({
'success': False,
'message': f'上传失败: {str(e)}'
}), 500
@app.route('/api/generate-features', methods=['POST'])
@login_required
def generate_features():
"""使用DeepSeek自动生成网站主要功能"""
try:
data = request.get_json()
name = data.get('name', '').strip()
description = data.get('description', '').strip()
url = data.get('url', '').strip()
if not name or not description:
return jsonify({
'success': False,
'message': '请提供网站名称和描述'
}), 400
# 生成功能列表
generator = TagGenerator()
features = generator.generate_features(name, description, url)
if not features:
return jsonify({
'success': False,
'message': 'DeepSeek功能生成失败,请检查API配置'
}), 500
return jsonify({
'success': True,
'features': features
})
except ValueError as e:
return jsonify({
'success': False,
'message': str(e)
}), 400
except Exception as e:
return jsonify({
'success': False,
'message': f'生成失败: {str(e)}'
}), 500
@app.route('/api/generate-description', methods=['POST'])
@login_required
def generate_description():
"""使用DeepSeek自动生成网站详细介绍"""
try:
data = request.get_json()
name = data.get('name', '').strip()
short_desc = data.get('short_desc', '').strip()
url = data.get('url', '').strip()
if not name:
return jsonify({
'success': False,
'message': '请提供网站名称'
}), 400
# 生成详细介绍
generator = TagGenerator()
description = generator.generate_description(name, short_desc, url)
if not description:
return jsonify({
'success': False,
'message': 'DeepSeek详细介绍生成失败,请检查API配置'
}), 500
return jsonify({
'success': True,
'description': description
})
except ValueError as e:
return jsonify({
'success': False,
'message': str(e)
}), 400
except Exception as e:
return jsonify({
'success': False,
'message': f'生成失败: {str(e)}'
}), 500
@app.route('/api/generate-tags', methods=['POST'])
@login_required
def generate_tags():
"""使用DeepSeek自动生成标签"""
try:
data = request.get_json()
name = data.get('name', '').strip()
description = data.get('description', '').strip()
if not name or not description:
return jsonify({
'success': False,
'message': '请提供网站名称和描述'
}), 400
# 获取现有标签作为参考
existing_tags = [tag.name for tag in Tag.query.all()]
# 生成标签
generator = TagGenerator()
suggested_tags = generator.generate_tags(name, description, existing_tags)
if not suggested_tags:
return jsonify({
'success': False,
'message': 'DeepSeek标签生成失败,请检查API配置'
}), 500
return jsonify({
'success': True,
'tags': suggested_tags
})
except ValueError as e:
return jsonify({
'success': False,
'message': str(e)
}), 400
except Exception as e:
return jsonify({
'success': False,
'message': f'生成失败: {str(e)}'
}), 500
# ========== 新闻获取路由 ==========
@app.route('/api/fetch-site-news', methods=['POST'])
@login_required
def fetch_site_news():
"""为指定网站获取最新新闻"""
try:
data = request.get_json()
site_id = data.get('site_id')
count = data.get('count', app.config.get('NEWS_SEARCH_COUNT', 10))
freshness = data.get('freshness', app.config.get('NEWS_SEARCH_FRESHNESS', 'oneMonth'))
if not site_id:
return jsonify({
'success': False,
'message': '请提供网站ID'
}), 400
# 获取网站信息
site = Site.query.get(site_id)
if not site:
return jsonify({
'success': False,
'message': '网站不存在'
}), 404
# 检查博查API配置
api_key = app.config.get('BOCHA_API_KEY')
if not api_key:
return jsonify({
'success': False,
'message': '博查API未配置,请在.env文件中设置BOCHA_API_KEY'
}), 500
# 创建新闻搜索器
searcher = NewsSearcher(api_key)
# 搜索新闻
news_items = searcher.search_site_news(
site_name=site.name,
site_url=site.url,
news_keywords=site.news_keywords, # v2.3新增:使用专用关键词
count=count,
freshness=freshness
)
if not news_items:
return jsonify({
'success': False,
'message': '未找到相关新闻'
}), 404
# 保存新闻到数据库
saved_count = 0
for item in news_items:
# 检查新闻是否已存在(根据URL判断)
existing_news = News.query.filter_by(
site_id=site_id,
url=item['url']
).first()
if not existing_news:
# 创建新闻记录
news = News(
site_id=site_id,
title=item['title'],
content=item.get('summary') or item.get('snippet', ''),
url=item['url'],
source_name=item.get('site_name', ''),
source_icon=item.get('site_icon', ''),
published_at=item.get('published_at'),
news_type='Search Result',
is_active=True
)
db.session.add(news)
saved_count += 1
# 提交事务
db.session.commit()
return jsonify({
'success': True,
'message': f'成功获取并保存 {saved_count} 条新闻',
'total_found': len(news_items),
'saved': saved_count,
'news_items': searcher.format_news_for_display(news_items)
})
except Exception as e:
db.session.rollback()
return jsonify({
'success': False,
'message': f'获取失败: {str(e)}'
}), 500
@app.route('/api/fetch-all-news', methods=['POST'])
@login_required
def fetch_all_news():
"""批量为所有网站获取新闻"""
try:
data = request.get_json()
count_per_site = data.get('count', 5) # 每个网站获取的新闻数量
freshness = data.get('freshness', app.config.get('NEWS_SEARCH_FRESHNESS', 'oneMonth'))
limit = data.get('limit', 10) # 限制处理的网站数量
# 检查博查API配置
api_key = app.config.get('BOCHA_API_KEY')
if not api_key:
return jsonify({
'success': False,
'message': '博查API未配置,请在.env文件中设置BOCHA_API_KEY'
}), 500
# 获取启用的网站(按更新时间排序,优先处理旧的)
sites = Site.query.filter_by(is_active=True).order_by(Site.updated_at).limit(limit).all()
if not sites:
return jsonify({
'success': False,
'message': '没有可用的网站'
}), 404
# 创建新闻搜索器
searcher = NewsSearcher(api_key)
# 统计信息
total_saved = 0
total_found = 0
processed_sites = []
# 为每个网站获取新闻
for site in sites:
try:
# 搜索新闻
news_items = searcher.search_site_news(
site_name=site.name,
site_url=site.url,
news_keywords=site.news_keywords, # v2.3新增:使用专用关键词
count=count_per_site,
freshness=freshness
)
site_saved = 0
for item in news_items:
# 检查是否已存在
existing_news = News.query.filter_by(
site_id=site.id,
url=item['url']
).first()
if not existing_news:
news = News(
site_id=site.id,
title=item['title'],
content=item.get('summary') or item.get('snippet', ''),
url=item['url'],
source_name=item.get('site_name', ''),
source_icon=item.get('site_icon', ''),
published_at=item.get('published_at'),
news_type='Search Result',
is_active=True
)
db.session.add(news)
site_saved += 1
total_found += len(news_items)
total_saved += site_saved
processed_sites.append({
'id': site.id,
'name': site.name,
'found': len(news_items),
'saved': site_saved
})
except Exception as e:
# 单个网站失败不影响其他网站
processed_sites.append({
'id': site.id,
'name': site.name,
'error': str(e)
})
continue
# 提交事务
db.session.commit()
return jsonify({
'success': True,
'message': f'批量获取完成,共处理 {len(processed_sites)} 个网站',
'total_found': total_found,
'total_saved': total_saved,
'processed_sites': processed_sites
})
except Exception as e:
db.session.rollback()
return jsonify({
'success': False,
'message': f'批量获取失败: {str(e)}'
}), 500
# ========== SEO路由 (v2.4新增) ==========
@app.route('/sitemap.xml')
def sitemap():
"""动态生成sitemap.xml"""
from flask import make_response
# 获取所有启用的网站
sites = Site.query.filter_by(is_active=True).order_by(Site.updated_at.desc()).all()
# 获取所有标签
tags = Tag.query.all()
# 构建XML内容
xml_content = '''
'''
# 首页
xml_content += '''
{}
daily
1.0
'''.format(request.url_root.rstrip('/'))
# 工具详情页
for site in sites:
xml_content += '''
{}
{}
weekly
0.8
'''.format(
request.url_root.rstrip('/') + url_for('site_detail', code=site.code),
site.updated_at.strftime('%Y-%m-%d') if site.updated_at else datetime.now().strftime('%Y-%m-%d')
)
# 标签页
for tag in tags:
xml_content += '''
{}
weekly
0.6
'''.format(request.url_root.rstrip('/') + '/?tag=' + tag.slug)
xml_content += '''
'''
response = make_response(xml_content)
response.headers['Content-Type'] = 'application/xml; charset=utf-8'
return response
@app.route('/robots.txt')
def robots():
"""动态生成robots.txt"""
from flask import make_response
robots_content = '''User-agent: *
Allow: /
Disallow: /admin/
Disallow: /api/
Sitemap: {}sitemap.xml
'''.format(request.url_root)
response = make_response(robots_content)
response.headers['Content-Type'] = 'text/plain; charset=utf-8'
return response
# ========== SEO工具管理路由 (v2.4新增) ==========
@app.route('/admin/seo-tools')
@login_required
def seo_tools():
"""SEO工具管理页面"""
# 检查static/sitemap.xml是否存在及最后更新时间
sitemap_path = 'static/sitemap.xml'
sitemap_info = None
if os.path.exists(sitemap_path):
import time
mtime = os.path.getmtime(sitemap_path)
sitemap_info = {
'exists': True,
'last_updated': datetime.fromtimestamp(mtime).strftime('%Y-%m-%d %H:%M:%S'),
'size': os.path.getsize(sitemap_path)
}
else:
sitemap_info = {'exists': False}
return render_template('admin/seo_tools.html', sitemap_info=sitemap_info)
@app.route('/api/generate-static-sitemap', methods=['POST'])
@login_required
def generate_static_sitemap():
"""生成静态sitemap.xml文件"""
try:
# 获取所有启用的网站
sites = Site.query.filter_by(is_active=True).order_by(Site.updated_at.desc()).all()
# 获取所有标签
tags = Tag.query.all()
# 构建XML内容(使用网站配置的域名)
base_url = request.url_root.rstrip('/')
xml_content = '''
'''
# 首页
xml_content += f'''
{base_url}
daily
1.0
'''
# 工具详情页
for site in sites:
xml_content += f'''
{base_url}/site/{site.code}
{site.updated_at.strftime('%Y-%m-%d') if site.updated_at else datetime.now().strftime('%Y-%m-%d')}
weekly
0.8
'''
# 标签页
for tag in tags:
xml_content += f'''
{base_url}/?tag={tag.slug}
weekly
0.6
'''
xml_content += '''
'''
# 保存到static目录
static_dir = 'static'
os.makedirs(static_dir, exist_ok=True)
sitemap_path = os.path.join(static_dir, 'sitemap.xml')
with open(sitemap_path, 'w', encoding='utf-8') as f:
f.write(xml_content)
# 统计信息
total_urls = 1 + len(sites) + len(tags) # 首页 + 工具页 + 标签页
return jsonify({
'success': True,
'message': f'静态sitemap.xml生成成功!共包含 {total_urls} 个URL',
'total_urls': total_urls,
'file_path': sitemap_path,
'timestamp': datetime.now().strftime('%Y-%m-%d %H:%M:%S')
})
except Exception as e:
return jsonify({
'success': False,
'message': f'生成失败: {str(e)}'
}), 500
@app.route('/api/notify-search-engines', methods=['POST'])
@login_required
def notify_search_engines():
"""通知搜索引擎sitemap更新"""
try:
import requests
from urllib.parse import quote
# 获取sitemap URL(使用当前请求的域名)
sitemap_url = request.url_root.rstrip('/') + '/sitemap.xml'
encoded_sitemap_url = quote(sitemap_url, safe='')
results = []
# 1. 通知Google
google_ping_url = f'http://www.google.com/ping?sitemap={encoded_sitemap_url}'
try:
google_response = requests.get(google_ping_url, timeout=10)
results.append({
'engine': 'Google',
'status': 'success' if google_response.status_code == 200 else 'failed',
'status_code': google_response.status_code,
'message': '提交成功' if google_response.status_code == 200 else f'HTTP {google_response.status_code}'
})
except Exception as e:
results.append({
'engine': 'Google',
'status': 'error',
'message': f'请求失败: {str(e)}'
})
# 2. 通知Baidu
baidu_ping_url = f'http://data.zz.baidu.com/ping?sitemap={encoded_sitemap_url}'
try:
baidu_response = requests.get(baidu_ping_url, timeout=10)
results.append({
'engine': 'Baidu',
'status': 'success' if baidu_response.status_code == 200 else 'failed',
'status_code': baidu_response.status_code,
'message': '提交成功' if baidu_response.status_code == 200 else f'HTTP {baidu_response.status_code}'
})
except Exception as e:
results.append({
'engine': 'Baidu',
'status': 'error',
'message': f'请求失败: {str(e)}'
})
# 3. 通知Bing
bing_ping_url = f'http://www.bing.com/ping?sitemap={encoded_sitemap_url}'
try:
bing_response = requests.get(bing_ping_url, timeout=10)
results.append({
'engine': 'Bing',
'status': 'success' if bing_response.status_code == 200 else 'failed',
'status_code': bing_response.status_code,
'message': '提交成功' if bing_response.status_code == 200 else f'HTTP {bing_response.status_code}'
})
except Exception as e:
results.append({
'engine': 'Bing',
'status': 'error',
'message': f'请求失败: {str(e)}'
})
# 统计成功数量
success_count = sum(1 for r in results if r['status'] == 'success')
return jsonify({
'success': True,
'message': f'已通知 {success_count}/{len(results)} 个搜索引擎',
'sitemap_url': sitemap_url,
'results': results
})
except Exception as e:
return jsonify({
'success': False,
'message': f'通知失败: {str(e)}'
}), 500
@app.route('/api/refresh-site-news/', methods=['POST'])
def refresh_site_news(site_code):
"""手动刷新指定网站的新闻(前台用户可访问)- v2.3新增"""
try:
# 根据code查找网站
site = Site.query.filter_by(code=site_code).first()
if not site:
return jsonify({
'success': False,
'message': '网站不存在'
}), 404
# 检查博查API配置
api_key = app.config.get('BOCHA_API_KEY')
if not api_key:
return jsonify({
'success': False,
'message': '新闻功能未启用'
}), 500
# 创建新闻搜索器
searcher = NewsSearcher(api_key)
# 搜索新闻(获取最新5条)
news_items = searcher.search_site_news(
site_name=site.name,
site_url=site.url,
news_keywords=site.news_keywords, # 使用专用关键词
count=5,
freshness='oneWeek' # 一周内的新闻
)
if not news_items:
return jsonify({
'success': False,
'message': '未找到相关新闻'
}), 404
# 保存新闻到数据库
saved_count = 0
for item in news_items:
# 检查新闻是否已存在(根据URL判断)
existing_news = News.query.filter_by(
site_id=site.id,
url=item['url']
).first()
if not existing_news:
news = News(
site_id=site.id,
title=item['title'],
content=item.get('summary') or item.get('snippet', ''),
url=item['url'],
source_name=item.get('site_name', ''),
source_icon=item.get('site_icon', ''),
published_at=item.get('published_at'),
news_type='Search Result',
is_active=True
)
db.session.add(news)
saved_count += 1
# 提交事务
db.session.commit()
return jsonify({
'success': True,
'message': f'成功获取 {saved_count} 条新资讯',
'total_found': len(news_items),
'saved_count': saved_count
})
except Exception as e:
db.session.rollback()
return jsonify({
'success': False,
'message': f'获取失败: {str(e)}'
}), 500
# ========== 批量导入路由 ==========
@app.route('/admin/batch-import', methods=['GET', 'POST'])
@login_required
def batch_import():
"""批量导入网站"""
from utils.bookmark_parser import BookmarkParser
from utils.website_fetcher import WebsiteFetcher
results = None
if request.method == 'POST':
import_type = request.form.get('import_type')
auto_activate = request.form.get('auto_activate') == 'on'
parser = BookmarkParser()
fetcher = WebsiteFetcher(timeout=15)
urls_to_import = []
try:
# 解析输入
if import_type == 'url_list':
url_list_text = request.form.get('url_list', '')
urls_to_import = parser.parse_url_list(url_list_text)
elif import_type == 'bookmark_file':
bookmark_file = request.files.get('bookmark_file')
if not bookmark_file:
flash('请选择书签文件', 'error')
return render_template('admin/batch_import.html')
html_content = bookmark_file.read().decode('utf-8', errors='ignore')
all_bookmarks = parser.parse_html_file(html_content)
# 筛选文件夹(如果指定)
folder_filter = request.form.get('folder_filter', '').strip()
if folder_filter:
urls_to_import = [
b for b in all_bookmarks
if folder_filter.lower() in b.get('folder', '').lower()
]
else:
urls_to_import = all_bookmarks
# 批量导入
success_list = []
failed_list = []
for idx, item in enumerate(urls_to_import, 1):
url = item['url']
name = item.get('name', '')
# 为每个URL创建独立的事务
try:
# 1. 检查URL是否已存在
try:
existing = Site.query.filter_by(url=url).first()
if existing:
failed_list.append({
'url': url,
'name': name or existing.name,
'error': f'该URL已存在(网站名称:{existing.name})'
})
continue
except Exception as e:
failed_list.append({
'url': url,
'name': name,
'error': f'检查URL时出错: {str(e)}'
})
continue
# 2. 抓取网站信息(带超时和错误处理)
info = None
try:
info = fetcher.fetch_website_info(url)
except Exception as e:
print(f"抓取 {url} 失败: {str(e)}")
# 抓取失败不是致命错误,继续尝试使用书签名称
# 3. 处理网站信息
if not info or not info.get('name'):
# 如果有书签名称,使用书签名称
if name:
info = {
'name': name,
'description': '',
'logo_url': ''
}
else:
# 尝试从URL提取域名作为名称
from urllib.parse import urlparse
try:
parsed = urlparse(url)
domain = parsed.netloc or parsed.path
if domain:
info = {
'name': domain,
'description': '',
'logo_url': ''
}
else:
failed_list.append({
'url': url,
'name': name,
'error': '无法获取网站信息且没有备用名称'
})
continue
except Exception:
failed_list.append({
'url': url,
'name': name,
'error': '无法获取网站信息且URL解析失败'
})
continue
# 4. 下载Logo到本地(失败不影响导入)
logo_path = None
if info.get('logo_url'):
try:
logo_path = fetcher.download_logo(info['logo_url'], save_dir='static/logos')
except Exception as e:
print(f"下载Logo失败 ({url}): {str(e)}")
# Logo下载失败不影响网站导入
# 5. 生成code和slug
try:
import random
from pypinyin import lazy_pinyin
import re
# 生成唯一的code
site_code = None
max_attempts = 10
for _ in range(max_attempts):
code = str(random.randint(10000000, 99999999))
if not Site.query.filter_by(code=code).first():
site_code = code
break
if not site_code:
# 如果10次都失败,使用时间戳
import time
site_code = str(int(time.time() * 1000))[-8:]
# 生成slug
site_name = info.get('name', name or 'Unknown')[:100]
slug = ''.join(lazy_pinyin(site_name))
slug = slug.lower()
slug = re.sub(r'[^\w\s-]', '', slug)
slug = re.sub(r'[-\s]+', '-', slug).strip('-')
if not slug:
slug = f"site-{site_code}"
# 确保slug唯一
base_slug = slug[:50] # 限制长度
counter = 1
final_slug = slug
while Site.query.filter_by(slug=final_slug).first():
final_slug = f"{base_slug}-{counter}"
counter += 1
if counter > 100: # 防止无限循环
final_slug = f"{base_slug}-{site_code}"
break
# 6. 创建网站记录(带code和slug)
site = Site(
code=site_code,
slug=final_slug,
name=site_name,
url=url[:500], # 限制URL长度
logo=logo_path or info.get('logo_url', '')[:500] if info.get('logo_url') else '',
short_desc=info.get('description', '')[:200] if info.get('description') else '',
description=info.get('description', '')[:2000] if info.get('description') else '',
is_active=auto_activate
)
# 添加到数据库并提交
db.session.add(site)
db.session.commit()
success_list.append({
'name': site.name,
'url': site.url
})
print(f"成功导入 [{idx}/{len(urls_to_import)}]: {site.name}")
except Exception as e:
db.session.rollback()
failed_list.append({
'url': url,
'name': name or info.get('name', 'Unknown'),
'error': f'数据库保存失败: {str(e)}'
})
continue
except Exception as e:
# 捕获所有未预期的错误
db.session.rollback()
failed_list.append({
'url': url,
'name': name,
'error': f'未知错误: {str(e)}'
})
print(f"导入 {url} 时发生未知错误: {str(e)}")
continue
results = {
'total_count': len(urls_to_import),
'success_count': len(success_list),
'failed_count': len(failed_list),
'success_list': success_list,
'failed_list': failed_list
}
if success_list:
flash(f'成功导入 {len(success_list)} 个网站!', 'success')
except Exception as e:
flash(f'导入失败: {str(e)}', 'error')
return render_template('admin/batch_import.html', results=results)
# ========== Flask-Admin 配置 ==========
class SecureModelView(ModelView):
"""需要登录的模型视图"""
# 中文化配置
can_set_page_size = True
page_size = 20
# 自定义文本
list_template = 'admin/model/list.html'
create_template = 'admin/model/create.html'
edit_template = 'admin/model/edit.html'
# 覆盖英文文本
named_filter_urls = True
def is_accessible(self):
return current_user.is_authenticated
def inaccessible_callback(self, name, **kwargs):
return redirect(url_for('admin_login'))
class SecureAdminIndexView(AdminIndexView):
"""需要登录的管理首页"""
def is_accessible(self):
return current_user.is_authenticated
def inaccessible_callback(self, name, **kwargs):
return redirect(url_for('admin_login'))
@expose('/')
def index(self):
"""控制台首页,显示统计信息"""
# 统计数据
stats = {
'sites_count': Site.query.filter_by(is_active=True).count(),
'tags_count': Tag.query.count(),
'news_count': News.query.filter_by(is_active=True).count(),
'total_views': db.session.query(db.func.sum(Site.view_count)).scalar() or 0
}
# 最近添加的工具(最多5个)
recent_sites = Site.query.order_by(Site.created_at.desc()).limit(5).all()
return self.render('admin/index.html', stats=stats, recent_sites=recent_sites)
# 网站管理视图
class SiteAdmin(SecureModelView):
# 自定义模板
create_template = 'admin/site/create.html'
edit_template = 'admin/site/edit.html'
# 启用编辑和删除
can_edit = True
can_delete = True
can_create = True
can_view_details = False # 禁用查看详情,点击名称即可查看
# 显示操作列
column_display_actions = True
action_disallowed_list = []
column_list = ['id', 'code', 'name', 'url', 'slug', 'is_active', 'is_recommended', 'view_count', 'created_at']
column_searchable_list = ['code', 'name', 'url', 'description']
column_filters = ['is_active', 'is_recommended', 'tags']
column_labels = {
'id': 'ID',
'code': '网站编码',
'name': '网站名称',
'url': 'URL',
'slug': 'URL别名',
'logo': 'Logo',
'short_desc': '简短描述',
'description': '详细介绍',
'features': '主要功能',
'news_keywords': '新闻关键词',
'is_active': '是否启用',
'is_recommended': '是否推荐',
'view_count': '浏览次数',
'sort_order': '排序权重',
'tags': '标签',
'created_at': '创建时间',
'updated_at': '更新时间'
}
form_columns = ['name', 'url', 'slug', 'logo', 'short_desc', 'description', 'features', 'news_keywords', 'tags', 'is_active', 'is_recommended', 'sort_order']
# 自定义编辑/删除文字
column_extra_row_actions = None
def on_model_change(self, form, model, is_created):
"""保存前自动生成code和slug(如果为空)"""
import re
import random
from pypinyin import lazy_pinyin
from flask import request
# 使用no_autoflush防止在查询时触发提前flush
with db.session.no_autoflush:
# 处理手动输入的新标签
new_tags_str = request.form.get('new_tags', '')
if new_tags_str:
new_tag_names = [name.strip() for name in new_tags_str.split(',') if name.strip()]
for tag_name in new_tag_names:
# 检查标签是否已存在
existing_tag = Tag.query.filter_by(name=tag_name).first()
if not existing_tag:
# 创建新标签
tag_slug = ''.join(lazy_pinyin(tag_name))
tag_slug = tag_slug.lower()
tag_slug = re.sub(r'[^\w\s-]', '', tag_slug)
tag_slug = re.sub(r'[-\s]+', '-', tag_slug).strip('-')
# 确保slug唯一
base_tag_slug = tag_slug[:50]
counter = 1
final_tag_slug = tag_slug
while Tag.query.filter_by(slug=final_tag_slug).first():
final_tag_slug = f"{base_tag_slug}-{counter}"
counter += 1
if counter > 100:
final_tag_slug = f"{base_tag_slug}-{random.randint(1000, 9999)}"
break
new_tag = Tag(name=tag_name, slug=final_tag_slug)
db.session.add(new_tag)
db.session.flush() # 确保新标签有ID
# 添加到模型的标签列表
if new_tag not in model.tags:
model.tags.append(new_tag)
else:
# 添加已存在的标签
if existing_tag not in model.tags:
model.tags.append(existing_tag)
# 如果code为空,自动生成唯一的8位数字编码
if not model.code or model.code.strip() == '':
max_attempts = 10
for attempt in range(max_attempts):
# 生成10000000-99999999之间的随机数
code = str(random.randint(10000000, 99999999))
# 检查是否已存在
existing = Site.query.filter(Site.code == code).first()
if not existing or existing.id == model.id:
model.code = code
break
# 如果10次都失败,使用时间戳
if not model.code:
import time
model.code = str(int(time.time() * 1000))[-8:]
# 如果slug为空,从name自动生成
if not model.slug or model.slug.strip() == '':
# 将中文转换为拼音
slug = ''.join(lazy_pinyin(model.name))
# 转换为小写,移除特殊字符
slug = slug.lower()
slug = re.sub(r'[^\w\s-]', '', slug)
slug = re.sub(r'[-\s]+', '-', slug).strip('-')
# 如果转换后为空,使用code
if not slug:
slug = f"site-{model.code}"
# 确保slug唯一(限制长度和重试次数)
base_slug = slug[:50]
counter = 1
final_slug = slug
max_slug_attempts = 100
while counter < max_slug_attempts:
existing = Site.query.filter(Site.slug == final_slug).first()
if not existing or existing.id == model.id:
break
final_slug = f"{base_slug}-{counter}"
counter += 1
# 如果100次都失败,使用code确保唯一
if counter >= max_slug_attempts:
final_slug = f"{base_slug}-{model.code}"
model.slug = final_slug
# 标签管理视图
class TagAdmin(SecureModelView):
can_edit = True
can_delete = True
can_create = True
can_view_details = False
# 显示操作列
column_display_actions = True
column_list = ['id', 'name', 'slug', 'description', 'sort_order']
column_searchable_list = ['name', 'description']
column_labels = {
'id': 'ID',
'name': '标签名称',
'slug': 'URL别名',
'description': '标签描述',
'seo_title': 'SEO标题 (v2.4)',
'seo_description': 'SEO描述 (v2.4)',
'seo_keywords': 'SEO关键词 (v2.4)',
'icon': '图标',
'sort_order': '排序权重',
'created_at': '创建时间'
}
form_columns = ['name', 'slug', 'description', 'seo_title', 'seo_description', 'seo_keywords', 'icon', 'sort_order']
# 管理员视图
class AdminAdmin(SecureModelView):
can_edit = True
can_delete = True
can_create = True
can_view_details = False
# 显示操作列
column_display_actions = True
column_list = ['id', 'username', 'email', 'is_active', 'last_login', 'created_at']
column_searchable_list = ['username', 'email']
column_filters = ['is_active']
column_labels = {
'id': 'ID',
'username': '用户名',
'email': '邮箱',
'is_active': '是否启用',
'created_at': '创建时间',
'last_login': '最后登录'
}
form_columns = ['username', 'email', 'is_active']
def on_model_change(self, form, model, is_created):
# 如果是新建管理员,设置默认密码
if is_created:
model.set_password('admin123') # 默认密码
# 新闻管理视图
class NewsAdmin(SecureModelView):
can_edit = True
can_delete = True
can_create = True
can_view_details = False
# 显示操作列
column_display_actions = True
column_list = ['id', 'site', 'title', 'source_name', 'news_type', 'published_at', 'is_active']
column_searchable_list = ['title', 'content', 'source_name']
column_filters = ['site', 'news_type', 'source_name', 'is_active', 'published_at']
column_labels = {
'id': 'ID',
'site': '关联网站',
'title': '新闻标题',
'content': '新闻内容',
'news_type': '新闻类型',
'url': '新闻链接',
'source_name': '来源网站',
'source_icon': '来源图标',
'published_at': '发布时间',
'is_active': '是否启用',
'created_at': '创建时间',
'updated_at': '更新时间'
}
form_columns = ['site', 'title', 'content', 'news_type', 'url', 'source_name', 'source_icon', 'published_at', 'is_active']
# 可选的新闻类型
form_choices = {
'news_type': [
('Search Result', 'Search Result'),
('Product Update', 'Product Update'),
('Industry News', 'Industry News'),
('Company News', 'Company News'),
('Other', 'Other')
]
}
# 默认排序
column_default_sort = ('published_at', True) # 按发布时间倒序排列
# Prompt模板管理视图
class PromptAdmin(SecureModelView):
can_edit = True
can_delete = False # 不允许删除,避免系统必需的prompt被删除
can_create = True
can_view_details = False
# 显示操作列
column_display_actions = True
column_list = ['id', 'key', 'name', 'description', 'is_active', 'updated_at']
column_searchable_list = ['key', 'name', 'description']
column_filters = ['is_active', 'key']
column_labels = {
'id': 'ID',
'key': '唯一标识',
'name': '模板名称',
'system_prompt': '系统提示词',
'user_prompt_template': '用户提示词模板',
'description': '模板说明',
'is_active': '是否启用',
'created_at': '创建时间',
'updated_at': '更新时间'
}
form_columns = ['key', 'name', 'description', 'system_prompt', 'user_prompt_template', 'is_active']
# 字段说明
column_descriptions = {
'key': '唯一标识,如: tags, features, description',
'system_prompt': 'AI的系统角色设定',
'user_prompt_template': '用户提示词模板,支持变量如 {name}, {description}, {url}',
}
# 表单字段配置
form_widget_args = {
'system_prompt': {
'rows': 3,
'style': 'font-family: monospace;'
},
'user_prompt_template': {
'rows': 20,
'style': 'font-family: monospace;'
}
}
# 初始化 Flask-Admin
admin = Admin(
app,
name='ZJPB 焦提示词',
template_mode='bootstrap4',
index_view=SecureAdminIndexView(name='控制台', url='/admin'),
base_template='admin/master.html'
)
admin.add_view(SiteAdmin(Site, db.session, name='网站管理'))
admin.add_view(TagAdmin(Tag, db.session, name='标签管理'))
admin.add_view(NewsAdmin(News, db.session, name='新闻管理'))
admin.add_view(PromptAdmin(PromptTemplate, db.session, name='Prompt管理'))
admin.add_view(AdminAdmin(AdminModel, db.session, name='管理员', endpoint='admin_users'))
return app
if __name__ == '__main__':
app = create_app(os.getenv('FLASK_ENV', 'development'))
app.run(host='0.0.0.0', port=5000, debug=True)