diff --git a/WebUI/src/api/index.js b/WebUI/src/api/index.js index 11dac60..5424323 100644 --- a/WebUI/src/api/index.js +++ b/WebUI/src/api/index.js @@ -5,7 +5,7 @@ import { showError } from '../utils/message' export const DEFAULT_API_BASE_URL = import.meta.env.VITE_API_BASE_URL || 'http://localhost:18080' /** @type {number} 请求超时时间(毫秒) */ -export const REQUEST_TIMEOUT = 30000 +export const REQUEST_TIMEOUT = 120000 const api = axios.create({ baseURL: import.meta.env.VITE_API_BASE_URL || DEFAULT_API_BASE_URL, diff --git a/_test_batch.py b/_test_batch.py new file mode 100644 index 0000000..a4b2ac8 --- /dev/null +++ b/_test_batch.py @@ -0,0 +1,18 @@ +import asyncio +import time +import app.plugins +from app.services.plugin_service import PluginService + +async def main(): + svc = PluginService() + start = time.time() + results = await svc.run_all_plugins() + elapsed = time.time() - start + print(f"Batch crawl completed in {elapsed:.2f}s") + print(f"Total unique proxies: {len(results)}") + from collections import Counter + c = Counter(p.protocol for p in results) + for proto, cnt in sorted(c.items()): + print(f" {proto}: {cnt}") + +asyncio.run(main()) diff --git a/_test_crawlers.py b/_test_crawlers.py new file mode 100644 index 0000000..3da9a47 --- /dev/null +++ b/_test_crawlers.py @@ -0,0 +1,47 @@ +import asyncio +import app.plugins +from app.core.plugin_system.registry import registry +from app.core.log import logger +import logging +logger.setLevel(logging.WARNING) + +async def test_plugin(p, timeout=20): + try: + proxies = await asyncio.wait_for(p.crawl(), timeout=timeout) + return len(proxies), proxies[:1] if proxies else [] + except asyncio.TimeoutError: + return -2, [] + except Exception as e: + return -1, [str(e)] + +async def test_all(): + plugins = registry.list_plugins() + print(f'Total plugins: {len(plugins)}') + results = {} + for p in plugins: + print(f'Testing {p.name} (timeout=20s)...', flush=True) + count, sample = await test_plugin(p, timeout=20) + results[p.name] = count + if count > 0: + print(f' -> OK: {count} proxies, sample={sample[0]}') + elif count == 0: + print(f' -> EMPTY') + elif count == -2: + print(f' -> TIMEOUT') + else: + print(f' -> ERROR: {sample[0]}') + + print('\n' + '='*50) + print('SUMMARY:') + for name, count in sorted(results.items()): + if count > 0: + status = 'OK' + elif count == 0: + status = 'EMPTY' + elif count == -2: + status = 'TIMEOUT' + else: + status = 'ERROR' + print(f' {name:22s} {status:8s} ({count} proxies)') + +asyncio.run(test_all()) diff --git a/app/api/lifespan.py b/app/api/lifespan.py index ceda5b2..94226c1 100644 --- a/app/api/lifespan.py +++ b/app/api/lifespan.py @@ -1,4 +1,5 @@ """应用生命周期管理""" +import asyncio from contextlib import asynccontextmanager from fastapi import FastAPI from app.core.db import init_db, get_db @@ -37,6 +38,12 @@ async def lifespan(app: FastAPI): yield # 关闭调度器 + if scheduler_service._validate_task and not scheduler_service._validate_task.done(): + scheduler_service._validate_task.cancel() + try: + await scheduler_service._validate_task + except asyncio.CancelledError: + pass await scheduler_service.stop() await scheduler_service.validation_queue.validator.close() logger.info("API server shutdown") diff --git a/app/api/main.py b/app/api/main.py index c5c2bc4..37f4dbf 100644 --- a/app/api/main.py +++ b/app/api/main.py @@ -53,3 +53,6 @@ def create_app() -> FastAPI: } return app + + +app = create_app() diff --git a/app/core/db.py b/app/core/db.py index c811999..888ef28 100644 --- a/app/core/db.py +++ b/app/core/db.py @@ -77,6 +77,13 @@ async def init_db(): await db.execute("ALTER TABLE plugin_settings ADD COLUMN config_json TEXT DEFAULT '{}'") logger.info("Migrated: added config_json column to plugin_settings") + # 迁移:为旧版 plugin_settings 表增加 stats_json 列 + try: + await db.execute("SELECT stats_json FROM plugin_settings LIMIT 1") + except Exception: + await db.execute("ALTER TABLE plugin_settings ADD COLUMN stats_json TEXT DEFAULT '{}'") + logger.info("Migrated: added stats_json column to plugin_settings") + # 验证任务队列表 await db.execute(""" CREATE TABLE IF NOT EXISTS validation_tasks ( diff --git a/app/core/tasks/queue.py b/app/core/tasks/queue.py index 0cbf5a7..067403b 100644 --- a/app/core/tasks/queue.py +++ b/app/core/tasks/queue.py @@ -56,6 +56,12 @@ class ValidationQueue: async with get_db() as db: recovered = await self.task_repo.reset_processing(db) pending = await self.task_repo.get_pending_count(db) + if pending > 1000: + logger.warning(f"ValidationQueue has {pending} pending tasks, cleaning up all pending tasks...") + await db.execute("DELETE FROM validation_tasks WHERE status = 'pending'") + await db.commit() + pending = await self.task_repo.get_pending_count(db) + logger.info(f"ValidationQueue cleaned up pending tasks, remaining: {pending}") if recovered: logger.info(f"ValidationQueue recovered {recovered} interrupted tasks") if pending: diff --git a/app/plugins/base.py b/app/plugins/base.py index 749d97d..9e3ca41 100644 --- a/app/plugins/base.py +++ b/app/plugins/base.py @@ -1,7 +1,7 @@ """通用 HTTP 爬虫基类 - 为基于 HTTP 请求的插件提供封装""" import random import asyncio -import aiohttp +import httpx from typing import List from app.core.plugin_system import BaseCrawlerPlugin @@ -28,25 +28,39 @@ class BaseHTTPPlugin(BaseCrawlerPlugin): "Connection": "keep-alive", } - async def fetch(self, url: str, timeout: float = 10.0, retries: int = 3) -> str: + async def fetch(self, url: str, timeout: float = 15.0, retries: int = 2) -> str: """异步抓取指定 URL 的 HTML 内容""" + from app.core.log import logger headers = self.get_headers() - async with aiohttp.ClientSession(headers=headers) as session: - for attempt in range(retries): + transport = httpx.AsyncHTTPTransport(retries=0) + for attempt in range(retries): + async with httpx.AsyncClient(headers=headers, transport=transport, follow_redirects=True) as client: try: - async with session.get( - url, timeout=aiohttp.ClientTimeout(total=timeout) - ) as response: - if response.status == 200: - content = await response.read() - encoding = response.get_encoding() - if encoding == "utf-8" or not encoding: - try: - return content.decode("utf-8") - except UnicodeDecodeError: - return content.decode("gbk", errors="ignore") - return content.decode(encoding, errors="ignore") - except Exception: - pass - await asyncio.sleep(random.uniform(1, 3)) + response = await client.get(url, timeout=timeout) + if response.status_code == 200: + content = response.content + encoding = response.encoding + if encoding == "utf-8" or not encoding: + try: + return content.decode("utf-8") + except UnicodeDecodeError: + return content.decode("gbk", errors="ignore") + return content.decode(encoding, errors="ignore") + else: + logger.warning(f"Fetch {url} returned status {response.status_code}") + except Exception as e: + logger.warning(f"Fetch {url} failed (attempt {attempt + 1}/{retries}): {e}") + if attempt < retries - 1: + await asyncio.sleep(random.uniform(1, 3)) return "" + + async def fetch_all(self, urls: List[str], timeout: float = 15.0) -> List[str]: + """并发抓取多个 URL,限制单个插件内部并发为 3""" + semaphore = asyncio.Semaphore(3) + + async def _fetch_limited(url: str): + async with semaphore: + return await self.fetch(url, timeout=timeout) + + tasks = [_fetch_limited(url) for url in urls] + return await asyncio.gather(*tasks) diff --git a/app/plugins/fate0.py b/app/plugins/fate0.py index 60ebc70..885d65f 100644 --- a/app/plugins/fate0.py +++ b/app/plugins/fate0.py @@ -13,27 +13,34 @@ class Fate0Plugin(BaseHTTPPlugin): def __init__(self): super().__init__() - self.urls = ["https://raw.githubusercontent.com/fate0/proxylist/master/proxy.list"] + self.urls = [ + "https://raw.githubusercontent.com/fate0/proxylist/master/proxy.list", + "https://cdn.jsdelivr.net/gh/fate0/proxylist@master/proxy.list", + ] async def crawl(self) -> List[ProxyRaw]: results = [] + # 顺序 fetch,带 fallback for url in self.urls: html = await self.fetch(url, timeout=30) - if not html: + if html: + break + if not html: + logger.warning(f"{self.display_name} 所有源均不可用") + return results + for line in html.split("\n"): + line = line.strip() + if not line: + continue + try: + data = json.loads(line) + ip = data.get("host") + port = data.get("port") + protocol = data.get("type", "http") + if ip and port: + results.append(ProxyRaw(ip, int(port), protocol)) + except Exception: continue - for line in html.split("\n"): - line = line.strip() - if not line: - continue - try: - data = json.loads(line) - ip = data.get("host") - port = data.get("port") - protocol = data.get("type", "http") - if ip and port: - results.append(ProxyRaw(ip, int(port), protocol)) - except Exception: - continue if results: logger.info(f"{self.display_name} 解析完成,获取 {len(results)} 个潜在代理") return results diff --git a/app/plugins/ip3366.py b/app/plugins/ip3366.py index e2dbec4..8b79333 100644 --- a/app/plugins/ip3366.py +++ b/app/plugins/ip3366.py @@ -12,24 +12,29 @@ class Ip3366Plugin(BaseHTTPPlugin): name = "ip3366" display_name = "IP3366" description = "从 IP3366 网站爬取免费代理" - default_config = {"max_pages": 5} + default_config = {"max_pages": 3} def __init__(self): super().__init__() self._update_urls() def _update_urls(self): - max_pages = self.config.get("max_pages", 5) + max_pages = self.config.get("max_pages", 3) self.urls = [ f"http://www.ip3366.net/free/?stype=1&page={i}" for i in range(1, max_pages + 1) ] + [ f"http://www.ip3366.net/free/?stype=2&page={i}" for i in range(1, max_pages + 1) ] + def get_headers(self) -> dict: + headers = super().get_headers() + headers["Referer"] = "http://www.ip3366.net/free/" + return headers + async def crawl(self) -> List[ProxyRaw]: results = [] - for url in self.urls: - html = await self.fetch(url, timeout=15) + htmls = await self.fetch_all(self.urls) + for html in htmls: if not html: continue soup = BeautifulSoup(html, "lxml") diff --git a/app/plugins/ip89.py b/app/plugins/ip89.py index 9276a9e..9a449e6 100644 --- a/app/plugins/ip89.py +++ b/app/plugins/ip89.py @@ -1,3 +1,5 @@ +import asyncio +import random import re from typing import List from bs4 import BeautifulSoup @@ -35,6 +37,8 @@ class Ip89Plugin(BaseHTTPPlugin): if re.match(r"^\d+\.\d+\.\d+\.\d+$", ip) and port.isdigit(): results.append(ProxyRaw(ip, int(port), "http")) + await asyncio.sleep(random.uniform(1, 2)) + if results: logger.info(f"{self.display_name} 解析完成,获取 {len(results)} 个潜在代理") return results diff --git a/app/plugins/kuaidaili.py b/app/plugins/kuaidaili.py index 6828c3c..94fa5f1 100644 --- a/app/plugins/kuaidaili.py +++ b/app/plugins/kuaidaili.py @@ -1,4 +1,6 @@ import re +import asyncio +import random from typing import List from bs4 import BeautifulSoup from app.core.plugin_system import ProxyRaw @@ -16,22 +18,39 @@ class KuaiDaiLiPlugin(BaseHTTPPlugin): def __init__(self): super().__init__() + # 减少页数,降低被反爬概率,确保至少能拿到数据 self.urls = [ - f"https://www.kuaidaili.com/free/inha/{i}/" for i in range(1, 11) - ] + [ - f"https://www.kuaidaili.com/free/intr/{i}/" for i in range(1, 11) + "https://www.kuaidaili.com/free/inha/1/", + "https://www.kuaidaili.com/free/intr/1/", ] + def get_headers(self) -> dict: + headers = super().get_headers() + headers["Referer"] = "https://www.kuaidaili.com/free/inha/" + headers["Accept"] = "text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8" + headers["Accept-Encoding"] = "gzip, deflate, br" + headers["Accept-Language"] = "zh-CN,zh;q=0.9,en;q=0.8" + headers["Sec-Fetch-Dest"] = "document" + headers["Sec-Fetch-Mode"] = "navigate" + headers["Sec-Fetch-Site"] = "same-origin" + headers["Upgrade-Insecure-Requests"] = "1" + return headers + async def crawl(self) -> List[ProxyRaw]: results = [] + # 先访问首页预热会话,获取 cookie,降低被反爬概率 + await self.fetch("https://www.kuaidaili.com/", timeout=10) + await asyncio.sleep(random.uniform(2, 4)) + + # 顺序请求免费代理页面 for url in self.urls: - html = await self.fetch(url, timeout=15) + html = await self.fetch(url, timeout=10) if not html: continue soup = BeautifulSoup(html, "lxml") table = soup.find("table") if not table: - logger.warning(f"{self.display_name} 未能找到表格,可能是触发了反爬") + logger.warning(f"{self.display_name} 未能找到表格,可能是触发了反爬: {url}") continue for row in table.find_all("tr"): @@ -44,6 +63,7 @@ class KuaiDaiLiPlugin(BaseHTTPPlugin): protocol = "http" if re.match(r"^\d+\.\d+\.\d+\.\d+$", ip) and port.isdigit(): results.append(ProxyRaw(ip, int(port), protocol)) + await asyncio.sleep(random.uniform(5, 8)) if results: logger.info(f"{self.display_name} 解析完成,获取 {len(results)} 个潜在代理") diff --git a/app/plugins/proxylist_download.py b/app/plugins/proxylist_download.py index 48e84f1..96863b1 100644 --- a/app/plugins/proxylist_download.py +++ b/app/plugins/proxylist_download.py @@ -8,48 +8,87 @@ class ProxyListDownloadPlugin(BaseHTTPPlugin): default_config = {"max_pages": 5} name = "proxylist_download" display_name = "ProxyListDownload" - description = "从 ProxyListDownload API 获取代理" + description = "从 GitHub 公开代理列表获取代理" def __init__(self): super().__init__() - self.urls = [ - "https://www.proxy-list.download/api/v1/get?type=http", - "https://www.proxy-list.download/api/v1/get?type=https", - "https://www.proxy-list.download/api/v1/get?type=socks4", - "https://www.proxy-list.download/api/v1/get?type=socks5", + # 首选 GitHub raw + fallback 备用源(jsdelivr CDN 或 ProxyScrape API) + self.sources = [ + { + "primary": "https://raw.githubusercontent.com/komutan234/Proxy-List-Free/main/proxies/http.txt", + "fallbacks": [ + "https://cdn.jsdelivr.net/gh/komutan234/Proxy-List-Free@main/proxies/http.txt", + "https://api.proxyscrape.com/v2/?request=get&protocol=http&timeout=10000&country=all&ssl=all&anonymity=all", + ], + "protocol": "http", + }, + { + "primary": "https://raw.githubusercontent.com/komutan234/Proxy-List-Free/main/proxies/socks4.txt", + "fallbacks": [ + "https://cdn.jsdelivr.net/gh/komutan234/Proxy-List-Free@main/proxies/socks4.txt", + "https://api.proxyscrape.com/v2/?request=get&protocol=socks4&timeout=10000&country=all", + ], + "protocol": "socks4", + }, + { + "primary": "https://raw.githubusercontent.com/komutan234/Proxy-List-Free/main/proxies/socks5.txt", + "fallbacks": [ + "https://cdn.jsdelivr.net/gh/komutan234/Proxy-List-Free@main/proxies/socks5.txt", + "https://api.proxyscrape.com/v2/?request=get&protocol=socks5&timeout=10000&country=all", + ], + "protocol": "socks5", + }, ] + def _detect_protocol(self, url: str) -> str: + """根据 URL 判断协议(注意不要用 https:// 来判断)""" + if "socks4" in url: + return "socks4" + elif "socks5" in url: + return "socks5" + elif "/http.txt" in url or "protocol=http" in url: + return "http" + return "http" + + def _parse_lines(self, html: str, protocol: str) -> List[ProxyRaw]: + """解析代理文本,统一处理 \r\n、\n 两种换行以及可能存在的空行""" + results = [] + # 统一替换为 \n 后再分割 + text = html.replace("\r\n", "\n").replace("\r", "\n") + for line in text.split("\n"): + line = line.strip() + if not line or ":" not in line: + continue + parts = line.split(":") + if len(parts) >= 2: + ip = parts[0].strip() + port = parts[1].strip() + if ip and port.isdigit(): + results.append(ProxyRaw(ip, int(port), protocol)) + return results + async def crawl(self) -> List[ProxyRaw]: results = [] - for url in self.urls: - html = await self.fetch(url, timeout=30) - if not html: + # 并发请求所有 primary URL + primary_urls = [s["primary"] for s in self.sources] + primary_htmls = await self.fetch_all(primary_urls, timeout=15) + + for idx, html in enumerate(primary_htmls): + source = self.sources[idx] + protocol = source.get("protocol") or self._detect_protocol(source["primary"]) + + if html and html.strip(): + results.extend(self._parse_lines(html, protocol)) continue - # 根据 URL 判断协议 - if "type=socks4" in url: - protocol = "socks4" - elif "type=socks5" in url: - protocol = "socks5" - elif "type=https" in url: - protocol = "https" - else: - protocol = "http" - - lines = html.split("\r\n") - if len(lines) <= 1: - lines = html.split("\n") - - for line in lines: - line = line.strip() - if not line or ":" not in line: - continue - parts = line.split(":") - if len(parts) >= 2: - ip = parts[0].strip() - port = parts[1].strip() - if ip and port.isdigit(): - results.append(ProxyRaw(ip, int(port), protocol)) + # primary 返回空或仅空白字符,依次尝试 fallback + logger.warning(f"{self.display_name} 主源返回空,尝试 fallback: {source['primary']}") + for fallback_url in source["fallbacks"]: + fallback_html = await self.fetch(fallback_url, timeout=15) + if fallback_html and fallback_html.strip(): + fb_protocol = source.get("protocol") or self._detect_protocol(fallback_url) + results.extend(self._parse_lines(fallback_html, fb_protocol)) + break if results: logger.info(f"{self.display_name} 解析完成,获得 {len(results)} 个潜在代理") diff --git a/app/plugins/proxyscrape.py b/app/plugins/proxyscrape.py index f41a027..2bc3c5a 100644 --- a/app/plugins/proxyscrape.py +++ b/app/plugins/proxyscrape.py @@ -1,4 +1,5 @@ """ProxyScrape 测试爬虫 - 用于验证架构,支持全协议类型""" +import asyncio from typing import List from app.core.plugin_system import ProxyRaw from app.plugins.base import BaseHTTPPlugin @@ -19,39 +20,89 @@ class ProxyScrapePlugin(BaseHTTPPlugin): def __init__(self): super().__init__() - # 使用多个公开 GitHub 代理列表作为源,稳定性较差 + # GitHub raw 源作为首选 self.urls = [ ("http", "https://raw.githubusercontent.com/monosans/proxy-list/main/proxies/http.txt"), ("https", "https://raw.githubusercontent.com/monosans/proxy-list/main/proxies/https.txt"), ("socks4", "https://raw.githubusercontent.com/monosans/proxy-list/main/proxies/socks4.txt"), ("socks5", "https://raw.githubusercontent.com/monosans/proxy-list/main/proxies/socks5.txt"), ] + # ProxyScrape 官方 API 作为 fallback + self.api_urls = { + "http": "https://api.proxyscrape.com/v2/?request=get&protocol=http&timeout=10000&country=all&ssl=all&anonymity=all", + "https": "https://api.proxyscrape.com/v2/?request=get&protocol=https&timeout=10000&country=all&ssl=all&anonymity=all", + "socks4": "https://api.proxyscrape.com/v2/?request=get&protocol=socks4&timeout=10000&country=all&ssl=all&anonymity=all", + "socks5": "https://api.proxyscrape.com/v2/?request=get&protocol=socks5&timeout=10000&country=all&ssl=all&anonymity=all", + } + + def _parse_proxies(self, text: str, protocol: str) -> List[ProxyRaw]: + """解析 ip:port 每行的文本内容""" + proxies = [] + for line in text.splitlines(): + line = line.strip() + if not line or ":" not in line: + continue + parts = line.split(":") + if len(parts) >= 2: + ip = parts[0].strip() + port_str = parts[1].strip() + if port_str.isdigit(): + proxies.append(ProxyRaw(ip, int(port_str), protocol)) + return proxies async def crawl(self) -> List[ProxyRaw]: results: List[ProxyRaw] = [] - for protocol, url in self.urls: + protocols = [protocol for protocol, _ in self.urls] + urls = [url for _, url in self.urls] + + # 1. 并发请求所有 GitHub raw 源,整体限时 10s,先完成的保留结果 + tasks = [asyncio.create_task(self.fetch(url, timeout=12)) for url in urls] + done, pending = await asyncio.wait(tasks, timeout=10) + for task in pending: + task.cancel() + htmls = [] + done_protocols = set() + for i, task in enumerate(tasks): try: - html = await self.fetch(url, timeout=30) - if not html: - logger.warning(f"ProxyScrape {protocol.upper()} 返回空内容") - continue + if task in done: + htmls.append(task.result()) + done_protocols.add(protocols[i]) + else: + htmls.append("") + except Exception: + htmls.append("") + done_protocols.add(protocols[i]) - count = 0 - for line in html.splitlines(): - line = line.strip() - if not line or ":" not in line: - continue - parts = line.split(":") - if len(parts) >= 2: - ip = parts[0].strip() - port_str = parts[1].strip() - if port_str.isdigit(): - results.append(ProxyRaw(ip, int(port_str), protocol)) - count += 1 + fallback_protocols = [] + for protocol, html in zip(protocols, htmls): + proxies = self._parse_proxies(html or "", protocol) if html else [] + if proxies: + logger.info(f"ProxyScrape {protocol.upper()} GitHub raw 获取 {len(proxies)} 个代理") + results.extend(proxies) + else: + if protocol in done_protocols: + logger.warning(f"ProxyScrape {protocol.upper()} GitHub raw 返回空或无效,将尝试 API fallback") + else: + logger.warning(f"ProxyScrape {protocol.upper()} GitHub raw 请求超时,将尝试 API fallback") + fallback_protocols.append(protocol) - logger.info(f"ProxyScrape {protocol.upper()} 获取 {count} 个代理") - except Exception as e: - logger.error(f"ProxyScrape {protocol.upper()} 爬取失败: {e}") + # 2. 对 GitHub raw 失败的协议,并发请求 ProxyScrape API fallback + if fallback_protocols: + fallback_urls = [self.api_urls[p] for p in fallback_protocols] + try: + api_htmls = await asyncio.wait_for( + self.fetch_all(fallback_urls, timeout=10), timeout=10 + ) + except asyncio.TimeoutError: + logger.warning(f"ProxyScrape API fallback 批量请求超时,跳过 {len(fallback_protocols)} 个协议") + api_htmls = [""] * len(fallback_protocols) + for protocol, api_html in zip(fallback_protocols, api_htmls): + proxies = self._parse_proxies(api_html or "", protocol) if api_html else [] + if proxies: + logger.info(f"ProxyScrape {protocol.upper()} API 获取 {len(proxies)} 个代理") + results.extend(proxies) + else: + logger.warning(f"ProxyScrape {protocol.upper()} API 返回空或无效") if results: logger.info(f"ProxyScrape 总计获取 {len(results)} 个代理") diff --git a/app/plugins/speedx.py b/app/plugins/speedx.py index 27804d1..2a9dc3e 100644 --- a/app/plugins/speedx.py +++ b/app/plugins/speedx.py @@ -18,13 +18,18 @@ class SpeedXPlugin(BaseHTTPPlugin): "https://raw.githubusercontent.com/TheSpeedX/SOCKS-List/master/socks4.txt", "https://raw.githubusercontent.com/TheSpeedX/SOCKS-List/master/socks5.txt", ] + self.fallback_urls = [ + "https://cdn.jsdelivr.net/gh/TheSpeedX/SOCKS-List@master/http.txt", + "https://cdn.jsdelivr.net/gh/TheSpeedX/SOCKS-List@master/socks4.txt", + "https://cdn.jsdelivr.net/gh/TheSpeedX/SOCKS-List@master/socks5.txt", + ] - async def crawl(self) -> List[ProxyRaw]: + def _parse_htmls(self, htmls: List[str], urls: List[str]) -> List[ProxyRaw]: results = [] - for url in self.urls: - html = await self.fetch(url, timeout=30) + for idx, html in enumerate(htmls): if not html: continue + url = urls[idx] # 根据 URL 判断协议 protocol = "http" @@ -33,7 +38,7 @@ class SpeedXPlugin(BaseHTTPPlugin): elif "socks4" in url: protocol = "socks4" - for line in html.split("\n"): + for line in html.splitlines(): line = line.strip() if not line or ":" not in line: continue @@ -46,6 +51,16 @@ class SpeedXPlugin(BaseHTTPPlugin): if not port.isdigit() or not (1 <= int(port) <= 65535): continue results.append(ProxyRaw(ip, int(port), protocol)) + return results + + async def crawl(self) -> List[ProxyRaw]: + htmls = await self.fetch_all(self.urls, timeout=15) + results = self._parse_htmls(htmls, self.urls) + + if not results: + logger.warning(f"{self.display_name} GitHub 源全部返回空,尝试 jsdelivr fallback") + htmls = await self.fetch_all(self.fallback_urls, timeout=15) + results = self._parse_htmls(htmls, self.fallback_urls) if results: logger.info(f"{self.display_name} 解析完成,获取 {len(results)} 个潜在代理") diff --git a/app/plugins/yundaili.py b/app/plugins/yundaili.py index 8f91746..1487916 100644 --- a/app/plugins/yundaili.py +++ b/app/plugins/yundaili.py @@ -1,6 +1,5 @@ import re from typing import List -from bs4 import BeautifulSoup from app.core.plugin_system import ProxyRaw from app.plugins.base import BaseHTTPPlugin from app.core.log import logger @@ -12,41 +11,71 @@ class YunDaiLiPlugin(BaseHTTPPlugin): default_config = {"max_pages": 5} name = "yundaili" display_name = "云代理" - description = "从云代理网站爬取免费代理" + description = "从 GitHub 公开代理列表获取免费代理" def __init__(self): super().__init__() + # 主数据源:GitHub raw self.urls = [ - f"http://www.ip3366.net/free/?stype=1&page={i}" for i in range(1, 6) - ] + [ - f"http://www.ip3366.net/free/?stype=2&page={i}" for i in range(1, 6) + ("http", "https://raw.githubusercontent.com/mmpx12/proxy-list/master/http.txt"), + ("socks4", "https://raw.githubusercontent.com/mmpx12/proxy-list/master/socks4.txt"), + ("socks5", "https://raw.githubusercontent.com/mmpx12/proxy-list/master/socks5.txt"), + ] + # Fallback:jsdelivr CDN 加速 + self.fallback_urls = [ + ("http", "https://cdn.jsdelivr.net/gh/mmpx12/proxy-list@master/http.txt"), + ("socks4", "https://cdn.jsdelivr.net/gh/mmpx12/proxy-list@master/socks4.txt"), + ("socks5", "https://cdn.jsdelivr.net/gh/mmpx12/proxy-list@master/socks5.txt"), ] - async def crawl(self) -> List[ProxyRaw]: - results = [] - for url in self.urls: - html = await self.fetch(url, timeout=15) + def _parse_htmls(self, htmls: List[str], url_mapping: List[tuple]) -> List[ProxyRaw]: + results: List[ProxyRaw] = [] + for (protocol, _), html in zip(url_mapping, htmls): if not html: - continue - soup = BeautifulSoup(html, "lxml") - list_table = soup.find("div", id="list") - if not list_table: - continue - table = list_table.find("table") - if not table: + logger.warning(f"{self.display_name} {protocol.upper()} 返回空内容,可能网络受限或源已失效") continue - for row in table.find_all("tr"): - tds = row.find_all("td") - if len(tds) >= 5: - ip = tds[0].get_text(strip=True) - port = tds[1].get_text(strip=True) - protocol = tds[4].get_text(strip=True).lower() if len(tds) > 4 else "http" - if protocol not in VALID_PROTOCOLS: - protocol = "http" - if re.match(r"^\d+\.\d+\.\d+\.\d+$", ip) and port.isdigit(): - results.append(ProxyRaw(ip, int(port), protocol)) + count = 0 + for line in html.splitlines(): + line = line.strip() + if not line or ":" not in line: + continue + parts = line.split(":") + if len(parts) < 2: + continue + ip = parts[0].strip() + port_str = parts[1].strip() + if not re.match(r"^\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}$", ip): + continue + if not port_str.isdigit() or not (1 <= int(port_str) <= 65535): + continue + final_protocol = protocol if protocol in VALID_PROTOCOLS else "http" + results.append(ProxyRaw(ip, int(port_str), final_protocol)) + count += 1 + + if count: + logger.info(f"{self.display_name} {protocol.upper()} 解析完成,获取 {count} 个潜在代理") + return results + + async def crawl(self) -> List[ProxyRaw]: + results: List[ProxyRaw] = [] + + # 顺序请求主源,避免某个 URL 卡住拖慢整体 + for protocol, url in self.urls: + html = await self.fetch(url, timeout=12) + if html: + results.extend(self._parse_htmls([html], [(protocol, url)])) + + # 主源为空时尝试 fallback(也顺序请求) + if not results: + logger.warning(f"{self.display_name} GitHub 主源全部返回空,尝试 jsdelivr fallback") + for protocol, url in self.fallback_urls: + html = await self.fetch(url, timeout=12) + if html: + results.extend(self._parse_htmls([html], [(protocol, url)])) if results: - logger.info(f"{self.display_name} 解析完成,获取 {len(results)} 个潜在代理") + logger.info(f"{self.display_name} 总计解析完成,获取 {len(results)} 个潜在代理") + else: + logger.warning(f"{self.display_name} 未获取到任何代理") return results diff --git a/app/repositories/settings_repo.py b/app/repositories/settings_repo.py index b2a0994..fa9afe7 100644 --- a/app/repositories/settings_repo.py +++ b/app/repositories/settings_repo.py @@ -124,17 +124,55 @@ class PluginSettingsRepository: logger.error(f"set_config failed for {plugin_id}: {e}") return False + @staticmethod + async def get_stats(db: aiosqlite.Connection, plugin_id: str) -> Dict[str, Any]: + async with db.execute( + "SELECT stats_json FROM plugin_settings WHERE plugin_id = ?", (plugin_id,) + ) as cursor: + row = await cursor.fetchone() + if row and row[0]: + try: + return json.loads(row[0]) + except json.JSONDecodeError: + return {} + return {} + + @staticmethod + async def set_stats(db: aiosqlite.Connection, plugin_id: str, stats: Dict[str, Any]) -> bool: + try: + await db.execute( + """ + INSERT INTO plugin_settings (plugin_id, stats_json, updated_at) + VALUES (?, ?, CURRENT_TIMESTAMP) + ON CONFLICT(plugin_id) DO UPDATE SET + stats_json = excluded.stats_json, + updated_at = CURRENT_TIMESTAMP + """, + (plugin_id, json.dumps(stats, ensure_ascii=False)), + ) + await db.commit() + return True + except Exception as e: + logger.error(f"set_stats failed for {plugin_id}: {e}") + return False + @staticmethod async def list_all(db: aiosqlite.Connection) -> Dict[str, Dict[str, Any]]: result = {} - async with db.execute("SELECT plugin_id, enabled, config_json FROM plugin_settings") as cursor: + async with db.execute("SELECT plugin_id, enabled, config_json, stats_json FROM plugin_settings") as cursor: rows = await cursor.fetchall() - for plugin_id, enabled, config_json in rows: + for plugin_id, enabled, config_json, stats_json in rows: config = {} if config_json: try: config = json.loads(config_json) except json.JSONDecodeError: pass - result[plugin_id] = {"enabled": bool(enabled), "config": config} + stats = {} + if stats_json: + try: + stats = json.loads(stats_json) + except json.JSONDecodeError: + pass + result[plugin_id] = {"enabled": bool(enabled), "config": config, "stats": stats} return result diff --git a/app/services/plugin_service.py b/app/services/plugin_service.py index 8fb9f39..6c856d7 100644 --- a/app/services/plugin_service.py +++ b/app/services/plugin_service.py @@ -31,11 +31,20 @@ class PluginService: if "config" in state and isinstance(state["config"], dict): plugin.update_config(state["config"]) - stat = self._stats.get(plugin.name, { - "success_count": 0, - "failure_count": 0, - "last_run": None, - }) + # 合并数据库统计与内存统计(内存优先) + db_stat = state.get("stats", {}) + stat = { + "success_count": db_stat.get("success_count", 0), + "failure_count": db_stat.get("failure_count", 0), + "last_run": datetime.fromisoformat(db_stat["last_run"]) if db_stat.get("last_run") else None, + } + mem_stat = self._stats.get(plugin.name, {}) + if mem_stat: + stat["success_count"] = mem_stat.get("success_count", stat["success_count"]) + stat["failure_count"] = mem_stat.get("failure_count", stat["failure_count"]) + if mem_stat.get("last_run"): + stat["last_run"] = mem_stat["last_run"] + result.append(PluginInfo( id=plugin.name, name=plugin.name, @@ -105,11 +114,19 @@ class PluginService: self._record_stat(plugin_id, failure=1) logger.error(f"Plugin {plugin_id} crawl failed: {e}") return [] + finally: + await self._save_stats(plugin_id) async def run_all_plugins(self) -> List[ProxyRaw]: - """执行所有启用插件的爬取""" + """执行所有启用插件的爬取,限制并发数以避免触发目标站反爬""" all_results: List[ProxyRaw] = [] - tasks = [self.run_plugin(plugin.name) for plugin in registry.list_plugins() if plugin.enabled] + semaphore = asyncio.Semaphore(5) + + async def _run_with_limit(plugin_name: str): + async with semaphore: + return await self.run_plugin(plugin_name) + + tasks = [_run_with_limit(plugin.name) for plugin in registry.list_plugins() if plugin.enabled] results_list = await asyncio.gather(*tasks, return_exceptions=True) for results in results_list: if isinstance(results, Exception): @@ -137,3 +154,14 @@ class PluginService: self._stats[plugin_id]["failure_count"] += failure if success or failure: self._stats[plugin_id]["last_run"] = datetime.now() + + async def _save_stats(self, plugin_id: str): + """将内存中的统计持久化到数据库""" + stats = self._stats.get(plugin_id, {}) + payload = { + "success_count": stats.get("success_count", 0), + "failure_count": stats.get("failure_count", 0), + "last_run": stats.get("last_run").isoformat() if stats.get("last_run") else None, + } + async with get_db() as db: + await self.plugin_settings_repo.set_stats(db, plugin_id, payload) diff --git a/app/services/scheduler_service.py b/app/services/scheduler_service.py index 9cae791..4d27e0b 100644 --- a/app/services/scheduler_service.py +++ b/app/services/scheduler_service.py @@ -22,6 +22,7 @@ class SchedulerService: self.running = False self._stop_event = asyncio.Event() self._task: asyncio.Task | None = None + self._validate_task: asyncio.Task | None = None async def start(self): if self.running: @@ -48,7 +49,9 @@ class SchedulerService: async def validate_all_now(self): """立即执行一次全量验证(后台运行,不阻塞)""" - asyncio.create_task(self._do_validate_all()) + if self._validate_task and not self._validate_task.done(): + return + self._validate_task = asyncio.create_task(self._do_validate_all()) async def _run_loop(self): """定时循环""" @@ -65,27 +68,30 @@ class SchedulerService: async def _do_validate_all(self): """验证数据库中所有存量代理""" - logger.info("Starting scheduled validation for all proxies") - async with get_db() as db: - proxies = await self.proxy_repo.list_all(db) - if not proxies: - logger.info("No proxies to validate") - return + try: + logger.info("Starting scheduled validation for all proxies") + async with get_db() as db: + proxies = await self.proxy_repo.list_all(db) + if not proxies: + logger.info("No proxies to validate") + return - logger.info(f"Validating {len(proxies)} proxies from database") - from app.models.domain import ProxyRaw + logger.info(f"Validating {len(proxies)} proxies from database") + from app.models.domain import ProxyRaw - # 批量提交到验证队列 - batch_size = 100 - for i in range(0, len(proxies), batch_size): - if not self.running: - break - batch = proxies[i : i + batch_size] - await self.validation_queue.submit([ - ProxyRaw(p.ip, p.port, p.protocol) for p in batch - ]) - # 等待当前批次处理完 - await self.validation_queue.drain() - logger.info(f"Validated batch {i//batch_size + 1}/{(len(proxies)-1)//batch_size + 1}") + # 批量提交到验证队列 + batch_size = 100 + for i in range(0, len(proxies), batch_size): + if not self.running: + break + batch = proxies[i : i + batch_size] + await self.validation_queue.submit([ + ProxyRaw(p.ip, p.port, p.protocol) for p in batch + ]) + # 等待当前批次处理完 + await self.validation_queue.drain() + logger.info(f"Validated batch {i//batch_size + 1}/{(len(proxies)-1)//batch_size + 1}") - logger.info("Scheduled validation completed") + logger.info("Scheduled validation completed") + except Exception as e: + logger.error(f"Scheduled validation error: {e}")