Files
ProxyPool/app/plugins/fpw_spys_one.py
祀梦 0131c8b408 feat: fpw plugins, validation/crawl perf, WS stats, test DB isolation
- Add Free_Proxy_Website-style fpw_* plugins and register them
- Per-plugin crawl timeout (crawl_timeout_seconds=120); remove global crawl_timeout setting
- Validator: fix connect vs total timeout on save; SOCKS session LRU cache; drop redundant semaphore
- Validation handler uses single DB connection; batch upsert after crawl; WorkerPool put_nowait
- Remove unused max_retries from settings API/UI; settings maintenance SQL + init_db cleanup of deprecated keys
- WebSocket dashboard stats; ProxyList pool_filter and API alignment
- POST /api/proxies/delete-one for IPv6-safe deletes; task poll stops on 404
- pytest uses PROXYPOOL_DB_PATH=db/proxies.test.sqlite so tests do not wipe production DB
- .gitignore: explicit proxies.test.sqlite patterns; fix plugin_service ValidationException import

Made-with: Cursor
2026-04-05 13:39:19 +08:00

149 lines
4.9 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""spys.one 表单 POST + 端口 XOR 解码README: GetProxyFromSPYSONE.py"""
import asyncio
import re
from typing import Dict, List, Tuple
from app.core.plugin_system import ProxyRaw
from app.plugins.base import BaseHTTPPlugin
from app.core.log import logger
class FpwSpysOnePlugin(BaseHTTPPlugin):
name = "fpw_spys_one"
display_name = "Spys.one"
description = "spys.one HTTP/SOCKS 列表POST 筛选 + XOR 端口解码)"
def __init__(self):
super().__init__()
self.pages: List[Tuple[str, str, str]] = [
("http", "http://spys.one/en/http-proxy-list/", "1"),
("socks5", "http://spys.one/en/socks-proxy-list/", "2"),
]
@staticmethod
def _exec_spys_decoder(body: str) -> Dict[str, int]:
body = re.sub(r"\s+", "", body)
stmts = [s.strip() for s in body.split(";") if s.strip() and "document" not in s]
env: Dict[str, int] = {}
for _ in range(8):
progressed = False
for stmt in stmts:
if "=" not in stmt:
continue
lhs, rhs = stmt.split("=", 1)
lhs = lhs.strip()
rhs = rhs.strip()
if lhs in env:
continue
if "^" not in rhs:
if rhs.isdigit():
env[lhs] = int(rhs)
progressed = True
continue
a, b = rhs.split("^", 1)
a, b = a.strip(), b.strip()
def gv(x: str) -> int:
if x.isdigit():
return int(x)
return env[x]
try:
env[lhs] = gv(a) ^ gv(b)
progressed = True
except KeyError:
continue
if not progressed:
break
return env
def _decoder_env_from_html(self, html: str) -> Dict[str, int]:
best: Dict[str, int] = {}
for m in re.finditer(r"<script[^>]*>([\s\S]*?)</script>", html, re.IGNORECASE):
chunk = m.group(1).strip()
if "document.write" in chunk:
continue
xor_assigns = len(re.findall(r"\w+=\d+\^\w+", chunk))
if xor_assigns < 4:
continue
env = self._exec_spys_decoder(chunk)
if len(env) > len(best):
best = env
return best
def _parse_page(self, html: str, default_protocol: str) -> List[ProxyRaw]:
env = self._decoder_env_from_html(html)
if not env:
logger.warning(f"{self.display_name} 未解析到 XOR 变量表")
return []
results: List[ProxyRaw] = []
for m in re.finditer(
r"class=spy14>(\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3})<script[^>]*>([\s\S]+?)</script>",
html,
re.IGNORECASE,
):
ip = m.group(1)
inner = m.group(2)
dw = re.search(
r'document\.write\("[^"]*"\+((?:\(\w+\^\w+\)\+?)+)\)',
inner,
)
if not dw:
continue
pairs = re.findall(r"\((\w+)\^(\w+)\)", dw.group(1))
if not pairs:
continue
try:
digits = "".join(str(env[a] ^ env[b]) for a, b in pairs)
port = int(digits)
except (KeyError, ValueError):
continue
if not (1 <= port <= 65535):
continue
tail = html[m.end() : m.end() + 2000]
u = tail.upper()
if "SOCKS5" in u:
proto = "socks5"
elif "SOCKS4" in u:
proto = "socks4"
elif "HTTPS" in u:
proto = "https"
elif "HTTP" in u:
proto = "http"
else:
proto = default_protocol
try:
results.append(ProxyRaw(ip, port, proto))
except ValueError:
continue
return results
async def crawl(self) -> List[ProxyRaw]:
results: List[ProxyRaw] = []
form_base = {
"xpp": "3",
"xf1": "0",
"xf2": "0",
"xf4": "0",
}
async def _one(proto: str, url: str, xf5: str) -> Tuple[str, str]:
data = {**form_base, "xf5": xf5}
html = await self.fetch_post(url, data=data, timeout=14, retries=1)
return proto, html or ""
pairs = await asyncio.gather(
*(_one(proto, url, xf5) for proto, url, xf5 in self.pages)
)
for proto, html in pairs:
if not html:
continue
batch = self._parse_page(html, proto)
if batch:
results.extend(batch)
logger.info(f"{self.display_name} ({proto}): {len(batch)}")
if results:
logger.info(f"{self.display_name} 合计 {len(results)}")
return results