Files
ProxyPool/app/services/scheduler_service.py

94 lines
3.2 KiB
Python

"""调度器服务 - 定时触发全量验证"""
import asyncio
from typing import Optional, Any
from app.core.execution.executor import JobExecutor
from app.core.execution.job import ValidateAllJob
from app.core.log import logger
class SchedulerService:
"""代理验证调度器
职责单一:定时循环,触发 ValidateAllJob。
不再直接持有验证队列或 ValidatorService。
"""
def __init__(
self,
executor: JobExecutor,
worker_pool: Optional[Any] = None,
interval_minutes: int = 30,
proxy_service: Optional[Any] = None,
settings_repo: Optional[Any] = None,
):
self.executor = executor
self.worker_pool = worker_pool
self.interval_minutes = interval_minutes
self._proxy_service = proxy_service
self._settings_repo = settings_repo
self.running = False
self._stop_event = asyncio.Event()
self._task: Optional[asyncio.Task] = None
async def start(self) -> None:
if self.running:
logger.warning("Scheduler already running")
return
self._stop_event.clear()
self.running = True
self._task = asyncio.create_task(self._run_loop())
logger.info("Scheduler started")
async def stop(self) -> None:
if not self.running:
return
self.running = False
self._stop_event.set()
if self._task:
self._task.cancel()
try:
await self._task
except asyncio.CancelledError:
pass
self._task = None
logger.info("Scheduler stopped")
def validate_all_now(self) -> str:
"""立即执行一次全量验证,返回 Job ID"""
job_id = self.executor.submit_job(ValidateAllJob(validator_pool=self.worker_pool))
logger.info(f"ValidateAllJob submitted: {job_id}")
return job_id
async def _run_loop(self) -> None:
"""定时循环"""
while self.running:
if self._proxy_service is not None and self._settings_repo is not None:
try:
from app.core.db import get_db
async with get_db() as db:
s = await self._settings_repo.get_all(db)
days = int(s.get("proxy_expiry_days", 7))
removed = await self._proxy_service.clean_expired(days)
if removed:
logger.info(
"Scheduler removed %s proxies (last_check older than %s days)",
removed,
days,
)
except Exception as e:
logger.error("Scheduler clean_expired failed: %s", e, exc_info=True)
try:
self.executor.submit_job(ValidateAllJob(validator_pool=self.worker_pool))
except Exception as e:
logger.error(f"Scheduler loop error: {e}", exc_info=True)
# 等待下一次
try:
await asyncio.wait_for(
self._stop_event.wait(),
timeout=self.interval_minutes * 60,
)
except asyncio.TimeoutError:
pass