Files
ProxyPool/app/plugins/fpw_spys_one.py
祀梦 e582067316 fix(plugins): fpw parsers for JSON API, mirrors, and looser HTML
- fpw_proxy_list_download: parse JSON list/proxies bodies; jsDelivr monosans tier; crawl timeout 300s
- fpw_socks_ssl: try parse_html_table before regex
- fpw_hidemy: loose row scan when fixed columns fail
- fpw_proxynova: plain IP/port row fallback
- fpw_spys_one: HTTPS endpoints; crawl timeout 180s
- fpw_gatherproxy: HTTPS + extra JSON key patterns
- fpw_checkerproxy: lower min HTML length for parse
- fpw_premproxy: ip:port regex fallback when few table rows

Made-with: Cursor
2026-04-05 14:16:03 +08:00

150 lines
5.0 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""spys.one 表单 POST + 端口 XOR 解码README: GetProxyFromSPYSONE.py"""
import asyncio
import re
from typing import Dict, List, Tuple
from app.core.plugin_system import ProxyRaw
from app.plugins.base import BaseHTTPPlugin
from app.core.log import logger
class FpwSpysOnePlugin(BaseHTTPPlugin):
name = "fpw_spys_one"
display_name = "Spys.one"
description = "spys.one HTTP/SOCKS 列表POST 筛选 + XOR 端口解码)"
crawl_timeout_seconds = 180.0
def __init__(self):
super().__init__()
self.pages: List[Tuple[str, str, str]] = [
("http", "https://spys.one/en/http-proxy-list/", "1"),
("socks5", "https://spys.one/en/socks-proxy-list/", "2"),
]
@staticmethod
def _exec_spys_decoder(body: str) -> Dict[str, int]:
body = re.sub(r"\s+", "", body)
stmts = [s.strip() for s in body.split(";") if s.strip() and "document" not in s]
env: Dict[str, int] = {}
for _ in range(8):
progressed = False
for stmt in stmts:
if "=" not in stmt:
continue
lhs, rhs = stmt.split("=", 1)
lhs = lhs.strip()
rhs = rhs.strip()
if lhs in env:
continue
if "^" not in rhs:
if rhs.isdigit():
env[lhs] = int(rhs)
progressed = True
continue
a, b = rhs.split("^", 1)
a, b = a.strip(), b.strip()
def gv(x: str) -> int:
if x.isdigit():
return int(x)
return env[x]
try:
env[lhs] = gv(a) ^ gv(b)
progressed = True
except KeyError:
continue
if not progressed:
break
return env
def _decoder_env_from_html(self, html: str) -> Dict[str, int]:
best: Dict[str, int] = {}
for m in re.finditer(r"<script[^>]*>([\s\S]*?)</script>", html, re.IGNORECASE):
chunk = m.group(1).strip()
if "document.write" in chunk:
continue
xor_assigns = len(re.findall(r"\w+=\d+\^\w+", chunk))
if xor_assigns < 4:
continue
env = self._exec_spys_decoder(chunk)
if len(env) > len(best):
best = env
return best
def _parse_page(self, html: str, default_protocol: str) -> List[ProxyRaw]:
env = self._decoder_env_from_html(html)
if not env:
logger.warning(f"{self.display_name} 未解析到 XOR 变量表")
return []
results: List[ProxyRaw] = []
for m in re.finditer(
r"class=spy14>(\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3})<script[^>]*>([\s\S]+?)</script>",
html,
re.IGNORECASE,
):
ip = m.group(1)
inner = m.group(2)
dw = re.search(
r'document\.write\("[^"]*"\+((?:\(\w+\^\w+\)\+?)+)\)',
inner,
)
if not dw:
continue
pairs = re.findall(r"\((\w+)\^(\w+)\)", dw.group(1))
if not pairs:
continue
try:
digits = "".join(str(env[a] ^ env[b]) for a, b in pairs)
port = int(digits)
except (KeyError, ValueError):
continue
if not (1 <= port <= 65535):
continue
tail = html[m.end() : m.end() + 2000]
u = tail.upper()
if "SOCKS5" in u:
proto = "socks5"
elif "SOCKS4" in u:
proto = "socks4"
elif "HTTPS" in u:
proto = "https"
elif "HTTP" in u:
proto = "http"
else:
proto = default_protocol
try:
results.append(ProxyRaw(ip, port, proto))
except ValueError:
continue
return results
async def crawl(self) -> List[ProxyRaw]:
results: List[ProxyRaw] = []
form_base = {
"xpp": "3",
"xf1": "0",
"xf2": "0",
"xf4": "0",
}
async def _one(proto: str, url: str, xf5: str) -> Tuple[str, str]:
data = {**form_base, "xf5": xf5}
html = await self.fetch_post(url, data=data, timeout=25, retries=2)
return proto, html or ""
pairs = await asyncio.gather(
*(_one(proto, url, xf5) for proto, url, xf5 in self.pages)
)
for proto, html in pairs:
if not html:
continue
batch = self._parse_page(html, proto)
if batch:
results.extend(batch)
logger.info(f"{self.display_name} ({proto}): {len(batch)}")
if results:
logger.info(f"{self.display_name} 合计 {len(results)}")
return results