From 7d5eaa438ab12b3f547238b801ae77cc7c8301d9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E7=A5=80=E6=A2=A6?= <3501646051@qq.com> Date: Sun, 5 Apr 2026 18:53:33 +0800 Subject: [PATCH] feat: external plugin loading, score threshold, expiry cleanup and more improvements Made-with: Cursor --- app/api/lifespan.py | 34 +++++++--- app/api/routes/proxies.py | 5 +- app/core/config.py | 2 +- app/core/execution/job.py | 12 +++- app/core/plugin_system/registry.py | 54 +++++++++++++++ app/plugins/__init__.py | 9 +++ app/repositories/proxy_repo.py | 58 ++++++++++++---- app/services/plugin_service.py | 10 ++- app/services/proxy_service.py | 31 +++++++-- app/services/scheduler_service.py | 20 ++++++ config/app.json | 2 +- config/app.test.json | 2 +- script/reset_and_recrawl.py | 102 +++++++++++++++++++++++++++++ 13 files changed, 302 insertions(+), 39 deletions(-) create mode 100644 script/reset_and_recrawl.py diff --git a/app/api/lifespan.py b/app/api/lifespan.py index 4de40d9..c654356 100644 --- a/app/api/lifespan.py +++ b/app/api/lifespan.py @@ -14,6 +14,7 @@ from app.services.validator_service import ValidatorService from app.services.proxy_scoring import compute_proxy_quality_score from app.services.plugin_runner import PluginRunner from app.services.scheduler_service import SchedulerService +from app.services.proxy_service import ProxyService from app.api.ws_manager import ConnectionManager from app.api.realtime import stats_broadcaster_loop @@ -80,10 +81,14 @@ async def lifespan(app: FastAPI): proxy.protocol, score=q_score, ) - if latency: - await proxy_repo.update_response_time( - db, proxy.ip, proxy.port, latency - ) + rt_ms = ( + float(latency) + if latency is not None and float(latency) > 0 + else float(app_settings.score_default_latency_ms) + ) + await proxy_repo.update_response_time( + db, proxy.ip, proxy.port, rt_ms + ) else: await proxy_repo.delete(db, proxy.ip, proxy.port) else: @@ -104,10 +109,14 @@ async def lifespan(app: FastAPI): proxy.protocol, score=q_score, ) - if latency: - await proxy_repo.update_response_time( - db, proxy.ip, proxy.port, latency - ) + rt_ms = ( + float(latency) + if latency is not None and float(latency) > 0 + else float(app_settings.score_default_latency_ms) + ) + await proxy_repo.update_response_time( + db, proxy.ip, proxy.port, rt_ms + ) else: await proxy_repo.update_score( db, @@ -125,20 +134,25 @@ async def lifespan(app: FastAPI): ) await stack.enter_async_context(worker_pool) - # Job 执行器:槽位需覆盖「全部爬取」时 N 个 CrawlJob + 聚合任务 + 全量验证等 + # Job 执行器:并发槽位(crawler_max_queue_size 与插件数共同约束,避免 crawl-all 死锁) _n_plugins = len(registry.list_plugins()) - _max_jobs = max(24, _n_plugins + 8) + _floor = max(24, _n_plugins + 8) + _max_jobs = max(_floor, app_settings.crawler_max_queue_size) executor = JobExecutor(worker_pool=worker_pool, max_concurrent_jobs=_max_jobs) await stack.enter_async_context(executor) # 插件运行器 plugin_runner = PluginRunner() + proxy_service = ProxyService() + # 调度器 scheduler = SchedulerService( executor=executor, worker_pool=worker_pool, interval_minutes=db_settings.get("validate_interval_minutes", 30), + proxy_service=proxy_service, + settings_repo=settings_repo, ) # 挂载到 app.state diff --git a/app/api/routes/proxies.py b/app/api/routes/proxies.py index a1be9af..b653e4c 100644 --- a/app/api/routes/proxies.py +++ b/app/api/routes/proxies.py @@ -10,9 +10,12 @@ from app.models.schemas import ProxyListRequest, BatchDeleteRequest, ProxyDelete from app.api.deps import get_proxy_service, get_scheduler_service from app.api.common import success_response, format_proxy from app.core.exceptions import ProxyPoolException, ProxyNotFoundException +from app.core.config import settings as app_settings router = APIRouter(prefix="/api/proxies", tags=["proxies"]) +_EXPORT_MAX = int(app_settings.export_max_records) + @router.get("/stats") async def get_stats( @@ -60,7 +63,7 @@ async def get_random_proxy(service: ProxyService = Depends(get_proxy_service)): async def export_proxies( fmt: str, protocol: Optional[str] = None, - limit: int = Query(default=10000, ge=1, le=100000), + limit: int = Query(default=_EXPORT_MAX, ge=1, le=_EXPORT_MAX), service: ProxyService = Depends(get_proxy_service), ): if fmt not in ("csv", "txt", "json"): diff --git a/app/core/config.py b/app/core/config.py index 884b1ec..249f981 100644 --- a/app/core/config.py +++ b/app/core/config.py @@ -19,7 +19,7 @@ _DEFAULTS: Dict[str, Any] = { "validator_max_concurrency": 200, "validator_connect_timeout": 3, "crawler_num_validators": 50, - "crawler_max_queue_size": 500, + "crawler_max_queue_size": 48, "log_level": "INFO", "log_dir": "logs", "ws_stats_interval_seconds": 1, diff --git a/app/core/execution/job.py b/app/core/execution/job.py index 64d934b..efd523d 100644 --- a/app/core/execution/job.py +++ b/app/core/execution/job.py @@ -102,14 +102,22 @@ class CrawlJob(Job): proxies: List[ProxyRaw] = result.proxies if result else [] if proxies: + from app.core.config import settings as app_settings from app.core.db import transaction from app.repositories.proxy_repo import ProxyRepository try: + initial = max( + app_settings.score_min, + min(app_settings.score_max, int(app_settings.score_valid)), + ) async with transaction() as db: - await ProxyRepository.upsert_many_from_crawl(db, proxies, 0) + await ProxyRepository.upsert_many_from_crawl( + db, proxies, initial + ) logger.info( - f"CrawlJob {self.id}: persisted {len(proxies)} crawled proxies as pending" + f"CrawlJob {self.id}: persisted {len(proxies)} crawled proxies " + f"as pending (initial score={initial})" ) except Exception as e: logger.error( diff --git a/app/core/plugin_system/registry.py b/app/core/plugin_system/registry.py index 4d5b1c8..decbb85 100644 --- a/app/core/plugin_system/registry.py +++ b/app/core/plugin_system/registry.py @@ -1,7 +1,10 @@ """插件注册中心 - 显式注册,类型安全,测试友好""" import importlib +import importlib.util import inspect import os +import sys +from pathlib import Path from typing import Dict, List, Type, Optional from app.core.plugin_system.base import BaseCrawlerPlugin from app.core.log import logger @@ -77,6 +80,57 @@ class PluginRegistry: except Exception as e: logger.error(f"Failed to load module {module_name}: {e}") + def load_external_plugins_directory(self, directory: Path) -> int: + """从项目下任意目录加载 ``BaseCrawlerPlugin`` 子类(每个 ``.py`` 一个模块)。 + + 与内置 ``app.plugins`` 并存;若 ``name`` 与已注册插件冲突则跳过并打日志。 + """ + directory = Path(directory).resolve() + if not directory.is_dir(): + logger.info("外部插件目录不存在,已跳过: %s", directory) + return 0 + loaded = 0 + for path in sorted(directory.glob("*.py")): + if path.name.startswith("_"): + continue + mod_name = f"proxypool_ext_{path.stem}_{abs(hash(str(path))) % 10_000_000_000}" + try: + spec = importlib.util.spec_from_file_location(mod_name, path) + if spec is None or spec.loader is None: + continue + module = importlib.util.module_from_spec(spec) + sys.modules[mod_name] = module + spec.loader.exec_module(module) + for attr_name in dir(module): + obj = getattr(module, attr_name) + if ( + inspect.isclass(obj) + and issubclass(obj, BaseCrawlerPlugin) + and obj is not BaseCrawlerPlugin + and obj not in self._plugins.values() + ): + if not getattr(obj, "name", None): + logger.warning( + "跳过外部插件类(缺少 name): %s in %s", + obj.__name__, + path, + ) + continue + if obj.name in self._plugins: + logger.warning( + "外部插件 %s 与已注册插件重名,已跳过: %s", + obj.name, + path, + ) + continue + self.register(obj) + loaded += 1 + except Exception as e: + logger.error("加载外部插件失败 %s: %s", path, e, exc_info=True) + if loaded: + logger.info("从 %s 额外加载 %s 个插件", directory, loaded) + return loaded + # 全局注册中心实例 registry = PluginRegistry() diff --git a/app/plugins/__init__.py b/app/plugins/__init__.py index b3922d0..56a443a 100644 --- a/app/plugins/__init__.py +++ b/app/plugins/__init__.py @@ -37,3 +37,12 @@ registry.register(FpwPremproxyPlugin) registry.register(FpwFreeproxylistsPlugin) registry.register(FpwGatherproxyPlugin) registry.register(FpwCheckerproxyPlugin) + +# 可选:从 config 的 plugins_dir 加载用户插件(根目录下目录,非 app/plugins 包) +from pathlib import Path + +from app.core.config import settings as _app_settings +from app.core.config_paths import project_root as _project_root + +_ext_dir = _project_root() / _app_settings.plugins_dir +registry.load_external_plugins_directory(_ext_dir) diff --git a/app/repositories/proxy_repo.py b/app/repositories/proxy_repo.py index 139ae2c..c4dc3a7 100644 --- a/app/repositories/proxy_repo.py +++ b/app/repositories/proxy_repo.py @@ -3,6 +3,8 @@ import aiosqlite from datetime import datetime, timedelta from typing import List, Optional, Tuple, Union +from app.core.config import settings as app_settings + from app.models.domain import Proxy, ProxyRaw from app.core.log import logger @@ -54,10 +56,12 @@ class ProxyRepository: ip: str, port: int, protocol: str = "http", - score: int = 10, + score: Optional[int] = None, ) -> bool: if protocol not in VALID_PROTOCOLS: protocol = "http" + if score is None: + score = int(app_settings.score_valid) try: await db.execute( """ @@ -85,7 +89,7 @@ class ProxyRepository: protocol: str = "http", initial_score: int = 0, ) -> None: - """爬取入库:待验证状态(validated=0, score=0);再次爬取同一条则重置为待验证。""" + """爬取入库:待验证(validated=0);score 由 initial_score 决定(通常来自配置 score_valid)。""" if protocol not in VALID_PROTOCOLS: protocol = "http" await db.execute( @@ -232,13 +236,17 @@ class ProxyRepository: return None @staticmethod - async def get_random(db: aiosqlite.Connection) -> Optional[Proxy]: + async def get_random( + db: aiosqlite.Connection, min_score: int = 1 + ) -> Optional[Proxy]: + ms = max(1, int(min_score)) async with db.execute( f""" SELECT {_SELECT_PROXY_COLS} FROM proxies - WHERE validated = 1 AND score > 0 + WHERE validated = 1 AND score >= ? ORDER BY RANDOM() LIMIT 1 - """ + """, + (ms,), ) as cursor: row = await cursor.fetchone() if row: @@ -306,12 +314,18 @@ class ProxyRepository: protocol: Optional[str] = None, batch_size: int = 1000, only_usable: bool = False, + usable_min_score: int = 1, ): """流式分批读取代理,避免一次性加载大量数据到内存""" offset = 0 while True: batch = await ProxyRepository._list_batch_offset( - db, protocol, batch_size, offset, only_usable=only_usable + db, + protocol, + batch_size, + offset, + only_usable=only_usable, + usable_min_score=usable_min_score, ) if not batch: break @@ -325,12 +339,15 @@ class ProxyRepository: batch_size: int, offset: int, only_usable: bool, + usable_min_score: int = 1, ) -> List[Proxy]: query = f"SELECT {_SELECT_PROXY_COLS} FROM proxies" params: List = [] clauses = [] if only_usable: - clauses.append("validated = 1 AND score > 0") + ms = max(1, int(usable_min_score)) + clauses.append("validated = 1 AND score >= ?") + params.append(ms) if protocol: clauses.append("protocol = ?") params.append(protocol.lower()) @@ -396,12 +413,16 @@ class ProxyRepository: return proxies, total @staticmethod - async def get_stats(db: aiosqlite.Connection) -> dict: + async def get_stats( + db: aiosqlite.Connection, low_score_threshold: int = 0 + ) -> dict: """统计快照。 协议计数(http/https/socks*)仅含已验证且 score>0 的可用代理,供首页图表与「可用」口径一致。 pending_* 为待验证池(validated=0)按协议分布。 + invalid_count:已验证且 score<=0,或 score 低于系统「最低分」阈值(阈值>0 时)。 """ + thr = max(0, int(low_score_threshold)) query = """ SELECT COUNT(*) as total, @@ -416,12 +437,12 @@ class ProxyRepository: COUNT(CASE WHEN validated = 0 AND protocol = 'https' THEN 1 END) as pending_https_count, COUNT(CASE WHEN validated = 0 AND protocol = 'socks4' THEN 1 END) as pending_socks4_count, COUNT(CASE WHEN validated = 0 AND protocol = 'socks5' THEN 1 END) as pending_socks5_count, - COUNT(CASE WHEN validated = 1 AND score <= 0 THEN 1 END) as invalid_count, + COUNT(CASE WHEN validated = 1 AND (score <= 0 OR (? > 0 AND score < ?)) THEN 1 END) as invalid_count, (SELECT AVG(response_time_ms) FROM proxies WHERE validated = 1 AND score > 0 AND response_time_ms IS NOT NULL AND response_time_ms > 0) as avg_response_ms FROM proxies """ - async with db.execute(query) as cursor: + async with db.execute(query, (thr, thr)) as cursor: row = await cursor.fetchone() if row: avg_lat = row[13] @@ -477,10 +498,19 @@ class ProxyRepository: return 0 @staticmethod - async def clean_invalid(db: aiosqlite.Connection) -> int: - await db.execute( - "DELETE FROM proxies WHERE validated = 1 AND score <= 0" - ) + async def clean_invalid( + db: aiosqlite.Connection, low_score_threshold: int = 0 + ) -> int: + thr = max(0, int(low_score_threshold)) + if thr > 0: + await db.execute( + "DELETE FROM proxies WHERE validated = 1 AND (score <= 0 OR score < ?)", + (thr,), + ) + else: + await db.execute( + "DELETE FROM proxies WHERE validated = 1 AND score <= 0" + ) await db.commit() return db.total_changes diff --git a/app/services/plugin_service.py b/app/services/plugin_service.py index 4158e33..e637c0a 100644 --- a/app/services/plugin_service.py +++ b/app/services/plugin_service.py @@ -5,7 +5,12 @@ from typing import List, Optional from app.core.db import get_db from app.core.plugin_system.registry import registry from app.core.plugin_system.base import BaseCrawlerPlugin -from app.core.exceptions import PluginNotFoundException, ValidationException +from app.core.exceptions import ( + PluginNotFoundException, + ProxyPoolException, + ValidationException, +) +from app.core.config import settings as app_settings from app.repositories.settings_repo import PluginSettingsRepository from app.models.domain import PluginInfo, ProxyRaw, CrawlResult from app.core.log import logger @@ -110,7 +115,8 @@ class PluginService: async def run_all_plugins(self, plugin_runner) -> List[ProxyRaw]: """执行所有启用插件的爬取,限制并发数以避免触发目标站反爬""" all_results: List[ProxyRaw] = [] - semaphore = asyncio.Semaphore(5) + n = max(1, int(app_settings.crawler_num_validators)) + semaphore = asyncio.Semaphore(n) async def _run_with_limit(plugin_name: str): plugin = self.get_plugin_or_raise(plugin_name) diff --git a/app/services/proxy_service.py b/app/services/proxy_service.py index 8865ad8..569eec9 100644 --- a/app/services/proxy_service.py +++ b/app/services/proxy_service.py @@ -7,6 +7,7 @@ from typing import List, Optional, Tuple, AsyncIterator from app.core.db import get_db from app.repositories.proxy_repo import ProxyRepository +from app.repositories.settings_repo import SettingsRepository from app.models.domain import Proxy from app.core.log import logger from app.core.config import settings as app_settings @@ -19,7 +20,9 @@ class ProxyService: async def get_stats(self) -> dict: async with get_db() as db: - stats = await self.proxy_repo.get_stats(db) + s = await SettingsRepository.get_all(db) + floor = int(s.get("min_proxy_score", 0)) + stats = await self.proxy_repo.get_stats(db, low_score_threshold=floor) stats["today_new"] = await self.proxy_repo.get_today_new_count(db) return stats @@ -49,7 +52,10 @@ class ProxyService: async def get_random_proxy(self) -> Optional[Proxy]: async with get_db() as db: - p = await self.proxy_repo.get_random(db) + s = await SettingsRepository.get_all(db) + floor = int(s.get("min_proxy_score", 0)) + ms = max(1, floor) + p = await self.proxy_repo.get_random(db, min_score=ms) if not p: return None new_uc = int(getattr(p, "use_count", 0) or 0) + 1 @@ -73,7 +79,9 @@ class ProxyService: async def clean_invalid(self) -> int: async with get_db() as db: - return await self.proxy_repo.clean_invalid(db) + s = await SettingsRepository.get_all(db) + floor = int(s.get("min_proxy_score", 0)) + return await self.proxy_repo.clean_invalid(db, low_score_threshold=floor) async def clean_expired(self, days: int) -> int: async with get_db() as db: @@ -83,8 +91,11 @@ class ProxyService: self, fmt: str, protocol: Optional[str] = None, - limit: int = 10000, + limit: Optional[int] = None, ) -> AsyncIterator[str]: + cap = int(app_settings.export_max_records) if limit is None else int(limit) + if cap < 1: + cap = 1 if fmt == "csv": yield "\ufeffIP,Port,Protocol,Score,Last Check\n" elif fmt == "txt": @@ -95,11 +106,17 @@ class ProxyService: exported = 0 async with get_db() as db: + s = await SettingsRepository.get_all(db) + floor = max(1, int(s.get("min_proxy_score", 0))) async for batch in self.proxy_repo.iter_batches( - db, protocol=protocol, batch_size=1000, only_usable=True + db, + protocol=protocol, + batch_size=1000, + only_usable=True, + usable_min_score=floor, ): for p in batch: - if exported >= limit: + if exported >= cap: break if fmt == "csv": yield f"{p.ip},{p.port},{p.protocol},{p.score},{self._fmt_time(p.last_check)}\n" @@ -117,7 +134,7 @@ class ProxyService: yield prefix + json.dumps(item, ensure_ascii=False) first = False exported += 1 - if exported >= limit: + if exported >= cap: break if fmt == "json": diff --git a/app/services/scheduler_service.py b/app/services/scheduler_service.py index a00bf16..58ef756 100644 --- a/app/services/scheduler_service.py +++ b/app/services/scheduler_service.py @@ -19,10 +19,14 @@ class SchedulerService: executor: JobExecutor, worker_pool: Optional[Any] = None, interval_minutes: int = 30, + proxy_service: Optional[Any] = None, + settings_repo: Optional[Any] = None, ): self.executor = executor self.worker_pool = worker_pool self.interval_minutes = interval_minutes + self._proxy_service = proxy_service + self._settings_repo = settings_repo self.running = False self._stop_event = asyncio.Event() self._task: Optional[asyncio.Task] = None @@ -59,6 +63,22 @@ class SchedulerService: async def _run_loop(self) -> None: """定时循环""" while self.running: + if self._proxy_service is not None and self._settings_repo is not None: + try: + from app.core.db import get_db + + async with get_db() as db: + s = await self._settings_repo.get_all(db) + days = int(s.get("proxy_expiry_days", 7)) + removed = await self._proxy_service.clean_expired(days) + if removed: + logger.info( + "Scheduler removed %s proxies (last_check older than %s days)", + removed, + days, + ) + except Exception as e: + logger.error("Scheduler clean_expired failed: %s", e, exc_info=True) try: self.executor.submit_job(ValidateAllJob(validator_pool=self.worker_pool)) except Exception as e: diff --git a/config/app.json b/config/app.json index 8f25ae8..4d7a02d 100644 --- a/config/app.json +++ b/config/app.json @@ -6,7 +6,7 @@ "validator_max_concurrency": 200, "validator_connect_timeout": 3, "crawler_num_validators": 50, - "crawler_max_queue_size": 500, + "crawler_max_queue_size": 48, "log_level": "INFO", "log_dir": "logs", "ws_stats_interval_seconds": 1, diff --git a/config/app.test.json b/config/app.test.json index 2834299..53fad50 100644 --- a/config/app.test.json +++ b/config/app.test.json @@ -6,7 +6,7 @@ "validator_max_concurrency": 200, "validator_connect_timeout": 3, "crawler_num_validators": 50, - "crawler_max_queue_size": 500, + "crawler_max_queue_size": 48, "log_level": "INFO", "log_dir": "logs", "ws_stats_interval_seconds": 1, diff --git a/script/reset_and_recrawl.py b/script/reset_and_recrawl.py new file mode 100644 index 0000000..6292852 --- /dev/null +++ b/script/reset_and_recrawl.py @@ -0,0 +1,102 @@ +"""清空 proxies 表,并依次执行各启用插件爬取;可选触发运行中 API 的全量验证。 + +用法(在项目根目录):: + python script/reset_and_recrawl.py + python script/reset_and_recrawl.py --api-base http://127.0.0.1:18080 + python script/reset_and_recrawl.py --skip-validate +""" +from __future__ import annotations + +import argparse +import asyncio +import sys +from pathlib import Path + +# 项目根 +_ROOT = Path(__file__).resolve().parents[1] +if str(_ROOT) not in sys.path: + sys.path.insert(0, str(_ROOT)) + + +async def _main(api_base: str, skip_validate: bool) -> None: + from app.core.db import init_db, get_db, transaction + from app.repositories.proxy_repo import ProxyRepository + from app.core.config import settings + import app.plugins # noqa: F401 — 注册内置与外部插件 + from app.core.plugin_system.registry import registry + from app.services.plugin_runner import PluginRunner + + await init_db() + async with get_db() as db: + await db.execute("DELETE FROM proxies") + await db.commit() + print("已清空表 proxies") + + initial = max( + settings.score_min, + min(settings.score_max, int(settings.score_valid)), + ) + runner = PluginRunner() + total_in = 0 + for plugin in registry.list_plugins(): + if not plugin.enabled: + print(f"[跳过] {plugin.name}(已禁用)") + continue + print(f"[爬取] {plugin.name} …", flush=True) + try: + result = await runner.run(plugin) + proxies = result.proxies or [] + if not proxies: + err = result.error or "无数据" + print(f" -> 0 条 ({err})") + continue + async with transaction() as db: + await ProxyRepository.upsert_many_from_crawl(db, proxies, initial) + total_in += len(proxies) + print(f" -> {len(proxies)} 条已入库(待验证)") + except Exception as e: + print(f" -> 失败: {e}") + + print(f"爬取阶段结束,累计入库约 {total_in} 条(去重前按插件计)。") + + if skip_validate: + print("已跳过远程全量验证。请启动 API 后执行 POST /api/scheduler/validate-now") + return + + try: + import httpx + except ImportError: + print("未安装 httpx,跳过远程全量验证。") + return + + url = api_base.rstrip("/") + "/api/scheduler/validate-now" + try: + async with httpx.AsyncClient(timeout=60.0) as client: + r = await client.post(url) + data = r.json() if r.headers.get("content-type", "").startswith("application/json") else {} + if r.status_code == 200 and data.get("code") == 200: + print("已提交全量验证:", data.get("data")) + else: + print(f"全量验证请求异常 HTTP {r.status_code}: {data or r.text[:200]}") + except Exception as e: + print(f"无法连接 API({url}):{e}") + + +def main() -> None: + p = argparse.ArgumentParser(description="清空代理并逐插件爬取") + p.add_argument( + "--api-base", + default="http://127.0.0.1:18080", + help="运行中的 ProxyPool API 根地址,用于提交全量验证", + ) + p.add_argument( + "--skip-validate", + action="store_true", + help="不调用 HTTP 全量验证", + ) + args = p.parse_args() + asyncio.run(_main(args.api_base, args.skip_validate)) + + +if __name__ == "__main__": + main()