- 删除 ValidationQueue 双轨持久化队列,替换为纯内存 AsyncWorkerPool - 引入统一后台任务框架 JobExecutor(Job/CrawlJob/ValidateAllJob) - 新增 PluginRunner 统一插件执行(超时、重试、健康检查、统计) - 重构 SchedulerService 职责收敛为仅定时触发 ValidateAllJob - 使用 AsyncExitStack 重构 lifespan,安全管理长生命周期资源 - 路由层瘦身 50%+,业务异常上抛由全局中间件统一处理 - 实现设置全热更新(WorkerPool 并发、Validator 超时即时生效) - 前端 Store 强制写后重新拉取,消除乐观更新数据不同步 - 删除 queue.py / task_repo.py / task_service.py - 新增 execution 单元测试,全部 85 个测试通过
98 lines
3.3 KiB
Python
98 lines
3.3 KiB
Python
"""轻量级纯内存异步 Worker Pool"""
|
||
import asyncio
|
||
from typing import Callable, Coroutine, List, TypeVar
|
||
|
||
from app.core.log import logger
|
||
|
||
T = TypeVar("T")
|
||
Handler = Callable[[T], Coroutine[None, None, None]]
|
||
|
||
|
||
class AsyncWorkerPool:
|
||
"""纯内存异步工作池
|
||
|
||
职责单一:接收任务列表 -> 分发给 N 个 Worker 协程 -> 调用 handler 处理
|
||
不包含任何持久化逻辑,也不维护复杂的状态机。
|
||
"""
|
||
|
||
def __init__(
|
||
self,
|
||
worker_count: int,
|
||
handler: Handler,
|
||
queue_maxsize: int = 10000,
|
||
name: str = "WorkerPool",
|
||
):
|
||
self.worker_count = worker_count
|
||
self.handler = handler
|
||
self.name = name
|
||
self._queue: asyncio.Queue = asyncio.Queue(maxsize=queue_maxsize)
|
||
self._workers: List[asyncio.Task] = []
|
||
self._running = False
|
||
self._shutdown_event = asyncio.Event()
|
||
|
||
async def __aenter__(self):
|
||
await self.start()
|
||
return self
|
||
|
||
async def __aexit__(self, exc_type, exc_val, exc_tb):
|
||
await self.stop()
|
||
|
||
async def start(self) -> None:
|
||
if self._running:
|
||
return
|
||
self._running = True
|
||
self._shutdown_event.clear()
|
||
self._workers = [
|
||
asyncio.create_task(self._worker_loop(i), name=f"{self.name}-worker-{i}")
|
||
for i in range(self.worker_count)
|
||
]
|
||
logger.info(f"{self.name} started with {self.worker_count} workers")
|
||
|
||
async def stop(self) -> None:
|
||
if not self._running:
|
||
return
|
||
self._running = False
|
||
self._shutdown_event.set()
|
||
# 发送足够数量的 sentinel,确保所有 Worker 都能收到
|
||
for _ in range(len(self._workers) + self._queue.qsize()):
|
||
try:
|
||
self._queue.put_nowait(None)
|
||
except asyncio.QueueFull:
|
||
break
|
||
if self._workers:
|
||
await asyncio.gather(*self._workers, return_exceptions=True)
|
||
self._workers.clear()
|
||
logger.info(f"{self.name} stopped")
|
||
|
||
async def submit(self, items: List[T]) -> None:
|
||
"""提交一批任务到队列(阻塞直到有空位,天然背压)"""
|
||
for item in items:
|
||
await self._queue.put(item)
|
||
|
||
async def drain(self) -> None:
|
||
"""等待队列中所有任务被消费完毕"""
|
||
await self._queue.join()
|
||
|
||
async def resize(self, new_worker_count: int) -> None:
|
||
"""动态调整 Worker 数量:先全部停止,再按新数量启动"""
|
||
if new_worker_count == self.worker_count:
|
||
return
|
||
logger.info(f"{self.name} resizing from {self.worker_count} to {new_worker_count}")
|
||
# 安全做法:先 stop 再 start,避免新旧 Worker 竞争 sentinel 导致死锁
|
||
await self.stop()
|
||
self.worker_count = new_worker_count
|
||
await self.start()
|
||
|
||
async def _worker_loop(self, worker_id: int) -> None:
|
||
while True:
|
||
item = await self._queue.get()
|
||
try:
|
||
if item is None or not self._running:
|
||
self._queue.task_done()
|
||
break
|
||
await self.handler(item)
|
||
except Exception as e:
|
||
logger.error(f"{self.name} worker {worker_id} handler error: {e}", exc_info=True)
|
||
finally:
|
||
self._queue.task_done()
|