From 07248ff4ee68479415b55a73a93aeb1565ee43f9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E7=A5=80=E6=A2=A6?= <3501646051@qq.com> Date: Sun, 5 Apr 2026 14:40:36 +0800 Subject: [PATCH] feat(crawl): browser-like headers, HTTP/2, curl_cffi TLS fingerprint fallback - get_headers(url): Referer, Sec-Fetch-*, sec-ch-ua, API vs HTML Accept - httpx AsyncClient/ sync Client with optional HTTP/2 (h2 extra) - On 403/429/503/520-523/525/567 or request errors, retry via curl_cffi chrome124 impersonate - POST: Origin, Referer, Content-Type for form posts - kuaidaili/ip3366: forward get_headers(url=...) Made-with: Cursor --- app/plugins/base.py | 248 +++++++++++++++++++++++++++++++++++---- app/plugins/ip3366.py | 6 +- app/plugins/kuaidaili.py | 6 +- requirements.txt | 3 +- 4 files changed, 234 insertions(+), 29 deletions(-) diff --git a/app/plugins/base.py b/app/plugins/base.py index 33b0828..328e528 100644 --- a/app/plugins/base.py +++ b/app/plugins/base.py @@ -4,13 +4,27 @@ import random import asyncio import httpx from typing import Dict, List, Optional +from urllib.parse import urlparse + from bs4 import BeautifulSoup from app.core.plugin_system import BaseCrawlerPlugin from app.models.domain import ProxyRaw +try: + import h2 # noqa: F401 + + _HTTPX_HTTP2 = True +except ImportError: + _HTTPX_HTTP2 = False VALID_PROTOCOLS = ("http", "https", "socks4", "socks5") +# 遇此类 HTTP 状态时尝试 curl_cffi 浏览器 TLS/JA3 指纹(比裸 httpx 更易过简单反爬) +_CURL_FALLBACK_STATUS = frozenset( + {403, 429, 503, 520, 521, 522, 523, 525, 567} +) +_CURL_IMPERSONATE = "chrome124" + class BaseHTTPPlugin(BaseCrawlerPlugin): """基于 HTTP 的爬虫插件基类""" @@ -18,23 +32,86 @@ class BaseHTTPPlugin(BaseCrawlerPlugin): def __init__(self): super().__init__() self.user_agents = [ - "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36", - "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/119.0.0.0 Safari/537.36", - "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/118.0.0.0 Safari/537.36", - "Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:109.0) Gecko/20100101 Firefox/121.0", + "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/124.0.0.0 Safari/537.36", + "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/123.0.0.0 Safari/537.36", + "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/124.0.0.0 Safari/537.36", + "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/124.0.0.0 Safari/537.36", ] self.urls: List[str] = [] self.current_url: str = "" self._client: Optional[httpx.AsyncClient] = None self.max_concurrency: int = 2 - def get_headers(self) -> dict: - return { - "User-Agent": random.choice(self.user_agents), - "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8", - "Accept-Language": "zh-CN,zh;q=0.8,zh-TW;q=0.7,zh-HK;q=0.5,en-US;q=0.3,en;q=0.2", + def get_headers( + self, + url: Optional[str] = None, + *, + for_api: bool = False, + for_post: bool = False, + ) -> dict: + """接近真实浏览器的请求头;url 用于 Referer / Sec-Fetch-*。""" + ua = random.choice(self.user_agents) + is_chrome = "Chrome/" in ua and "Edg/" not in ua + if for_api or (url and ("/api/" in url or url.endswith(".txt") or "/raw/" in url)): + accept = ( + "text/plain,text/html,application/json,application/xhtml+xml," + "application/xml;q=0.9,*/*;q=0.8" + ) + sec_dest = "empty" + sec_mode = "cors" + else: + accept = ( + "text/html,application/xhtml+xml,application/xml;q=0.9," + "image/avif,image/webp,image/apng,*/*;q=0.8" + ) + sec_dest = "document" + sec_mode = "navigate" if not for_post else "same-origin" + + ref_host = "" + if url: + p = urlparse(url) + if p.scheme and p.netloc: + ref_host = p.netloc + referer = f"{p.scheme}://{p.netloc}/" + else: + referer = "" + else: + referer = "" + + sec_site = "none" + if referer and url: + try: + req_host = urlparse(url).netloc + if req_host == ref_host: + sec_site = "same-origin" + else: + sec_site = "cross-site" + except Exception: + sec_site = "cross-site" + + headers: Dict[str, str] = { + "User-Agent": ua, + "Accept": accept, + "Accept-Language": "en-US,en;q=0.9,zh-CN;q=0.8,zh;q=0.7", + "Accept-Encoding": "gzip, deflate, br", + "DNT": "1", "Connection": "keep-alive", + "Upgrade-Insecure-Requests": "1", + "Sec-Fetch-Dest": sec_dest, + "Sec-Fetch-Mode": sec_mode, + "Sec-Fetch-Site": sec_site, + "Sec-Fetch-User": "?1", + "Cache-Control": "max-age=0", } + if is_chrome: + headers["sec-ch-ua"] = ( + '"Chromium";v="124", "Google Chrome";v="124", "Not-A.Brand";v="99"' + ) + headers["sec-ch-ua-mobile"] = "?0" + headers["sec-ch-ua-platform"] = '"Windows"' + if referer: + headers["Referer"] = referer + return headers def _get_client(self) -> httpx.AsyncClient: """获取或创建复用的 AsyncClient""" @@ -43,11 +120,62 @@ class BaseHTTPPlugin(BaseCrawlerPlugin): self._client = httpx.AsyncClient( transport=transport, follow_redirects=True, - # 忽略系统 HTTP(S)_PROXY,避免误配导致列表站全部连接失败 + http2=_HTTPX_HTTP2, trust_env=False, ) return self._client + async def _curl_get(self, url: str, headers: dict, timeout: float) -> str: + try: + from curl_cffi import requests as cr + except ImportError: + return "" + + def _run() -> str: + try: + h = {k: v for k, v in headers.items() if k.lower() != "accept-encoding"} + r = cr.get( + url, + impersonate=_CURL_IMPERSONATE, + headers=h, + timeout=timeout, + allow_redirects=True, + ) + if r.status_code == 200: + return r.text or "" + except Exception: + pass + return "" + + return await asyncio.to_thread(_run) + + async def _curl_post( + self, url: str, data: Dict[str, str], headers: dict, timeout: float + ) -> str: + try: + from curl_cffi import requests as cr + except ImportError: + return "" + + def _run() -> str: + try: + h = {k: v for k, v in headers.items() if k.lower() != "accept-encoding"} + r = cr.post( + url, + impersonate=_CURL_IMPERSONATE, + headers=h, + data=data, + timeout=timeout, + allow_redirects=True, + ) + if r.status_code == 200: + return r.text or "" + except Exception: + pass + return "" + + return await asyncio.to_thread(_run) + @staticmethod def _http_timeout(seconds: float) -> httpx.Timeout: """连接阶段单独收紧,避免 AsyncClient 在部分环境下长时间卡在 connect。""" @@ -74,11 +202,28 @@ class BaseHTTPPlugin(BaseCrawlerPlugin): transport=httpx.HTTPTransport(retries=0), follow_redirects=True, trust_env=False, + http2=_HTTPX_HTTP2, ) as c: r = c.get(url, headers=headers, timeout=to) - if r.status_code != 200: - return "" - return self._decode_response_body(r) + if r.status_code == 200: + return self._decode_response_body(r) + if r.status_code in _CURL_FALLBACK_STATUS: + try: + from curl_cffi import requests as cr + + h = {k: v for k, v in headers.items() if k.lower() != "accept-encoding"} + r2 = cr.get( + url, + impersonate=_CURL_IMPERSONATE, + headers=h, + timeout=timeout, + allow_redirects=True, + ) + if r2.status_code == 200: + return r2.text or "" + except Exception: + pass + return "" def _sync_post( self, url: str, data: Dict[str, str], timeout: float, headers: dict @@ -88,11 +233,38 @@ class BaseHTTPPlugin(BaseCrawlerPlugin): transport=httpx.HTTPTransport(retries=0), follow_redirects=True, trust_env=False, + http2=_HTTPX_HTTP2, ) as c: r = c.post(url, headers=headers, data=data, timeout=to) - if r.status_code != 200: - return "" - return self._decode_response_body(r) + if r.status_code == 200: + return self._decode_response_body(r) + if r.status_code in _CURL_FALLBACK_STATUS: + try: + from curl_cffi import requests as cr + + h = {k: v for k, v in headers.items() if k.lower() != "accept-encoding"} + r2 = cr.post( + url, + impersonate=_CURL_IMPERSONATE, + headers=h, + data=data, + timeout=timeout, + allow_redirects=True, + ) + if r2.status_code == 200: + return r2.text or "" + except Exception: + pass + return "" + + @staticmethod + def _is_textish_url(url: str) -> bool: + return bool( + url.endswith(".txt") + or "/api/" in url + or "raw.githubusercontent.com" in url + or "cdn.jsdelivr.net" in url + ) async def fetch( self, @@ -103,24 +275,35 @@ class BaseHTTPPlugin(BaseCrawlerPlugin): ) -> str: """异步抓取指定 URL 的 HTML 内容""" from app.core.log import logger + client = self._get_client() to = self._http_timeout(timeout) + for_api = self._is_textish_url(url) for attempt in range(retries): + headers = self.get_headers(url=url, for_api=for_api) try: - response = await client.get(url, headers=self.get_headers(), timeout=to) + response = await client.get(url, headers=headers, timeout=to) if raise_for_status: response.raise_for_status() if response.status_code == 200: return self._decode_response_body(response) logger.warning(f"Fetch {url} returned status {response.status_code}") + if response.status_code in _CURL_FALLBACK_STATUS: + curl_text = await self._curl_get(url, headers, timeout) + if curl_text: + logger.info(f"Fetch {url} 使用浏览器指纹回退成功") + return curl_text except Exception as e: logger.warning(f"Fetch {url} failed (attempt {attempt + 1}/{retries}): {e}") + curl_text = await self._curl_get(url, headers, timeout) + if curl_text: + logger.info(f"Fetch {url} 异常后浏览器指纹回退成功") + return curl_text if attempt < retries - 1: await asyncio.sleep(random.uniform(1, 3)) try: - text = await asyncio.to_thread( - self._sync_get, url, timeout, self.get_headers() - ) + h = self.get_headers(url=url, for_api=for_api) + text = await asyncio.to_thread(self._sync_get, url, timeout, h) if text: logger.info(f"Fetch {url} 使用同步回退成功") return text @@ -142,23 +325,44 @@ class BaseHTTPPlugin(BaseCrawlerPlugin): payload = data or {} to = self._http_timeout(timeout) for attempt in range(retries): + headers = self.get_headers(url=url, for_post=True) + p = urlparse(url) + if p.scheme and p.netloc: + headers["Origin"] = f"{p.scheme}://{p.netloc}" + headers["Referer"] = url + headers["Content-Type"] = "application/x-www-form-urlencoded" try: response = await client.post( url, - headers=self.get_headers(), + headers=headers, data=payload, timeout=to, ) if response.status_code == 200: return self._decode_response_body(response) logger.warning(f"POST {url} returned status {response.status_code}") + if response.status_code in _CURL_FALLBACK_STATUS: + curl_text = await self._curl_post(url, payload, headers, timeout) + if curl_text: + logger.info(f"POST {url} 使用浏览器指纹回退成功") + return curl_text except Exception as e: logger.warning(f"POST {url} failed (attempt {attempt + 1}/{retries}): {e}") + curl_text = await self._curl_post(url, payload, headers, timeout) + if curl_text: + logger.info(f"POST {url} 异常后浏览器指纹回退成功") + return curl_text if attempt < retries - 1: await asyncio.sleep(random.uniform(1, 3)) try: + headers = self.get_headers(url=url, for_post=True) + p = urlparse(url) + if p.scheme and p.netloc: + headers["Origin"] = f"{p.scheme}://{p.netloc}" + headers["Referer"] = url + headers["Content-Type"] = "application/x-www-form-urlencoded" text = await asyncio.to_thread( - self._sync_post, url, payload, timeout, self.get_headers() + self._sync_post, url, payload, timeout, headers ) if text: logger.info(f"POST {url} 使用同步回退成功") diff --git a/app/plugins/ip3366.py b/app/plugins/ip3366.py index 87f0edd..ac8d07d 100644 --- a/app/plugins/ip3366.py +++ b/app/plugins/ip3366.py @@ -1,5 +1,5 @@ import re -from typing import List +from typing import List, Optional from bs4 import BeautifulSoup from app.core.plugin_system import ProxyRaw from app.plugins.base import BaseHTTPPlugin @@ -26,8 +26,8 @@ class Ip3366Plugin(BaseHTTPPlugin): f"http://www.ip3366.net/free/?stype=2&page={i}" for i in range(1, max_pages + 1) ] - def get_headers(self) -> dict: - headers = super().get_headers() + def get_headers(self, url: Optional[str] = None, **kwargs) -> dict: + headers = super().get_headers(url=url, **kwargs) headers["Referer"] = "http://www.ip3366.net/free/" return headers diff --git a/app/plugins/kuaidaili.py b/app/plugins/kuaidaili.py index 57003f4..9ead462 100644 --- a/app/plugins/kuaidaili.py +++ b/app/plugins/kuaidaili.py @@ -1,7 +1,7 @@ import re import asyncio import random -from typing import List +from typing import List, Optional from bs4 import BeautifulSoup from app.core.plugin_system import ProxyRaw from app.plugins.base import BaseHTTPPlugin @@ -26,8 +26,8 @@ class KuaiDaiLiPlugin(BaseHTTPPlugin): "https://www.kuaidaili.com/free/intr/1/", ] - def get_headers(self) -> dict: - headers = super().get_headers() + def get_headers(self, url: Optional[str] = None, **kwargs) -> dict: + headers = super().get_headers(url=url, **kwargs) headers["Referer"] = "https://www.kuaidaili.com/free/" headers["Accept"] = "text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8" headers["Accept-Encoding"] = "gzip, deflate" diff --git a/requirements.txt b/requirements.txt index 526a10c..2806dfa 100644 --- a/requirements.txt +++ b/requirements.txt @@ -6,4 +6,5 @@ aiohttp-socks==0.9.1 beautifulsoup4==4.12.3 lxml==5.1.0 pydantic-settings==2.8.1 -httpx==0.27.0 +httpx[http2]==0.27.0 +curl-cffi>=0.7.0