fix: 修复设置系统脱节、队列计数漂移、资源泄露等全量问题
- 统一设置系统:create_scheduler_service 读取 DB 设置覆盖默认值 - 修复 ProxyRepository.update_score 误删所有无效代理的 SQL - ValidationQueue:修复 Worker 计数漂移与启动恢复任务饿死 - SchedulerService:移除 drain() 阻塞,主循环可正常响应 stop - TaskService:在调度器周期内自动清理过期任务,防止内存泄漏 - lifespan/conftest:规范关闭顺序,消除 Event loop closed 警告 - Repository:异常日志增加 exc_info,今日新增按 created_at 统计 - ValidatorService:防止 HTTP session 重复关闭,移除 SOCKS 多余 close - 前端:补全 pluginsStore.isEmpty,ProxyList 最低分数上限改为 100 - 删除 config.py 中冗余的 cors_origins_list property
This commit is contained in:
@@ -39,7 +39,6 @@ class ValidationQueue:
|
||||
self._signal: asyncio.Queue[None] = asyncio.Queue()
|
||||
self._workers: list[asyncio.Task] = []
|
||||
self._running = False
|
||||
self._db_lock = asyncio.Lock()
|
||||
self._pending_count = 0
|
||||
self._condition = asyncio.Condition()
|
||||
|
||||
@@ -72,9 +71,9 @@ class ValidationQueue:
|
||||
for i in range(self.worker_count):
|
||||
self._workers.append(asyncio.create_task(self._worker_loop(i)))
|
||||
|
||||
# 唤醒 Worker 处理恢复的 pending 任务
|
||||
# 唤醒 Worker 处理恢复的 pending 任务(每个 Worker 一次唤醒即可,内部会循环处理)
|
||||
if pending:
|
||||
for _ in range(min(pending, self.worker_count)):
|
||||
for _ in range(self.worker_count):
|
||||
self._signal.put_nowait(None)
|
||||
|
||||
logger.info(f"ValidationQueue started with {self.worker_count} workers")
|
||||
@@ -92,9 +91,8 @@ class ValidationQueue:
|
||||
|
||||
async def submit(self, proxies: list[ProxyRaw]):
|
||||
"""提交代理到验证队列(持久化 + 唤醒 Worker)"""
|
||||
async with self._db_lock:
|
||||
async with get_db() as db:
|
||||
inserted = await self.task_repo.insert_batch(db, proxies)
|
||||
async with get_db() as db:
|
||||
inserted = await self.task_repo.insert_batch(db, proxies)
|
||||
if inserted:
|
||||
async with self._condition:
|
||||
self._pending_count += inserted
|
||||
@@ -117,46 +115,53 @@ class ValidationQueue:
|
||||
self._signal.task_done()
|
||||
if not self._running:
|
||||
break
|
||||
await self._process_one_task(worker_id)
|
||||
# 持续处理任务直到没有 pending 为止,避免信号数不足导致任务饿死
|
||||
while self._running:
|
||||
processed = await self._process_one_task(worker_id)
|
||||
if not processed:
|
||||
break
|
||||
|
||||
async def _process_one_task(self, worker_id: int):
|
||||
"""从数据库取一个任务并验证"""
|
||||
async with self._db_lock:
|
||||
async with get_db() as db:
|
||||
task = await self.task_repo.acquire_pending(db)
|
||||
if not task:
|
||||
return
|
||||
async def _process_one_task(self, worker_id: int) -> bool:
|
||||
"""从数据库取一个任务并验证。返回 True 表示确实处理了一个任务。"""
|
||||
async with get_db() as db:
|
||||
task = await self.task_repo.acquire_pending(db)
|
||||
if not task:
|
||||
return False
|
||||
|
||||
proxy = ProxyRaw(task["ip"], task["port"], task["protocol"])
|
||||
try:
|
||||
is_valid, latency = await self.validator.validate(
|
||||
proxy.ip, proxy.port, proxy.protocol
|
||||
)
|
||||
except Exception as e:
|
||||
logger.error(f"Worker {worker_id} validation error: {e}")
|
||||
is_valid, latency = False, 0.0
|
||||
proxy = ProxyRaw(task["ip"], task["port"], task["protocol"])
|
||||
try:
|
||||
is_valid, latency = await self.validator.validate(
|
||||
proxy.ip, proxy.port, proxy.protocol
|
||||
)
|
||||
except Exception as e:
|
||||
logger.error(f"Worker {worker_id} validation error: {e}", exc_info=True)
|
||||
is_valid, latency = False, 0.0
|
||||
|
||||
async with self._db_lock:
|
||||
async with get_db() as db:
|
||||
if is_valid:
|
||||
await self.proxy_repo.insert_or_update(
|
||||
db, proxy.ip, proxy.port, proxy.protocol, score=self.score_valid
|
||||
if is_valid:
|
||||
await self.proxy_repo.insert_or_update(
|
||||
db, proxy.ip, proxy.port, proxy.protocol, score=self.score_valid
|
||||
)
|
||||
if latency:
|
||||
await self.proxy_repo.update_response_time(
|
||||
db, proxy.ip, proxy.port, latency
|
||||
)
|
||||
if latency:
|
||||
await self.proxy_repo.update_response_time(
|
||||
db, proxy.ip, proxy.port, latency
|
||||
)
|
||||
await self.task_repo.complete_task(db, task["id"], True, latency)
|
||||
self.valid_count += 1
|
||||
logger.debug(f"ValidationQueue: valid {proxy.ip}:{proxy.port}")
|
||||
else:
|
||||
await self.task_repo.complete_task(db, task["id"], False, 0.0)
|
||||
self.invalid_count += 1
|
||||
logger.debug(f"ValidationQueue: invalid {proxy.ip}:{proxy.port}")
|
||||
await self.task_repo.complete_task(db, task["id"], True, latency)
|
||||
self.valid_count += 1
|
||||
logger.debug(f"ValidationQueue: valid {proxy.ip}:{proxy.port}")
|
||||
else:
|
||||
# 对已有代理扣分,分数<=0时自动删除
|
||||
await self.proxy_repo.update_score(
|
||||
db, proxy.ip, proxy.port, self.score_invalid,
|
||||
self.score_min, self.score_max
|
||||
)
|
||||
await self.task_repo.complete_task(db, task["id"], False, 0.0)
|
||||
self.invalid_count += 1
|
||||
logger.debug(f"ValidationQueue: invalid {proxy.ip}:{proxy.port}")
|
||||
async with self._condition:
|
||||
self._pending_count = max(0, self._pending_count - 1)
|
||||
if self._pending_count == 0:
|
||||
self._condition.notify_all()
|
||||
return True
|
||||
|
||||
def reset_stats(self):
|
||||
self.valid_count = 0
|
||||
|
||||
Reference in New Issue
Block a user