"""spys.one 表单 POST + 端口 XOR 解码(README: GetProxyFromSPYSONE.py)。""" import asyncio import re from typing import Dict, List, Tuple from app.core.plugin_system import ProxyRaw from app.plugins.base import BaseHTTPPlugin from app.core.log import logger class FpwSpysOnePlugin(BaseHTTPPlugin): name = "fpw_spys_one" display_name = "Spys.one" description = "spys.one HTTP/SOCKS 列表(POST 筛选 + XOR 端口解码)" def __init__(self): super().__init__() self.pages: List[Tuple[str, str, str]] = [ ("http", "http://spys.one/en/http-proxy-list/", "1"), ("socks5", "http://spys.one/en/socks-proxy-list/", "2"), ] @staticmethod def _exec_spys_decoder(body: str) -> Dict[str, int]: body = re.sub(r"\s+", "", body) stmts = [s.strip() for s in body.split(";") if s.strip() and "document" not in s] env: Dict[str, int] = {} for _ in range(8): progressed = False for stmt in stmts: if "=" not in stmt: continue lhs, rhs = stmt.split("=", 1) lhs = lhs.strip() rhs = rhs.strip() if lhs in env: continue if "^" not in rhs: if rhs.isdigit(): env[lhs] = int(rhs) progressed = True continue a, b = rhs.split("^", 1) a, b = a.strip(), b.strip() def gv(x: str) -> int: if x.isdigit(): return int(x) return env[x] try: env[lhs] = gv(a) ^ gv(b) progressed = True except KeyError: continue if not progressed: break return env def _decoder_env_from_html(self, html: str) -> Dict[str, int]: best: Dict[str, int] = {} for m in re.finditer(r"]*>([\s\S]*?)", html, re.IGNORECASE): chunk = m.group(1).strip() if "document.write" in chunk: continue xor_assigns = len(re.findall(r"\w+=\d+\^\w+", chunk)) if xor_assigns < 4: continue env = self._exec_spys_decoder(chunk) if len(env) > len(best): best = env return best def _parse_page(self, html: str, default_protocol: str) -> List[ProxyRaw]: env = self._decoder_env_from_html(html) if not env: logger.warning(f"{self.display_name} 未解析到 XOR 变量表") return [] results: List[ProxyRaw] = [] for m in re.finditer( r"class=spy14>(\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3})]*>([\s\S]+?)", html, re.IGNORECASE, ): ip = m.group(1) inner = m.group(2) dw = re.search( r'document\.write\("[^"]*"\+((?:\(\w+\^\w+\)\+?)+)\)', inner, ) if not dw: continue pairs = re.findall(r"\((\w+)\^(\w+)\)", dw.group(1)) if not pairs: continue try: digits = "".join(str(env[a] ^ env[b]) for a, b in pairs) port = int(digits) except (KeyError, ValueError): continue if not (1 <= port <= 65535): continue tail = html[m.end() : m.end() + 2000] u = tail.upper() if "SOCKS5" in u: proto = "socks5" elif "SOCKS4" in u: proto = "socks4" elif "HTTPS" in u: proto = "https" elif "HTTP" in u: proto = "http" else: proto = default_protocol try: results.append(ProxyRaw(ip, port, proto)) except ValueError: continue return results async def crawl(self) -> List[ProxyRaw]: results: List[ProxyRaw] = [] form_base = { "xpp": "3", "xf1": "0", "xf2": "0", "xf4": "0", } async def _one(proto: str, url: str, xf5: str) -> Tuple[str, str]: data = {**form_base, "xf5": xf5} html = await self.fetch_post(url, data=data, timeout=25, retries=2) return proto, html or "" pairs = await asyncio.gather( *(_one(proto, url, xf5) for proto, url, xf5 in self.pages) ) for proto, html in pairs: if not html: continue batch = self._parse_page(html, proto) if batch: results.extend(batch) logger.info(f"{self.display_name} ({proto}): {len(batch)} 条") if results: logger.info(f"{self.display_name} 合计 {len(results)} 条") return results