Files
yangxiaoyu-crypto 426cf4d2cd Add university scraper system with backend, frontend, and configs
- Add src/university_scraper module with scraper, analyzer, and CLI
- Add backend FastAPI service with API endpoints and database models
- Add frontend React app with university management pages
- Add configs for Harvard, Manchester, and UCL universities
- Add artifacts with various scraper implementations
- Add Docker compose configuration for deployment
- Update .gitignore to exclude generated files

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2025-12-22 15:25:08 +08:00

178 lines
5.1 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
爬虫执行服务
运行爬虫脚本并保存结果
"""
import asyncio
import json
import re
import sys
import traceback
from datetime import datetime, timezone
from urllib.parse import urljoin, urlparse
from sqlalchemy.orm import Session
# Windows 上需要设置事件循环策略
if sys.platform == "win32":
asyncio.set_event_loop_policy(asyncio.WindowsProactorEventLoopPolicy())
# 导入playwright供脚本使用
try:
from playwright.async_api import async_playwright
PLAYWRIGHT_AVAILABLE = True
except ImportError:
PLAYWRIGHT_AVAILABLE = False
async_playwright = None
from ..database import SessionLocal
from ..models import ScraperScript, ScrapeJob, ScrapeLog, ScrapeResult
def run_scraper(job_id: int, script_id: int):
"""
执行爬虫的后台任务
"""
db = SessionLocal()
try:
job = db.query(ScrapeJob).filter(ScrapeJob.id == job_id).first()
script = db.query(ScraperScript).filter(ScraperScript.id == script_id).first()
if not job or not script:
return
# 更新任务状态
job.status = "running"
job.started_at = datetime.utcnow()
job.current_step = "正在初始化..."
job.progress = 5
db.commit()
_add_log(db, job_id, "info", "开始执行爬虫脚本")
# 创建日志回调函数
def log_callback(level: str, message: str):
_add_log(db, job_id, level, message)
# 执行脚本
job.current_step = "正在爬取数据..."
job.progress = 20
db.commit()
result_data = _execute_script(script.script_content, log_callback)
if result_data:
job.progress = 80
job.current_step = "正在保存结果..."
db.commit()
_add_log(db, job_id, "info", "爬取完成,正在保存结果...")
# 计算统计信息
schools = result_data.get("schools", [])
schools_count = len(schools)
programs_count = sum(len(s.get("programs", [])) for s in schools)
faculty_count = sum(
len(p.get("faculty", []))
for s in schools
for p in s.get("programs", [])
)
# 保存结果
result = ScrapeResult(
job_id=job_id,
university_id=job.university_id,
result_data=result_data,
schools_count=schools_count,
programs_count=programs_count,
faculty_count=faculty_count
)
db.add(result)
job.status = "completed"
job.progress = 100
job.current_step = "完成"
job.completed_at = datetime.utcnow()
_add_log(
db, job_id, "info",
f"爬取成功: {schools_count}个学院, {programs_count}个项目, {faculty_count}位导师"
)
else:
job.status = "failed"
job.error_message = "脚本执行无返回结果"
job.completed_at = datetime.utcnow()
_add_log(db, job_id, "error", "脚本执行失败: 无返回结果")
db.commit()
except Exception as e:
error_msg = f"执行出错: {str(e)}\n{traceback.format_exc()}"
_add_log(db, job_id, "error", error_msg)
job = db.query(ScrapeJob).filter(ScrapeJob.id == job_id).first()
if job:
job.status = "failed"
job.error_message = str(e)
job.completed_at = datetime.utcnow()
db.commit()
finally:
db.close()
def _execute_script(script_content: str, log_callback) -> dict:
"""
执行Python脚本内容
安全地在隔离环境中执行脚本
"""
if not PLAYWRIGHT_AVAILABLE:
log_callback("error", "Playwright 未安装,请运行: pip install playwright && playwright install")
return None
# 创建执行环境 - 包含脚本需要的所有模块
# 注意:使用同一个字典作为 globals 和 locals确保函数定义可以互相访问
exec_namespace = {
"__builtins__": __builtins__,
"asyncio": asyncio,
"json": json,
"re": re,
"datetime": datetime,
"timezone": timezone,
"urljoin": urljoin,
"urlparse": urlparse,
"async_playwright": async_playwright,
}
try:
# 编译并执行脚本 - 使用同一个命名空间确保函数可互相调用
exec(script_content, exec_namespace, exec_namespace)
# 获取scrape函数
scrape_func = exec_namespace.get("scrape")
if not scrape_func:
log_callback("error", "脚本中未找到 scrape 函数")
return None
# 运行异步爬虫函数
result = asyncio.run(scrape_func(output_callback=log_callback))
return result
except Exception as e:
log_callback("error", f"脚本执行异常: {str(e)}")
raise
def _add_log(db: Session, job_id: int, level: str, message: str):
"""添加日志"""
log = ScrapeLog(
job_id=job_id,
level=level,
message=message
)
db.add(log)
db.commit()