实现插件配置持久化与任务队列持久化

插件配置持久化:
- plugin_settings 表新增 config_json 字段,支持存储每个插件的自定义配置
- BaseCrawlerPlugin 新增 default_config 属性和 update_config 方法
- PluginSettingsRepository 新增 get_config / set_config 方法
- PluginService 新增 get_plugin_config 和 update_plugin_config
- api/routes/plugins.py 新增 GET /{id}/config 和 POST /{id}/config 接口
- 前端 Plugins.vue 增加配置编辑对话框,支持动态渲染数字/布尔/字符串类型配置
- ip3366 插件示例化:增加 max_pages 配置项,验证配置生效后会动态更新爬取 URL

任务队列持久化:
- 新建 validation_tasks 表:id, ip, port, protocol, status, result, response_time_ms, created_at, updated_at
- 新建 ValidationTaskRepository,提供 insert_batch / acquire_pending / complete_task / reset_processing 等方法
- ValidationQueue 重构:
  - submit() 时把任务写入数据库并唤醒 Worker
  - Worker 通过 acquire_pending 原子取任务并验证
  - 验证完成后更新任务状态并入库有效代理
  - 启动时自动恢复之前中断的 processing 任务为 pending
  - 支持 drain() 等待所有 pending 完成
- 调度器验证流程同样自动持久化到任务表

其他适配:
- 更新 api/deps.py 和 api/lifespan.py,移除对已删除 settings_service 的残留引用
- 更新前端 pluginService.js 和 api/index.js 增加配置相关 API
This commit is contained in:
祀梦
2026-04-02 12:35:06 +08:00
parent b77641f059
commit 66943df864
13 changed files with 472 additions and 73 deletions

View File

@@ -64,11 +64,36 @@ async def init_db():
CREATE TABLE IF NOT EXISTS plugin_settings (
plugin_id TEXT PRIMARY KEY,
enabled INTEGER DEFAULT 1,
config_json TEXT DEFAULT '{}',
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
""")
# 迁移:为旧版 plugin_settings 表增加 config_json 列
try:
await db.execute("SELECT config_json FROM plugin_settings LIMIT 1")
except Exception:
await db.execute("ALTER TABLE plugin_settings ADD COLUMN config_json TEXT DEFAULT '{}'")
logger.info("Migrated: added config_json column to plugin_settings")
# 验证任务队列表
await db.execute("""
CREATE TABLE IF NOT EXISTS validation_tasks (
id INTEGER PRIMARY KEY AUTOINCREMENT,
ip TEXT NOT NULL,
port INTEGER NOT NULL,
protocol TEXT DEFAULT 'http',
status TEXT DEFAULT 'pending',
result TEXT,
response_time_ms REAL,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
""")
await db.execute("CREATE INDEX IF NOT EXISTS idx_validation_status ON validation_tasks(status)")
await db.execute("CREATE INDEX IF NOT EXISTS idx_validation_created ON validation_tasks(created_at)")
# 系统设置表
await db.execute("""
CREATE TABLE IF NOT EXISTS settings (

View File

@@ -1,7 +1,7 @@
"""插件基类 - 所有爬虫插件必须继承此基类"""
from abc import ABC, abstractmethod
from dataclasses import dataclass
from typing import List
from typing import List, Dict, Any
@dataclass
@@ -30,6 +30,20 @@ class BaseCrawlerPlugin(ABC):
display_name: str = ""
description: str = ""
enabled: bool = True
default_config: Dict[str, Any] = {}
def __init__(self):
self._config: Dict[str, Any] = dict(self.default_config or {})
@property
def config(self) -> Dict[str, Any]:
return self._config
def update_config(self, updates: Dict[str, Any]) -> None:
"""更新插件配置,只覆盖存在的键"""
for key, value in updates.items():
if key in self._config:
self._config[key] = value
@abstractmethod
async def crawl(self) -> List[ProxyRaw]:

View File

@@ -1,24 +1,26 @@
"""验证任务队列 - 解耦爬取与验证,支持背压控制"""
"""验证任务队列 - 解耦爬取与验证,支持背压控制和持久化"""
import asyncio
from typing import Optional
from models.domain import ProxyRaw
from repositories.task_repo import ValidationTaskRepository
from core.db import get_db
from core.log import logger
class ValidationQueue:
"""代理验证队列
"""代理验证队列(支持持久化到 SQLite
工作流程:
1. 爬虫将原始代理 submit() 到队列
2. Worker 池从队列消费并验证
1. 爬虫将原始代理 submit() 到队列(写入数据库 + 内存信号)
2. Worker 池从数据库消费并验证
3. 验证通过的代理写入数据库
4. 服务重启时自动恢复未完成的 pending 任务
"""
def __init__(
self,
validator,
proxy_repo,
db_ctx,
worker_count: int = 50,
score_valid: int = 10,
score_invalid: int = -5,
@@ -27,16 +29,17 @@ class ValidationQueue:
):
self.validator = validator
self.proxy_repo = proxy_repo
self.db_ctx = db_ctx
self.task_repo = ValidationTaskRepository()
self.worker_count = worker_count
self.score_valid = score_valid
self.score_invalid = score_invalid
self.score_min = score_min
self.score_max = score_max
self._queue: asyncio.Queue[Optional[ProxyRaw]] = asyncio.Queue()
self._signal: asyncio.Queue[None] = asyncio.Queue()
self._workers: list[asyncio.Task] = []
self._running = False
self._db_lock = asyncio.Lock()
# 统计
self.valid_count = 0
@@ -46,6 +49,16 @@ class ValidationQueue:
if self._running:
return
self._running = True
# 恢复之前中断的 processing 任务
async with get_db() as db:
recovered = await self.task_repo.reset_processing(db)
pending = await self.task_repo.get_pending_count(db)
if recovered:
logger.info(f"ValidationQueue recovered {recovered} interrupted tasks")
if pending:
logger.info(f"ValidationQueue has {pending} pending tasks to process")
for i in range(self.worker_count):
self._workers.append(asyncio.create_task(self._worker_loop(i)))
logger.info(f"ValidationQueue started with {self.worker_count} workers")
@@ -55,56 +68,75 @@ class ValidationQueue:
return
self._running = False
for _ in self._workers:
self._queue.put_nowait(None) # sentinel
self._signal.put_nowait(None) # sentinel
if self._workers:
await asyncio.gather(*self._workers, return_exceptions=True)
self._workers.clear()
logger.info("ValidationQueue stopped")
async def submit(self, proxies: list[ProxyRaw]):
"""提交代理到验证队列"""
for p in proxies:
await self._queue.put(p)
"""提交代理到验证队列(持久化 + 唤醒 Worker"""
async with self._db_lock:
async with get_db() as db:
inserted = await self.task_repo.insert_batch(db, proxies)
if inserted:
for _ in range(min(inserted, self.worker_count)):
self._signal.put_nowait(None)
async def submit_one(self, proxy: ProxyRaw):
await self._queue.put(proxy)
await self.submit([proxy])
async def drain(self):
"""等待队列中当前所有任务处理完毕"""
await self._queue.join()
"""等待队列中当前所有 pending 任务处理完毕"""
while True:
async with get_db() as db:
count = await self.task_repo.get_pending_count(db)
if count == 0:
break
await asyncio.sleep(0.5)
async def _worker_loop(self, worker_id: int):
while True:
item = await self._queue.get()
if item is None:
self._queue.task_done()
await self._signal.get()
self._signal.task_done()
if not self._running:
break
try:
await self._validate_and_save(item)
except Exception as e:
logger.error(f"Worker {worker_id} validation error: {e}")
finally:
self._queue.task_done()
await self._process_one_task(worker_id)
async def _validate_and_save(self, proxy: ProxyRaw):
is_valid, latency = await self.validator.validate(
proxy.ip, proxy.port, proxy.protocol
)
async with self.db_ctx() as db:
if is_valid:
await self.proxy_repo.insert_or_update(
db, proxy.ip, proxy.port, proxy.protocol, score=self.score_valid
)
if latency:
await self.proxy_repo.update_response_time(
db, proxy.ip, proxy.port, latency
async def _process_one_task(self, worker_id: int):
"""从数据库取一个任务并验证"""
async with self._db_lock:
async with get_db() as db:
task = await self.task_repo.acquire_pending(db)
if not task:
return
proxy = ProxyRaw(task["ip"], task["port"], task["protocol"])
try:
is_valid, latency = await self.validator.validate(
proxy.ip, proxy.port, proxy.protocol
)
except Exception as e:
logger.error(f"Worker {worker_id} validation error: {e}")
is_valid, latency = False, 0.0
async with self._db_lock:
async with get_db() as db:
if is_valid:
await self.proxy_repo.insert_or_update(
db, proxy.ip, proxy.port, proxy.protocol, score=self.score_valid
)
self.valid_count += 1
logger.debug(f"ValidationQueue: valid {proxy.ip}:{proxy.port}")
else:
# 对于新爬取的无效代理,不需要入库,直接丢弃
self.invalid_count += 1
logger.debug(f"ValidationQueue: invalid {proxy.ip}:{proxy.port}")
if latency:
await self.proxy_repo.update_response_time(
db, proxy.ip, proxy.port, latency
)
await self.task_repo.complete_task(db, task["id"], True, latency)
self.valid_count += 1
logger.debug(f"ValidationQueue: valid {proxy.ip}:{proxy.port}")
else:
await self.task_repo.complete_task(db, task["id"], False, 0.0)
self.invalid_count += 1
logger.debug(f"ValidationQueue: invalid {proxy.ip}:{proxy.port}")
def reset_stats(self):
self.valid_count = 0