"""通用 HTTP 爬虫基类 - 为基于 HTTP 请求的插件提供封装""" import re import random import asyncio import httpx from typing import Dict, List, Optional from urllib.parse import urlparse from bs4 import BeautifulSoup from app.core.plugin_system import BaseCrawlerPlugin from app.models.domain import ProxyRaw try: import h2 # noqa: F401 _HTTPX_HTTP2 = True except ImportError: _HTTPX_HTTP2 = False VALID_PROTOCOLS = ("http", "https", "socks4", "socks5") # 遇此类 HTTP 状态时尝试 curl_cffi 浏览器 TLS/JA3 指纹(比裸 httpx 更易过简单反爬) _CURL_FALLBACK_STATUS = frozenset( {403, 429, 503, 520, 521, 522, 523, 525, 567} ) _CURL_IMPERSONATE = "chrome124" class BaseHTTPPlugin(BaseCrawlerPlugin): """基于 HTTP 的爬虫插件基类""" def __init__(self): super().__init__() self.user_agents = [ "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/124.0.0.0 Safari/537.36", "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/123.0.0.0 Safari/537.36", "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/124.0.0.0 Safari/537.36", "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/124.0.0.0 Safari/537.36", ] self.urls: List[str] = [] self.current_url: str = "" self._client: Optional[httpx.AsyncClient] = None self.max_concurrency: int = 2 def get_headers( self, url: Optional[str] = None, *, for_api: bool = False, for_post: bool = False, ) -> dict: """接近真实浏览器的请求头;url 用于 Referer / Sec-Fetch-*。""" ua = random.choice(self.user_agents) is_chrome = "Chrome/" in ua and "Edg/" not in ua if for_api or (url and ("/api/" in url or url.endswith(".txt") or "/raw/" in url)): accept = ( "text/plain,text/html,application/json,application/xhtml+xml," "application/xml;q=0.9,*/*;q=0.8" ) sec_dest = "empty" sec_mode = "cors" else: accept = ( "text/html,application/xhtml+xml,application/xml;q=0.9," "image/avif,image/webp,image/apng,*/*;q=0.8" ) sec_dest = "document" sec_mode = "navigate" if not for_post else "same-origin" ref_host = "" if url: p = urlparse(url) if p.scheme and p.netloc: ref_host = p.netloc referer = f"{p.scheme}://{p.netloc}/" else: referer = "" else: referer = "" sec_site = "none" if referer and url: try: req_host = urlparse(url).netloc if req_host == ref_host: sec_site = "same-origin" else: sec_site = "cross-site" except Exception: sec_site = "cross-site" headers: Dict[str, str] = { "User-Agent": ua, "Accept": accept, "Accept-Language": "en-US,en;q=0.9,zh-CN;q=0.8,zh;q=0.7", "Accept-Encoding": "gzip, deflate, br", "DNT": "1", "Connection": "keep-alive", "Upgrade-Insecure-Requests": "1", "Sec-Fetch-Dest": sec_dest, "Sec-Fetch-Mode": sec_mode, "Sec-Fetch-Site": sec_site, "Sec-Fetch-User": "?1", "Cache-Control": "max-age=0", } if is_chrome: headers["sec-ch-ua"] = ( '"Chromium";v="124", "Google Chrome";v="124", "Not-A.Brand";v="99"' ) headers["sec-ch-ua-mobile"] = "?0" headers["sec-ch-ua-platform"] = '"Windows"' if referer: headers["Referer"] = referer return headers def _get_client(self) -> httpx.AsyncClient: """获取或创建复用的 AsyncClient""" if self._client is None or self._client.is_closed: transport = httpx.AsyncHTTPTransport(retries=0) self._client = httpx.AsyncClient( transport=transport, follow_redirects=True, http2=_HTTPX_HTTP2, trust_env=False, ) return self._client async def _curl_get(self, url: str, headers: dict, timeout: float) -> str: try: from curl_cffi import requests as cr except ImportError: return "" def _run() -> str: try: h = {k: v for k, v in headers.items() if k.lower() != "accept-encoding"} r = cr.get( url, impersonate=_CURL_IMPERSONATE, headers=h, timeout=timeout, allow_redirects=True, ) if r.status_code == 200: return r.text or "" except Exception: pass return "" return await asyncio.to_thread(_run) async def _curl_post( self, url: str, data: Dict[str, str], headers: dict, timeout: float ) -> str: try: from curl_cffi import requests as cr except ImportError: return "" def _run() -> str: try: h = {k: v for k, v in headers.items() if k.lower() != "accept-encoding"} r = cr.post( url, impersonate=_CURL_IMPERSONATE, headers=h, data=data, timeout=timeout, allow_redirects=True, ) if r.status_code == 200: return r.text or "" except Exception: pass return "" return await asyncio.to_thread(_run) @staticmethod def _http_timeout(seconds: float) -> httpx.Timeout: """连接阶段单独收紧,避免 AsyncClient 在部分环境下长时间卡在 connect。""" t = max(2.0, float(seconds)) # 国际链路 / 批量爬取时 connect 过短易集体超时 c = min(12.0, max(4.0, t * 0.4)) return httpx.Timeout(t, connect=c) @staticmethod def _decode_response_body(response: httpx.Response) -> str: content = response.content encoding = response.encoding if encoding == "utf-8" or not encoding: try: return content.decode("utf-8") except UnicodeDecodeError: return content.decode("gbk", errors="ignore") return content.decode(encoding, errors="ignore") def _sync_get(self, url: str, timeout: float, headers: dict) -> str: """同步 GET(部分站点在 Windows 上 AsyncClient 易 ConnectTimeout,同步 Client 正常)。""" to = BaseHTTPPlugin._http_timeout(timeout) with httpx.Client( transport=httpx.HTTPTransport(retries=0), follow_redirects=True, trust_env=False, http2=_HTTPX_HTTP2, ) as c: r = c.get(url, headers=headers, timeout=to) if r.status_code == 200: return self._decode_response_body(r) if r.status_code in _CURL_FALLBACK_STATUS: try: from curl_cffi import requests as cr h = {k: v for k, v in headers.items() if k.lower() != "accept-encoding"} r2 = cr.get( url, impersonate=_CURL_IMPERSONATE, headers=h, timeout=timeout, allow_redirects=True, ) if r2.status_code == 200: return r2.text or "" except Exception: pass return "" def _sync_post( self, url: str, data: Dict[str, str], timeout: float, headers: dict ) -> str: to = BaseHTTPPlugin._http_timeout(timeout) with httpx.Client( transport=httpx.HTTPTransport(retries=0), follow_redirects=True, trust_env=False, http2=_HTTPX_HTTP2, ) as c: r = c.post(url, headers=headers, data=data, timeout=to) if r.status_code == 200: return self._decode_response_body(r) if r.status_code in _CURL_FALLBACK_STATUS: try: from curl_cffi import requests as cr h = {k: v for k, v in headers.items() if k.lower() != "accept-encoding"} r2 = cr.post( url, impersonate=_CURL_IMPERSONATE, headers=h, data=data, timeout=timeout, allow_redirects=True, ) if r2.status_code == 200: return r2.text or "" except Exception: pass return "" @staticmethod def _is_textish_url(url: str) -> bool: return bool( url.endswith(".txt") or "/api/" in url or "raw.githubusercontent.com" in url or "cdn.jsdelivr.net" in url ) async def fetch( self, url: str, timeout: float = 15.0, retries: int = 2, raise_for_status: bool = False, ) -> str: """异步抓取指定 URL 的 HTML 内容""" from app.core.log import logger client = self._get_client() to = self._http_timeout(timeout) for_api = self._is_textish_url(url) for attempt in range(retries): headers = self.get_headers(url=url, for_api=for_api) try: response = await client.get(url, headers=headers, timeout=to) if raise_for_status: response.raise_for_status() if response.status_code == 200: return self._decode_response_body(response) logger.warning(f"Fetch {url} returned status {response.status_code}") if response.status_code in _CURL_FALLBACK_STATUS: curl_text = await self._curl_get(url, headers, timeout) if curl_text: logger.info(f"Fetch {url} 使用浏览器指纹回退成功") return curl_text except Exception as e: logger.warning(f"Fetch {url} failed (attempt {attempt + 1}/{retries}): {e}") curl_text = await self._curl_get(url, headers, timeout) if curl_text: logger.info(f"Fetch {url} 异常后浏览器指纹回退成功") return curl_text if attempt < retries - 1: await asyncio.sleep(random.uniform(1, 3)) try: h = self.get_headers(url=url, for_api=for_api) text = await asyncio.to_thread(self._sync_get, url, timeout, h) if text: logger.info(f"Fetch {url} 使用同步回退成功") return text except Exception as e: logger.warning(f"Fetch {url} 同步回退失败: {e}") return "" async def fetch_post( self, url: str, data: Optional[Dict[str, str]] = None, timeout: float = 15.0, retries: int = 2, ) -> str: """POST application/x-www-form-urlencoded,用于 spys.one 等表单页。""" from app.core.log import logger client = self._get_client() payload = data or {} to = self._http_timeout(timeout) for attempt in range(retries): headers = self.get_headers(url=url, for_post=True) p = urlparse(url) if p.scheme and p.netloc: headers["Origin"] = f"{p.scheme}://{p.netloc}" headers["Referer"] = url headers["Content-Type"] = "application/x-www-form-urlencoded" try: response = await client.post( url, headers=headers, data=payload, timeout=to, ) if response.status_code == 200: return self._decode_response_body(response) logger.warning(f"POST {url} returned status {response.status_code}") if response.status_code in _CURL_FALLBACK_STATUS: curl_text = await self._curl_post(url, payload, headers, timeout) if curl_text: logger.info(f"POST {url} 使用浏览器指纹回退成功") return curl_text except Exception as e: logger.warning(f"POST {url} failed (attempt {attempt + 1}/{retries}): {e}") curl_text = await self._curl_post(url, payload, headers, timeout) if curl_text: logger.info(f"POST {url} 异常后浏览器指纹回退成功") return curl_text if attempt < retries - 1: await asyncio.sleep(random.uniform(1, 3)) try: headers = self.get_headers(url=url, for_post=True) p = urlparse(url) if p.scheme and p.netloc: headers["Origin"] = f"{p.scheme}://{p.netloc}" headers["Referer"] = url headers["Content-Type"] = "application/x-www-form-urlencoded" text = await asyncio.to_thread( self._sync_post, url, payload, timeout, headers ) if text: logger.info(f"POST {url} 使用同步回退成功") return text except Exception as e: logger.warning(f"POST {url} 同步回退失败: {e}") return "" async def fetch_all( self, urls: List[str], timeout: float = 15.0, retries: int = 2, ) -> List[str]: """并发抓取多个 URL,限制单个插件内部并发""" semaphore = asyncio.Semaphore(self.max_concurrency) async def _fetch_limited(url: str): async with semaphore: await asyncio.sleep(random.uniform(0.08, 0.45)) return await self.fetch(url, timeout=timeout, retries=retries) tasks = [_fetch_limited(url) for url in urls] return await asyncio.gather(*tasks) def parse_text_proxies(self, text: str, protocol: str = "http") -> List[ProxyRaw]: """解析 ip:port 格式的文本代理列表 统一处理 \r\n、\n 两种换行以及可能存在的空行。 """ results = [] text = text.replace("\r\n", "\n").replace("\r", "\n") for line in text.split("\n"): line = line.strip() if not line or ":" not in line: continue ip, _, port = line.rpartition(":") ip = ip.strip() port = port.strip() if ip and port.isdigit() and 1 <= int(port) <= 65535: try: results.append(ProxyRaw(ip, int(port), protocol)) except ValueError: continue return results def parse_html_table( self, html: str, column_map: dict, protocol: str = "http", ) -> List[ProxyRaw]: """通用 HTML 表格解析器 Args: html: HTML 文本 column_map: 列名到索引的映射,如 {"ip": 0, "port": 1, "protocol": 4} protocol: 默认协议,如果表格中没有协议列则使用此值 """ results = [] soup = BeautifulSoup(html, "lxml") table = soup.find("table") if not table: return results ip_idx = column_map.get("ip", 0) port_idx = column_map.get("port", 1) protocol_idx = column_map.get("protocol", -1) for row in table.find_all("tr"): tds = row.find_all("td") if len(tds) <= max(ip_idx, port_idx): continue ip = tds[ip_idx].get_text(strip=True) port = tds[port_idx].get_text(strip=True) if protocol_idx >= 0 and len(tds) > protocol_idx: proto = tds[protocol_idx].get_text(strip=True).lower() if proto not in VALID_PROTOCOLS: proto = protocol else: proto = protocol if re.match(r"^\d+\.\d+\.\d+\.\d+$", ip) and port.isdigit() and 1 <= int(port) <= 65535: try: results.append(ProxyRaw(ip, int(port), proto)) except ValueError: continue return results async def close(self): """关闭复用的 HTTP 客户端""" if self._client and not self._client.is_closed: await self._client.aclose() self._client = None