"""Job 定义 - 所有后台异步任务的统一抽象""" import uuid from abc import ABC, abstractmethod from dataclasses import dataclass, field from datetime import datetime from enum import Enum from typing import Any, Dict, List, Optional from app.models.domain import ProxyRaw class JobStatus(Enum): PENDING = "pending" RUNNING = "running" COMPLETED = "completed" FAILED = "failed" CANCELLED = "cancelled" @dataclass class Job(ABC): """后台任务基类""" id: str = field(default_factory=lambda: str(uuid.uuid4())) status: JobStatus = JobStatus.PENDING progress: float = 0.0 # 0-100 result: Any = None error: Optional[str] = None created_at: datetime = field(default_factory=datetime.now) updated_at: datetime = field(default_factory=datetime.now) _cancelled: bool = field(default=False, repr=False) def cancel(self) -> None: """请求取消任务""" self._cancelled = True if self.status in (JobStatus.PENDING, JobStatus.RUNNING): self.status = JobStatus.CANCELLED self._touch() def _touch(self) -> None: self.updated_at = datetime.now() def _set_running(self) -> None: self.status = JobStatus.RUNNING self._touch() def _set_completed(self, result: Any = None) -> None: self.status = JobStatus.COMPLETED self.result = result self.progress = 100.0 self._touch() def _set_failed(self, error: str) -> None: self.status = JobStatus.FAILED self.error = error self._touch() @property def is_cancelled(self) -> bool: return self._cancelled @abstractmethod async def run(self) -> Any: """执行任务的核心逻辑,子类必须实现""" raise NotImplementedError def to_dict(self) -> Dict[str, Any]: return { "id": self.id, "status": self.status.value, "progress": round(self.progress, 2), "result": self.result, "error": self.error, "created_at": self.created_at.isoformat(), "updated_at": self.updated_at.isoformat(), } @dataclass class CrawlJob(Job): """插件爬取任务""" plugin_id: str = "" plugin_runner: Any = field(repr=False, default=None) proxy_service: Any = field(repr=False, default=None) validator_pool: Any = field(repr=False, default=None) async def run(self) -> Dict[str, Any]: from app.services.plugin_service import PluginService from app.core.log import logger self._set_running() if not self.plugin_runner: raise RuntimeError("plugin_runner is not set") plugin_service = self.proxy_service or PluginService() plugin = plugin_service.get_plugin(self.plugin_id) if not plugin: raise ValueError(f"Plugin '{self.plugin_id}' not found") result = await self.plugin_runner.run(plugin) proxies: List[ProxyRaw] = result.proxies if result else [] if proxies: from app.core.config import settings as app_settings from app.core.db import transaction from app.repositories.proxy_repo import ProxyRepository try: initial = max( app_settings.score_min, min(app_settings.score_max, int(app_settings.score_valid)), ) async with transaction() as db: await ProxyRepository.upsert_many_from_crawl( db, proxies, initial ) logger.info( f"CrawlJob {self.id}: persisted {len(proxies)} crawled proxies " f"as pending (initial score={initial})" ) except Exception as e: logger.error( f"CrawlJob {self.id}: failed to persist crawled proxies: {e}", exc_info=True, ) raise if proxies and self.validator_pool: from app.core.db import get_db as _get_db from app.repositories.settings_repo import ( SettingsRepository, DEFAULT_SETTINGS, ) async with _get_db() as db: db_settings = await SettingsRepository.get_all(db) if db_settings.get( "auto_validate_after_crawl", DEFAULT_SETTINGS["auto_validate_after_crawl"], ): await self.validator_pool.submit(proxies) logger.info( f"CrawlJob {self.id}: submitted {len(proxies)} proxies for immediate validation" ) crawl_failed = bool(result and (result.failure_count > 0 or result.error)) payload = { "plugin_id": self.plugin_id, "proxy_count": len(proxies), "crawl_failed": crawl_failed, "error": result.error if result else None, # 与持久化统计一致:success_count=本次爬到的条数,failure_count=是否失败(0/1) "success_count": len(proxies), "failure_count": result.failure_count if result else 0, } self._set_completed(payload) return payload @dataclass class ValidateAllJob(Job): """全量验证任务 - 验证数据库中所有存量代理""" proxy_repo: Any = field(repr=False, default=None) validator_pool: Any = field(repr=False, default=None) batch_size: int = 100 async def run(self) -> Dict[str, Any]: from app.repositories.proxy_repo import ProxyRepository from app.core.db import get_db from app.core.log import logger self._set_running() repo = self.proxy_repo or ProxyRepository() async with get_db() as db: proxies = await repo.list_for_validation(db) if not proxies: self._set_completed({"total": 0, "submitted": 0}) return self.result total = len(proxies) submitted = 0 for i in range(0, total, self.batch_size): if self.is_cancelled: logger.info(f"ValidateAllJob {self.id}: cancelled") break batch = proxies[i : i + self.batch_size] raws = [ProxyRaw(p.ip, p.port, p.protocol) for p in batch] if self.validator_pool: await self.validator_pool.submit(raws) submitted += len(raws) self.progress = min(100.0, (submitted / total) * 100) self._touch() payload = {"total": total, "submitted": submitted} if self.is_cancelled: self.status = JobStatus.CANCELLED self._touch() else: self._set_completed(payload) logger.info(f"ValidateAllJob {self.id}: submitted {submitted}/{total} proxies") return payload