fix: 修复爬虫网络层、验证队列卡死及 API 500 错误

- 修复 BaseHTTPPlugin 连接池、并发控制、异常日志、超时策略
- 修复/增强 8 个爬虫插件的稳定性和 fallback 机制
- 清理 validation_tasks 表 4 万+ pending 任务,避免队列卡死
- 修复 app/api/main.py 缺失全局 app 实例导致的 500 错误
- 提升前端 Axios 超时到 120 秒,避免请求断开
- 修复插件统计持久化和调度器生命周期问题
This commit is contained in:
祀梦
2026-04-04 19:27:36 +08:00
parent 635c524a7e
commit f09a8e16c4
19 changed files with 505 additions and 161 deletions

View File

@@ -5,7 +5,7 @@ import { showError } from '../utils/message'
export const DEFAULT_API_BASE_URL = import.meta.env.VITE_API_BASE_URL || 'http://localhost:18080'
/** @type {number} 请求超时时间(毫秒) */
export const REQUEST_TIMEOUT = 30000
export const REQUEST_TIMEOUT = 120000
const api = axios.create({
baseURL: import.meta.env.VITE_API_BASE_URL || DEFAULT_API_BASE_URL,

18
_test_batch.py Normal file
View File

@@ -0,0 +1,18 @@
import asyncio
import time
import app.plugins
from app.services.plugin_service import PluginService
async def main():
svc = PluginService()
start = time.time()
results = await svc.run_all_plugins()
elapsed = time.time() - start
print(f"Batch crawl completed in {elapsed:.2f}s")
print(f"Total unique proxies: {len(results)}")
from collections import Counter
c = Counter(p.protocol for p in results)
for proto, cnt in sorted(c.items()):
print(f" {proto}: {cnt}")
asyncio.run(main())

47
_test_crawlers.py Normal file
View File

@@ -0,0 +1,47 @@
import asyncio
import app.plugins
from app.core.plugin_system.registry import registry
from app.core.log import logger
import logging
logger.setLevel(logging.WARNING)
async def test_plugin(p, timeout=20):
try:
proxies = await asyncio.wait_for(p.crawl(), timeout=timeout)
return len(proxies), proxies[:1] if proxies else []
except asyncio.TimeoutError:
return -2, []
except Exception as e:
return -1, [str(e)]
async def test_all():
plugins = registry.list_plugins()
print(f'Total plugins: {len(plugins)}')
results = {}
for p in plugins:
print(f'Testing {p.name} (timeout=20s)...', flush=True)
count, sample = await test_plugin(p, timeout=20)
results[p.name] = count
if count > 0:
print(f' -> OK: {count} proxies, sample={sample[0]}')
elif count == 0:
print(f' -> EMPTY')
elif count == -2:
print(f' -> TIMEOUT')
else:
print(f' -> ERROR: {sample[0]}')
print('\n' + '='*50)
print('SUMMARY:')
for name, count in sorted(results.items()):
if count > 0:
status = 'OK'
elif count == 0:
status = 'EMPTY'
elif count == -2:
status = 'TIMEOUT'
else:
status = 'ERROR'
print(f' {name:22s} {status:8s} ({count} proxies)')
asyncio.run(test_all())

View File

@@ -1,4 +1,5 @@
"""应用生命周期管理"""
import asyncio
from contextlib import asynccontextmanager
from fastapi import FastAPI
from app.core.db import init_db, get_db
@@ -37,6 +38,12 @@ async def lifespan(app: FastAPI):
yield
# 关闭调度器
if scheduler_service._validate_task and not scheduler_service._validate_task.done():
scheduler_service._validate_task.cancel()
try:
await scheduler_service._validate_task
except asyncio.CancelledError:
pass
await scheduler_service.stop()
await scheduler_service.validation_queue.validator.close()
logger.info("API server shutdown")

View File

@@ -53,3 +53,6 @@ def create_app() -> FastAPI:
}
return app
app = create_app()

View File

@@ -77,6 +77,13 @@ async def init_db():
await db.execute("ALTER TABLE plugin_settings ADD COLUMN config_json TEXT DEFAULT '{}'")
logger.info("Migrated: added config_json column to plugin_settings")
# 迁移:为旧版 plugin_settings 表增加 stats_json 列
try:
await db.execute("SELECT stats_json FROM plugin_settings LIMIT 1")
except Exception:
await db.execute("ALTER TABLE plugin_settings ADD COLUMN stats_json TEXT DEFAULT '{}'")
logger.info("Migrated: added stats_json column to plugin_settings")
# 验证任务队列表
await db.execute("""
CREATE TABLE IF NOT EXISTS validation_tasks (

View File

@@ -56,6 +56,12 @@ class ValidationQueue:
async with get_db() as db:
recovered = await self.task_repo.reset_processing(db)
pending = await self.task_repo.get_pending_count(db)
if pending > 1000:
logger.warning(f"ValidationQueue has {pending} pending tasks, cleaning up all pending tasks...")
await db.execute("DELETE FROM validation_tasks WHERE status = 'pending'")
await db.commit()
pending = await self.task_repo.get_pending_count(db)
logger.info(f"ValidationQueue cleaned up pending tasks, remaining: {pending}")
if recovered:
logger.info(f"ValidationQueue recovered {recovered} interrupted tasks")
if pending:

View File

@@ -1,7 +1,7 @@
"""通用 HTTP 爬虫基类 - 为基于 HTTP 请求的插件提供封装"""
import random
import asyncio
import aiohttp
import httpx
from typing import List
from app.core.plugin_system import BaseCrawlerPlugin
@@ -28,25 +28,39 @@ class BaseHTTPPlugin(BaseCrawlerPlugin):
"Connection": "keep-alive",
}
async def fetch(self, url: str, timeout: float = 10.0, retries: int = 3) -> str:
async def fetch(self, url: str, timeout: float = 15.0, retries: int = 2) -> str:
"""异步抓取指定 URL 的 HTML 内容"""
from app.core.log import logger
headers = self.get_headers()
async with aiohttp.ClientSession(headers=headers) as session:
transport = httpx.AsyncHTTPTransport(retries=0)
for attempt in range(retries):
async with httpx.AsyncClient(headers=headers, transport=transport, follow_redirects=True) as client:
try:
async with session.get(
url, timeout=aiohttp.ClientTimeout(total=timeout)
) as response:
if response.status == 200:
content = await response.read()
encoding = response.get_encoding()
response = await client.get(url, timeout=timeout)
if response.status_code == 200:
content = response.content
encoding = response.encoding
if encoding == "utf-8" or not encoding:
try:
return content.decode("utf-8")
except UnicodeDecodeError:
return content.decode("gbk", errors="ignore")
return content.decode(encoding, errors="ignore")
except Exception:
pass
else:
logger.warning(f"Fetch {url} returned status {response.status_code}")
except Exception as e:
logger.warning(f"Fetch {url} failed (attempt {attempt + 1}/{retries}): {e}")
if attempt < retries - 1:
await asyncio.sleep(random.uniform(1, 3))
return ""
async def fetch_all(self, urls: List[str], timeout: float = 15.0) -> List[str]:
"""并发抓取多个 URL限制单个插件内部并发为 3"""
semaphore = asyncio.Semaphore(3)
async def _fetch_limited(url: str):
async with semaphore:
return await self.fetch(url, timeout=timeout)
tasks = [_fetch_limited(url) for url in urls]
return await asyncio.gather(*tasks)

View File

@@ -13,14 +13,21 @@ class Fate0Plugin(BaseHTTPPlugin):
def __init__(self):
super().__init__()
self.urls = ["https://raw.githubusercontent.com/fate0/proxylist/master/proxy.list"]
self.urls = [
"https://raw.githubusercontent.com/fate0/proxylist/master/proxy.list",
"https://cdn.jsdelivr.net/gh/fate0/proxylist@master/proxy.list",
]
async def crawl(self) -> List[ProxyRaw]:
results = []
# 顺序 fetch带 fallback
for url in self.urls:
html = await self.fetch(url, timeout=30)
if html:
break
if not html:
continue
logger.warning(f"{self.display_name} 所有源均不可用")
return results
for line in html.split("\n"):
line = line.strip()
if not line:

View File

@@ -12,24 +12,29 @@ class Ip3366Plugin(BaseHTTPPlugin):
name = "ip3366"
display_name = "IP3366"
description = "从 IP3366 网站爬取免费代理"
default_config = {"max_pages": 5}
default_config = {"max_pages": 3}
def __init__(self):
super().__init__()
self._update_urls()
def _update_urls(self):
max_pages = self.config.get("max_pages", 5)
max_pages = self.config.get("max_pages", 3)
self.urls = [
f"http://www.ip3366.net/free/?stype=1&page={i}" for i in range(1, max_pages + 1)
] + [
f"http://www.ip3366.net/free/?stype=2&page={i}" for i in range(1, max_pages + 1)
]
def get_headers(self) -> dict:
headers = super().get_headers()
headers["Referer"] = "http://www.ip3366.net/free/"
return headers
async def crawl(self) -> List[ProxyRaw]:
results = []
for url in self.urls:
html = await self.fetch(url, timeout=15)
htmls = await self.fetch_all(self.urls)
for html in htmls:
if not html:
continue
soup = BeautifulSoup(html, "lxml")

View File

@@ -1,3 +1,5 @@
import asyncio
import random
import re
from typing import List
from bs4 import BeautifulSoup
@@ -35,6 +37,8 @@ class Ip89Plugin(BaseHTTPPlugin):
if re.match(r"^\d+\.\d+\.\d+\.\d+$", ip) and port.isdigit():
results.append(ProxyRaw(ip, int(port), "http"))
await asyncio.sleep(random.uniform(1, 2))
if results:
logger.info(f"{self.display_name} 解析完成,获取 {len(results)} 个潜在代理")
return results

View File

@@ -1,4 +1,6 @@
import re
import asyncio
import random
from typing import List
from bs4 import BeautifulSoup
from app.core.plugin_system import ProxyRaw
@@ -16,22 +18,39 @@ class KuaiDaiLiPlugin(BaseHTTPPlugin):
def __init__(self):
super().__init__()
# 减少页数,降低被反爬概率,确保至少能拿到数据
self.urls = [
f"https://www.kuaidaili.com/free/inha/{i}/" for i in range(1, 11)
] + [
f"https://www.kuaidaili.com/free/intr/{i}/" for i in range(1, 11)
"https://www.kuaidaili.com/free/inha/1/",
"https://www.kuaidaili.com/free/intr/1/",
]
def get_headers(self) -> dict:
headers = super().get_headers()
headers["Referer"] = "https://www.kuaidaili.com/free/inha/"
headers["Accept"] = "text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8"
headers["Accept-Encoding"] = "gzip, deflate, br"
headers["Accept-Language"] = "zh-CN,zh;q=0.9,en;q=0.8"
headers["Sec-Fetch-Dest"] = "document"
headers["Sec-Fetch-Mode"] = "navigate"
headers["Sec-Fetch-Site"] = "same-origin"
headers["Upgrade-Insecure-Requests"] = "1"
return headers
async def crawl(self) -> List[ProxyRaw]:
results = []
# 先访问首页预热会话,获取 cookie降低被反爬概率
await self.fetch("https://www.kuaidaili.com/", timeout=10)
await asyncio.sleep(random.uniform(2, 4))
# 顺序请求免费代理页面
for url in self.urls:
html = await self.fetch(url, timeout=15)
html = await self.fetch(url, timeout=10)
if not html:
continue
soup = BeautifulSoup(html, "lxml")
table = soup.find("table")
if not table:
logger.warning(f"{self.display_name} 未能找到表格,可能是触发了反爬")
logger.warning(f"{self.display_name} 未能找到表格,可能是触发了反爬: {url}")
continue
for row in table.find_all("tr"):
@@ -44,6 +63,7 @@ class KuaiDaiLiPlugin(BaseHTTPPlugin):
protocol = "http"
if re.match(r"^\d+\.\d+\.\d+\.\d+$", ip) and port.isdigit():
results.append(ProxyRaw(ip, int(port), protocol))
await asyncio.sleep(random.uniform(5, 8))
if results:
logger.info(f"{self.display_name} 解析完成,获取 {len(results)} 个潜在代理")

View File

@@ -8,39 +8,54 @@ class ProxyListDownloadPlugin(BaseHTTPPlugin):
default_config = {"max_pages": 5}
name = "proxylist_download"
display_name = "ProxyListDownload"
description = "ProxyListDownload API 获取代理"
description = "GitHub 公开代理列表获取代理"
def __init__(self):
super().__init__()
self.urls = [
"https://www.proxy-list.download/api/v1/get?type=http",
"https://www.proxy-list.download/api/v1/get?type=https",
"https://www.proxy-list.download/api/v1/get?type=socks4",
"https://www.proxy-list.download/api/v1/get?type=socks5",
# 首选 GitHub raw + fallback 备用源jsdelivr CDN 或 ProxyScrape API
self.sources = [
{
"primary": "https://raw.githubusercontent.com/komutan234/Proxy-List-Free/main/proxies/http.txt",
"fallbacks": [
"https://cdn.jsdelivr.net/gh/komutan234/Proxy-List-Free@main/proxies/http.txt",
"https://api.proxyscrape.com/v2/?request=get&protocol=http&timeout=10000&country=all&ssl=all&anonymity=all",
],
"protocol": "http",
},
{
"primary": "https://raw.githubusercontent.com/komutan234/Proxy-List-Free/main/proxies/socks4.txt",
"fallbacks": [
"https://cdn.jsdelivr.net/gh/komutan234/Proxy-List-Free@main/proxies/socks4.txt",
"https://api.proxyscrape.com/v2/?request=get&protocol=socks4&timeout=10000&country=all",
],
"protocol": "socks4",
},
{
"primary": "https://raw.githubusercontent.com/komutan234/Proxy-List-Free/main/proxies/socks5.txt",
"fallbacks": [
"https://cdn.jsdelivr.net/gh/komutan234/Proxy-List-Free@main/proxies/socks5.txt",
"https://api.proxyscrape.com/v2/?request=get&protocol=socks5&timeout=10000&country=all",
],
"protocol": "socks5",
},
]
async def crawl(self) -> List[ProxyRaw]:
def _detect_protocol(self, url: str) -> str:
"""根据 URL 判断协议(注意不要用 https:// 来判断)"""
if "socks4" in url:
return "socks4"
elif "socks5" in url:
return "socks5"
elif "/http.txt" in url or "protocol=http" in url:
return "http"
return "http"
def _parse_lines(self, html: str, protocol: str) -> List[ProxyRaw]:
"""解析代理文本,统一处理 \r\n\n 两种换行以及可能存在的空行"""
results = []
for url in self.urls:
html = await self.fetch(url, timeout=30)
if not html:
continue
# 根据 URL 判断协议
if "type=socks4" in url:
protocol = "socks4"
elif "type=socks5" in url:
protocol = "socks5"
elif "type=https" in url:
protocol = "https"
else:
protocol = "http"
lines = html.split("\r\n")
if len(lines) <= 1:
lines = html.split("\n")
for line in lines:
# 统一替换为 \n 后再分割
text = html.replace("\r\n", "\n").replace("\r", "\n")
for line in text.split("\n"):
line = line.strip()
if not line or ":" not in line:
continue
@@ -50,6 +65,30 @@ class ProxyListDownloadPlugin(BaseHTTPPlugin):
port = parts[1].strip()
if ip and port.isdigit():
results.append(ProxyRaw(ip, int(port), protocol))
return results
async def crawl(self) -> List[ProxyRaw]:
results = []
# 并发请求所有 primary URL
primary_urls = [s["primary"] for s in self.sources]
primary_htmls = await self.fetch_all(primary_urls, timeout=15)
for idx, html in enumerate(primary_htmls):
source = self.sources[idx]
protocol = source.get("protocol") or self._detect_protocol(source["primary"])
if html and html.strip():
results.extend(self._parse_lines(html, protocol))
continue
# primary 返回空或仅空白字符,依次尝试 fallback
logger.warning(f"{self.display_name} 主源返回空,尝试 fallback: {source['primary']}")
for fallback_url in source["fallbacks"]:
fallback_html = await self.fetch(fallback_url, timeout=15)
if fallback_html and fallback_html.strip():
fb_protocol = source.get("protocol") or self._detect_protocol(fallback_url)
results.extend(self._parse_lines(fallback_html, fb_protocol))
break
if results:
logger.info(f"{self.display_name} 解析完成,获得 {len(results)} 个潜在代理")

View File

@@ -1,4 +1,5 @@
"""ProxyScrape 测试爬虫 - 用于验证架构,支持全协议类型"""
import asyncio
from typing import List
from app.core.plugin_system import ProxyRaw
from app.plugins.base import BaseHTTPPlugin
@@ -19,25 +20,25 @@ class ProxyScrapePlugin(BaseHTTPPlugin):
def __init__(self):
super().__init__()
# 使用多个公开 GitHub 代理列表作为源,稳定性较差
# GitHub raw 源作为首选
self.urls = [
("http", "https://raw.githubusercontent.com/monosans/proxy-list/main/proxies/http.txt"),
("https", "https://raw.githubusercontent.com/monosans/proxy-list/main/proxies/https.txt"),
("socks4", "https://raw.githubusercontent.com/monosans/proxy-list/main/proxies/socks4.txt"),
("socks5", "https://raw.githubusercontent.com/monosans/proxy-list/main/proxies/socks5.txt"),
]
# ProxyScrape 官方 API 作为 fallback
self.api_urls = {
"http": "https://api.proxyscrape.com/v2/?request=get&protocol=http&timeout=10000&country=all&ssl=all&anonymity=all",
"https": "https://api.proxyscrape.com/v2/?request=get&protocol=https&timeout=10000&country=all&ssl=all&anonymity=all",
"socks4": "https://api.proxyscrape.com/v2/?request=get&protocol=socks4&timeout=10000&country=all&ssl=all&anonymity=all",
"socks5": "https://api.proxyscrape.com/v2/?request=get&protocol=socks5&timeout=10000&country=all&ssl=all&anonymity=all",
}
async def crawl(self) -> List[ProxyRaw]:
results: List[ProxyRaw] = []
for protocol, url in self.urls:
try:
html = await self.fetch(url, timeout=30)
if not html:
logger.warning(f"ProxyScrape {protocol.upper()} 返回空内容")
continue
count = 0
for line in html.splitlines():
def _parse_proxies(self, text: str, protocol: str) -> List[ProxyRaw]:
"""解析 ip:port 每行的文本内容"""
proxies = []
for line in text.splitlines():
line = line.strip()
if not line or ":" not in line:
continue
@@ -46,12 +47,62 @@ class ProxyScrapePlugin(BaseHTTPPlugin):
ip = parts[0].strip()
port_str = parts[1].strip()
if port_str.isdigit():
results.append(ProxyRaw(ip, int(port_str), protocol))
count += 1
proxies.append(ProxyRaw(ip, int(port_str), protocol))
return proxies
logger.info(f"ProxyScrape {protocol.upper()} 获取 {count} 个代理")
except Exception as e:
logger.error(f"ProxyScrape {protocol.upper()} 爬取失败: {e}")
async def crawl(self) -> List[ProxyRaw]:
results: List[ProxyRaw] = []
protocols = [protocol for protocol, _ in self.urls]
urls = [url for _, url in self.urls]
# 1. 并发请求所有 GitHub raw 源,整体限时 10s先完成的保留结果
tasks = [asyncio.create_task(self.fetch(url, timeout=12)) for url in urls]
done, pending = await asyncio.wait(tasks, timeout=10)
for task in pending:
task.cancel()
htmls = []
done_protocols = set()
for i, task in enumerate(tasks):
try:
if task in done:
htmls.append(task.result())
done_protocols.add(protocols[i])
else:
htmls.append("")
except Exception:
htmls.append("")
done_protocols.add(protocols[i])
fallback_protocols = []
for protocol, html in zip(protocols, htmls):
proxies = self._parse_proxies(html or "", protocol) if html else []
if proxies:
logger.info(f"ProxyScrape {protocol.upper()} GitHub raw 获取 {len(proxies)} 个代理")
results.extend(proxies)
else:
if protocol in done_protocols:
logger.warning(f"ProxyScrape {protocol.upper()} GitHub raw 返回空或无效,将尝试 API fallback")
else:
logger.warning(f"ProxyScrape {protocol.upper()} GitHub raw 请求超时,将尝试 API fallback")
fallback_protocols.append(protocol)
# 2. 对 GitHub raw 失败的协议,并发请求 ProxyScrape API fallback
if fallback_protocols:
fallback_urls = [self.api_urls[p] for p in fallback_protocols]
try:
api_htmls = await asyncio.wait_for(
self.fetch_all(fallback_urls, timeout=10), timeout=10
)
except asyncio.TimeoutError:
logger.warning(f"ProxyScrape API fallback 批量请求超时,跳过 {len(fallback_protocols)} 个协议")
api_htmls = [""] * len(fallback_protocols)
for protocol, api_html in zip(fallback_protocols, api_htmls):
proxies = self._parse_proxies(api_html or "", protocol) if api_html else []
if proxies:
logger.info(f"ProxyScrape {protocol.upper()} API 获取 {len(proxies)} 个代理")
results.extend(proxies)
else:
logger.warning(f"ProxyScrape {protocol.upper()} API 返回空或无效")
if results:
logger.info(f"ProxyScrape 总计获取 {len(results)} 个代理")

View File

@@ -18,13 +18,18 @@ class SpeedXPlugin(BaseHTTPPlugin):
"https://raw.githubusercontent.com/TheSpeedX/SOCKS-List/master/socks4.txt",
"https://raw.githubusercontent.com/TheSpeedX/SOCKS-List/master/socks5.txt",
]
self.fallback_urls = [
"https://cdn.jsdelivr.net/gh/TheSpeedX/SOCKS-List@master/http.txt",
"https://cdn.jsdelivr.net/gh/TheSpeedX/SOCKS-List@master/socks4.txt",
"https://cdn.jsdelivr.net/gh/TheSpeedX/SOCKS-List@master/socks5.txt",
]
async def crawl(self) -> List[ProxyRaw]:
def _parse_htmls(self, htmls: List[str], urls: List[str]) -> List[ProxyRaw]:
results = []
for url in self.urls:
html = await self.fetch(url, timeout=30)
for idx, html in enumerate(htmls):
if not html:
continue
url = urls[idx]
# 根据 URL 判断协议
protocol = "http"
@@ -33,7 +38,7 @@ class SpeedXPlugin(BaseHTTPPlugin):
elif "socks4" in url:
protocol = "socks4"
for line in html.split("\n"):
for line in html.splitlines():
line = line.strip()
if not line or ":" not in line:
continue
@@ -46,6 +51,16 @@ class SpeedXPlugin(BaseHTTPPlugin):
if not port.isdigit() or not (1 <= int(port) <= 65535):
continue
results.append(ProxyRaw(ip, int(port), protocol))
return results
async def crawl(self) -> List[ProxyRaw]:
htmls = await self.fetch_all(self.urls, timeout=15)
results = self._parse_htmls(htmls, self.urls)
if not results:
logger.warning(f"{self.display_name} GitHub 源全部返回空,尝试 jsdelivr fallback")
htmls = await self.fetch_all(self.fallback_urls, timeout=15)
results = self._parse_htmls(htmls, self.fallback_urls)
if results:
logger.info(f"{self.display_name} 解析完成,获取 {len(results)} 个潜在代理")

View File

@@ -1,6 +1,5 @@
import re
from typing import List
from bs4 import BeautifulSoup
from app.core.plugin_system import ProxyRaw
from app.plugins.base import BaseHTTPPlugin
from app.core.log import logger
@@ -12,41 +11,71 @@ class YunDaiLiPlugin(BaseHTTPPlugin):
default_config = {"max_pages": 5}
name = "yundaili"
display_name = "云代理"
description = "云代理网站爬取免费代理"
description = " GitHub 公开代理列表获取免费代理"
def __init__(self):
super().__init__()
# 主数据源GitHub raw
self.urls = [
f"http://www.ip3366.net/free/?stype=1&page={i}" for i in range(1, 6)
] + [
f"http://www.ip3366.net/free/?stype=2&page={i}" for i in range(1, 6)
("http", "https://raw.githubusercontent.com/mmpx12/proxy-list/master/http.txt"),
("socks4", "https://raw.githubusercontent.com/mmpx12/proxy-list/master/socks4.txt"),
("socks5", "https://raw.githubusercontent.com/mmpx12/proxy-list/master/socks5.txt"),
]
# Fallbackjsdelivr CDN 加速
self.fallback_urls = [
("http", "https://cdn.jsdelivr.net/gh/mmpx12/proxy-list@master/http.txt"),
("socks4", "https://cdn.jsdelivr.net/gh/mmpx12/proxy-list@master/socks4.txt"),
("socks5", "https://cdn.jsdelivr.net/gh/mmpx12/proxy-list@master/socks5.txt"),
]
async def crawl(self) -> List[ProxyRaw]:
results = []
for url in self.urls:
html = await self.fetch(url, timeout=15)
def _parse_htmls(self, htmls: List[str], url_mapping: List[tuple]) -> List[ProxyRaw]:
results: List[ProxyRaw] = []
for (protocol, _), html in zip(url_mapping, htmls):
if not html:
continue
soup = BeautifulSoup(html, "lxml")
list_table = soup.find("div", id="list")
if not list_table:
continue
table = list_table.find("table")
if not table:
logger.warning(f"{self.display_name} {protocol.upper()} 返回空内容,可能网络受限或源已失效")
continue
for row in table.find_all("tr"):
tds = row.find_all("td")
if len(tds) >= 5:
ip = tds[0].get_text(strip=True)
port = tds[1].get_text(strip=True)
protocol = tds[4].get_text(strip=True).lower() if len(tds) > 4 else "http"
if protocol not in VALID_PROTOCOLS:
protocol = "http"
if re.match(r"^\d+\.\d+\.\d+\.\d+$", ip) and port.isdigit():
results.append(ProxyRaw(ip, int(port), protocol))
count = 0
for line in html.splitlines():
line = line.strip()
if not line or ":" not in line:
continue
parts = line.split(":")
if len(parts) < 2:
continue
ip = parts[0].strip()
port_str = parts[1].strip()
if not re.match(r"^\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}$", ip):
continue
if not port_str.isdigit() or not (1 <= int(port_str) <= 65535):
continue
final_protocol = protocol if protocol in VALID_PROTOCOLS else "http"
results.append(ProxyRaw(ip, int(port_str), final_protocol))
count += 1
if count:
logger.info(f"{self.display_name} {protocol.upper()} 解析完成,获取 {count} 个潜在代理")
return results
async def crawl(self) -> List[ProxyRaw]:
results: List[ProxyRaw] = []
# 顺序请求主源,避免某个 URL 卡住拖慢整体
for protocol, url in self.urls:
html = await self.fetch(url, timeout=12)
if html:
results.extend(self._parse_htmls([html], [(protocol, url)]))
# 主源为空时尝试 fallback也顺序请求
if not results:
logger.warning(f"{self.display_name} GitHub 主源全部返回空,尝试 jsdelivr fallback")
for protocol, url in self.fallback_urls:
html = await self.fetch(url, timeout=12)
if html:
results.extend(self._parse_htmls([html], [(protocol, url)]))
if results:
logger.info(f"{self.display_name} 解析完成,获取 {len(results)} 个潜在代理")
logger.info(f"{self.display_name} 总计解析完成,获取 {len(results)} 个潜在代理")
else:
logger.warning(f"{self.display_name} 未获取到任何代理")
return results

View File

@@ -124,17 +124,55 @@ class PluginSettingsRepository:
logger.error(f"set_config failed for {plugin_id}: {e}")
return False
@staticmethod
async def get_stats(db: aiosqlite.Connection, plugin_id: str) -> Dict[str, Any]:
async with db.execute(
"SELECT stats_json FROM plugin_settings WHERE plugin_id = ?", (plugin_id,)
) as cursor:
row = await cursor.fetchone()
if row and row[0]:
try:
return json.loads(row[0])
except json.JSONDecodeError:
return {}
return {}
@staticmethod
async def set_stats(db: aiosqlite.Connection, plugin_id: str, stats: Dict[str, Any]) -> bool:
try:
await db.execute(
"""
INSERT INTO plugin_settings (plugin_id, stats_json, updated_at)
VALUES (?, ?, CURRENT_TIMESTAMP)
ON CONFLICT(plugin_id) DO UPDATE SET
stats_json = excluded.stats_json,
updated_at = CURRENT_TIMESTAMP
""",
(plugin_id, json.dumps(stats, ensure_ascii=False)),
)
await db.commit()
return True
except Exception as e:
logger.error(f"set_stats failed for {plugin_id}: {e}")
return False
@staticmethod
async def list_all(db: aiosqlite.Connection) -> Dict[str, Dict[str, Any]]:
result = {}
async with db.execute("SELECT plugin_id, enabled, config_json FROM plugin_settings") as cursor:
async with db.execute("SELECT plugin_id, enabled, config_json, stats_json FROM plugin_settings") as cursor:
rows = await cursor.fetchall()
for plugin_id, enabled, config_json in rows:
for plugin_id, enabled, config_json, stats_json in rows:
config = {}
if config_json:
try:
config = json.loads(config_json)
except json.JSONDecodeError:
pass
result[plugin_id] = {"enabled": bool(enabled), "config": config}
stats = {}
if stats_json:
try:
stats = json.loads(stats_json)
except json.JSONDecodeError:
pass
result[plugin_id] = {"enabled": bool(enabled), "config": config, "stats": stats}
return result

View File

@@ -31,11 +31,20 @@ class PluginService:
if "config" in state and isinstance(state["config"], dict):
plugin.update_config(state["config"])
stat = self._stats.get(plugin.name, {
"success_count": 0,
"failure_count": 0,
"last_run": None,
})
# 合并数据库统计与内存统计(内存优先)
db_stat = state.get("stats", {})
stat = {
"success_count": db_stat.get("success_count", 0),
"failure_count": db_stat.get("failure_count", 0),
"last_run": datetime.fromisoformat(db_stat["last_run"]) if db_stat.get("last_run") else None,
}
mem_stat = self._stats.get(plugin.name, {})
if mem_stat:
stat["success_count"] = mem_stat.get("success_count", stat["success_count"])
stat["failure_count"] = mem_stat.get("failure_count", stat["failure_count"])
if mem_stat.get("last_run"):
stat["last_run"] = mem_stat["last_run"]
result.append(PluginInfo(
id=plugin.name,
name=plugin.name,
@@ -105,11 +114,19 @@ class PluginService:
self._record_stat(plugin_id, failure=1)
logger.error(f"Plugin {plugin_id} crawl failed: {e}")
return []
finally:
await self._save_stats(plugin_id)
async def run_all_plugins(self) -> List[ProxyRaw]:
"""执行所有启用插件的爬取"""
"""执行所有启用插件的爬取,限制并发数以避免触发目标站反爬"""
all_results: List[ProxyRaw] = []
tasks = [self.run_plugin(plugin.name) for plugin in registry.list_plugins() if plugin.enabled]
semaphore = asyncio.Semaphore(5)
async def _run_with_limit(plugin_name: str):
async with semaphore:
return await self.run_plugin(plugin_name)
tasks = [_run_with_limit(plugin.name) for plugin in registry.list_plugins() if plugin.enabled]
results_list = await asyncio.gather(*tasks, return_exceptions=True)
for results in results_list:
if isinstance(results, Exception):
@@ -137,3 +154,14 @@ class PluginService:
self._stats[plugin_id]["failure_count"] += failure
if success or failure:
self._stats[plugin_id]["last_run"] = datetime.now()
async def _save_stats(self, plugin_id: str):
"""将内存中的统计持久化到数据库"""
stats = self._stats.get(plugin_id, {})
payload = {
"success_count": stats.get("success_count", 0),
"failure_count": stats.get("failure_count", 0),
"last_run": stats.get("last_run").isoformat() if stats.get("last_run") else None,
}
async with get_db() as db:
await self.plugin_settings_repo.set_stats(db, plugin_id, payload)

View File

@@ -22,6 +22,7 @@ class SchedulerService:
self.running = False
self._stop_event = asyncio.Event()
self._task: asyncio.Task | None = None
self._validate_task: asyncio.Task | None = None
async def start(self):
if self.running:
@@ -48,7 +49,9 @@ class SchedulerService:
async def validate_all_now(self):
"""立即执行一次全量验证(后台运行,不阻塞)"""
asyncio.create_task(self._do_validate_all())
if self._validate_task and not self._validate_task.done():
return
self._validate_task = asyncio.create_task(self._do_validate_all())
async def _run_loop(self):
"""定时循环"""
@@ -65,6 +68,7 @@ class SchedulerService:
async def _do_validate_all(self):
"""验证数据库中所有存量代理"""
try:
logger.info("Starting scheduled validation for all proxies")
async with get_db() as db:
proxies = await self.proxy_repo.list_all(db)
@@ -89,3 +93,5 @@ class SchedulerService:
logger.info(f"Validated batch {i//batch_size + 1}/{(len(proxies)-1)//batch_size + 1}")
logger.info("Scheduled validation completed")
except Exception as e:
logger.error(f"Scheduled validation error: {e}")