"""代理验证服务 - 支持 HTTP/HTTPS/SOCKS4/SOCKS5""" import asyncio import random import time import aiohttp import aiohttp_socks from typing import Tuple, Optional, List from app.core.config import settings as app_settings from app.core.log import logger class ValidatorService: """代理验证器 支持动态读取配置,实现设置热更新。 """ # 测试 URL 默认池 DEFAULT_TEST_URLS = { "http": [ "http://httpbin.org/ip", "http://api.ipify.org", "http://www.baidu.com", "http://www.qq.com", ], "https": [ "https://httpbin.org/ip", "https://api.ipify.org", "https://www.baidu.com", "https://www.qq.com", ], } def __init__( self, timeout: Optional[float] = None, connect_timeout: Optional[float] = None, max_concurrency: Optional[int] = None, ): # 初始化时使用传入值或默认值,但运行期会动态读取 settings self._init_timeout = timeout or app_settings.validator_timeout self._init_connect_timeout = connect_timeout or app_settings.validator_connect_timeout self._init_max_concurrency = max_concurrency or app_settings.validator_max_concurrency self._http_connector: Optional[aiohttp.TCPConnector] = None self._http_session: Optional[aiohttp.ClientSession] = None self._semaphore: Optional[asyncio.Semaphore] = None self._lock = asyncio.Lock() self._test_urls: Optional[List[str]] = None @property def timeout(self) -> float: return float(self._init_timeout) @property def connect_timeout(self) -> float: return float(self._init_connect_timeout) @property def max_concurrency(self) -> int: return int(self._init_max_concurrency) def _ensure_session(self) -> aiohttp.ClientSession: """懒加载共享 HTTP session""" if self._http_session is None or self._http_session.closed: connector = aiohttp.TCPConnector( ssl=False, limit=self.max_concurrency, limit_per_host=self.max_concurrency, force_close=False, ) timeout = aiohttp.ClientTimeout( total=self.timeout, connect=self.connect_timeout ) self._http_connector = connector self._http_session = aiohttp.ClientSession( connector=connector, timeout=timeout, ) return self._http_session def _ensure_semaphore(self) -> asyncio.Semaphore: if self._semaphore is None: self._semaphore = asyncio.Semaphore(self.max_concurrency) return self._semaphore def _get_test_url(self, protocol: str) -> str: custom_urls = self._test_urls if not custom_urls: from app.core.config import settings as app_settings custom_urls = getattr(app_settings, "validator_test_urls", None) if custom_urls and isinstance(custom_urls, list) and len(custom_urls) > 0: # 按协议过滤自定义 URL,如果没有匹配的则使用全部 filtered = [u for u in custom_urls if u.lower().startswith(protocol.lower())] if filtered: return random.choice(filtered) return random.choice(custom_urls) urls = self.DEFAULT_TEST_URLS.get(protocol.lower(), self.DEFAULT_TEST_URLS["http"]) return random.choice(urls) async def validate(self, ip: str, port: int, protocol: str = "http") -> Tuple[bool, float]: """验证单个代理,返回 (是否有效, 延迟毫秒)""" protocol = protocol.lower() semaphore = self._ensure_semaphore() async with semaphore: start = time.time() try: if protocol in ("socks4", "socks5"): return await self._validate_socks(ip, port, protocol, start) else: return await self._validate_http(ip, port, protocol, start) except asyncio.TimeoutError: logger.debug(f"Validation timeout: {ip}:{port} ({protocol})") return False, 0.0 except Exception as e: logger.debug(f"Validation error {ip}:{port} ({protocol}): {e}") return False, 0.0 async def _validate_http(self, ip: str, port: int, protocol: str, start: float) -> Tuple[bool, float]: proxy_url = f"http://{ip}:{port}" test_url = self._get_test_url(protocol) session = self._ensure_session() async with session.get(test_url, proxy=proxy_url, allow_redirects=True) as response: if response.status in (200, 301, 302): latency = round((time.time() - start) * 1000, 2) logger.info(f"HTTP valid: {ip}:{port} ({protocol}) {latency}ms") return True, latency return False, 0.0 async def _validate_socks(self, ip: str, port: int, protocol: str, start: float) -> Tuple[bool, float]: proxy_type = ( aiohttp_socks.ProxyType.SOCKS4 if protocol == "socks4" else aiohttp_socks.ProxyType.SOCKS5 ) connector = aiohttp_socks.ProxyConnector( proxy_type=proxy_type, host=ip, port=port, rdns=True, ssl=False, ) timeout = aiohttp.ClientTimeout(total=self.timeout, connect=self.connect_timeout) test_url = self._get_test_url("http") async with aiohttp.ClientSession(connector=connector, timeout=timeout) as session: async with session.get(test_url, allow_redirects=True) as response: if response.status in (200, 301, 302): latency = round((time.time() - start) * 1000, 2) logger.info(f"SOCKS valid: {ip}:{port} ({protocol}) {latency}ms") return True, latency return False, 0.0 def update_test_urls(self, urls: List[str]) -> None: """运行时更新验证目标 URL 列表""" self._test_urls = list(urls) if urls else None async def close(self) -> None: """关闭共享的 HTTP ClientSession""" if self._http_session and not self._http_session.closed: await self._http_session.close() self._http_session = None self._http_connector = None