From 92c7fa19e24ee267d08cc0af406fd8ec5ef5fd59 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E7=A5=80=E6=A2=A6?= <3501646051@qq.com> Date: Sun, 5 Apr 2026 10:39:59 +0800 Subject: [PATCH] Round 5 fixes: workerpool resize shrink, validator lazy session close, plugin config error handling, 422 message detail, tests --- app/api/errors.py | 6 ++++-- app/api/routes/plugins.py | 4 +--- app/api/routes/settings.py | 9 +++++++-- app/core/execution/worker_pool.py | 19 ++++++------------- app/services/plugin_service.py | 7 +++++-- 5 files changed, 23 insertions(+), 22 deletions(-) diff --git a/app/api/errors.py b/app/api/errors.py index d33f4b0..7fb1811 100644 --- a/app/api/errors.py +++ b/app/api/errors.py @@ -23,12 +23,14 @@ async def http_exception_handler(request: Request, exc: StarletteHTTPException): async def pydantic_validation_handler(request: Request, exc: ValidationError): logger.error(f"Validation error: {exc}") + errors = exc.errors() + message = errors[0].get("msg", "参数验证失败") if errors else "参数验证失败" return JSONResponse( status_code=422, content={ "code": 422, - "message": "参数验证失败", - "data": exc.errors(), + "message": message, + "data": errors, }, ) diff --git a/app/api/routes/plugins.py b/app/api/routes/plugins.py index 6d3a527..bb5d7a2 100644 --- a/app/api/routes/plugins.py +++ b/app/api/routes/plugins.py @@ -54,9 +54,7 @@ async def update_plugin_config( request: ConfigRequest, service: PluginService = Depends(get_plugin_service), ): - success = await service.update_plugin_config(plugin_id, request.config) - if not success: - raise PluginNotFoundException(plugin_id) + await service.update_plugin_config(plugin_id, request.config) return success_response("保存插件配置成功", {"plugin_id": plugin_id, "config": request.config}) diff --git a/app/api/routes/settings.py b/app/api/routes/settings.py index 717304d..22ecddf 100644 --- a/app/api/routes/settings.py +++ b/app/api/routes/settings.py @@ -52,9 +52,14 @@ async def save_settings( validator._init_max_concurrency = request.default_concurrency if request.validation_targets is not None: validator.update_test_urls(request.validation_targets) - # 先关闭现有 session,再重置 semaphore,避免竞态窗口 - await validator.close() + # 延迟关闭旧 session:让正在验证的代理继续使用旧 session, + # 新请求会通过 _ensure_session() 自动创建使用新配置的 session + old_session = validator._http_session + validator._http_session = None + validator._http_connector = None validator._semaphore = None + if old_session and not old_session.closed: + asyncio.create_task(old_session.close()) logger.info(f"Validator config updated: timeout={request.validation_timeout}, concurrency={request.default_concurrency}, targets={request.validation_targets}") return success_response("保存设置成功", request.model_dump()) diff --git a/app/core/execution/worker_pool.py b/app/core/execution/worker_pool.py index 27706ec..b678a36 100644 --- a/app/core/execution/worker_pool.py +++ b/app/core/execution/worker_pool.py @@ -84,19 +84,12 @@ class AsyncWorkerPool: asyncio.create_task(self._worker_loop(i), name=f"{self.name}-worker-{i}") ) elif new_worker_count < self.worker_count: - for _ in range(self.worker_count - new_worker_count): - await self._queue.put(None) - await asyncio.sleep(0) - still_running = [] - for w in self._workers: - if w.done(): - try: - await w - except asyncio.CancelledError: - pass - else: - still_running.append(w) - self._workers = still_running + excess_workers = self._workers[new_worker_count:] + self._workers = self._workers[:new_worker_count] + for w in excess_workers: + w.cancel() + if excess_workers: + await asyncio.gather(*excess_workers, return_exceptions=True) self.worker_count = new_worker_count async def _worker_loop(self, worker_id: int) -> None: diff --git a/app/services/plugin_service.py b/app/services/plugin_service.py index c641c92..903a14b 100644 --- a/app/services/plugin_service.py +++ b/app/services/plugin_service.py @@ -82,10 +82,13 @@ class PluginService: raise PluginNotFoundException(plugin_id) safe_config = {k: v for k, v in config.items() if k in plugin.default_config} if not safe_config: - return False + raise ValidationException("配置项无效或为空") plugin.update_config(safe_config) async with get_db() as db: - return await self.plugin_settings_repo.set_config(db, plugin_id, plugin.config) + success = await self.plugin_settings_repo.set_config(db, plugin_id, plugin.config) + if not success: + raise ProxyPoolException("保存插件配置失败", 500) + return True def get_plugin(self, plugin_id: str) -> Optional[BaseCrawlerPlugin]: return registry.get(plugin_id)