diff --git a/app/core/execution/crawl_gate.py b/app/core/execution/crawl_gate.py new file mode 100644 index 0000000..5915145 --- /dev/null +++ b/app/core/execution/crawl_gate.py @@ -0,0 +1,22 @@ +"""批量爬取时限制同时发起 HTTP 的插件数,避免 crawl-all 与验证/聚合任务抢满执行器槽位。""" +import asyncio +from contextlib import asynccontextmanager +from typing import AsyncIterator + +# 与单插件内 max_concurrency 相乘后仍应对外网友好;过小会拉长总耗时。 +CRAWL_MAX_CONCURRENT = 4 + +_sem: asyncio.Semaphore | None = None + + +def _get_sem() -> asyncio.Semaphore: + global _sem + if _sem is None: + _sem = asyncio.Semaphore(CRAWL_MAX_CONCURRENT) + return _sem + + +@asynccontextmanager +async def crawl_slot() -> AsyncIterator[None]: + async with _get_sem(): + yield diff --git a/app/core/execution/executor.py b/app/core/execution/executor.py index dbad80d..2aa849b 100644 --- a/app/core/execution/executor.py +++ b/app/core/execution/executor.py @@ -4,7 +4,8 @@ from contextlib import asynccontextmanager from datetime import datetime, timedelta from typing import Any, Dict, List, Optional -from app.core.execution.job import Job, JobStatus +from app.core.execution.job import CrawlJob, Job, JobStatus +from app.core.execution.crawl_gate import crawl_slot from app.core.execution.worker_pool import AsyncWorkerPool from app.core.log import logger @@ -67,24 +68,32 @@ class JobExecutor: return job.id async def _run_job(self, job: Job) -> None: + async def _execute() -> None: + try: + if job.is_cancelled: + logger.info(f"Job {job.id} was cancelled before running") + return + result = await job.run() + if job.status not in (JobStatus.COMPLETED, JobStatus.FAILED, JobStatus.CANCELLED): + job._set_completed(result) + logger.info(f"Job {job.id} completed: {result}") + except asyncio.CancelledError: + job.status = JobStatus.CANCELLED + job._touch() + logger.info(f"Job {job.id} cancelled during execution") + except Exception as e: + job._set_failed(str(e)) + logger.error(f"Job {job.id} failed: {e}", exc_info=True) + try: - async with self._semaphore: - try: - if job.is_cancelled: - logger.info(f"Job {job.id} was cancelled before running") - return - result = await job.run() - # 如果子类没有显式设置完成状态,自动设为 completed - if job.status not in (JobStatus.COMPLETED, JobStatus.FAILED, JobStatus.CANCELLED): - job._set_completed(result) - logger.info(f"Job {job.id} completed: {result}") - except asyncio.CancelledError: - job.status = JobStatus.CANCELLED - job._touch() - logger.info(f"Job {job.id} cancelled during execution") - except Exception as e: - job._set_failed(str(e)) - logger.error(f"Job {job.id} failed: {e}", exc_info=True) + # CrawlJob 先等爬取槽位再占执行器,避免十几个任务占满 max_concurrent_jobs 却只排队等外网 + if isinstance(job, CrawlJob): + async with crawl_slot(): + async with self._semaphore: + await _execute() + else: + async with self._semaphore: + await _execute() finally: self._tasks.pop(job.id, None) diff --git a/app/plugins/base.py b/app/plugins/base.py index 85da2af..cff33ca 100644 --- a/app/plugins/base.py +++ b/app/plugins/base.py @@ -52,7 +52,8 @@ class BaseHTTPPlugin(BaseCrawlerPlugin): def _http_timeout(seconds: float) -> httpx.Timeout: """连接阶段单独收紧,避免 AsyncClient 在部分环境下长时间卡在 connect。""" t = max(2.0, float(seconds)) - c = min(6.0, max(3.0, t * 0.35)) + # 国际链路 / 批量爬取时 connect 过短易集体超时 + c = min(12.0, max(4.0, t * 0.4)) return httpx.Timeout(t, connect=c) @staticmethod diff --git a/app/plugins/fpw_checkerproxy.py b/app/plugins/fpw_checkerproxy.py index eb7be7d..bdd81ea 100644 --- a/app/plugins/fpw_checkerproxy.py +++ b/app/plugins/fpw_checkerproxy.py @@ -47,7 +47,7 @@ class FpwCheckerproxyPlugin(BaseHTTPPlugin): async def crawl(self) -> List[ProxyRaw]: merged: List[ProxyRaw] = [] seen: Set[Tuple[str, int, str]] = set() - htmls = await self.fetch_all(self.urls, timeout=12, retries=1) + htmls = await self.fetch_all(self.urls, timeout=25, retries=2) for html in htmls: if not html or len(html) < 200: continue diff --git a/app/plugins/fpw_freeproxylists.py b/app/plugins/fpw_freeproxylists.py index b496a38..9af9ab3 100644 --- a/app/plugins/fpw_freeproxylists.py +++ b/app/plugins/fpw_freeproxylists.py @@ -55,7 +55,7 @@ class FpwFreeproxylistsPlugin(BaseHTTPPlugin): async def crawl(self) -> List[ProxyRaw]: seen = set() out: List[ProxyRaw] = [] - htmls = await self.fetch_all(self.urls, timeout=10, retries=1) + htmls = await self.fetch_all(self.urls, timeout=25, retries=2) for url, html in zip(self.urls, htmls): if not html: continue diff --git a/app/plugins/fpw_gatherproxy.py b/app/plugins/fpw_gatherproxy.py index 31d7038..3e68770 100644 --- a/app/plugins/fpw_gatherproxy.py +++ b/app/plugins/fpw_gatherproxy.py @@ -47,7 +47,7 @@ class FpwGatherproxyPlugin(BaseHTTPPlugin): async def crawl(self) -> List[ProxyRaw]: seen = set() out: List[ProxyRaw] = [] - htmls = await self.fetch_all(self.urls, timeout=10, retries=1) + htmls = await self.fetch_all(self.urls, timeout=25, retries=2) for url, html in zip(self.urls, htmls): if not html: continue diff --git a/app/plugins/fpw_hidemy.py b/app/plugins/fpw_hidemy.py index 4eb698b..86aae01 100644 --- a/app/plugins/fpw_hidemy.py +++ b/app/plugins/fpw_hidemy.py @@ -21,7 +21,7 @@ class FpwHidemyPlugin(BaseHTTPPlugin): async def crawl(self) -> List[ProxyRaw]: results: List[ProxyRaw] = [] - htmls = await self.fetch_all(self.urls, timeout=12, retries=1) + htmls = await self.fetch_all(self.urls, timeout=25, retries=2) for url, html in zip(self.urls, htmls): if not html: continue diff --git a/app/plugins/fpw_premproxy.py b/app/plugins/fpw_premproxy.py index 03c44b1..9660f8c 100644 --- a/app/plugins/fpw_premproxy.py +++ b/app/plugins/fpw_premproxy.py @@ -51,7 +51,7 @@ class FpwPremproxyPlugin(BaseHTTPPlugin): async def crawl(self) -> List[ProxyRaw]: merged: List[ProxyRaw] = [] - htmls = await self.fetch_all(self.urls, timeout=12, retries=1) + htmls = await self.fetch_all(self.urls, timeout=25, retries=2) for url, html in zip(self.urls, htmls): if not html: continue diff --git a/app/plugins/fpw_proxy_list_download.py b/app/plugins/fpw_proxy_list_download.py index 2eb0e49..a0a5c24 100644 --- a/app/plugins/fpw_proxy_list_download.py +++ b/app/plugins/fpw_proxy_list_download.py @@ -13,7 +13,7 @@ class FpwProxyListDownloadPlugin(BaseHTTPPlugin): def __init__(self): super().__init__() - self.max_concurrency = 8 + self.max_concurrency = 4 self.api_pairs = [ ("http", "https://www.proxy-list.download/api/v1/get?type=http"), ("https", "https://www.proxy-list.download/api/v1/get?type=https"), @@ -30,7 +30,7 @@ class FpwProxyListDownloadPlugin(BaseHTTPPlugin): async def crawl(self) -> List[ProxyRaw]: results: List[ProxyRaw] = [] urls = [u for _, u in self.api_pairs] - htmls = await self.fetch_all(urls, timeout=10, retries=1) + htmls = await self.fetch_all(urls, timeout=25, retries=2) for (protocol, _), text in zip(self.api_pairs, htmls): if not text: continue @@ -41,7 +41,7 @@ class FpwProxyListDownloadPlugin(BaseHTTPPlugin): if not results: logger.warning(f"{self.display_name} 主 API 无数据,尝试 ProxyScrape 备用") fb_urls = [u for _, u in self.fallback_pairs] - fb_htmls = await self.fetch_all(fb_urls, timeout=10, retries=1) + fb_htmls = await self.fetch_all(fb_urls, timeout=25, retries=2) for (protocol, _), text in zip(self.fallback_pairs, fb_htmls): if not text: continue diff --git a/app/plugins/fpw_proxynova.py b/app/plugins/fpw_proxynova.py index ca7e3db..ae493e5 100644 --- a/app/plugins/fpw_proxynova.py +++ b/app/plugins/fpw_proxynova.py @@ -65,7 +65,7 @@ class FpwProxynovaPlugin(BaseHTTPPlugin): return out async def crawl(self) -> List[ProxyRaw]: - html = await self.fetch(self.urls[0], timeout=14, retries=1) + html = await self.fetch(self.urls[0], timeout=25, retries=2) if not html: return [] results = self._parse_rows(html) diff --git a/app/plugins/fpw_socks_ssl_proxy.py b/app/plugins/fpw_socks_ssl_proxy.py index d7d183e..79959cc 100644 --- a/app/plugins/fpw_socks_ssl_proxy.py +++ b/app/plugins/fpw_socks_ssl_proxy.py @@ -14,7 +14,7 @@ class FpwSocksSslProxyPlugin(BaseHTTPPlugin): def __init__(self): super().__init__() - self.max_concurrency = 6 + self.max_concurrency = 4 # 与 sslproxies 同模板的镜像站较多,socks-proxy 在部分网络下不稳定,多源提高成功率 self.urls = [ "https://www.sslproxies.org/", @@ -39,7 +39,7 @@ class FpwSocksSslProxyPlugin(BaseHTTPPlugin): async def crawl(self) -> List[ProxyRaw]: results: List[ProxyRaw] = [] - htmls = await self.fetch_all(self.urls, timeout=12, retries=1) + htmls = await self.fetch_all(self.urls, timeout=25, retries=2) for url, html in zip(self.urls, htmls): if not html: continue diff --git a/app/plugins/fpw_spys_one.py b/app/plugins/fpw_spys_one.py index c79d1f5..29ee73f 100644 --- a/app/plugins/fpw_spys_one.py +++ b/app/plugins/fpw_spys_one.py @@ -130,7 +130,7 @@ class FpwSpysOnePlugin(BaseHTTPPlugin): async def _one(proto: str, url: str, xf5: str) -> Tuple[str, str]: data = {**form_base, "xf5": xf5} - html = await self.fetch_post(url, data=data, timeout=14, retries=1) + html = await self.fetch_post(url, data=data, timeout=25, retries=2) return proto, html or "" pairs = await asyncio.gather( diff --git a/app/plugins/proxyscrape.py b/app/plugins/proxyscrape.py index 6b511b7..18ea7a1 100644 --- a/app/plugins/proxyscrape.py +++ b/app/plugins/proxyscrape.py @@ -20,13 +20,16 @@ class ProxyScrapePlugin(BaseHTTPPlugin): def __init__(self): super().__init__() - # GitHub raw 源作为首选 + # GitHub raw 首选;国内/高负载时 jsDelivr 镜像常更稳 self.urls = [ ("http", "https://raw.githubusercontent.com/monosans/proxy-list/main/proxies/http.txt"), ("https", "https://raw.githubusercontent.com/monosans/proxy-list/main/proxies/https.txt"), ("socks4", "https://raw.githubusercontent.com/monosans/proxy-list/main/proxies/socks4.txt"), ("socks5", "https://raw.githubusercontent.com/monosans/proxy-list/main/proxies/socks5.txt"), ] + self._mirror_prefix = ( + "https://cdn.jsdelivr.net/gh/monosans/proxy-list@main/proxies/" + ) # ProxyScrape 官方 API 作为 fallback self.api_urls = { "http": "https://api.proxyscrape.com/v2/?request=get&protocol=http&timeout=10000&country=all&ssl=all&anonymity=all", @@ -56,14 +59,18 @@ class ProxyScrapePlugin(BaseHTTPPlugin): results: List[ProxyRaw] = [] protocols = [protocol for protocol, _ in self.urls] urls = [url for _, url in self.urls] + fetch_timeout = 28.0 - # 1. 并发请求所有 GitHub raw 源,整体限时 10s,先完成的保留结果 - tasks = [asyncio.create_task(self.fetch(url, timeout=12)) for url in urls] - done, pending = await asyncio.wait(tasks, timeout=10) + # 1. GitHub raw:放宽总等待,避免 crawl-all 时与其它插件抢带宽导致集体超时 + tasks = [ + asyncio.create_task(self.fetch(url, timeout=fetch_timeout)) + for url in urls + ] + done, pending = await asyncio.wait(tasks, timeout=45) for task in pending: task.cancel() - htmls = [] - done_protocols = set() + htmls: list[str] = [] + done_protocols: set[str] = set() for i, task in enumerate(tasks): try: if task in done: @@ -73,35 +80,60 @@ class ProxyScrapePlugin(BaseHTTPPlugin): htmls.append("") except Exception: htmls.append("") - # 异常时不加入 done_protocols,以便触发 API fallback - fallback_protocols = [] + need_mirror: list[str] = [] for protocol, html in zip(protocols, htmls): proxies = self._parse_proxies(html or "", protocol) if html else [] if proxies: - logger.info(f"ProxyScrape {protocol.upper()} GitHub raw 获取 {len(proxies)} 个代理") + logger.info( + f"ProxyScrape {protocol.upper()} GitHub raw 获取 {len(proxies)} 个代理" + ) results.extend(proxies) else: if protocol in done_protocols: - logger.warning(f"ProxyScrape {protocol.upper()} GitHub raw 返回空或无效,将尝试 API fallback") + logger.warning( + f"ProxyScrape {protocol.upper()} GitHub raw 返回空或无效,尝试镜像与 API" + ) else: - logger.warning(f"ProxyScrape {protocol.upper()} GitHub raw 请求超时,将尝试 API fallback") - fallback_protocols.append(protocol) + logger.warning( + f"ProxyScrape {protocol.upper()} GitHub raw 请求超时,尝试镜像与 API" + ) + need_mirror.append(protocol) - # 2. 对 GitHub raw 失败的协议,并发请求 ProxyScrape API fallback - if fallback_protocols: - fallback_urls = [self.api_urls[p] for p in fallback_protocols] + # 2. jsDelivr 镜像(顺序请求,减轻与其它插件的瞬时并发叠加) + still_need_api: list[str] = [] + for protocol in need_mirror: + mirror_url = f"{self._mirror_prefix}{protocol}.txt" + text = await self.fetch(mirror_url, timeout=fetch_timeout, retries=2) + proxies = self._parse_proxies(text or "", protocol) if text else [] + if proxies: + logger.info( + f"ProxyScrape {protocol.upper()} jsDelivr 镜像获取 {len(proxies)} 个代理" + ) + results.extend(proxies) + else: + still_need_api.append(protocol) + + # 3. ProxyScrape 官方 API + if still_need_api: + fallback_urls = [self.api_urls[p] for p in still_need_api] try: api_htmls = await asyncio.wait_for( - self.fetch_all(fallback_urls, timeout=10), timeout=10 + self.fetch_all(fallback_urls, timeout=25), timeout=35 ) except asyncio.TimeoutError: - logger.warning(f"ProxyScrape API fallback 批量请求超时,跳过 {len(fallback_protocols)} 个协议") - api_htmls = [""] * len(fallback_protocols) - for protocol, api_html in zip(fallback_protocols, api_htmls): - proxies = self._parse_proxies(api_html or "", protocol) if api_html else [] + logger.warning( + f"ProxyScrape API fallback 批量请求超时,跳过 {len(still_need_api)} 个协议" + ) + api_htmls = [""] * len(still_need_api) + for protocol, api_html in zip(still_need_api, api_htmls): + proxies = ( + self._parse_proxies(api_html or "", protocol) if api_html else [] + ) if proxies: - logger.info(f"ProxyScrape {protocol.upper()} API 获取 {len(proxies)} 个代理") + logger.info( + f"ProxyScrape {protocol.upper()} API 获取 {len(proxies)} 个代理" + ) results.extend(proxies) else: logger.warning(f"ProxyScrape {protocol.upper()} API 返回空或无效")