""" 爬虫脚本生成服务 分析大学网站结构,自动生成爬虫脚本 """ import re from datetime import datetime from urllib.parse import urlparse from sqlalchemy.orm import Session from ..database import SessionLocal from ..models import University, ScraperScript # 预置的大学爬虫脚本模板 SCRAPER_TEMPLATES = { "harvard.edu": "harvard_scraper", "mit.edu": "generic_scraper", "stanford.edu": "generic_scraper", } def generate_scraper_script(university_id: int, university_url: str): """ 生成爬虫脚本的后台任务 1. 分析大学网站域名 2. 如果有预置模板则使用模板 3. 否则生成通用爬虫脚本 """ db = SessionLocal() try: university = db.query(University).filter(University.id == university_id).first() if not university: return # 解析URL获取域名 parsed = urlparse(university_url) domain = parsed.netloc.replace("www.", "") # 检查是否有预置模板 template_name = None for pattern, template in SCRAPER_TEMPLATES.items(): if pattern in domain: template_name = template break # 生成脚本 script_content = _generate_script_content(domain, template_name) config_content = _generate_config_content(university.name, university_url, domain) # 计算版本号 existing_count = db.query(ScraperScript).filter( ScraperScript.university_id == university_id ).count() # 保存脚本 script = ScraperScript( university_id=university_id, script_name=f"{domain.replace('.', '_')}_scraper", script_content=script_content, config_content=config_content, version=existing_count + 1, status="active" ) db.add(script) # 更新大学状态 university.status = "ready" db.commit() except Exception as e: # 记录错误 if university: university.status = "error" db.commit() raise e finally: db.close() def _generate_script_content(domain: str, template_name: str = None) -> str: """生成Python爬虫脚本内容""" if template_name == "harvard_scraper": return '''""" Harvard University 专用爬虫脚本 自动生成 """ import asyncio import json from datetime import datetime, timezone from playwright.async_api import async_playwright # 学院URL映射 SCHOOL_MAPPING = { "gsas.harvard.edu": "Graduate School of Arts and Sciences (GSAS)", "seas.harvard.edu": "John A. Paulson School of Engineering and Applied Sciences (SEAS)", "hbs.edu": "Harvard Business School (HBS)", "gsd.harvard.edu": "Graduate School of Design (GSD)", "gse.harvard.edu": "Graduate School of Education (HGSE)", "hks.harvard.edu": "Harvard Kennedy School (HKS)", "hls.harvard.edu": "Harvard Law School (HLS)", "hms.harvard.edu": "Harvard Medical School (HMS)", "hsph.harvard.edu": "T.H. Chan School of Public Health (HSPH)", "hds.harvard.edu": "Harvard Divinity School (HDS)", "fas.harvard.edu": "Faculty of Arts and Sciences (FAS)", } async def scrape(output_callback=None): """执行爬取""" async with async_playwright() as p: browser = await p.chromium.launch(headless=True) page = await browser.new_page() result = { "name": "Harvard University", "url": "https://www.harvard.edu/", "country": "USA", "scraped_at": datetime.now(timezone.utc).isoformat(), "schools": [] } # 访问项目列表页 if output_callback: output_callback("info", "访问Harvard项目列表...") await page.goto("https://www.harvard.edu/programs/?degree_levels=graduate") await page.wait_for_timeout(3000) # 提取项目数据 programs = await page.evaluate("""() => { const items = document.querySelectorAll('[class*="records__record"]'); const programs = []; items.forEach(item => { const btn = item.querySelector('button[class*="title-link"]'); if (btn) { programs.push({ name: btn.innerText.trim(), url: '' }); } }); return programs; }""") if output_callback: output_callback("info", f"找到 {len(programs)} 个项目") # 简化输出 result["schools"] = [{ "name": "Graduate Programs", "url": "https://www.harvard.edu/programs/", "programs": [{"name": p["name"], "url": p["url"], "faculty": []} for p in programs[:50]] }] await browser.close() return result if __name__ == "__main__": result = asyncio.run(scrape()) print(json.dumps(result, indent=2, ensure_ascii=False)) ''' # 通用爬虫模板 - 深度爬取硕士项目 # 使用字符串拼接来避免 f-string 和 JavaScript 引号冲突 return _build_generic_scraper_template(domain) def _build_generic_scraper_template(domain: str) -> str: """构建通用爬虫模板""" # JavaScript code blocks (use raw strings to avoid escaping issues) js_check_courses = r'''() => { const links = document.querySelectorAll('a[href]'); let courseCount = 0; for (const a of links) { const href = a.href.toLowerCase(); if (/\/\d{4,}\//.test(href) || /\/(msc|ma|mba|mres|llm|med|meng)-/.test(href) || /\/course\/[a-z]/.test(href)) { courseCount++; } } return courseCount; }''' js_find_list_url = r'''() => { const links = document.querySelectorAll('a[href]'); for (const a of links) { const text = a.innerText.toLowerCase(); const href = a.href.toLowerCase(); if ((text.includes('a-z') || text.includes('all course') || text.includes('full list') || text.includes('browse all') || href.includes('/list')) && (href.includes('master') || href.includes('course') || href.includes('postgrad'))) { return a.href; } } return null; }''' js_find_courses_from_home = r'''() => { const links = document.querySelectorAll('a[href]'); for (const a of links) { const href = a.href.toLowerCase(); const text = a.innerText.toLowerCase(); if ((href.includes('master') || href.includes('postgraduate') || href.includes('graduate')) && (href.includes('course') || href.includes('program') || href.includes('degree'))) { return a.href; } } return null; }''' js_extract_programs = r'''() => { const programs = []; const seen = new Set(); const currentHost = window.location.hostname; document.querySelectorAll('a[href]').forEach(a => { const href = a.href; const text = a.innerText.trim().replace(/\s+/g, ' '); if (!href || seen.has(href)) return; if (text.length < 5 || text.length > 200) return; if (href.includes('#') || href.includes('javascript:') || href.includes('mailto:')) return; try { const linkHost = new URL(href).hostname; if (!linkHost.includes(currentHost.replace('www.', '')) && !currentHost.includes(linkHost.replace('www.', ''))) return; } catch { return; } const hrefLower = href.toLowerCase(); const textLower = text.toLowerCase(); const isNavigation = textLower === 'courses' || textLower === 'programmes' || textLower === 'undergraduate' || textLower === 'postgraduate' || textLower === 'masters' || textLower === "master's" || textLower.includes('skip to') || textLower.includes('share') || textLower === 'home' || textLower === 'study' || textLower.startsWith('a-z') || textLower.includes('admission') || textLower.includes('fees and funding') || textLower.includes('why should') || textLower.includes('why manchester') || textLower.includes('teaching and learning') || textLower.includes('meet us') || textLower.includes('student support') || textLower.includes('contact us') || textLower.includes('how to apply') || hrefLower.includes('/admissions/') || hrefLower.includes('/fees-and-funding/') || hrefLower.includes('/why-') || hrefLower.includes('/meet-us/') || hrefLower.includes('/contact-us/') || hrefLower.includes('/student-support/') || hrefLower.includes('/teaching-and-learning/') || hrefLower.endsWith('/courses/') || hrefLower.endsWith('/masters/') || hrefLower.endsWith('/postgraduate/'); if (isNavigation) return; const isExcluded = hrefLower.includes('/undergraduate') || hrefLower.includes('/bachelor') || hrefLower.includes('/phd/') || hrefLower.includes('/doctoral') || hrefLower.includes('/research-degree') || textLower.includes('bachelor') || textLower.includes('undergraduate') || (textLower.includes('phd') && !textLower.includes('mphil')); if (isExcluded) return; const hasNumericId = /\/\d{4,}\//.test(href); const hasDegreeSlug = /\/(msc|ma|mba|mres|llm|med|meng|mpa|mph|mphil)-[a-z]/.test(hrefLower); const isCoursePage = (hrefLower.includes('/course/') || hrefLower.includes('/courses/list/') || hrefLower.includes('/programme/')) && href.split('/').filter(p => p).length > 4; const textHasDegree = /\b(msc|ma|mba|mres|llm|med|meng|pgcert|pgdip)\b/i.test(text) || textLower.includes('master'); if (hasNumericId || hasDegreeSlug || isCoursePage || textHasDegree) { seen.add(href); programs.push({ name: text, url: href }); } }); return programs; }''' js_extract_faculty = r'''() => { const faculty = []; const seen = new Set(); document.querySelectorAll('a[href]').forEach(a => { const href = a.href.toLowerCase(); const text = a.innerText.trim(); if (seen.has(href)) return; if (text.length < 3 || text.length > 100) return; const isStaff = href.includes('/people/') || href.includes('/staff/') || href.includes('/faculty/') || href.includes('/profile/') || href.includes('/academics/') || href.includes('/researcher/'); if (isStaff) { seen.add(href); faculty.push({ name: text.replace(/\s+/g, ' '), url: a.href }); } }); return faculty.slice(0, 20); }''' university_name = domain.split('.')[0].title() template = f'''""" 通用大学爬虫脚本 目标: {domain} 自动生成 - 深度爬取硕士项目和导师信息 """ import asyncio import json import re from datetime import datetime, timezone from urllib.parse import urljoin, urlparse from playwright.async_api import async_playwright MASTERS_PATHS = [ "/study/masters/courses/list/", "/study/masters/courses/", "/postgraduate/taught/courses/", "/postgraduate/courses/list/", "/postgraduate/courses/", "/graduate/programs/", "/academics/graduate/programs/", "/programmes/masters/", "/masters/programmes/", "/admissions/graduate/programs/", ] JS_CHECK_COURSES = """{js_check_courses}""" JS_FIND_LIST_URL = """{js_find_list_url}""" JS_FIND_COURSES_FROM_HOME = """{js_find_courses_from_home}""" JS_EXTRACT_PROGRAMS = """{js_extract_programs}""" JS_EXTRACT_FACULTY = """{js_extract_faculty}""" async def find_course_list_page(page, base_url, output_callback): for path in MASTERS_PATHS: test_url = base_url.rstrip('/') + path try: response = await page.goto(test_url, wait_until="domcontentloaded", timeout=15000) if response and response.status == 200: title = await page.title() if '404' not in title.lower() and 'not found' not in title.lower(): has_courses = await page.evaluate(JS_CHECK_COURSES) if has_courses > 5: if output_callback: output_callback("info", f"Found course list: {{path}} ({{has_courses}} courses)") return test_url list_url = await page.evaluate(JS_FIND_LIST_URL) if list_url: if output_callback: output_callback("info", f"Found full course list: {{list_url}}") return list_url except: continue try: await page.goto(base_url, wait_until="domcontentloaded", timeout=30000) await page.wait_for_timeout(2000) courses_url = await page.evaluate(JS_FIND_COURSES_FROM_HOME) if courses_url: return courses_url except: pass return None async def extract_course_links(page, output_callback): return await page.evaluate(JS_EXTRACT_PROGRAMS) async def scrape(output_callback=None): async with async_playwright() as p: browser = await p.chromium.launch(headless=True) context = await browser.new_context( user_agent="Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36" ) page = await context.new_page() base_url = "https://www.{domain}/" result = {{ "name": "{university_name} University", "url": base_url, "scraped_at": datetime.now(timezone.utc).isoformat(), "schools": [] }} all_programs = [] try: if output_callback: output_callback("info", "Searching for masters course list...") courses_url = await find_course_list_page(page, base_url, output_callback) if not courses_url: if output_callback: output_callback("warning", "Course list not found, using homepage") courses_url = base_url if output_callback: output_callback("info", "Extracting masters programs...") await page.goto(courses_url, wait_until="domcontentloaded", timeout=30000) await page.wait_for_timeout(3000) for _ in range(3): try: load_more = page.locator('button:has-text("Load more"), button:has-text("Show more"), button:has-text("View more"), a:has-text("Load more")') if await load_more.count() > 0: await load_more.first.click() await page.wait_for_timeout(2000) else: break except: break programs_data = await extract_course_links(page, output_callback) if output_callback: output_callback("info", f"Found {{len(programs_data)}} masters programs") max_detail_pages = min(len(programs_data), 30) for i, prog in enumerate(programs_data[:max_detail_pages]): try: if output_callback and i % 10 == 0: output_callback("info", f"Processing {{i+1}}/{{max_detail_pages}}: {{prog['name'][:50]}}") await page.goto(prog['url'], wait_until="domcontentloaded", timeout=15000) await page.wait_for_timeout(800) faculty_data = await page.evaluate(JS_EXTRACT_FACULTY) all_programs.append({{ "name": prog['name'], "url": prog['url'], "faculty": faculty_data }}) except: all_programs.append({{ "name": prog['name'], "url": prog['url'], "faculty": [] }}) for prog in programs_data[max_detail_pages:]: all_programs.append({{ "name": prog['name'], "url": prog['url'], "faculty": [] }}) result["schools"] = [{{ "name": "Masters Programs", "url": courses_url, "programs": all_programs }}] if output_callback: total_faculty = sum(len(p.get('faculty', [])) for p in all_programs) output_callback("info", f"Done! {{len(all_programs)}} programs, {{total_faculty}} faculty") except Exception as e: if output_callback: output_callback("error", f"Scraping error: {{str(e)}}") finally: await browser.close() return result if __name__ == "__main__": result = asyncio.run(scrape()) print(json.dumps(result, indent=2, ensure_ascii=False)) ''' return template def _generate_config_content(name: str, url: str, domain: str) -> dict: """生成配置内容""" return { "university": { "name": name, "url": url, "domain": domain }, "scraper": { "headless": True, "timeout": 30000, "wait_time": 2000 }, "paths_to_try": [ "/programs", "/academics/programs", "/graduate", "/degrees", "/admissions/graduate" ], "selectors": { "program_item": "div.program, li.program, article.program, a[href*='/program']", "faculty_item": "div.faculty, li.person, .profile-card" }, "generated_at": datetime.utcnow().isoformat() }