diff --git a/api/deps.py b/api/deps.py index 6b08e9e..4cab0a5 100644 --- a/api/deps.py +++ b/api/deps.py @@ -36,7 +36,6 @@ def create_scheduler_service() -> SchedulerService: queue = ValidationQueue( validator=validator, proxy_repo=proxy_repo, - db_ctx=get_db, worker_count=app_settings.validator_max_concurrency, score_valid=app_settings.score_valid, score_invalid=app_settings.score_invalid, @@ -44,7 +43,3 @@ def create_scheduler_service() -> SchedulerService: score_max=app_settings.score_max, ) return SchedulerService(validation_queue=queue, proxy_repo=proxy_repo) - - -# 避免循环导入 -from core.db import get_db diff --git a/api/routes/plugins.py b/api/routes/plugins.py index 8f160d5..57856ce 100644 --- a/api/routes/plugins.py +++ b/api/routes/plugins.py @@ -2,7 +2,6 @@ from fastapi import APIRouter, Depends from services.plugin_service import PluginService from services.scheduler_service import SchedulerService -from models.schemas import PluginToggleRequest from api.deps import get_plugin_service, get_scheduler_service from core.log import logger @@ -26,7 +25,7 @@ async def list_plugins(service: PluginService = Depends(get_plugin_service)): "plugins": [ { "id": p.id, - "name": p.display_name, # 保持旧版本兼容:name 用于展示 + "name": p.display_name, "display_name": p.display_name, "description": p.description, "enabled": p.enabled, @@ -43,18 +42,47 @@ async def list_plugins(service: PluginService = Depends(get_plugin_service)): @router.put("/{plugin_id}/toggle") async def toggle_plugin( plugin_id: str, - request: PluginToggleRequest, + request: dict, service: PluginService = Depends(get_plugin_service), ): - success = await service.toggle_plugin(plugin_id, request.enabled) + enabled = request.get("enabled") + if enabled is None: + return error_response("缺少 enabled 参数", 400) + success = await service.toggle_plugin(plugin_id, enabled) if not success: return error_response("插件不存在", 404) return success_response( - f"插件 {plugin_id} 已{'启用' if request.enabled else '禁用'}", - {"plugin_id": plugin_id, "enabled": request.enabled}, + f"插件 {plugin_id} 已{'启用' if enabled else '禁用'}", + {"plugin_id": plugin_id, "enabled": enabled}, ) +@router.get("/{plugin_id}/config") +async def get_plugin_config( + plugin_id: str, + service: PluginService = Depends(get_plugin_service), +): + config = await service.get_plugin_config(plugin_id) + if config is None: + return error_response("插件不存在", 404) + return success_response("获取插件配置成功", {"plugin_id": plugin_id, "config": config}) + + +@router.post("/{plugin_id}/config") +async def update_plugin_config( + plugin_id: str, + request: dict, + service: PluginService = Depends(get_plugin_service), +): + config = request.get("config", {}) + if not isinstance(config, dict): + return error_response("config 必须是对象", 400) + success = await service.update_plugin_config(plugin_id, config) + if not success: + return error_response("插件不存在或配置无效", 404) + return success_response("保存插件配置成功", {"plugin_id": plugin_id, "config": config}) + + @router.post("/{plugin_id}/crawl") async def crawl_plugin( plugin_id: str, diff --git a/core/db.py b/core/db.py index b0e6ea9..4a577d0 100644 --- a/core/db.py +++ b/core/db.py @@ -64,11 +64,36 @@ async def init_db(): CREATE TABLE IF NOT EXISTS plugin_settings ( plugin_id TEXT PRIMARY KEY, enabled INTEGER DEFAULT 1, + config_json TEXT DEFAULT '{}', created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ) """) + # 迁移:为旧版 plugin_settings 表增加 config_json 列 + try: + await db.execute("SELECT config_json FROM plugin_settings LIMIT 1") + except Exception: + await db.execute("ALTER TABLE plugin_settings ADD COLUMN config_json TEXT DEFAULT '{}'") + logger.info("Migrated: added config_json column to plugin_settings") + + # 验证任务队列表 + await db.execute(""" + CREATE TABLE IF NOT EXISTS validation_tasks ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + ip TEXT NOT NULL, + port INTEGER NOT NULL, + protocol TEXT DEFAULT 'http', + status TEXT DEFAULT 'pending', + result TEXT, + response_time_ms REAL, + created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, + updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP + ) + """) + await db.execute("CREATE INDEX IF NOT EXISTS idx_validation_status ON validation_tasks(status)") + await db.execute("CREATE INDEX IF NOT EXISTS idx_validation_created ON validation_tasks(created_at)") + # 系统设置表 await db.execute(""" CREATE TABLE IF NOT EXISTS settings ( diff --git a/core/plugin_system/base.py b/core/plugin_system/base.py index 37aff7e..0390e3b 100644 --- a/core/plugin_system/base.py +++ b/core/plugin_system/base.py @@ -1,7 +1,7 @@ """插件基类 - 所有爬虫插件必须继承此基类""" from abc import ABC, abstractmethod from dataclasses import dataclass -from typing import List +from typing import List, Dict, Any @dataclass @@ -30,6 +30,20 @@ class BaseCrawlerPlugin(ABC): display_name: str = "" description: str = "" enabled: bool = True + default_config: Dict[str, Any] = {} + + def __init__(self): + self._config: Dict[str, Any] = dict(self.default_config or {}) + + @property + def config(self) -> Dict[str, Any]: + return self._config + + def update_config(self, updates: Dict[str, Any]) -> None: + """更新插件配置,只覆盖存在的键""" + for key, value in updates.items(): + if key in self._config: + self._config[key] = value @abstractmethod async def crawl(self) -> List[ProxyRaw]: diff --git a/core/tasks/queue.py b/core/tasks/queue.py index cc8c82c..added0e 100644 --- a/core/tasks/queue.py +++ b/core/tasks/queue.py @@ -1,24 +1,26 @@ -"""验证任务队列 - 解耦爬取与验证,支持背压控制""" +"""验证任务队列 - 解耦爬取与验证,支持背压控制和持久化""" import asyncio from typing import Optional from models.domain import ProxyRaw +from repositories.task_repo import ValidationTaskRepository +from core.db import get_db from core.log import logger class ValidationQueue: - """代理验证队列 - + """代理验证队列(支持持久化到 SQLite) + 工作流程: - 1. 爬虫将原始代理 submit() 到队列 - 2. Worker 池从队列消费并验证 + 1. 爬虫将原始代理 submit() 到队列(写入数据库 + 内存信号) + 2. Worker 池从数据库消费并验证 3. 验证通过的代理写入数据库 + 4. 服务重启时自动恢复未完成的 pending 任务 """ def __init__( self, validator, proxy_repo, - db_ctx, worker_count: int = 50, score_valid: int = 10, score_invalid: int = -5, @@ -27,16 +29,17 @@ class ValidationQueue: ): self.validator = validator self.proxy_repo = proxy_repo - self.db_ctx = db_ctx + self.task_repo = ValidationTaskRepository() self.worker_count = worker_count self.score_valid = score_valid self.score_invalid = score_invalid self.score_min = score_min self.score_max = score_max - self._queue: asyncio.Queue[Optional[ProxyRaw]] = asyncio.Queue() + self._signal: asyncio.Queue[None] = asyncio.Queue() self._workers: list[asyncio.Task] = [] self._running = False + self._db_lock = asyncio.Lock() # 统计 self.valid_count = 0 @@ -46,6 +49,16 @@ class ValidationQueue: if self._running: return self._running = True + + # 恢复之前中断的 processing 任务 + async with get_db() as db: + recovered = await self.task_repo.reset_processing(db) + pending = await self.task_repo.get_pending_count(db) + if recovered: + logger.info(f"ValidationQueue recovered {recovered} interrupted tasks") + if pending: + logger.info(f"ValidationQueue has {pending} pending tasks to process") + for i in range(self.worker_count): self._workers.append(asyncio.create_task(self._worker_loop(i))) logger.info(f"ValidationQueue started with {self.worker_count} workers") @@ -55,56 +68,75 @@ class ValidationQueue: return self._running = False for _ in self._workers: - self._queue.put_nowait(None) # sentinel + self._signal.put_nowait(None) # sentinel if self._workers: await asyncio.gather(*self._workers, return_exceptions=True) self._workers.clear() logger.info("ValidationQueue stopped") async def submit(self, proxies: list[ProxyRaw]): - """提交代理到验证队列""" - for p in proxies: - await self._queue.put(p) + """提交代理到验证队列(持久化 + 唤醒 Worker)""" + async with self._db_lock: + async with get_db() as db: + inserted = await self.task_repo.insert_batch(db, proxies) + if inserted: + for _ in range(min(inserted, self.worker_count)): + self._signal.put_nowait(None) async def submit_one(self, proxy: ProxyRaw): - await self._queue.put(proxy) + await self.submit([proxy]) async def drain(self): - """等待队列中当前所有任务处理完毕""" - await self._queue.join() + """等待队列中当前所有 pending 任务处理完毕""" + while True: + async with get_db() as db: + count = await self.task_repo.get_pending_count(db) + if count == 0: + break + await asyncio.sleep(0.5) async def _worker_loop(self, worker_id: int): while True: - item = await self._queue.get() - if item is None: - self._queue.task_done() + await self._signal.get() + self._signal.task_done() + if not self._running: break - try: - await self._validate_and_save(item) - except Exception as e: - logger.error(f"Worker {worker_id} validation error: {e}") - finally: - self._queue.task_done() + await self._process_one_task(worker_id) - async def _validate_and_save(self, proxy: ProxyRaw): - is_valid, latency = await self.validator.validate( - proxy.ip, proxy.port, proxy.protocol - ) - async with self.db_ctx() as db: - if is_valid: - await self.proxy_repo.insert_or_update( - db, proxy.ip, proxy.port, proxy.protocol, score=self.score_valid - ) - if latency: - await self.proxy_repo.update_response_time( - db, proxy.ip, proxy.port, latency + async def _process_one_task(self, worker_id: int): + """从数据库取一个任务并验证""" + async with self._db_lock: + async with get_db() as db: + task = await self.task_repo.acquire_pending(db) + if not task: + return + + proxy = ProxyRaw(task["ip"], task["port"], task["protocol"]) + try: + is_valid, latency = await self.validator.validate( + proxy.ip, proxy.port, proxy.protocol + ) + except Exception as e: + logger.error(f"Worker {worker_id} validation error: {e}") + is_valid, latency = False, 0.0 + + async with self._db_lock: + async with get_db() as db: + if is_valid: + await self.proxy_repo.insert_or_update( + db, proxy.ip, proxy.port, proxy.protocol, score=self.score_valid ) - self.valid_count += 1 - logger.debug(f"ValidationQueue: valid {proxy.ip}:{proxy.port}") - else: - # 对于新爬取的无效代理,不需要入库,直接丢弃 - self.invalid_count += 1 - logger.debug(f"ValidationQueue: invalid {proxy.ip}:{proxy.port}") + if latency: + await self.proxy_repo.update_response_time( + db, proxy.ip, proxy.port, latency + ) + await self.task_repo.complete_task(db, task["id"], True, latency) + self.valid_count += 1 + logger.debug(f"ValidationQueue: valid {proxy.ip}:{proxy.port}") + else: + await self.task_repo.complete_task(db, task["id"], False, 0.0) + self.invalid_count += 1 + logger.debug(f"ValidationQueue: invalid {proxy.ip}:{proxy.port}") def reset_stats(self): self.valid_count = 0 diff --git a/frontend/src/api/index.js b/frontend/src/api/index.js index 174edb2..ebcb3f0 100644 --- a/frontend/src/api/index.js +++ b/frontend/src/api/index.js @@ -79,6 +79,8 @@ export const proxiesAPI = { export const pluginsAPI = { getPlugins: () => api.get('/api/plugins'), togglePlugin: (pluginId, enabled) => api.put(`/api/plugins/${pluginId}/toggle`, { enabled }), + getPluginConfig: (pluginId) => api.get(`/api/plugins/${pluginId}/config`), + updatePluginConfig: (pluginId, config) => api.post(`/api/plugins/${pluginId}/config`, { config }), crawlPlugin: (pluginId) => api.post(`/api/plugins/${pluginId}/crawl`), crawlAll: () => api.post('/api/plugins/crawl-all') } diff --git a/frontend/src/services/pluginService.js b/frontend/src/services/pluginService.js index 33c0272..9c2b128 100644 --- a/frontend/src/services/pluginService.js +++ b/frontend/src/services/pluginService.js @@ -9,6 +9,14 @@ export const pluginService = { return pluginsAPI.togglePlugin(pluginId, enabled) }, + async getPluginConfig(pluginId) { + return pluginsAPI.getPluginConfig(pluginId) + }, + + async updatePluginConfig(pluginId, config) { + return pluginsAPI.updatePluginConfig(pluginId, config) + }, + async crawlPlugin(pluginId) { return pluginsAPI.crawlPlugin(pluginId) }, diff --git a/frontend/src/views/Plugins.vue b/frontend/src/views/Plugins.vue index efcfa4b..97cf040 100644 --- a/frontend/src/views/Plugins.vue +++ b/frontend/src/views/Plugins.vue @@ -74,11 +74,19 @@ - +