"""应用生命周期管理""" import asyncio from contextlib import AsyncExitStack, asynccontextmanager from fastapi import FastAPI from app.core.db import init_db, get_db from app.core.config import settings as app_settings from app.core.log import logger from app.core.execution import AsyncWorkerPool, JobExecutor from app.core.plugin_system.registry import registry from app.repositories.proxy_repo import ProxyRepository from app.repositories.settings_repo import SettingsRepository, DEFAULT_SETTINGS from app.services.validator_service import ValidatorService from app.services.plugin_runner import PluginRunner from app.services.scheduler_service import SchedulerService settings_repo = SettingsRepository() proxy_repo = ProxyRepository() async def _load_settings() -> dict: db_settings = DEFAULT_SETTINGS.copy() try: async with get_db() as db: db_settings = await settings_repo.get_all(db) except Exception as e: logger.error(f"Failed to load settings on startup: {e}") return db_settings @asynccontextmanager async def lifespan(app: FastAPI): """应用启动和关闭时的生命周期管理""" await init_db() db_settings = await _load_settings() async with AsyncExitStack() as stack: # 验证器 validator = ValidatorService( timeout=db_settings.get("validation_timeout", app_settings.validator_timeout), connect_timeout=app_settings.validator_connect_timeout, max_concurrency=db_settings.get("default_concurrency", app_settings.validator_max_concurrency), ) if db_settings.get("validation_targets") is not None: validator.update_test_urls(db_settings["validation_targets"]) # 验证 WorkerPool async def validation_handler(proxy): from app.models.domain import ProxyRaw is_valid, latency = await validator.validate( proxy.ip, proxy.port, proxy.protocol ) async with get_db() as db: if is_valid: await proxy_repo.insert_or_update( db, proxy.ip, proxy.port, proxy.protocol, score=app_settings.score_valid ) if latency: await proxy_repo.update_response_time(db, proxy.ip, proxy.port, latency) else: await proxy_repo.update_score( db, proxy.ip, proxy.port, app_settings.score_invalid, app_settings.score_min, app_settings.score_max ) worker_pool = AsyncWorkerPool( worker_count=db_settings.get("default_concurrency", app_settings.validator_max_concurrency), handler=validation_handler, name="ValidationPool", ) await stack.enter_async_context(worker_pool) # Job 执行器 executor = JobExecutor(worker_pool=worker_pool, max_concurrent_jobs=10) await stack.enter_async_context(executor) # 插件运行器 plugin_runner = PluginRunner(timeout=db_settings.get("crawl_timeout", 30)) # 调度器 scheduler = SchedulerService( executor=executor, worker_pool=worker_pool, interval_minutes=db_settings.get("validate_interval_minutes", 30), ) # 挂载到 app.state app.state.validator = validator app.state.worker_pool = worker_pool app.state.executor = executor app.state.plugin_runner = plugin_runner app.state.scheduler = scheduler # 启动调度器 if db_settings.get("auto_validate", True): try: await scheduler.start() except Exception as e: logger.error(f"Failed to start scheduler on startup: {e}") logger.info("API server started") yield # 停止调度器 await scheduler.stop() # 取消所有运行中的 Job await executor.cancel_all() # AsyncExitStack 会自动关闭 executor 和 worker_pool # 关闭验证器 session try: await validator.close() except Exception: pass # 关闭所有插件的 HTTP 客户端 for plugin in registry.list_plugins(): if hasattr(plugin, "close"): try: await plugin.close() except Exception: pass # 给 aiosqlite / aiohttp 后台线程留出收尾时间 await asyncio.sleep(0.1) logger.info("API server shutdown")