diff --git a/app/api/lifespan.py b/app/api/lifespan.py index c84ac3e..d2794bf 100644 --- a/app/api/lifespan.py +++ b/app/api/lifespan.py @@ -106,8 +106,10 @@ async def lifespan(app: FastAPI): ) await stack.enter_async_context(worker_pool) - # Job 执行器 - executor = JobExecutor(worker_pool=worker_pool, max_concurrent_jobs=10) + # Job 执行器:槽位需覆盖「全部爬取」时 N 个 CrawlJob + 聚合任务 + 全量验证等 + _n_plugins = len(registry.list_plugins()) + _max_jobs = max(24, _n_plugins + 8) + executor = JobExecutor(worker_pool=worker_pool, max_concurrent_jobs=_max_jobs) await stack.enter_async_context(executor) # 插件运行器 diff --git a/app/core/execution/crawl_gate.py b/app/core/execution/crawl_gate.py deleted file mode 100644 index 5915145..0000000 --- a/app/core/execution/crawl_gate.py +++ /dev/null @@ -1,22 +0,0 @@ -"""批量爬取时限制同时发起 HTTP 的插件数,避免 crawl-all 与验证/聚合任务抢满执行器槽位。""" -import asyncio -from contextlib import asynccontextmanager -from typing import AsyncIterator - -# 与单插件内 max_concurrency 相乘后仍应对外网友好;过小会拉长总耗时。 -CRAWL_MAX_CONCURRENT = 4 - -_sem: asyncio.Semaphore | None = None - - -def _get_sem() -> asyncio.Semaphore: - global _sem - if _sem is None: - _sem = asyncio.Semaphore(CRAWL_MAX_CONCURRENT) - return _sem - - -@asynccontextmanager -async def crawl_slot() -> AsyncIterator[None]: - async with _get_sem(): - yield diff --git a/app/core/execution/executor.py b/app/core/execution/executor.py index 2aa849b..bdbbf93 100644 --- a/app/core/execution/executor.py +++ b/app/core/execution/executor.py @@ -1,11 +1,9 @@ """Job 执行器 - 统一管理所有后台 Job 的生命周期""" import asyncio -from contextlib import asynccontextmanager from datetime import datetime, timedelta from typing import Any, Dict, List, Optional -from app.core.execution.job import CrawlJob, Job, JobStatus -from app.core.execution.crawl_gate import crawl_slot +from app.core.execution.job import Job, JobStatus from app.core.execution.worker_pool import AsyncWorkerPool from app.core.log import logger @@ -86,14 +84,8 @@ class JobExecutor: logger.error(f"Job {job.id} failed: {e}", exc_info=True) try: - # CrawlJob 先等爬取槽位再占执行器,避免十几个任务占满 max_concurrent_jobs 却只排队等外网 - if isinstance(job, CrawlJob): - async with crawl_slot(): - async with self._semaphore: - await _execute() - else: - async with self._semaphore: - await _execute() + async with self._semaphore: + await _execute() finally: self._tasks.pop(job.id, None) diff --git a/app/plugins/base.py b/app/plugins/base.py index cff33ca..33b0828 100644 --- a/app/plugins/base.py +++ b/app/plugins/base.py @@ -26,7 +26,7 @@ class BaseHTTPPlugin(BaseCrawlerPlugin): self.urls: List[str] = [] self.current_url: str = "" self._client: Optional[httpx.AsyncClient] = None - self.max_concurrency: int = 3 + self.max_concurrency: int = 2 def get_headers(self) -> dict: return { @@ -178,6 +178,7 @@ class BaseHTTPPlugin(BaseCrawlerPlugin): async def _fetch_limited(url: str): async with semaphore: + await asyncio.sleep(random.uniform(0.08, 0.45)) return await self.fetch(url, timeout=timeout, retries=retries) tasks = [_fetch_limited(url) for url in urls] diff --git a/app/plugins/fpw_proxy_list_download.py b/app/plugins/fpw_proxy_list_download.py index a0a5c24..7b83871 100644 --- a/app/plugins/fpw_proxy_list_download.py +++ b/app/plugins/fpw_proxy_list_download.py @@ -13,7 +13,7 @@ class FpwProxyListDownloadPlugin(BaseHTTPPlugin): def __init__(self): super().__init__() - self.max_concurrency = 4 + self.max_concurrency = 2 self.api_pairs = [ ("http", "https://www.proxy-list.download/api/v1/get?type=http"), ("https", "https://www.proxy-list.download/api/v1/get?type=https"), diff --git a/app/plugins/fpw_socks_ssl_proxy.py b/app/plugins/fpw_socks_ssl_proxy.py index 79959cc..dffdd58 100644 --- a/app/plugins/fpw_socks_ssl_proxy.py +++ b/app/plugins/fpw_socks_ssl_proxy.py @@ -14,7 +14,7 @@ class FpwSocksSslProxyPlugin(BaseHTTPPlugin): def __init__(self): super().__init__() - self.max_concurrency = 4 + self.max_concurrency = 2 # 与 sslproxies 同模板的镜像站较多,socks-proxy 在部分网络下不稳定,多源提高成功率 self.urls = [ "https://www.sslproxies.org/",