Add university scraper system with backend, frontend, and configs

- Add src/university_scraper module with scraper, analyzer, and CLI
- Add backend FastAPI service with API endpoints and database models
- Add frontend React app with university management pages
- Add configs for Harvard, Manchester, and UCL universities
- Add artifacts with various scraper implementations
- Add Docker compose configuration for deployment
- Update .gitignore to exclude generated files

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
yangxiaoyu-crypto
2025-12-22 15:25:08 +08:00
parent 2714c8ad5c
commit 426cf4d2cd
75 changed files with 13527 additions and 2 deletions

1
backend/app/__init__.py Normal file
View File

@ -0,0 +1 @@
"""University Scraper Web Backend"""

View File

@ -0,0 +1,15 @@
"""API路由"""
from fastapi import APIRouter
from .universities import router as universities_router
from .scripts import router as scripts_router
from .jobs import router as jobs_router
from .results import router as results_router
api_router = APIRouter()
api_router.include_router(universities_router, prefix="/universities", tags=["大学管理"])
api_router.include_router(scripts_router, prefix="/scripts", tags=["爬虫脚本"])
api_router.include_router(jobs_router, prefix="/jobs", tags=["爬取任务"])
api_router.include_router(results_router, prefix="/results", tags=["爬取结果"])

144
backend/app/api/jobs.py Normal file
View File

@ -0,0 +1,144 @@
"""爬取任务API"""
from typing import List
from datetime import datetime
from fastapi import APIRouter, Depends, HTTPException, BackgroundTasks
from sqlalchemy.orm import Session
from ..database import get_db
from ..models import University, ScraperScript, ScrapeJob, ScrapeLog
from ..schemas.job import JobResponse, JobStatusResponse, LogResponse
from ..services.scraper_runner import run_scraper
router = APIRouter()
@router.post("/start/{university_id}", response_model=JobResponse)
async def start_scrape_job(
university_id: int,
background_tasks: BackgroundTasks,
db: Session = Depends(get_db)
):
"""
一键运行爬虫
启动爬取任务,抓取大学项目和导师数据
"""
# 检查大学是否存在
university = db.query(University).filter(University.id == university_id).first()
if not university:
raise HTTPException(status_code=404, detail="大学不存在")
# 检查是否有活跃的脚本
script = db.query(ScraperScript).filter(
ScraperScript.university_id == university_id,
ScraperScript.status == "active"
).first()
if not script:
raise HTTPException(status_code=400, detail="没有可用的爬虫脚本,请先生成脚本")
# 检查是否有正在运行的任务
running_job = db.query(ScrapeJob).filter(
ScrapeJob.university_id == university_id,
ScrapeJob.status == "running"
).first()
if running_job:
raise HTTPException(status_code=400, detail="已有正在运行的任务")
# 创建任务
job = ScrapeJob(
university_id=university_id,
script_id=script.id,
status="pending",
progress=0,
current_step="准备中..."
)
db.add(job)
db.commit()
db.refresh(job)
# 在后台执行爬虫
background_tasks.add_task(
run_scraper,
job_id=job.id,
script_id=script.id
)
return job
@router.get("/{job_id}", response_model=JobResponse)
def get_job(
job_id: int,
db: Session = Depends(get_db)
):
"""获取任务详情"""
job = db.query(ScrapeJob).filter(ScrapeJob.id == job_id).first()
if not job:
raise HTTPException(status_code=404, detail="任务不存在")
return job
@router.get("/{job_id}/status", response_model=JobStatusResponse)
def get_job_status(
job_id: int,
db: Session = Depends(get_db)
):
"""获取任务状态和日志"""
job = db.query(ScrapeJob).filter(ScrapeJob.id == job_id).first()
if not job:
raise HTTPException(status_code=404, detail="任务不存在")
# 获取最近的日志
logs = db.query(ScrapeLog).filter(
ScrapeLog.job_id == job_id
).order_by(ScrapeLog.created_at.desc()).limit(50).all()
return JobStatusResponse(
id=job.id,
status=job.status,
progress=job.progress,
current_step=job.current_step,
logs=[LogResponse(
id=log.id,
level=log.level,
message=log.message,
created_at=log.created_at
) for log in reversed(logs)]
)
@router.get("/university/{university_id}", response_model=List[JobResponse])
def get_university_jobs(
university_id: int,
db: Session = Depends(get_db)
):
"""获取大学的所有任务"""
jobs = db.query(ScrapeJob).filter(
ScrapeJob.university_id == university_id
).order_by(ScrapeJob.created_at.desc()).limit(20).all()
return jobs
@router.post("/{job_id}/cancel")
def cancel_job(
job_id: int,
db: Session = Depends(get_db)
):
"""取消任务"""
job = db.query(ScrapeJob).filter(ScrapeJob.id == job_id).first()
if not job:
raise HTTPException(status_code=404, detail="任务不存在")
if job.status not in ["pending", "running"]:
raise HTTPException(status_code=400, detail="任务已结束,无法取消")
job.status = "cancelled"
job.completed_at = datetime.utcnow()
db.commit()
return {"message": "任务已取消"}

175
backend/app/api/results.py Normal file
View File

@ -0,0 +1,175 @@
"""爬取结果API"""
from typing import Optional
from fastapi import APIRouter, Depends, HTTPException, Query
from fastapi.responses import JSONResponse
from sqlalchemy.orm import Session
from ..database import get_db
from ..models import ScrapeResult
from ..schemas.result import ResultResponse
router = APIRouter()
@router.get("/university/{university_id}", response_model=ResultResponse)
def get_university_result(
university_id: int,
db: Session = Depends(get_db)
):
"""获取大学最新的爬取结果"""
result = db.query(ScrapeResult).filter(
ScrapeResult.university_id == university_id
).order_by(ScrapeResult.created_at.desc()).first()
if not result:
raise HTTPException(status_code=404, detail="没有爬取结果")
return result
@router.get("/university/{university_id}/schools")
def get_schools(
university_id: int,
db: Session = Depends(get_db)
):
"""获取学院列表"""
result = db.query(ScrapeResult).filter(
ScrapeResult.university_id == university_id
).order_by(ScrapeResult.created_at.desc()).first()
if not result:
raise HTTPException(status_code=404, detail="没有爬取结果")
schools = result.result_data.get("schools", [])
# 返回简化的学院列表
return {
"total": len(schools),
"schools": [
{
"name": s.get("name"),
"url": s.get("url"),
"program_count": len(s.get("programs", []))
}
for s in schools
]
}
@router.get("/university/{university_id}/programs")
def get_programs(
university_id: int,
school_name: Optional[str] = Query(None, description="按学院筛选"),
search: Optional[str] = Query(None, description="搜索项目名称"),
db: Session = Depends(get_db)
):
"""获取项目列表"""
result = db.query(ScrapeResult).filter(
ScrapeResult.university_id == university_id
).order_by(ScrapeResult.created_at.desc()).first()
if not result:
raise HTTPException(status_code=404, detail="没有爬取结果")
schools = result.result_data.get("schools", [])
programs = []
for school in schools:
if school_name and school.get("name") != school_name:
continue
for prog in school.get("programs", []):
if search and search.lower() not in prog.get("name", "").lower():
continue
programs.append({
"name": prog.get("name"),
"url": prog.get("url"),
"degree_type": prog.get("degree_type"),
"school": school.get("name"),
"faculty_count": len(prog.get("faculty", []))
})
return {
"total": len(programs),
"programs": programs
}
@router.get("/university/{university_id}/faculty")
def get_faculty(
university_id: int,
school_name: Optional[str] = Query(None, description="按学院筛选"),
program_name: Optional[str] = Query(None, description="按项目筛选"),
search: Optional[str] = Query(None, description="搜索导师姓名"),
skip: int = Query(0, ge=0),
limit: int = Query(50, ge=1, le=200),
db: Session = Depends(get_db)
):
"""获取导师列表"""
result = db.query(ScrapeResult).filter(
ScrapeResult.university_id == university_id
).order_by(ScrapeResult.created_at.desc()).first()
if not result:
raise HTTPException(status_code=404, detail="没有爬取结果")
schools = result.result_data.get("schools", [])
faculty_list = []
for school in schools:
if school_name and school.get("name") != school_name:
continue
for prog in school.get("programs", []):
if program_name and prog.get("name") != program_name:
continue
for fac in prog.get("faculty", []):
if search and search.lower() not in fac.get("name", "").lower():
continue
faculty_list.append({
"name": fac.get("name"),
"url": fac.get("url"),
"title": fac.get("title"),
"email": fac.get("email"),
"program": prog.get("name"),
"school": school.get("name")
})
total = len(faculty_list)
faculty_list = faculty_list[skip:skip + limit]
return {
"total": total,
"skip": skip,
"limit": limit,
"faculty": faculty_list
}
@router.get("/university/{university_id}/export")
def export_result(
university_id: int,
format: str = Query("json", enum=["json"]),
db: Session = Depends(get_db)
):
"""导出爬取结果"""
result = db.query(ScrapeResult).filter(
ScrapeResult.university_id == university_id
).order_by(ScrapeResult.created_at.desc()).first()
if not result:
raise HTTPException(status_code=404, detail="没有爬取结果")
if format == "json":
return JSONResponse(
content=result.result_data,
headers={
"Content-Disposition": f"attachment; filename=university_{university_id}_result.json"
}
)
raise HTTPException(status_code=400, detail="不支持的格式")

167
backend/app/api/scripts.py Normal file
View File

@ -0,0 +1,167 @@
"""爬虫脚本API"""
from typing import List
from fastapi import APIRouter, Depends, HTTPException, BackgroundTasks
from sqlalchemy.orm import Session
from ..database import get_db
from ..models import University, ScraperScript
from ..schemas.script import (
ScriptCreate,
ScriptResponse,
GenerateScriptRequest,
GenerateScriptResponse
)
from ..services.script_generator import generate_scraper_script
router = APIRouter()
@router.post("/generate", response_model=GenerateScriptResponse)
async def generate_script(
data: GenerateScriptRequest,
background_tasks: BackgroundTasks,
db: Session = Depends(get_db)
):
"""
一键生成爬虫脚本
分析大学网站结构,自动生成爬虫脚本
"""
# 检查或创建大学记录
university = db.query(University).filter(University.url == data.university_url).first()
if not university:
# 从URL提取大学名称
name = data.university_name
if not name:
from urllib.parse import urlparse
parsed = urlparse(data.university_url)
name = parsed.netloc.replace("www.", "").split(".")[0].title()
university = University(
name=name,
url=data.university_url,
status="analyzing"
)
db.add(university)
db.commit()
db.refresh(university)
else:
# 更新状态
university.status = "analyzing"
db.commit()
# 在后台执行脚本生成
background_tasks.add_task(
generate_scraper_script,
university_id=university.id,
university_url=data.university_url
)
return GenerateScriptResponse(
success=True,
university_id=university.id,
script_id=None,
message="正在分析网站结构并生成爬虫脚本...",
status="analyzing"
)
@router.get("/university/{university_id}", response_model=List[ScriptResponse])
def get_university_scripts(
university_id: int,
db: Session = Depends(get_db)
):
"""获取大学的所有爬虫脚本"""
scripts = db.query(ScraperScript).filter(
ScraperScript.university_id == university_id
).order_by(ScraperScript.version.desc()).all()
return scripts
@router.get("/{script_id}", response_model=ScriptResponse)
def get_script(
script_id: int,
db: Session = Depends(get_db)
):
"""获取脚本详情"""
script = db.query(ScraperScript).filter(ScraperScript.id == script_id).first()
if not script:
raise HTTPException(status_code=404, detail="脚本不存在")
return script
@router.post("", response_model=ScriptResponse)
def create_script(
data: ScriptCreate,
db: Session = Depends(get_db)
):
"""手动创建脚本"""
# 检查大学是否存在
university = db.query(University).filter(University.id == data.university_id).first()
if not university:
raise HTTPException(status_code=404, detail="大学不存在")
# 获取当前最高版本
max_version = db.query(ScraperScript).filter(
ScraperScript.university_id == data.university_id
).count()
script = ScraperScript(
university_id=data.university_id,
script_name=data.script_name,
script_content=data.script_content,
config_content=data.config_content,
version=max_version + 1,
status="active"
)
db.add(script)
db.commit()
db.refresh(script)
# 更新大学状态
university.status = "ready"
db.commit()
return script
@router.put("/{script_id}", response_model=ScriptResponse)
def update_script(
script_id: int,
data: ScriptCreate,
db: Session = Depends(get_db)
):
"""更新脚本"""
script = db.query(ScraperScript).filter(ScraperScript.id == script_id).first()
if not script:
raise HTTPException(status_code=404, detail="脚本不存在")
script.script_content = data.script_content
if data.config_content:
script.config_content = data.config_content
db.commit()
db.refresh(script)
return script
@router.delete("/{script_id}")
def delete_script(
script_id: int,
db: Session = Depends(get_db)
):
"""删除脚本"""
script = db.query(ScraperScript).filter(ScraperScript.id == script_id).first()
if not script:
raise HTTPException(status_code=404, detail="脚本不存在")
db.delete(script)
db.commit()
return {"message": "删除成功"}

View File

@ -0,0 +1,165 @@
"""大学管理API"""
from typing import List, Optional
from fastapi import APIRouter, Depends, HTTPException, Query
from sqlalchemy.orm import Session
from ..database import get_db
from ..models import University, ScrapeResult
from ..schemas.university import (
UniversityCreate,
UniversityUpdate,
UniversityResponse,
UniversityListResponse
)
router = APIRouter()
@router.get("", response_model=UniversityListResponse)
def list_universities(
skip: int = Query(0, ge=0),
limit: int = Query(20, ge=1, le=100),
search: Optional[str] = None,
db: Session = Depends(get_db)
):
"""获取大学列表"""
query = db.query(University)
if search:
query = query.filter(University.name.ilike(f"%{search}%"))
total = query.count()
universities = query.order_by(University.created_at.desc()).offset(skip).limit(limit).all()
# 添加统计信息
items = []
for uni in universities:
# 获取最新结果
latest_result = db.query(ScrapeResult).filter(
ScrapeResult.university_id == uni.id
).order_by(ScrapeResult.created_at.desc()).first()
items.append(UniversityResponse(
id=uni.id,
name=uni.name,
url=uni.url,
country=uni.country,
description=uni.description,
status=uni.status,
created_at=uni.created_at,
updated_at=uni.updated_at,
scripts_count=len(uni.scripts),
jobs_count=len(uni.jobs),
latest_result={
"schools_count": latest_result.schools_count,
"programs_count": latest_result.programs_count,
"faculty_count": latest_result.faculty_count,
"created_at": latest_result.created_at.isoformat()
} if latest_result else None
))
return UniversityListResponse(total=total, items=items)
@router.post("", response_model=UniversityResponse)
def create_university(
data: UniversityCreate,
db: Session = Depends(get_db)
):
"""创建大学"""
# 检查是否已存在
existing = db.query(University).filter(University.url == data.url).first()
if existing:
raise HTTPException(status_code=400, detail="该大学URL已存在")
university = University(**data.model_dump())
db.add(university)
db.commit()
db.refresh(university)
return UniversityResponse(
id=university.id,
name=university.name,
url=university.url,
country=university.country,
description=university.description,
status=university.status,
created_at=university.created_at,
updated_at=university.updated_at,
scripts_count=0,
jobs_count=0,
latest_result=None
)
@router.get("/{university_id}", response_model=UniversityResponse)
def get_university(
university_id: int,
db: Session = Depends(get_db)
):
"""获取大学详情"""
university = db.query(University).filter(University.id == university_id).first()
if not university:
raise HTTPException(status_code=404, detail="大学不存在")
# 获取最新结果
latest_result = db.query(ScrapeResult).filter(
ScrapeResult.university_id == university.id
).order_by(ScrapeResult.created_at.desc()).first()
return UniversityResponse(
id=university.id,
name=university.name,
url=university.url,
country=university.country,
description=university.description,
status=university.status,
created_at=university.created_at,
updated_at=university.updated_at,
scripts_count=len(university.scripts),
jobs_count=len(university.jobs),
latest_result={
"schools_count": latest_result.schools_count,
"programs_count": latest_result.programs_count,
"faculty_count": latest_result.faculty_count,
"created_at": latest_result.created_at.isoformat()
} if latest_result else None
)
@router.put("/{university_id}", response_model=UniversityResponse)
def update_university(
university_id: int,
data: UniversityUpdate,
db: Session = Depends(get_db)
):
"""更新大学信息"""
university = db.query(University).filter(University.id == university_id).first()
if not university:
raise HTTPException(status_code=404, detail="大学不存在")
update_data = data.model_dump(exclude_unset=True)
for field, value in update_data.items():
setattr(university, field, value)
db.commit()
db.refresh(university)
return get_university(university_id, db)
@router.delete("/{university_id}")
def delete_university(
university_id: int,
db: Session = Depends(get_db)
):
"""删除大学"""
university = db.query(University).filter(University.id == university_id).first()
if not university:
raise HTTPException(status_code=404, detail="大学不存在")
db.delete(university)
db.commit()
return {"message": "删除成功"}

37
backend/app/config.py Normal file
View File

@ -0,0 +1,37 @@
"""应用配置"""
from pydantic_settings import BaseSettings
from typing import Optional
class Settings(BaseSettings):
"""应用设置"""
# 应用配置
APP_NAME: str = "University Scraper API"
APP_VERSION: str = "1.0.0"
DEBUG: bool = True
# 数据库配置
DATABASE_URL: str = "sqlite:///./university_scraper.db" # 开发环境使用SQLite
# 生产环境使用: postgresql://user:password@localhost/university_scraper
# Redis配置 (用于Celery任务队列)
REDIS_URL: str = "redis://localhost:6379/0"
# CORS配置
CORS_ORIGINS: list = ["http://localhost:3000", "http://127.0.0.1:3000"]
# Agent配置 (用于自动生成脚本)
OPENROUTER_API_KEY: Optional[str] = None
# 文件存储路径
SCRIPTS_DIR: str = "./scripts"
RESULTS_DIR: str = "./results"
class Config:
env_file = ".env"
case_sensitive = True
settings = Settings()

35
backend/app/database.py Normal file
View File

@ -0,0 +1,35 @@
"""数据库连接和会话管理"""
from sqlalchemy import create_engine
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker
from .config import settings
# 创建数据库引擎
engine = create_engine(
settings.DATABASE_URL,
connect_args={"check_same_thread": False} if "sqlite" in settings.DATABASE_URL else {},
echo=settings.DEBUG
)
# 创建会话工厂
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)
# 声明基类
Base = declarative_base()
def get_db():
"""获取数据库会话 (依赖注入)"""
db = SessionLocal()
try:
yield db
finally:
db.close()
def init_db():
"""初始化数据库 (创建所有表)"""
from .models import university, script, job, result # noqa
Base.metadata.create_all(bind=engine)

72
backend/app/main.py Normal file
View File

@ -0,0 +1,72 @@
"""
University Scraper Web API
主应用入口
"""
from fastapi import FastAPI
from fastapi.middleware.cors import CORSMiddleware
from .config import settings
from .database import init_db
from .api import api_router
# 创建应用
app = FastAPI(
title=settings.APP_NAME,
version=settings.APP_VERSION,
description="""
## 大学爬虫Web系统 API
### 功能
- 🏫 **大学管理**: 添加、编辑、删除大学
- 📜 **脚本生成**: 一键生成爬虫脚本
- 🚀 **任务执行**: 一键运行爬虫
- 📊 **数据查看**: 查看和导出爬取结果
### 数据结构
大学 → 学院 → 项目 → 导师
""",
docs_url="/docs",
redoc_url="/redoc"
)
# 配置CORS
app.add_middleware(
CORSMiddleware,
allow_origins=settings.CORS_ORIGINS,
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# 注册路由
app.include_router(api_router, prefix="/api")
@app.on_event("startup")
async def startup_event():
"""应用启动时初始化数据库"""
init_db()
@app.get("/")
async def root():
"""根路由"""
return {
"name": settings.APP_NAME,
"version": settings.APP_VERSION,
"docs": "/docs",
"api": "/api"
}
@app.get("/health")
async def health_check():
"""健康检查"""
return {"status": "healthy"}
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8000)

View File

@ -0,0 +1,8 @@
"""数据库模型"""
from .university import University
from .script import ScraperScript
from .job import ScrapeJob, ScrapeLog
from .result import ScrapeResult
__all__ = ["University", "ScraperScript", "ScrapeJob", "ScrapeLog", "ScrapeResult"]

56
backend/app/models/job.py Normal file
View File

@ -0,0 +1,56 @@
"""爬取任务模型"""
from datetime import datetime
from sqlalchemy import Column, Integer, String, DateTime, Text, ForeignKey
from sqlalchemy.orm import relationship
from ..database import Base
class ScrapeJob(Base):
"""爬取任务表"""
__tablename__ = "scrape_jobs"
id = Column(Integer, primary_key=True, index=True)
university_id = Column(Integer, ForeignKey("universities.id"), nullable=False)
script_id = Column(Integer, ForeignKey("scraper_scripts.id"))
status = Column(String(50), default="pending") # pending, running, completed, failed, cancelled
progress = Column(Integer, default=0) # 0-100 进度百分比
current_step = Column(String(255)) # 当前步骤描述
started_at = Column(DateTime)
completed_at = Column(DateTime)
error_message = Column(Text)
created_at = Column(DateTime, default=datetime.utcnow)
# 关联
university = relationship("University", back_populates="jobs")
script = relationship("ScraperScript", back_populates="jobs")
logs = relationship("ScrapeLog", back_populates="job", cascade="all, delete-orphan")
results = relationship("ScrapeResult", back_populates="job", cascade="all, delete-orphan")
def __repr__(self):
return f"<ScrapeJob(id={self.id}, status='{self.status}')>"
class ScrapeLog(Base):
"""爬取日志表"""
__tablename__ = "scrape_logs"
id = Column(Integer, primary_key=True, index=True)
job_id = Column(Integer, ForeignKey("scrape_jobs.id"), nullable=False)
level = Column(String(20), default="info") # debug, info, warning, error
message = Column(Text, nullable=False)
created_at = Column(DateTime, default=datetime.utcnow)
# 关联
job = relationship("ScrapeJob", back_populates="logs")
def __repr__(self):
return f"<ScrapeLog(id={self.id}, level='{self.level}')>"

View File

@ -0,0 +1,34 @@
"""爬取结果模型"""
from datetime import datetime
from sqlalchemy import Column, Integer, DateTime, ForeignKey, JSON
from sqlalchemy.orm import relationship
from ..database import Base
class ScrapeResult(Base):
"""爬取结果表"""
__tablename__ = "scrape_results"
id = Column(Integer, primary_key=True, index=True)
job_id = Column(Integer, ForeignKey("scrape_jobs.id"))
university_id = Column(Integer, ForeignKey("universities.id"), nullable=False)
# JSON数据: 学院 → 项目 → 导师 层级结构
result_data = Column(JSON, nullable=False)
# 统计信息
schools_count = Column(Integer, default=0)
programs_count = Column(Integer, default=0)
faculty_count = Column(Integer, default=0)
created_at = Column(DateTime, default=datetime.utcnow)
# 关联
job = relationship("ScrapeJob", back_populates="results")
university = relationship("University", back_populates="results")
def __repr__(self):
return f"<ScrapeResult(id={self.id}, programs={self.programs_count}, faculty={self.faculty_count})>"

View File

@ -0,0 +1,34 @@
"""爬虫脚本模型"""
from datetime import datetime
from sqlalchemy import Column, Integer, String, DateTime, Text, ForeignKey, JSON
from sqlalchemy.orm import relationship
from ..database import Base
class ScraperScript(Base):
"""爬虫脚本表"""
__tablename__ = "scraper_scripts"
id = Column(Integer, primary_key=True, index=True)
university_id = Column(Integer, ForeignKey("universities.id"), nullable=False)
script_name = Column(String(255), nullable=False)
script_content = Column(Text, nullable=False) # Python脚本代码
config_content = Column(JSON) # YAML配置转为JSON存储
version = Column(Integer, default=1)
status = Column(String(50), default="draft") # draft, active, deprecated, error
error_message = Column(Text)
created_at = Column(DateTime, default=datetime.utcnow)
updated_at = Column(DateTime, default=datetime.utcnow, onupdate=datetime.utcnow)
# 关联
university = relationship("University", back_populates="scripts")
jobs = relationship("ScrapeJob", back_populates="script")
def __repr__(self):
return f"<ScraperScript(id={self.id}, name='{self.script_name}')>"

View File

@ -0,0 +1,31 @@
"""大学模型"""
from datetime import datetime
from sqlalchemy import Column, Integer, String, DateTime, Text
from sqlalchemy.orm import relationship
from ..database import Base
class University(Base):
"""大学表"""
__tablename__ = "universities"
id = Column(Integer, primary_key=True, index=True)
name = Column(String(255), nullable=False, index=True)
url = Column(String(500), nullable=False)
country = Column(String(100))
description = Column(Text)
status = Column(String(50), default="pending") # pending, analyzing, ready, error
created_at = Column(DateTime, default=datetime.utcnow)
updated_at = Column(DateTime, default=datetime.utcnow, onupdate=datetime.utcnow)
# 关联
scripts = relationship("ScraperScript", back_populates="university", cascade="all, delete-orphan")
jobs = relationship("ScrapeJob", back_populates="university", cascade="all, delete-orphan")
results = relationship("ScrapeResult", back_populates="university", cascade="all, delete-orphan")
def __repr__(self):
return f"<University(id={self.id}, name='{self.name}')>"

View File

@ -0,0 +1,33 @@
"""Pydantic schemas for API"""
from .university import (
UniversityCreate,
UniversityUpdate,
UniversityResponse,
UniversityListResponse
)
from .script import (
ScriptCreate,
ScriptResponse,
GenerateScriptRequest,
GenerateScriptResponse
)
from .job import (
JobCreate,
JobResponse,
JobStatusResponse,
LogResponse
)
from .result import (
ResultResponse,
SchoolData,
ProgramData,
FacultyData
)
__all__ = [
"UniversityCreate", "UniversityUpdate", "UniversityResponse", "UniversityListResponse",
"ScriptCreate", "ScriptResponse", "GenerateScriptRequest", "GenerateScriptResponse",
"JobCreate", "JobResponse", "JobStatusResponse", "LogResponse",
"ResultResponse", "SchoolData", "ProgramData", "FacultyData"
]

View File

@ -0,0 +1,52 @@
"""爬取任务相关的Pydantic模型"""
from datetime import datetime
from typing import Optional, List
from pydantic import BaseModel
class JobCreate(BaseModel):
"""创建任务请求"""
university_id: int
script_id: Optional[int] = None
class JobResponse(BaseModel):
"""任务响应"""
id: int
university_id: int
script_id: Optional[int] = None
status: str
progress: int
current_step: Optional[str] = None
started_at: Optional[datetime] = None
completed_at: Optional[datetime] = None
error_message: Optional[str] = None
created_at: datetime
class Config:
from_attributes = True
class JobStatusResponse(BaseModel):
"""任务状态响应"""
id: int
status: str
progress: int
current_step: Optional[str] = None
logs: List["LogResponse"] = []
class LogResponse(BaseModel):
"""日志响应"""
id: int
level: str
message: str
created_at: datetime
class Config:
from_attributes = True
# 解决循环引用
JobStatusResponse.model_rebuild()

View File

@ -0,0 +1,67 @@
"""爬取结果相关的Pydantic模型"""
from datetime import datetime
from typing import Optional, List, Dict, Any
from pydantic import BaseModel
class FacultyData(BaseModel):
"""导师数据"""
name: str
url: str
title: Optional[str] = None
email: Optional[str] = None
department: Optional[str] = None
class ProgramData(BaseModel):
"""项目数据"""
name: str
url: str
degree_type: Optional[str] = None
description: Optional[str] = None
faculty_page_url: Optional[str] = None
faculty_count: int = 0
faculty: List[FacultyData] = []
class SchoolData(BaseModel):
"""学院数据"""
name: str
url: str
description: Optional[str] = None
program_count: int = 0
programs: List[ProgramData] = []
class ResultResponse(BaseModel):
"""完整结果响应"""
id: int
university_id: int
job_id: Optional[int] = None
# 统计
schools_count: int
programs_count: int
faculty_count: int
# 完整数据
result_data: Dict[str, Any]
created_at: datetime
class Config:
from_attributes = True
class ResultSummary(BaseModel):
"""结果摘要"""
id: int
university_id: int
schools_count: int
programs_count: int
faculty_count: int
created_at: datetime
class Config:
from_attributes = True

View File

@ -0,0 +1,46 @@
"""爬虫脚本相关的Pydantic模型"""
from datetime import datetime
from typing import Optional, Dict, Any
from pydantic import BaseModel
class ScriptBase(BaseModel):
"""脚本基础字段"""
script_name: str
script_content: str
config_content: Optional[Dict[str, Any]] = None
class ScriptCreate(ScriptBase):
"""创建脚本请求"""
university_id: int
class ScriptResponse(ScriptBase):
"""脚本响应"""
id: int
university_id: int
version: int
status: str
error_message: Optional[str] = None
created_at: datetime
updated_at: datetime
class Config:
from_attributes = True
class GenerateScriptRequest(BaseModel):
"""生成脚本请求"""
university_url: str
university_name: Optional[str] = None
class GenerateScriptResponse(BaseModel):
"""生成脚本响应"""
success: bool
university_id: int
script_id: Optional[int] = None
message: str
status: str # analyzing, completed, failed

View File

@ -0,0 +1,48 @@
"""大学相关的Pydantic模型"""
from datetime import datetime
from typing import Optional, List
from pydantic import BaseModel, HttpUrl
class UniversityBase(BaseModel):
"""大学基础字段"""
name: str
url: str
country: Optional[str] = None
description: Optional[str] = None
class UniversityCreate(UniversityBase):
"""创建大学请求"""
pass
class UniversityUpdate(BaseModel):
"""更新大学请求"""
name: Optional[str] = None
url: Optional[str] = None
country: Optional[str] = None
description: Optional[str] = None
class UniversityResponse(UniversityBase):
"""大学响应"""
id: int
status: str
created_at: datetime
updated_at: datetime
# 统计信息
scripts_count: int = 0
jobs_count: int = 0
latest_result: Optional[dict] = None
class Config:
from_attributes = True
class UniversityListResponse(BaseModel):
"""大学列表响应"""
total: int
items: List[UniversityResponse]

View File

@ -0,0 +1,6 @@
"""业务服务"""
from .script_generator import generate_scraper_script
from .scraper_runner import run_scraper
__all__ = ["generate_scraper_script", "run_scraper"]

View File

@ -0,0 +1,177 @@
"""
爬虫执行服务
运行爬虫脚本并保存结果
"""
import asyncio
import json
import re
import sys
import traceback
from datetime import datetime, timezone
from urllib.parse import urljoin, urlparse
from sqlalchemy.orm import Session
# Windows 上需要设置事件循环策略
if sys.platform == "win32":
asyncio.set_event_loop_policy(asyncio.WindowsProactorEventLoopPolicy())
# 导入playwright供脚本使用
try:
from playwright.async_api import async_playwright
PLAYWRIGHT_AVAILABLE = True
except ImportError:
PLAYWRIGHT_AVAILABLE = False
async_playwright = None
from ..database import SessionLocal
from ..models import ScraperScript, ScrapeJob, ScrapeLog, ScrapeResult
def run_scraper(job_id: int, script_id: int):
"""
执行爬虫的后台任务
"""
db = SessionLocal()
try:
job = db.query(ScrapeJob).filter(ScrapeJob.id == job_id).first()
script = db.query(ScraperScript).filter(ScraperScript.id == script_id).first()
if not job or not script:
return
# 更新任务状态
job.status = "running"
job.started_at = datetime.utcnow()
job.current_step = "正在初始化..."
job.progress = 5
db.commit()
_add_log(db, job_id, "info", "开始执行爬虫脚本")
# 创建日志回调函数
def log_callback(level: str, message: str):
_add_log(db, job_id, level, message)
# 执行脚本
job.current_step = "正在爬取数据..."
job.progress = 20
db.commit()
result_data = _execute_script(script.script_content, log_callback)
if result_data:
job.progress = 80
job.current_step = "正在保存结果..."
db.commit()
_add_log(db, job_id, "info", "爬取完成,正在保存结果...")
# 计算统计信息
schools = result_data.get("schools", [])
schools_count = len(schools)
programs_count = sum(len(s.get("programs", [])) for s in schools)
faculty_count = sum(
len(p.get("faculty", []))
for s in schools
for p in s.get("programs", [])
)
# 保存结果
result = ScrapeResult(
job_id=job_id,
university_id=job.university_id,
result_data=result_data,
schools_count=schools_count,
programs_count=programs_count,
faculty_count=faculty_count
)
db.add(result)
job.status = "completed"
job.progress = 100
job.current_step = "完成"
job.completed_at = datetime.utcnow()
_add_log(
db, job_id, "info",
f"爬取成功: {schools_count}个学院, {programs_count}个项目, {faculty_count}位导师"
)
else:
job.status = "failed"
job.error_message = "脚本执行无返回结果"
job.completed_at = datetime.utcnow()
_add_log(db, job_id, "error", "脚本执行失败: 无返回结果")
db.commit()
except Exception as e:
error_msg = f"执行出错: {str(e)}\n{traceback.format_exc()}"
_add_log(db, job_id, "error", error_msg)
job = db.query(ScrapeJob).filter(ScrapeJob.id == job_id).first()
if job:
job.status = "failed"
job.error_message = str(e)
job.completed_at = datetime.utcnow()
db.commit()
finally:
db.close()
def _execute_script(script_content: str, log_callback) -> dict:
"""
执行Python脚本内容
安全地在隔离环境中执行脚本
"""
if not PLAYWRIGHT_AVAILABLE:
log_callback("error", "Playwright 未安装,请运行: pip install playwright && playwright install")
return None
# 创建执行环境 - 包含脚本需要的所有模块
# 注意:使用同一个字典作为 globals 和 locals确保函数定义可以互相访问
exec_namespace = {
"__builtins__": __builtins__,
"asyncio": asyncio,
"json": json,
"re": re,
"datetime": datetime,
"timezone": timezone,
"urljoin": urljoin,
"urlparse": urlparse,
"async_playwright": async_playwright,
}
try:
# 编译并执行脚本 - 使用同一个命名空间确保函数可互相调用
exec(script_content, exec_namespace, exec_namespace)
# 获取scrape函数
scrape_func = exec_namespace.get("scrape")
if not scrape_func:
log_callback("error", "脚本中未找到 scrape 函数")
return None
# 运行异步爬虫函数
result = asyncio.run(scrape_func(output_callback=log_callback))
return result
except Exception as e:
log_callback("error", f"脚本执行异常: {str(e)}")
raise
def _add_log(db: Session, job_id: int, level: str, message: str):
"""添加日志"""
log = ScrapeLog(
job_id=job_id,
level=level,
message=message
)
db.add(log)
db.commit()

View File

@ -0,0 +1,558 @@
"""
爬虫脚本生成服务
分析大学网站结构,自动生成爬虫脚本
"""
import re
from datetime import datetime
from urllib.parse import urlparse
from sqlalchemy.orm import Session
from ..database import SessionLocal
from ..models import University, ScraperScript
# 预置的大学爬虫脚本模板
SCRAPER_TEMPLATES = {
"harvard.edu": "harvard_scraper",
"mit.edu": "generic_scraper",
"stanford.edu": "generic_scraper",
}
def generate_scraper_script(university_id: int, university_url: str):
"""
生成爬虫脚本的后台任务
1. 分析大学网站域名
2. 如果有预置模板则使用模板
3. 否则生成通用爬虫脚本
"""
db = SessionLocal()
try:
university = db.query(University).filter(University.id == university_id).first()
if not university:
return
# 解析URL获取域名
parsed = urlparse(university_url)
domain = parsed.netloc.replace("www.", "")
# 检查是否有预置模板
template_name = None
for pattern, template in SCRAPER_TEMPLATES.items():
if pattern in domain:
template_name = template
break
# 生成脚本
script_content = _generate_script_content(domain, template_name)
config_content = _generate_config_content(university.name, university_url, domain)
# 计算版本号
existing_count = db.query(ScraperScript).filter(
ScraperScript.university_id == university_id
).count()
# 保存脚本
script = ScraperScript(
university_id=university_id,
script_name=f"{domain.replace('.', '_')}_scraper",
script_content=script_content,
config_content=config_content,
version=existing_count + 1,
status="active"
)
db.add(script)
# 更新大学状态
university.status = "ready"
db.commit()
except Exception as e:
# 记录错误
if university:
university.status = "error"
db.commit()
raise e
finally:
db.close()
def _generate_script_content(domain: str, template_name: str = None) -> str:
"""生成Python爬虫脚本内容"""
if template_name == "harvard_scraper":
return '''"""
Harvard University 专用爬虫脚本
自动生成
"""
import asyncio
import json
from datetime import datetime, timezone
from playwright.async_api import async_playwright
# 学院URL映射
SCHOOL_MAPPING = {
"gsas.harvard.edu": "Graduate School of Arts and Sciences (GSAS)",
"seas.harvard.edu": "John A. Paulson School of Engineering and Applied Sciences (SEAS)",
"hbs.edu": "Harvard Business School (HBS)",
"gsd.harvard.edu": "Graduate School of Design (GSD)",
"gse.harvard.edu": "Graduate School of Education (HGSE)",
"hks.harvard.edu": "Harvard Kennedy School (HKS)",
"hls.harvard.edu": "Harvard Law School (HLS)",
"hms.harvard.edu": "Harvard Medical School (HMS)",
"hsph.harvard.edu": "T.H. Chan School of Public Health (HSPH)",
"hds.harvard.edu": "Harvard Divinity School (HDS)",
"fas.harvard.edu": "Faculty of Arts and Sciences (FAS)",
}
async def scrape(output_callback=None):
"""执行爬取"""
async with async_playwright() as p:
browser = await p.chromium.launch(headless=True)
page = await browser.new_page()
result = {
"name": "Harvard University",
"url": "https://www.harvard.edu/",
"country": "USA",
"scraped_at": datetime.now(timezone.utc).isoformat(),
"schools": []
}
# 访问项目列表页
if output_callback:
output_callback("info", "访问Harvard项目列表...")
await page.goto("https://www.harvard.edu/programs/?degree_levels=graduate")
await page.wait_for_timeout(3000)
# 提取项目数据
programs = await page.evaluate("""() => {
const items = document.querySelectorAll('[class*="records__record"]');
const programs = [];
items.forEach(item => {
const btn = item.querySelector('button[class*="title-link"]');
if (btn) {
programs.push({
name: btn.innerText.trim(),
url: ''
});
}
});
return programs;
}""")
if output_callback:
output_callback("info", f"找到 {len(programs)} 个项目")
# 简化输出
result["schools"] = [{
"name": "Graduate Programs",
"url": "https://www.harvard.edu/programs/",
"programs": [{"name": p["name"], "url": p["url"], "faculty": []} for p in programs[:50]]
}]
await browser.close()
return result
if __name__ == "__main__":
result = asyncio.run(scrape())
print(json.dumps(result, indent=2, ensure_ascii=False))
'''
# 通用爬虫模板 - 深度爬取硕士项目
# 使用字符串拼接来避免 f-string 和 JavaScript 引号冲突
return _build_generic_scraper_template(domain)
def _build_generic_scraper_template(domain: str) -> str:
"""构建通用爬虫模板"""
# JavaScript code blocks (use raw strings to avoid escaping issues)
js_check_courses = r'''() => {
const links = document.querySelectorAll('a[href]');
let courseCount = 0;
for (const a of links) {
const href = a.href.toLowerCase();
if (/\/\d{4,}\//.test(href) ||
/\/(msc|ma|mba|mres|llm|med|meng)-/.test(href) ||
/\/course\/[a-z]/.test(href)) {
courseCount++;
}
}
return courseCount;
}'''
js_find_list_url = r'''() => {
const links = document.querySelectorAll('a[href]');
for (const a of links) {
const text = a.innerText.toLowerCase();
const href = a.href.toLowerCase();
if ((text.includes('a-z') || text.includes('all course') ||
text.includes('full list') || text.includes('browse all') ||
href.includes('/list')) &&
(href.includes('master') || href.includes('course') || href.includes('postgrad'))) {
return a.href;
}
}
return null;
}'''
js_find_courses_from_home = r'''() => {
const links = document.querySelectorAll('a[href]');
for (const a of links) {
const href = a.href.toLowerCase();
const text = a.innerText.toLowerCase();
if ((href.includes('master') || href.includes('postgraduate') || href.includes('graduate')) &&
(href.includes('course') || href.includes('program') || href.includes('degree'))) {
return a.href;
}
}
return null;
}'''
js_extract_programs = r'''() => {
const programs = [];
const seen = new Set();
const currentHost = window.location.hostname;
document.querySelectorAll('a[href]').forEach(a => {
const href = a.href;
const text = a.innerText.trim().replace(/\s+/g, ' ');
if (!href || seen.has(href)) return;
if (text.length < 5 || text.length > 200) return;
if (href.includes('#') || href.includes('javascript:') || href.includes('mailto:')) return;
try {
const linkHost = new URL(href).hostname;
if (!linkHost.includes(currentHost.replace('www.', '')) &&
!currentHost.includes(linkHost.replace('www.', ''))) return;
} catch {
return;
}
const hrefLower = href.toLowerCase();
const textLower = text.toLowerCase();
const isNavigation = textLower === 'courses' ||
textLower === 'programmes' ||
textLower === 'undergraduate' ||
textLower === 'postgraduate' ||
textLower === 'masters' ||
textLower === "master's" ||
textLower.includes('skip to') ||
textLower.includes('share') ||
textLower === 'home' ||
textLower === 'study' ||
textLower.startsWith('a-z') ||
textLower.includes('admission') ||
textLower.includes('fees and funding') ||
textLower.includes('why should') ||
textLower.includes('why manchester') ||
textLower.includes('teaching and learning') ||
textLower.includes('meet us') ||
textLower.includes('student support') ||
textLower.includes('contact us') ||
textLower.includes('how to apply') ||
hrefLower.includes('/admissions/') ||
hrefLower.includes('/fees-and-funding/') ||
hrefLower.includes('/why-') ||
hrefLower.includes('/meet-us/') ||
hrefLower.includes('/contact-us/') ||
hrefLower.includes('/student-support/') ||
hrefLower.includes('/teaching-and-learning/') ||
hrefLower.endsWith('/courses/') ||
hrefLower.endsWith('/masters/') ||
hrefLower.endsWith('/postgraduate/');
if (isNavigation) return;
const isExcluded = hrefLower.includes('/undergraduate') ||
hrefLower.includes('/bachelor') ||
hrefLower.includes('/phd/') ||
hrefLower.includes('/doctoral') ||
hrefLower.includes('/research-degree') ||
textLower.includes('bachelor') ||
textLower.includes('undergraduate') ||
(textLower.includes('phd') && !textLower.includes('mphil'));
if (isExcluded) return;
const hasNumericId = /\/\d{4,}\//.test(href);
const hasDegreeSlug = /\/(msc|ma|mba|mres|llm|med|meng|mpa|mph|mphil)-[a-z]/.test(hrefLower);
const isCoursePage = (hrefLower.includes('/course/') ||
hrefLower.includes('/courses/list/') ||
hrefLower.includes('/programme/')) &&
href.split('/').filter(p => p).length > 4;
const textHasDegree = /\b(msc|ma|mba|mres|llm|med|meng|pgcert|pgdip)\b/i.test(text) ||
textLower.includes('master');
if (hasNumericId || hasDegreeSlug || isCoursePage || textHasDegree) {
seen.add(href);
programs.push({
name: text,
url: href
});
}
});
return programs;
}'''
js_extract_faculty = r'''() => {
const faculty = [];
const seen = new Set();
document.querySelectorAll('a[href]').forEach(a => {
const href = a.href.toLowerCase();
const text = a.innerText.trim();
if (seen.has(href)) return;
if (text.length < 3 || text.length > 100) return;
const isStaff = href.includes('/people/') ||
href.includes('/staff/') ||
href.includes('/faculty/') ||
href.includes('/profile/') ||
href.includes('/academics/') ||
href.includes('/researcher/');
if (isStaff) {
seen.add(href);
faculty.push({
name: text.replace(/\s+/g, ' '),
url: a.href
});
}
});
return faculty.slice(0, 20);
}'''
university_name = domain.split('.')[0].title()
template = f'''"""
通用大学爬虫脚本
目标: {domain}
自动生成 - 深度爬取硕士项目和导师信息
"""
import asyncio
import json
import re
from datetime import datetime, timezone
from urllib.parse import urljoin, urlparse
from playwright.async_api import async_playwright
MASTERS_PATHS = [
"/study/masters/courses/list/",
"/study/masters/courses/",
"/postgraduate/taught/courses/",
"/postgraduate/courses/list/",
"/postgraduate/courses/",
"/graduate/programs/",
"/academics/graduate/programs/",
"/programmes/masters/",
"/masters/programmes/",
"/admissions/graduate/programs/",
]
JS_CHECK_COURSES = """{js_check_courses}"""
JS_FIND_LIST_URL = """{js_find_list_url}"""
JS_FIND_COURSES_FROM_HOME = """{js_find_courses_from_home}"""
JS_EXTRACT_PROGRAMS = """{js_extract_programs}"""
JS_EXTRACT_FACULTY = """{js_extract_faculty}"""
async def find_course_list_page(page, base_url, output_callback):
for path in MASTERS_PATHS:
test_url = base_url.rstrip('/') + path
try:
response = await page.goto(test_url, wait_until="domcontentloaded", timeout=15000)
if response and response.status == 200:
title = await page.title()
if '404' not in title.lower() and 'not found' not in title.lower():
has_courses = await page.evaluate(JS_CHECK_COURSES)
if has_courses > 5:
if output_callback:
output_callback("info", f"Found course list: {{path}} ({{has_courses}} courses)")
return test_url
list_url = await page.evaluate(JS_FIND_LIST_URL)
if list_url:
if output_callback:
output_callback("info", f"Found full course list: {{list_url}}")
return list_url
except:
continue
try:
await page.goto(base_url, wait_until="domcontentloaded", timeout=30000)
await page.wait_for_timeout(2000)
courses_url = await page.evaluate(JS_FIND_COURSES_FROM_HOME)
if courses_url:
return courses_url
except:
pass
return None
async def extract_course_links(page, output_callback):
return await page.evaluate(JS_EXTRACT_PROGRAMS)
async def scrape(output_callback=None):
async with async_playwright() as p:
browser = await p.chromium.launch(headless=True)
context = await browser.new_context(
user_agent="Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36"
)
page = await context.new_page()
base_url = "https://www.{domain}/"
result = {{
"name": "{university_name} University",
"url": base_url,
"scraped_at": datetime.now(timezone.utc).isoformat(),
"schools": []
}}
all_programs = []
try:
if output_callback:
output_callback("info", "Searching for masters course list...")
courses_url = await find_course_list_page(page, base_url, output_callback)
if not courses_url:
if output_callback:
output_callback("warning", "Course list not found, using homepage")
courses_url = base_url
if output_callback:
output_callback("info", "Extracting masters programs...")
await page.goto(courses_url, wait_until="domcontentloaded", timeout=30000)
await page.wait_for_timeout(3000)
for _ in range(3):
try:
load_more = page.locator('button:has-text("Load more"), button:has-text("Show more"), button:has-text("View more"), a:has-text("Load more")')
if await load_more.count() > 0:
await load_more.first.click()
await page.wait_for_timeout(2000)
else:
break
except:
break
programs_data = await extract_course_links(page, output_callback)
if output_callback:
output_callback("info", f"Found {{len(programs_data)}} masters programs")
max_detail_pages = min(len(programs_data), 30)
for i, prog in enumerate(programs_data[:max_detail_pages]):
try:
if output_callback and i % 10 == 0:
output_callback("info", f"Processing {{i+1}}/{{max_detail_pages}}: {{prog['name'][:50]}}")
await page.goto(prog['url'], wait_until="domcontentloaded", timeout=15000)
await page.wait_for_timeout(800)
faculty_data = await page.evaluate(JS_EXTRACT_FACULTY)
all_programs.append({{
"name": prog['name'],
"url": prog['url'],
"faculty": faculty_data
}})
except:
all_programs.append({{
"name": prog['name'],
"url": prog['url'],
"faculty": []
}})
for prog in programs_data[max_detail_pages:]:
all_programs.append({{
"name": prog['name'],
"url": prog['url'],
"faculty": []
}})
result["schools"] = [{{
"name": "Masters Programs",
"url": courses_url,
"programs": all_programs
}}]
if output_callback:
total_faculty = sum(len(p.get('faculty', [])) for p in all_programs)
output_callback("info", f"Done! {{len(all_programs)}} programs, {{total_faculty}} faculty")
except Exception as e:
if output_callback:
output_callback("error", f"Scraping error: {{str(e)}}")
finally:
await browser.close()
return result
if __name__ == "__main__":
result = asyncio.run(scrape())
print(json.dumps(result, indent=2, ensure_ascii=False))
'''
return template
def _generate_config_content(name: str, url: str, domain: str) -> dict:
"""生成配置内容"""
return {
"university": {
"name": name,
"url": url,
"domain": domain
},
"scraper": {
"headless": True,
"timeout": 30000,
"wait_time": 2000
},
"paths_to_try": [
"/programs",
"/academics/programs",
"/graduate",
"/degrees",
"/admissions/graduate"
],
"selectors": {
"program_item": "div.program, li.program, article.program, a[href*='/program']",
"faculty_item": "div.faculty, li.person, .profile-card"
},
"generated_at": datetime.utcnow().isoformat()
}

View File

@ -0,0 +1 @@
"""Celery任务 (可选,用于生产环境)"""