"""爬取任务API""" from typing import List from datetime import datetime from fastapi import APIRouter, Depends, HTTPException, BackgroundTasks from sqlalchemy.orm import Session from ..database import get_db from ..models import University, ScraperScript, ScrapeJob, ScrapeLog from ..schemas.job import JobResponse, JobStatusResponse, LogResponse from ..services.scraper_runner import run_scraper router = APIRouter() @router.post("/start/{university_id}", response_model=JobResponse) async def start_scrape_job( university_id: int, background_tasks: BackgroundTasks, db: Session = Depends(get_db) ): """ 一键运行爬虫 启动爬取任务,抓取大学项目和导师数据 """ # 检查大学是否存在 university = db.query(University).filter(University.id == university_id).first() if not university: raise HTTPException(status_code=404, detail="大学不存在") # 检查是否有活跃的脚本 script = db.query(ScraperScript).filter( ScraperScript.university_id == university_id, ScraperScript.status == "active" ).first() if not script: raise HTTPException(status_code=400, detail="没有可用的爬虫脚本,请先生成脚本") # 检查是否有正在运行的任务 running_job = db.query(ScrapeJob).filter( ScrapeJob.university_id == university_id, ScrapeJob.status == "running" ).first() if running_job: raise HTTPException(status_code=400, detail="已有正在运行的任务") # 创建任务 job = ScrapeJob( university_id=university_id, script_id=script.id, status="pending", progress=0, current_step="准备中..." ) db.add(job) db.commit() db.refresh(job) # 在后台执行爬虫 background_tasks.add_task( run_scraper, job_id=job.id, script_id=script.id ) return job @router.get("/{job_id}", response_model=JobResponse) def get_job( job_id: int, db: Session = Depends(get_db) ): """获取任务详情""" job = db.query(ScrapeJob).filter(ScrapeJob.id == job_id).first() if not job: raise HTTPException(status_code=404, detail="任务不存在") return job @router.get("/{job_id}/status", response_model=JobStatusResponse) def get_job_status( job_id: int, db: Session = Depends(get_db) ): """获取任务状态和日志""" job = db.query(ScrapeJob).filter(ScrapeJob.id == job_id).first() if not job: raise HTTPException(status_code=404, detail="任务不存在") # 获取最近的日志 logs = db.query(ScrapeLog).filter( ScrapeLog.job_id == job_id ).order_by(ScrapeLog.created_at.desc()).limit(50).all() return JobStatusResponse( id=job.id, status=job.status, progress=job.progress, current_step=job.current_step, logs=[LogResponse( id=log.id, level=log.level, message=log.message, created_at=log.created_at ) for log in reversed(logs)] ) @router.get("/university/{university_id}", response_model=List[JobResponse]) def get_university_jobs( university_id: int, db: Session = Depends(get_db) ): """获取大学的所有任务""" jobs = db.query(ScrapeJob).filter( ScrapeJob.university_id == university_id ).order_by(ScrapeJob.created_at.desc()).limit(20).all() return jobs @router.post("/{job_id}/cancel") def cancel_job( job_id: int, db: Session = Depends(get_db) ): """取消任务""" job = db.query(ScrapeJob).filter(ScrapeJob.id == job_id).first() if not job: raise HTTPException(status_code=404, detail="任务不存在") if job.status not in ["pending", "running"]: raise HTTPException(status_code=400, detail="任务已结束,无法取消") job.status = "cancelled" job.completed_at = datetime.utcnow() db.commit() return {"message": "任务已取消"}