"""代理验证服务 - 支持 HTTP/HTTPS/SOCKS4/SOCKS5""" import asyncio import random import time from collections import OrderedDict from typing import Tuple, Optional, List import aiohttp import aiohttp_socks from app.core.config import settings as app_settings from app.core.log import logger class ValidatorService: """代理验证器 支持动态读取配置,实现设置热更新。 并发由 AsyncWorkerPool.worker_count 限制,此处不再套 Semaphore。 """ # 测试 URL 默认池 DEFAULT_TEST_URLS = { "http": [ "http://httpbin.org/ip", "http://api.ipify.org", "http://www.baidu.com", "http://www.qq.com", ], "https": [ "https://httpbin.org/ip", "https://api.ipify.org", "https://www.baidu.com", "https://www.qq.com", ], } _SOCKS_CACHE_CAP = 128 def __init__( self, timeout: Optional[float] = None, connect_timeout: Optional[float] = None, max_concurrency: Optional[int] = None, ): self._init_timeout = timeout if timeout is not None else app_settings.validator_timeout self._init_connect_timeout = ( connect_timeout if connect_timeout is not None else app_settings.validator_connect_timeout ) self._init_max_concurrency = ( max_concurrency if max_concurrency is not None else app_settings.validator_max_concurrency ) self._http_connector: Optional[aiohttp.TCPConnector] = None self._http_session: Optional[aiohttp.ClientSession] = None self._lock = asyncio.Lock() self._test_urls: Optional[List[str]] = None self._socks_sessions: "OrderedDict[Tuple[str, str, int], aiohttp.ClientSession]" = OrderedDict() self._socks_lock = asyncio.Lock() @property def timeout(self) -> float: return float(self._init_timeout) @property def connect_timeout(self) -> float: return float(self._init_connect_timeout) @property def max_concurrency(self) -> int: return int(self._init_max_concurrency) def _client_timeout(self) -> aiohttp.ClientTimeout: t = float(self.timeout) c = min(float(self.connect_timeout), t) sock_read = min(t, max(2.0, t * 0.85)) return aiohttp.ClientTimeout(total=t, connect=c, sock_read=sock_read) async def _ensure_session(self) -> aiohttp.ClientSession: """懒加载共享 HTTP session""" if self._http_session is None or self._http_session.closed: async with self._lock: if self._http_session is None or self._http_session.closed: connector = aiohttp.TCPConnector( ssl=False, limit=self.max_concurrency, limit_per_host=self.max_concurrency, force_close=False, ) self._http_connector = connector self._http_session = aiohttp.ClientSession( connector=connector, timeout=self._client_timeout(), ) return self._http_session def _get_test_url(self, protocol: str) -> str: custom_urls = self._test_urls if not custom_urls: custom_urls = getattr(app_settings, "validator_test_urls", None) if custom_urls and isinstance(custom_urls, list) and len(custom_urls) > 0: filtered = [u for u in custom_urls if u.lower().startswith(protocol.lower())] if filtered: return random.choice(filtered) return random.choice(custom_urls) urls = self.DEFAULT_TEST_URLS.get(protocol.lower(), self.DEFAULT_TEST_URLS["http"]) return random.choice(urls) async def validate(self, ip: str, port: int, protocol: str = "http") -> Tuple[bool, float]: """验证单个代理,返回 (是否有效, 延迟毫秒)""" protocol = protocol.lower() start = time.time() try: if protocol in ("socks4", "socks5"): return await self._validate_socks(ip, port, protocol, start) return await self._validate_http(ip, port, protocol, start) except asyncio.TimeoutError: logger.debug(f"Validation timeout: {ip}:{port} ({protocol})") return False, 0.0 except Exception as e: logger.debug(f"Validation error {ip}:{port} ({protocol}): {e}") return False, 0.0 async def _validate_http(self, ip: str, port: int, protocol: str, start: float) -> Tuple[bool, float]: proxy_url = f"{protocol}://{ip}:{port}" test_url = self._get_test_url(protocol) session = await self._ensure_session() async with session.get(test_url, proxy=proxy_url, allow_redirects=True) as response: if response.status in (200, 301, 302): latency = round((time.time() - start) * 1000, 2) logger.debug(f"HTTP valid: {ip}:{port} ({protocol}) {latency}ms") return True, latency return False, 0.0 async def _get_socks_session(self, protocol: str, ip: str, port: int) -> aiohttp.ClientSession: key = (protocol, ip, port) async with self._socks_lock: sess = self._socks_sessions.get(key) if sess is not None: if sess.closed: del self._socks_sessions[key] else: self._socks_sessions.move_to_end(key) return sess while len(self._socks_sessions) >= self._SOCKS_CACHE_CAP: _, old = self._socks_sessions.popitem(last=False) if old is not None and not old.closed: await old.close() proxy_type = ( aiohttp_socks.ProxyType.SOCKS4 if protocol == "socks4" else aiohttp_socks.ProxyType.SOCKS5 ) connector = aiohttp_socks.ProxyConnector( proxy_type=proxy_type, host=ip, port=port, rdns=True, ssl=False, ) sess = aiohttp.ClientSession(connector=connector, timeout=self._client_timeout()) self._socks_sessions[key] = sess return sess async def _validate_socks(self, ip: str, port: int, protocol: str, start: float) -> Tuple[bool, float]: test_url = self._get_test_url("http") session = await self._get_socks_session(protocol, ip, port) async with session.get(test_url, allow_redirects=True) as response: if response.status in (200, 301, 302): latency = round((time.time() - start) * 1000, 2) logger.debug(f"SOCKS valid: {ip}:{port} ({protocol}) {latency}ms") return True, latency return False, 0.0 async def close_socks_sessions(self) -> None: """关闭 SOCKS 会话缓存(设置热更新或进程退出时调用)。""" async with self._socks_lock: for s in list(self._socks_sessions.values()): if not s.closed: await s.close() self._socks_sessions.clear() def update_test_urls(self, urls: List[str]) -> None: self._test_urls = list(urls) if urls else None async def close(self) -> None: await self.close_socks_sessions() if self._http_session and not self._http_session.closed: await self._http_session.close() self._http_session = None self._http_connector = None