Files

206 lines
6.8 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""Job 定义 - 所有后台异步任务的统一抽象"""
import uuid
from abc import ABC, abstractmethod
from dataclasses import dataclass, field
from datetime import datetime
from enum import Enum
from typing import Any, Dict, List, Optional
from app.models.domain import ProxyRaw
class JobStatus(Enum):
PENDING = "pending"
RUNNING = "running"
COMPLETED = "completed"
FAILED = "failed"
CANCELLED = "cancelled"
@dataclass
class Job(ABC):
"""后台任务基类"""
id: str = field(default_factory=lambda: str(uuid.uuid4()))
status: JobStatus = JobStatus.PENDING
progress: float = 0.0 # 0-100
result: Any = None
error: Optional[str] = None
created_at: datetime = field(default_factory=datetime.now)
updated_at: datetime = field(default_factory=datetime.now)
_cancelled: bool = field(default=False, repr=False)
def cancel(self) -> None:
"""请求取消任务"""
self._cancelled = True
if self.status in (JobStatus.PENDING, JobStatus.RUNNING):
self.status = JobStatus.CANCELLED
self._touch()
def _touch(self) -> None:
self.updated_at = datetime.now()
def _set_running(self) -> None:
self.status = JobStatus.RUNNING
self._touch()
def _set_completed(self, result: Any = None) -> None:
self.status = JobStatus.COMPLETED
self.result = result
self.progress = 100.0
self._touch()
def _set_failed(self, error: str) -> None:
self.status = JobStatus.FAILED
self.error = error
self._touch()
@property
def is_cancelled(self) -> bool:
return self._cancelled
@abstractmethod
async def run(self) -> Any:
"""执行任务的核心逻辑,子类必须实现"""
raise NotImplementedError
def to_dict(self) -> Dict[str, Any]:
return {
"id": self.id,
"status": self.status.value,
"progress": round(self.progress, 2),
"result": self.result,
"error": self.error,
"created_at": self.created_at.isoformat(),
"updated_at": self.updated_at.isoformat(),
}
@dataclass
class CrawlJob(Job):
"""插件爬取任务"""
plugin_id: str = ""
plugin_runner: Any = field(repr=False, default=None)
proxy_service: Any = field(repr=False, default=None)
validator_pool: Any = field(repr=False, default=None)
async def run(self) -> Dict[str, Any]:
from app.services.plugin_service import PluginService
from app.core.log import logger
self._set_running()
if not self.plugin_runner:
raise RuntimeError("plugin_runner is not set")
plugin_service = self.proxy_service or PluginService()
plugin = plugin_service.get_plugin(self.plugin_id)
if not plugin:
raise ValueError(f"Plugin '{self.plugin_id}' not found")
result = await self.plugin_runner.run(plugin)
proxies: List[ProxyRaw] = result.proxies if result else []
if proxies:
from app.core.config import settings as app_settings
from app.core.db import transaction
from app.repositories.proxy_repo import ProxyRepository
try:
initial = max(
app_settings.score_min,
min(app_settings.score_max, int(app_settings.score_valid)),
)
async with transaction() as db:
await ProxyRepository.upsert_many_from_crawl(
db, proxies, initial
)
logger.info(
f"CrawlJob {self.id}: persisted {len(proxies)} crawled proxies "
f"as pending (initial score={initial})"
)
except Exception as e:
logger.error(
f"CrawlJob {self.id}: failed to persist crawled proxies: {e}",
exc_info=True,
)
raise
if proxies and self.validator_pool:
from app.core.db import get_db as _get_db
from app.repositories.settings_repo import (
SettingsRepository,
DEFAULT_SETTINGS,
)
async with _get_db() as db:
db_settings = await SettingsRepository.get_all(db)
if db_settings.get(
"auto_validate_after_crawl",
DEFAULT_SETTINGS["auto_validate_after_crawl"],
):
await self.validator_pool.submit(proxies)
logger.info(
f"CrawlJob {self.id}: submitted {len(proxies)} proxies for immediate validation"
)
crawl_failed = bool(result and (result.failure_count > 0 or result.error))
payload = {
"plugin_id": self.plugin_id,
"proxy_count": len(proxies),
"crawl_failed": crawl_failed,
"error": result.error if result else None,
# 与持久化统计一致success_count=本次爬到的条数failure_count=是否失败(0/1)
"success_count": len(proxies),
"failure_count": result.failure_count if result else 0,
}
self._set_completed(payload)
return payload
@dataclass
class ValidateAllJob(Job):
"""全量验证任务 - 验证数据库中所有存量代理"""
proxy_repo: Any = field(repr=False, default=None)
validator_pool: Any = field(repr=False, default=None)
batch_size: int = 100
async def run(self) -> Dict[str, Any]:
from app.repositories.proxy_repo import ProxyRepository
from app.core.db import get_db
from app.core.log import logger
self._set_running()
repo = self.proxy_repo or ProxyRepository()
async with get_db() as db:
proxies = await repo.list_for_validation(db)
if not proxies:
self._set_completed({"total": 0, "submitted": 0})
return self.result
total = len(proxies)
submitted = 0
for i in range(0, total, self.batch_size):
if self.is_cancelled:
logger.info(f"ValidateAllJob {self.id}: cancelled")
break
batch = proxies[i : i + self.batch_size]
raws = [ProxyRaw(p.ip, p.port, p.protocol) for p in batch]
if self.validator_pool:
await self.validator_pool.submit(raws)
submitted += len(raws)
self.progress = min(100.0, (submitted / total) * 100)
self._touch()
payload = {"total": total, "submitted": submitted}
if self.is_cancelled:
self.status = JobStatus.CANCELLED
self._touch()
else:
self._set_completed(payload)
logger.info(f"ValidateAllJob {self.id}: submitted {submitted}/{total} proxies")
return payload