""" 定期新闻获取任务脚本 用途:定期为网站批量获取最新新闻 使用:python fetch_news_cron.py [options] 可以通过crontab定时执行: # 每天早上8点执行,获取10个网站的新闻 0 8 * * * cd /path/to/zjpb && /path/to/venv/bin/python fetch_news_cron.py --limit 10 >> logs/news_fetch.log 2>&1 """ import os import sys import argparse from datetime import datetime from dotenv import load_dotenv # 加载环境变量 load_dotenv() # 添加项目根目录到Python路径 sys.path.insert(0, os.path.dirname(os.path.abspath(__file__))) from app import create_app from models import db, Site, News from utils.news_searcher import NewsSearcher def fetch_news_for_sites(limit=10, count_per_site=5, freshness='oneMonth'): """ 批量为网站获取新闻 Args: limit: 处理的网站数量限制 count_per_site: 每个网站获取的新闻数量 freshness: 新闻时间范围 """ # 创建Flask应用上下文 app = create_app(os.getenv('FLASK_ENV', 'production')) with app.app_context(): # 检查博查API配置 api_key = app.config.get('BOCHA_API_KEY') if not api_key: print(f"[{datetime.now()}] 错误:未配置BOCHA_API_KEY") return False # 获取启用的网站(按更新时间排序,优先处理旧的) sites = Site.query.filter_by(is_active=True).order_by(Site.updated_at).limit(limit).all() if not sites: print(f"[{datetime.now()}] 没有可处理的网站") return False print(f"[{datetime.now()}] 开始批量获取新闻,共 {len(sites)} 个网站") print(f"配置:每个网站 {count_per_site} 条新闻,时间范围:{freshness}") print("-" * 60) # 创建新闻搜索器 searcher = NewsSearcher(api_key) # 统计信息 total_saved = 0 total_found = 0 success_count = 0 error_count = 0 # 为每个网站获取新闻 for i, site in enumerate(sites, 1): print(f"[{i}/{len(sites)}] 处理网站: {site.name}") try: # 搜索新闻 news_items = searcher.search_site_news( site_name=site.name, site_url=site.url, count=count_per_site, freshness=freshness ) if not news_items: print(f" └─ 未找到新闻") continue site_saved = 0 for item in news_items: # 检查是否已存在 existing_news = News.query.filter_by( site_id=site.id, url=item['url'] ).first() if not existing_news: news = News( site_id=site.id, title=item['title'], content=item.get('summary') or item.get('snippet', ''), url=item['url'], source_name=item.get('site_name', ''), source_icon=item.get('site_icon', ''), published_at=item.get('published_at'), news_type='Search Result', is_active=True ) db.session.add(news) site_saved += 1 # 提交该网站的新闻 db.session.commit() total_found += len(news_items) total_saved += site_saved success_count += 1 print(f" └─ 找到 {len(news_items)} 条,保存 {site_saved} 条新闻") except Exception as e: error_count += 1 print(f" └─ 错误: {str(e)}") db.session.rollback() continue print("-" * 60) print(f"[{datetime.now()}] 批量获取完成") print(f"成功: {success_count} 个网站, 失败: {error_count} 个网站") print(f"共找到 {total_found} 条新闻,保存 {total_saved} 条新新闻") print("=" * 60) return True def main(): """主函数""" parser = argparse.ArgumentParser(description='定期新闻获取任务') parser.add_argument('--limit', type=int, default=10, help='处理的网站数量限制(默认:10)') parser.add_argument('--count', type=int, default=5, help='每个网站获取的新闻数量(默认:5)') parser.add_argument('--freshness', type=str, default='oneMonth', choices=['noLimit', 'oneDay', 'oneWeek', 'oneMonth', 'oneYear'], help='新闻时间范围(默认:oneMonth)') args = parser.parse_args() print("=" * 60) print(f"定期新闻获取任务 - 开始时间: {datetime.now()}") print("=" * 60) try: success = fetch_news_for_sites( limit=args.limit, count_per_site=args.count, freshness=args.freshness ) if success: print(f"\n任务执行成功!") sys.exit(0) else: print(f"\n任务执行失败!") sys.exit(1) except Exception as e: print(f"\n[{datetime.now()}] 严重错误: {str(e)}") import traceback traceback.print_exc() sys.exit(1) if __name__ == '__main__': main()