Files
ProxyPool/app/services/plugin_runner.py
祀梦 0131c8b408 feat: fpw plugins, validation/crawl perf, WS stats, test DB isolation
- Add Free_Proxy_Website-style fpw_* plugins and register them
- Per-plugin crawl timeout (crawl_timeout_seconds=120); remove global crawl_timeout setting
- Validator: fix connect vs total timeout on save; SOCKS session LRU cache; drop redundant semaphore
- Validation handler uses single DB connection; batch upsert after crawl; WorkerPool put_nowait
- Remove unused max_retries from settings API/UI; settings maintenance SQL + init_db cleanup of deprecated keys
- WebSocket dashboard stats; ProxyList pool_filter and API alignment
- POST /api/proxies/delete-one for IPv6-safe deletes; task poll stops on 404
- pytest uses PROXYPOOL_DB_PATH=db/proxies.test.sqlite so tests do not wipe production DB
- .gitignore: explicit proxies.test.sqlite patterns; fix plugin_service ValidationException import

Made-with: Cursor
2026-04-05 13:39:19 +08:00

96 lines
3.5 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""插件统一执行器 - 封装超时、重试、健康检查、错误捕获"""
import asyncio
from datetime import datetime
from typing import Optional
from app.core.plugin_system.base import BaseCrawlerPlugin
from app.core.log import logger
from app.models.domain import CrawlResult, ProxyRaw
class PluginRunner:
"""统一插件执行器
- 超时:每插件独立,使用 plugin.crawl_timeout_seconds默认 120s
- 可选 crawl_timeout_override仅用于测试等场景覆盖插件自身限时
- 异常捕获和统计更新、健康检查前置、结果去重
"""
def __init__(self, crawl_timeout_override: Optional[float] = None):
self.crawl_timeout_override = crawl_timeout_override
async def run(self, plugin: BaseCrawlerPlugin) -> CrawlResult:
"""执行单个插件爬取"""
result = CrawlResult(plugin_name=plugin.name)
# 健康检查(可选)
try:
healthy = await asyncio.wait_for(
plugin.health_check(), timeout=5.0
)
if not healthy:
result.error = "health check failed"
result.failure_count = 1
await self._save_stats(plugin, result)
return result
except Exception as e:
logger.warning(f"Plugin {plugin.name} health check error: {e}")
result.error = f"health check error: {e}"
result.failure_count = 1
await self._save_stats(plugin, result)
return result
crawl_limit = float(getattr(plugin, "crawl_timeout_seconds", 120.0))
if self.crawl_timeout_override is not None:
crawl_limit = float(self.crawl_timeout_override)
try:
proxies = await asyncio.wait_for(
plugin.crawl(),
timeout=crawl_limit,
)
result.proxies = self._dedup(proxies)
result.success_count = len(result.proxies)
logger.info(
f"Plugin {plugin.name} crawled {len(result.proxies)} unique proxies"
)
except asyncio.TimeoutError:
result.error = f"crawl timeout after {crawl_limit}s"
result.failure_count = 1
logger.error(f"Plugin {plugin.name} crawl timeout")
except Exception as e:
result.error = str(e)
result.failure_count = 1
logger.error(f"Plugin {plugin.name} crawl failed: {e}", exc_info=True)
await self._save_stats(plugin, result)
return result
@staticmethod
def _dedup(proxies: list[ProxyRaw]) -> list[ProxyRaw]:
seen = set()
unique = []
for p in proxies:
key = (p.ip, p.port, p.protocol)
if key not in seen:
seen.add(key)
unique.append(p)
return unique
async def _save_stats(self, plugin: BaseCrawlerPlugin, result: CrawlResult) -> None:
"""将爬取统计持久化到数据库"""
from app.core.db import get_db
from app.repositories.settings_repo import PluginSettingsRepository
repo = PluginSettingsRepository()
payload = {
"success_count": result.success_count,
"failure_count": result.failure_count,
"last_run": datetime.now().isoformat(),
}
try:
async with get_db() as db:
await repo.set_stats(db, plugin.name, payload)
except Exception as e:
logger.error(f"Failed to save stats for {plugin.name}: {e}")