"""验证任务队列 - 解耦爬取与验证,支持背压控制""" import asyncio from typing import Optional from models.domain import ProxyRaw from core.log import logger class ValidationQueue: """代理验证队列 工作流程: 1. 爬虫将原始代理 submit() 到队列 2. Worker 池从队列消费并验证 3. 验证通过的代理写入数据库 """ def __init__( self, validator, proxy_repo, db_ctx, worker_count: int = 50, score_valid: int = 10, score_invalid: int = -5, score_min: int = 0, score_max: int = 100, ): self.validator = validator self.proxy_repo = proxy_repo self.db_ctx = db_ctx self.worker_count = worker_count self.score_valid = score_valid self.score_invalid = score_invalid self.score_min = score_min self.score_max = score_max self._queue: asyncio.Queue[Optional[ProxyRaw]] = asyncio.Queue() self._workers: list[asyncio.Task] = [] self._running = False # 统计 self.valid_count = 0 self.invalid_count = 0 async def start(self): if self._running: return self._running = True for i in range(self.worker_count): self._workers.append(asyncio.create_task(self._worker_loop(i))) logger.info(f"ValidationQueue started with {self.worker_count} workers") async def stop(self): if not self._running: return self._running = False for _ in self._workers: self._queue.put_nowait(None) # sentinel if self._workers: await asyncio.gather(*self._workers, return_exceptions=True) self._workers.clear() logger.info("ValidationQueue stopped") async def submit(self, proxies: list[ProxyRaw]): """提交代理到验证队列""" for p in proxies: await self._queue.put(p) async def submit_one(self, proxy: ProxyRaw): await self._queue.put(proxy) async def drain(self): """等待队列中当前所有任务处理完毕""" await self._queue.join() async def _worker_loop(self, worker_id: int): while True: item = await self._queue.get() if item is None: self._queue.task_done() break try: await self._validate_and_save(item) except Exception as e: logger.error(f"Worker {worker_id} validation error: {e}") finally: self._queue.task_done() async def _validate_and_save(self, proxy: ProxyRaw): is_valid, latency = await self.validator.validate( proxy.ip, proxy.port, proxy.protocol ) async with self.db_ctx() as db: if is_valid: await self.proxy_repo.insert_or_update( db, proxy.ip, proxy.port, proxy.protocol, score=self.score_valid ) if latency: await self.proxy_repo.update_response_time( db, proxy.ip, proxy.port, latency ) self.valid_count += 1 logger.debug(f"ValidationQueue: valid {proxy.ip}:{proxy.port}") else: # 对于新爬取的无效代理,不需要入库,直接丢弃 self.invalid_count += 1 logger.debug(f"ValidationQueue: invalid {proxy.ip}:{proxy.port}") def reset_stats(self): self.valid_count = 0 self.invalid_count = 0