"""spys.one 表单 POST + 端口 XOR 解码(README: GetProxyFromSPYSONE.py)。"""
import asyncio
import re
from typing import Dict, List, Tuple
from app.core.plugin_system import ProxyRaw
from app.plugins.base import BaseHTTPPlugin
from app.core.log import logger
class FpwSpysOnePlugin(BaseHTTPPlugin):
name = "fpw_spys_one"
display_name = "Spys.one"
description = "spys.one HTTP/SOCKS 列表(POST 筛选 + XOR 端口解码)"
crawl_timeout_seconds = 180.0
def __init__(self):
super().__init__()
self.pages: List[Tuple[str, str, str]] = [
("http", "https://spys.one/en/http-proxy-list/", "1"),
("socks5", "https://spys.one/en/socks-proxy-list/", "2"),
]
@staticmethod
def _exec_spys_decoder(body: str) -> Dict[str, int]:
body = re.sub(r"\s+", "", body)
stmts = [s.strip() for s in body.split(";") if s.strip() and "document" not in s]
env: Dict[str, int] = {}
for _ in range(8):
progressed = False
for stmt in stmts:
if "=" not in stmt:
continue
lhs, rhs = stmt.split("=", 1)
lhs = lhs.strip()
rhs = rhs.strip()
if lhs in env:
continue
if "^" not in rhs:
if rhs.isdigit():
env[lhs] = int(rhs)
progressed = True
continue
a, b = rhs.split("^", 1)
a, b = a.strip(), b.strip()
def gv(x: str) -> int:
if x.isdigit():
return int(x)
return env[x]
try:
env[lhs] = gv(a) ^ gv(b)
progressed = True
except KeyError:
continue
if not progressed:
break
return env
def _decoder_env_from_html(self, html: str) -> Dict[str, int]:
best: Dict[str, int] = {}
for m in re.finditer(r"", html, re.IGNORECASE):
chunk = m.group(1).strip()
if "document.write" in chunk:
continue
xor_assigns = len(re.findall(r"\w+=\d+\^\w+", chunk))
if xor_assigns < 4:
continue
env = self._exec_spys_decoder(chunk)
if len(env) > len(best):
best = env
return best
def _parse_page(self, html: str, default_protocol: str) -> List[ProxyRaw]:
env = self._decoder_env_from_html(html)
if not env:
logger.warning(f"{self.display_name} 未解析到 XOR 变量表")
return []
results: List[ProxyRaw] = []
for m in re.finditer(
r"class=spy14>(\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3})",
html,
re.IGNORECASE,
):
ip = m.group(1)
inner = m.group(2)
dw = re.search(
r'document\.write\("[^"]*"\+((?:\(\w+\^\w+\)\+?)+)\)',
inner,
)
if not dw:
continue
pairs = re.findall(r"\((\w+)\^(\w+)\)", dw.group(1))
if not pairs:
continue
try:
digits = "".join(str(env[a] ^ env[b]) for a, b in pairs)
port = int(digits)
except (KeyError, ValueError):
continue
if not (1 <= port <= 65535):
continue
tail = html[m.end() : m.end() + 2000]
u = tail.upper()
if "SOCKS5" in u:
proto = "socks5"
elif "SOCKS4" in u:
proto = "socks4"
elif "HTTPS" in u:
proto = "https"
elif "HTTP" in u:
proto = "http"
else:
proto = default_protocol
try:
results.append(ProxyRaw(ip, port, proto))
except ValueError:
continue
return results
async def crawl(self) -> List[ProxyRaw]:
results: List[ProxyRaw] = []
form_base = {
"xpp": "3",
"xf1": "0",
"xf2": "0",
"xf4": "0",
}
async def _one(proto: str, url: str, xf5: str) -> Tuple[str, str]:
data = {**form_base, "xf5": xf5}
html = await self.fetch_post(url, data=data, timeout=25, retries=2)
return proto, html or ""
pairs = await asyncio.gather(
*(_one(proto, url, xf5) for proto, url, xf5 in self.pages)
)
for proto, html in pairs:
if not html:
continue
batch = self._parse_page(html, proto)
if batch:
results.extend(batch)
logger.info(f"{self.display_name} ({proto}): {len(batch)} 条")
if results:
logger.info(f"{self.display_name} 合计 {len(results)} 条")
return results