import os from flask import Flask, render_template, redirect, url_for, request, flash, jsonify from flask_login import LoginManager, login_user, logout_user, login_required, current_user from flask_admin import Admin, AdminIndexView, expose from flask_admin.contrib.sqla import ModelView from datetime import datetime from config import config from models import db, Site, Tag, Admin as AdminModel, News from utils.website_fetcher import WebsiteFetcher from utils.tag_generator import TagGenerator def create_app(config_name='default'): """应用工厂函数""" app = Flask(__name__) # 加载配置 app.config.from_object(config[config_name]) # 初始化数据库 db.init_app(app) # 初始化登录管理 login_manager = LoginManager() login_manager.init_app(app) login_manager.login_view = 'admin_login' login_manager.login_message = '请先登录' @login_manager.user_loader def load_user(user_id): return AdminModel.query.get(int(user_id)) # ========== 前台路由 ========== @app.route('/') def index(): """首页""" # 获取所有启用的标签 tags = Tag.query.order_by(Tag.sort_order.desc(), Tag.id).all() # 获取筛选参数 tag_slug = request.args.get('tag') search_query = request.args.get('q', '').strip() page = request.args.get('page', 1, type=int) per_page = 100 # 每页显示100个站点 selected_tag = None # 构建基础查询 query = Site.query.filter_by(is_active=True) # 标签筛选 if tag_slug: selected_tag = Tag.query.filter_by(slug=tag_slug).first() if selected_tag: query = query.filter(Site.tags.contains(selected_tag)) else: sites = [] pagination = None return render_template('index_new.html', sites=sites, tags=tags, selected_tag=selected_tag, search_query=search_query, pagination=pagination) # 搜索功能 if search_query: # 使用OR条件搜索:网站名称、URL、描述 search_pattern = f'%{search_query}%' query = query.filter( db.or_( Site.name.like(search_pattern), Site.url.like(search_pattern), Site.description.like(search_pattern), Site.short_desc.like(search_pattern) ) ) # 排序并分页 query = query.order_by(Site.sort_order.desc(), Site.id.desc()) pagination = query.paginate(page=page, per_page=per_page, error_out=False) sites = pagination.items return render_template('index_new.html', sites=sites, tags=tags, selected_tag=selected_tag, search_query=search_query, pagination=pagination) @app.route('/site/') def site_detail(code): """网站详情页""" site = Site.query.filter_by(code=code, is_active=True).first_or_404() # 增加浏览次数 site.view_count += 1 db.session.commit() # 获取该网站的相关新闻(最多显示5条) news_list = News.query.filter_by( site_id=site.id, is_active=True ).order_by(News.published_at.desc()).limit(5).all() # 获取同类工具推荐(通过标签匹配,最多显示4个) recommended_sites = [] if site.tags: # 获取有相同标签的其他网站 recommended_sites = Site.query.filter( Site.id != site.id, Site.is_active == True, Site.tags.any(Tag.id.in_([tag.id for tag in site.tags])) ).order_by(Site.view_count.desc()).limit(4).all() return render_template('detail_new.html', site=site, news_list=news_list, recommended_sites=recommended_sites) # ========== 后台登录路由 ========== @app.route('/admin/login', methods=['GET', 'POST']) def admin_login(): """管理员登录""" if current_user.is_authenticated: return redirect(url_for('admin.index')) if request.method == 'POST': username = request.form.get('username') password = request.form.get('password') admin = AdminModel.query.filter_by(username=username).first() if admin and admin.check_password(password) and admin.is_active: login_user(admin) admin.last_login = datetime.now() db.session.commit() return redirect(url_for('admin.index')) else: flash('用户名或密码错误', 'error') return render_template('admin_login.html') @app.route('/admin/logout') @login_required def admin_logout(): """管理员登出""" logout_user() return redirect(url_for('index')) # ========== API路由 ========== @app.route('/api/fetch-website-info', methods=['POST']) @login_required def fetch_website_info(): """抓取网站信息API""" try: data = request.get_json() url = data.get('url', '').strip() if not url: return jsonify({ 'success': False, 'message': '请提供网站URL' }), 400 # 创建抓取器 fetcher = WebsiteFetcher(timeout=15) # 抓取网站信息 info = fetcher.fetch_website_info(url) if not info: return jsonify({ 'success': False, 'message': '无法获取网站信息,请检查URL是否正确或手动填写' }) # 下载Logo(如果有) logo_path = None if info.get('logo_url'): logo_path = fetcher.download_logo(info['logo_url']) return jsonify({ 'success': True, 'data': { 'name': info.get('name', ''), 'description': info.get('description', ''), 'logo': logo_path or info.get('logo_url', '') } }) except Exception as e: return jsonify({ 'success': False, 'message': f'抓取失败: {str(e)}' }), 500 @app.route('/api/generate-tags', methods=['POST']) @login_required def generate_tags(): """使用DeepSeek自动生成标签""" try: data = request.get_json() name = data.get('name', '').strip() description = data.get('description', '').strip() if not name or not description: return jsonify({ 'success': False, 'message': '请提供网站名称和描述' }), 400 # 获取现有标签作为参考 existing_tags = [tag.name for tag in Tag.query.all()] # 生成标签 generator = TagGenerator() suggested_tags = generator.generate_tags(name, description, existing_tags) if not suggested_tags: return jsonify({ 'success': False, 'message': 'DeepSeek标签生成失败,请检查API配置' }), 500 return jsonify({ 'success': True, 'tags': suggested_tags }) except ValueError as e: return jsonify({ 'success': False, 'message': str(e) }), 400 except Exception as e: return jsonify({ 'success': False, 'message': f'生成失败: {str(e)}' }), 500 # ========== 批量导入路由 ========== @app.route('/admin/batch-import', methods=['GET', 'POST']) @login_required def batch_import(): """批量导入网站""" from utils.bookmark_parser import BookmarkParser from utils.website_fetcher import WebsiteFetcher results = None if request.method == 'POST': import_type = request.form.get('import_type') auto_activate = request.form.get('auto_activate') == 'on' parser = BookmarkParser() fetcher = WebsiteFetcher(timeout=15) urls_to_import = [] try: # 解析输入 if import_type == 'url_list': url_list_text = request.form.get('url_list', '') urls_to_import = parser.parse_url_list(url_list_text) elif import_type == 'bookmark_file': bookmark_file = request.files.get('bookmark_file') if not bookmark_file: flash('请选择书签文件', 'error') return render_template('admin/batch_import.html') html_content = bookmark_file.read().decode('utf-8', errors='ignore') all_bookmarks = parser.parse_html_file(html_content) # 筛选文件夹(如果指定) folder_filter = request.form.get('folder_filter', '').strip() if folder_filter: urls_to_import = [ b for b in all_bookmarks if folder_filter.lower() in b.get('folder', '').lower() ] else: urls_to_import = all_bookmarks # 批量导入 success_list = [] failed_list = [] for idx, item in enumerate(urls_to_import, 1): url = item['url'] name = item.get('name', '') # 为每个URL创建独立的事务 try: # 1. 检查URL是否已存在 try: existing = Site.query.filter_by(url=url).first() if existing: failed_list.append({ 'url': url, 'name': name or existing.name, 'error': f'该URL已存在(网站名称:{existing.name})' }) continue except Exception as e: failed_list.append({ 'url': url, 'name': name, 'error': f'检查URL时出错: {str(e)}' }) continue # 2. 抓取网站信息(带超时和错误处理) info = None try: info = fetcher.fetch_website_info(url) except Exception as e: print(f"抓取 {url} 失败: {str(e)}") # 抓取失败不是致命错误,继续尝试使用书签名称 # 3. 处理网站信息 if not info or not info.get('name'): # 如果有书签名称,使用书签名称 if name: info = { 'name': name, 'description': '', 'logo_url': '' } else: # 尝试从URL提取域名作为名称 from urllib.parse import urlparse try: parsed = urlparse(url) domain = parsed.netloc or parsed.path if domain: info = { 'name': domain, 'description': '', 'logo_url': '' } else: failed_list.append({ 'url': url, 'name': name, 'error': '无法获取网站信息且没有备用名称' }) continue except Exception: failed_list.append({ 'url': url, 'name': name, 'error': '无法获取网站信息且URL解析失败' }) continue # 4. 下载Logo(失败不影响导入) logo_path = None if info.get('logo_url'): try: logo_path = fetcher.download_logo(info['logo_url']) except Exception as e: print(f"下载Logo失败 ({url}): {str(e)}") # Logo下载失败不影响网站导入 # 5. 生成code和slug try: import random from pypinyin import lazy_pinyin import re # 生成唯一的code site_code = None max_attempts = 10 for _ in range(max_attempts): code = str(random.randint(10000000, 99999999)) if not Site.query.filter_by(code=code).first(): site_code = code break if not site_code: # 如果10次都失败,使用时间戳 import time site_code = str(int(time.time() * 1000))[-8:] # 生成slug site_name = info.get('name', name or 'Unknown')[:100] slug = ''.join(lazy_pinyin(site_name)) slug = slug.lower() slug = re.sub(r'[^\w\s-]', '', slug) slug = re.sub(r'[-\s]+', '-', slug).strip('-') if not slug: slug = f"site-{site_code}" # 确保slug唯一 base_slug = slug[:50] # 限制长度 counter = 1 final_slug = slug while Site.query.filter_by(slug=final_slug).first(): final_slug = f"{base_slug}-{counter}" counter += 1 if counter > 100: # 防止无限循环 final_slug = f"{base_slug}-{site_code}" break # 6. 创建网站记录(带code和slug) site = Site( code=site_code, slug=final_slug, name=site_name, url=url[:500], # 限制URL长度 logo=logo_path or info.get('logo_url', '')[:500] if info.get('logo_url') else '', short_desc=info.get('description', '')[:200] if info.get('description') else '', description=info.get('description', '')[:2000] if info.get('description') else '', is_active=auto_activate ) # 添加到数据库并提交 db.session.add(site) db.session.commit() success_list.append({ 'name': site.name, 'url': site.url }) print(f"成功导入 [{idx}/{len(urls_to_import)}]: {site.name}") except Exception as e: db.session.rollback() failed_list.append({ 'url': url, 'name': name or info.get('name', 'Unknown'), 'error': f'数据库保存失败: {str(e)}' }) continue except Exception as e: # 捕获所有未预期的错误 db.session.rollback() failed_list.append({ 'url': url, 'name': name, 'error': f'未知错误: {str(e)}' }) print(f"导入 {url} 时发生未知错误: {str(e)}") continue results = { 'total_count': len(urls_to_import), 'success_count': len(success_list), 'failed_count': len(failed_list), 'success_list': success_list, 'failed_list': failed_list } if success_list: flash(f'成功导入 {len(success_list)} 个网站!', 'success') except Exception as e: flash(f'导入失败: {str(e)}', 'error') return render_template('admin/batch_import.html', results=results) # ========== Flask-Admin 配置 ========== class SecureModelView(ModelView): """需要登录的模型视图""" # 中文化配置 can_set_page_size = True page_size = 20 # 自定义文本 list_template = 'admin/model/list.html' create_template = 'admin/model/create.html' edit_template = 'admin/model/edit.html' # 覆盖英文文本 named_filter_urls = True def is_accessible(self): return current_user.is_authenticated def inaccessible_callback(self, name, **kwargs): return redirect(url_for('admin_login')) class SecureAdminIndexView(AdminIndexView): """需要登录的管理首页""" def is_accessible(self): return current_user.is_authenticated def inaccessible_callback(self, name, **kwargs): return redirect(url_for('admin_login')) @expose('/') def index(self): """控制台首页,显示统计信息""" # 统计数据 stats = { 'sites_count': Site.query.filter_by(is_active=True).count(), 'tags_count': Tag.query.count(), 'news_count': News.query.filter_by(is_active=True).count(), 'total_views': db.session.query(db.func.sum(Site.view_count)).scalar() or 0 } # 最近添加的工具(最多5个) recent_sites = Site.query.order_by(Site.created_at.desc()).limit(5).all() return self.render('admin/index.html', stats=stats, recent_sites=recent_sites) # 网站管理视图 class SiteAdmin(SecureModelView): # 自定义模板 create_template = 'admin/site/create.html' edit_template = 'admin/site/edit.html' # 启用编辑和删除 can_edit = True can_delete = True can_create = True can_view_details = False # 禁用查看详情,点击名称即可查看 # 显示操作列 column_display_actions = True action_disallowed_list = [] column_list = ['id', 'code', 'name', 'url', 'slug', 'is_active', 'view_count', 'created_at'] column_searchable_list = ['code', 'name', 'url', 'description'] column_filters = ['is_active', 'tags'] column_labels = { 'id': 'ID', 'code': '网站编码', 'name': '网站名称', 'url': 'URL', 'slug': 'URL别名', 'logo': 'Logo', 'short_desc': '简短描述', 'description': '详细介绍', 'features': '主要功能', 'is_active': '是否启用', 'view_count': '浏览次数', 'sort_order': '排序权重', 'tags': '标签', 'created_at': '创建时间', 'updated_at': '更新时间' } form_columns = ['name', 'url', 'slug', 'logo', 'short_desc', 'description', 'features', 'tags', 'is_active', 'sort_order'] # 自定义编辑/删除文字 column_extra_row_actions = None def on_model_change(self, form, model, is_created): """保存前自动生成code和slug(如果为空)""" import re import random from pypinyin import lazy_pinyin # 使用no_autoflush防止在查询时触发提前flush with db.session.no_autoflush: # 如果code为空,自动生成唯一的8位数字编码 if not model.code or model.code.strip() == '': max_attempts = 10 for attempt in range(max_attempts): # 生成10000000-99999999之间的随机数 code = str(random.randint(10000000, 99999999)) # 检查是否已存在 existing = Site.query.filter(Site.code == code).first() if not existing or existing.id == model.id: model.code = code break # 如果10次都失败,使用时间戳 if not model.code: import time model.code = str(int(time.time() * 1000))[-8:] # 如果slug为空,从name自动生成 if not model.slug or model.slug.strip() == '': # 将中文转换为拼音 slug = ''.join(lazy_pinyin(model.name)) # 转换为小写,移除特殊字符 slug = slug.lower() slug = re.sub(r'[^\w\s-]', '', slug) slug = re.sub(r'[-\s]+', '-', slug).strip('-') # 如果转换后为空,使用code if not slug: slug = f"site-{model.code}" # 确保slug唯一(限制长度和重试次数) base_slug = slug[:50] counter = 1 final_slug = slug max_slug_attempts = 100 while counter < max_slug_attempts: existing = Site.query.filter(Site.slug == final_slug).first() if not existing or existing.id == model.id: break final_slug = f"{base_slug}-{counter}" counter += 1 # 如果100次都失败,使用code确保唯一 if counter >= max_slug_attempts: final_slug = f"{base_slug}-{model.code}" model.slug = final_slug # 标签管理视图 class TagAdmin(SecureModelView): can_edit = True can_delete = True can_create = True can_view_details = False # 显示操作列 column_display_actions = True column_list = ['id', 'name', 'slug', 'description', 'sort_order'] column_searchable_list = ['name', 'description'] column_labels = { 'id': 'ID', 'name': '标签名称', 'slug': 'URL别名', 'description': '标签描述', 'icon': '图标', 'sort_order': '排序权重', 'created_at': '创建时间' } form_columns = ['name', 'slug', 'description', 'icon', 'sort_order'] # 管理员视图 class AdminAdmin(SecureModelView): can_edit = True can_delete = True can_create = True can_view_details = False # 显示操作列 column_display_actions = True column_list = ['id', 'username', 'email', 'is_active', 'last_login', 'created_at'] column_searchable_list = ['username', 'email'] column_filters = ['is_active'] column_labels = { 'id': 'ID', 'username': '用户名', 'email': '邮箱', 'is_active': '是否启用', 'created_at': '创建时间', 'last_login': '最后登录' } form_columns = ['username', 'email', 'is_active'] def on_model_change(self, form, model, is_created): # 如果是新建管理员,设置默认密码 if is_created: model.set_password('admin123') # 默认密码 # 新闻管理视图 class NewsAdmin(SecureModelView): can_edit = True can_delete = True can_create = True can_view_details = False # 显示操作列 column_display_actions = True column_list = ['id', 'site', 'title', 'news_type', 'published_at', 'is_active'] column_searchable_list = ['title', 'content'] column_filters = ['site', 'news_type', 'is_active', 'published_at'] column_labels = { 'id': 'ID', 'site': '关联网站', 'title': '新闻标题', 'content': '新闻内容', 'news_type': '新闻类型', 'url': '新闻链接', 'published_at': '发布时间', 'is_active': '是否启用', 'created_at': '创建时间', 'updated_at': '更新时间' } form_columns = ['site', 'title', 'content', 'news_type', 'url', 'published_at', 'is_active'] # 可选的新闻类型 form_choices = { 'news_type': [ ('Product Update', 'Product Update'), ('Industry News', 'Industry News'), ('Company News', 'Company News'), ('Other', 'Other') ] } # 初始化 Flask-Admin admin = Admin( app, name='ZJPB 焦提示词', template_mode='bootstrap4', index_view=SecureAdminIndexView(name='控制台', url='/admin'), base_template='admin/master.html' ) admin.add_view(SiteAdmin(Site, db.session, name='网站管理')) admin.add_view(TagAdmin(Tag, db.session, name='标签管理')) admin.add_view(NewsAdmin(News, db.session, name='新闻管理')) admin.add_view(AdminAdmin(AdminModel, db.session, name='管理员', endpoint='admin_users')) return app if __name__ == '__main__': app = create_app(os.getenv('FLASK_ENV', 'development')) app.run(host='0.0.0.0', port=5000, debug=True)