""" 爬虫执行服务 运行爬虫脚本并保存结果 """ import asyncio import json import re import sys import traceback from datetime import datetime, timezone from urllib.parse import urljoin, urlparse from sqlalchemy.orm import Session # Windows 上需要设置事件循环策略 if sys.platform == "win32": asyncio.set_event_loop_policy(asyncio.WindowsProactorEventLoopPolicy()) # 导入playwright供脚本使用 try: from playwright.async_api import async_playwright PLAYWRIGHT_AVAILABLE = True except ImportError: PLAYWRIGHT_AVAILABLE = False async_playwright = None from ..database import SessionLocal from ..models import ScraperScript, ScrapeJob, ScrapeLog, ScrapeResult def run_scraper(job_id: int, script_id: int): """ 执行爬虫的后台任务 """ db = SessionLocal() try: job = db.query(ScrapeJob).filter(ScrapeJob.id == job_id).first() script = db.query(ScraperScript).filter(ScraperScript.id == script_id).first() if not job or not script: return # 更新任务状态 job.status = "running" job.started_at = datetime.utcnow() job.current_step = "正在初始化..." job.progress = 5 db.commit() _add_log(db, job_id, "info", "开始执行爬虫脚本") # 创建日志回调函数 def log_callback(level: str, message: str): _add_log(db, job_id, level, message) # 执行脚本 job.current_step = "正在爬取数据..." job.progress = 20 db.commit() result_data = _execute_script(script.script_content, log_callback) if result_data: job.progress = 80 job.current_step = "正在保存结果..." db.commit() _add_log(db, job_id, "info", "爬取完成,正在保存结果...") # 计算统计信息 schools = result_data.get("schools", []) schools_count = len(schools) programs_count = sum(len(s.get("programs", [])) for s in schools) faculty_count = sum( len(p.get("faculty", [])) for s in schools for p in s.get("programs", []) ) # 保存结果 result = ScrapeResult( job_id=job_id, university_id=job.university_id, result_data=result_data, schools_count=schools_count, programs_count=programs_count, faculty_count=faculty_count ) db.add(result) job.status = "completed" job.progress = 100 job.current_step = "完成" job.completed_at = datetime.utcnow() _add_log( db, job_id, "info", f"爬取成功: {schools_count}个学院, {programs_count}个项目, {faculty_count}位导师" ) else: job.status = "failed" job.error_message = "脚本执行无返回结果" job.completed_at = datetime.utcnow() _add_log(db, job_id, "error", "脚本执行失败: 无返回结果") db.commit() except Exception as e: error_msg = f"执行出错: {str(e)}\n{traceback.format_exc()}" _add_log(db, job_id, "error", error_msg) job = db.query(ScrapeJob).filter(ScrapeJob.id == job_id).first() if job: job.status = "failed" job.error_message = str(e) job.completed_at = datetime.utcnow() db.commit() finally: db.close() def _execute_script(script_content: str, log_callback) -> dict: """ 执行Python脚本内容 安全地在隔离环境中执行脚本 """ if not PLAYWRIGHT_AVAILABLE: log_callback("error", "Playwright 未安装,请运行: pip install playwright && playwright install") return None # 创建执行环境 - 包含脚本需要的所有模块 # 注意:使用同一个字典作为 globals 和 locals,确保函数定义可以互相访问 exec_namespace = { "__builtins__": __builtins__, "asyncio": asyncio, "json": json, "re": re, "datetime": datetime, "timezone": timezone, "urljoin": urljoin, "urlparse": urlparse, "async_playwright": async_playwright, } try: # 编译并执行脚本 - 使用同一个命名空间确保函数可互相调用 exec(script_content, exec_namespace, exec_namespace) # 获取scrape函数 scrape_func = exec_namespace.get("scrape") if not scrape_func: log_callback("error", "脚本中未找到 scrape 函数") return None # 运行异步爬虫函数 result = asyncio.run(scrape_func(output_callback=log_callback)) return result except Exception as e: log_callback("error", f"脚本执行异常: {str(e)}") raise def _add_log(db: Session, job_id: int, level: str, message: str): """添加日志""" log = ScrapeLog( job_id=job_id, level=level, message=message ) db.add(log) db.commit()