feat(crawl): browser-like headers, HTTP/2, curl_cffi TLS fingerprint fallback

- get_headers(url): Referer, Sec-Fetch-*, sec-ch-ua, API vs HTML Accept
- httpx AsyncClient/ sync Client with optional HTTP/2 (h2 extra)
- On 403/429/503/520-523/525/567 or request errors, retry via curl_cffi chrome124 impersonate
- POST: Origin, Referer, Content-Type for form posts
- kuaidaili/ip3366: forward get_headers(url=...)

Made-with: Cursor
This commit is contained in:
祀梦
2026-04-05 14:40:36 +08:00
parent ce667dba13
commit 07248ff4ee
4 changed files with 234 additions and 29 deletions

View File

@@ -4,13 +4,27 @@ import random
import asyncio
import httpx
from typing import Dict, List, Optional
from urllib.parse import urlparse
from bs4 import BeautifulSoup
from app.core.plugin_system import BaseCrawlerPlugin
from app.models.domain import ProxyRaw
try:
import h2 # noqa: F401
_HTTPX_HTTP2 = True
except ImportError:
_HTTPX_HTTP2 = False
VALID_PROTOCOLS = ("http", "https", "socks4", "socks5")
# 遇此类 HTTP 状态时尝试 curl_cffi 浏览器 TLS/JA3 指纹(比裸 httpx 更易过简单反爬)
_CURL_FALLBACK_STATUS = frozenset(
{403, 429, 503, 520, 521, 522, 523, 525, 567}
)
_CURL_IMPERSONATE = "chrome124"
class BaseHTTPPlugin(BaseCrawlerPlugin):
"""基于 HTTP 的爬虫插件基类"""
@@ -18,23 +32,86 @@ class BaseHTTPPlugin(BaseCrawlerPlugin):
def __init__(self):
super().__init__()
self.user_agents = [
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36",
"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/119.0.0.0 Safari/537.36",
"Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/118.0.0.0 Safari/537.36",
"Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:109.0) Gecko/20100101 Firefox/121.0",
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/124.0.0.0 Safari/537.36",
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/123.0.0.0 Safari/537.36",
"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/124.0.0.0 Safari/537.36",
"Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/124.0.0.0 Safari/537.36",
]
self.urls: List[str] = []
self.current_url: str = ""
self._client: Optional[httpx.AsyncClient] = None
self.max_concurrency: int = 2
def get_headers(self) -> dict:
return {
"User-Agent": random.choice(self.user_agents),
"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8",
"Accept-Language": "zh-CN,zh;q=0.8,zh-TW;q=0.7,zh-HK;q=0.5,en-US;q=0.3,en;q=0.2",
def get_headers(
self,
url: Optional[str] = None,
*,
for_api: bool = False,
for_post: bool = False,
) -> dict:
"""接近真实浏览器的请求头url 用于 Referer / Sec-Fetch-*。"""
ua = random.choice(self.user_agents)
is_chrome = "Chrome/" in ua and "Edg/" not in ua
if for_api or (url and ("/api/" in url or url.endswith(".txt") or "/raw/" in url)):
accept = (
"text/plain,text/html,application/json,application/xhtml+xml,"
"application/xml;q=0.9,*/*;q=0.8"
)
sec_dest = "empty"
sec_mode = "cors"
else:
accept = (
"text/html,application/xhtml+xml,application/xml;q=0.9,"
"image/avif,image/webp,image/apng,*/*;q=0.8"
)
sec_dest = "document"
sec_mode = "navigate" if not for_post else "same-origin"
ref_host = ""
if url:
p = urlparse(url)
if p.scheme and p.netloc:
ref_host = p.netloc
referer = f"{p.scheme}://{p.netloc}/"
else:
referer = ""
else:
referer = ""
sec_site = "none"
if referer and url:
try:
req_host = urlparse(url).netloc
if req_host == ref_host:
sec_site = "same-origin"
else:
sec_site = "cross-site"
except Exception:
sec_site = "cross-site"
headers: Dict[str, str] = {
"User-Agent": ua,
"Accept": accept,
"Accept-Language": "en-US,en;q=0.9,zh-CN;q=0.8,zh;q=0.7",
"Accept-Encoding": "gzip, deflate, br",
"DNT": "1",
"Connection": "keep-alive",
"Upgrade-Insecure-Requests": "1",
"Sec-Fetch-Dest": sec_dest,
"Sec-Fetch-Mode": sec_mode,
"Sec-Fetch-Site": sec_site,
"Sec-Fetch-User": "?1",
"Cache-Control": "max-age=0",
}
if is_chrome:
headers["sec-ch-ua"] = (
'"Chromium";v="124", "Google Chrome";v="124", "Not-A.Brand";v="99"'
)
headers["sec-ch-ua-mobile"] = "?0"
headers["sec-ch-ua-platform"] = '"Windows"'
if referer:
headers["Referer"] = referer
return headers
def _get_client(self) -> httpx.AsyncClient:
"""获取或创建复用的 AsyncClient"""
@@ -43,11 +120,62 @@ class BaseHTTPPlugin(BaseCrawlerPlugin):
self._client = httpx.AsyncClient(
transport=transport,
follow_redirects=True,
# 忽略系统 HTTP(S)_PROXY避免误配导致列表站全部连接失败
http2=_HTTPX_HTTP2,
trust_env=False,
)
return self._client
async def _curl_get(self, url: str, headers: dict, timeout: float) -> str:
try:
from curl_cffi import requests as cr
except ImportError:
return ""
def _run() -> str:
try:
h = {k: v for k, v in headers.items() if k.lower() != "accept-encoding"}
r = cr.get(
url,
impersonate=_CURL_IMPERSONATE,
headers=h,
timeout=timeout,
allow_redirects=True,
)
if r.status_code == 200:
return r.text or ""
except Exception:
pass
return ""
return await asyncio.to_thread(_run)
async def _curl_post(
self, url: str, data: Dict[str, str], headers: dict, timeout: float
) -> str:
try:
from curl_cffi import requests as cr
except ImportError:
return ""
def _run() -> str:
try:
h = {k: v for k, v in headers.items() if k.lower() != "accept-encoding"}
r = cr.post(
url,
impersonate=_CURL_IMPERSONATE,
headers=h,
data=data,
timeout=timeout,
allow_redirects=True,
)
if r.status_code == 200:
return r.text or ""
except Exception:
pass
return ""
return await asyncio.to_thread(_run)
@staticmethod
def _http_timeout(seconds: float) -> httpx.Timeout:
"""连接阶段单独收紧,避免 AsyncClient 在部分环境下长时间卡在 connect。"""
@@ -74,11 +202,28 @@ class BaseHTTPPlugin(BaseCrawlerPlugin):
transport=httpx.HTTPTransport(retries=0),
follow_redirects=True,
trust_env=False,
http2=_HTTPX_HTTP2,
) as c:
r = c.get(url, headers=headers, timeout=to)
if r.status_code != 200:
return ""
return self._decode_response_body(r)
if r.status_code == 200:
return self._decode_response_body(r)
if r.status_code in _CURL_FALLBACK_STATUS:
try:
from curl_cffi import requests as cr
h = {k: v for k, v in headers.items() if k.lower() != "accept-encoding"}
r2 = cr.get(
url,
impersonate=_CURL_IMPERSONATE,
headers=h,
timeout=timeout,
allow_redirects=True,
)
if r2.status_code == 200:
return r2.text or ""
except Exception:
pass
return ""
def _sync_post(
self, url: str, data: Dict[str, str], timeout: float, headers: dict
@@ -88,11 +233,38 @@ class BaseHTTPPlugin(BaseCrawlerPlugin):
transport=httpx.HTTPTransport(retries=0),
follow_redirects=True,
trust_env=False,
http2=_HTTPX_HTTP2,
) as c:
r = c.post(url, headers=headers, data=data, timeout=to)
if r.status_code != 200:
return ""
return self._decode_response_body(r)
if r.status_code == 200:
return self._decode_response_body(r)
if r.status_code in _CURL_FALLBACK_STATUS:
try:
from curl_cffi import requests as cr
h = {k: v for k, v in headers.items() if k.lower() != "accept-encoding"}
r2 = cr.post(
url,
impersonate=_CURL_IMPERSONATE,
headers=h,
data=data,
timeout=timeout,
allow_redirects=True,
)
if r2.status_code == 200:
return r2.text or ""
except Exception:
pass
return ""
@staticmethod
def _is_textish_url(url: str) -> bool:
return bool(
url.endswith(".txt")
or "/api/" in url
or "raw.githubusercontent.com" in url
or "cdn.jsdelivr.net" in url
)
async def fetch(
self,
@@ -103,24 +275,35 @@ class BaseHTTPPlugin(BaseCrawlerPlugin):
) -> str:
"""异步抓取指定 URL 的 HTML 内容"""
from app.core.log import logger
client = self._get_client()
to = self._http_timeout(timeout)
for_api = self._is_textish_url(url)
for attempt in range(retries):
headers = self.get_headers(url=url, for_api=for_api)
try:
response = await client.get(url, headers=self.get_headers(), timeout=to)
response = await client.get(url, headers=headers, timeout=to)
if raise_for_status:
response.raise_for_status()
if response.status_code == 200:
return self._decode_response_body(response)
logger.warning(f"Fetch {url} returned status {response.status_code}")
if response.status_code in _CURL_FALLBACK_STATUS:
curl_text = await self._curl_get(url, headers, timeout)
if curl_text:
logger.info(f"Fetch {url} 使用浏览器指纹回退成功")
return curl_text
except Exception as e:
logger.warning(f"Fetch {url} failed (attempt {attempt + 1}/{retries}): {e}")
curl_text = await self._curl_get(url, headers, timeout)
if curl_text:
logger.info(f"Fetch {url} 异常后浏览器指纹回退成功")
return curl_text
if attempt < retries - 1:
await asyncio.sleep(random.uniform(1, 3))
try:
text = await asyncio.to_thread(
self._sync_get, url, timeout, self.get_headers()
)
h = self.get_headers(url=url, for_api=for_api)
text = await asyncio.to_thread(self._sync_get, url, timeout, h)
if text:
logger.info(f"Fetch {url} 使用同步回退成功")
return text
@@ -142,23 +325,44 @@ class BaseHTTPPlugin(BaseCrawlerPlugin):
payload = data or {}
to = self._http_timeout(timeout)
for attempt in range(retries):
headers = self.get_headers(url=url, for_post=True)
p = urlparse(url)
if p.scheme and p.netloc:
headers["Origin"] = f"{p.scheme}://{p.netloc}"
headers["Referer"] = url
headers["Content-Type"] = "application/x-www-form-urlencoded"
try:
response = await client.post(
url,
headers=self.get_headers(),
headers=headers,
data=payload,
timeout=to,
)
if response.status_code == 200:
return self._decode_response_body(response)
logger.warning(f"POST {url} returned status {response.status_code}")
if response.status_code in _CURL_FALLBACK_STATUS:
curl_text = await self._curl_post(url, payload, headers, timeout)
if curl_text:
logger.info(f"POST {url} 使用浏览器指纹回退成功")
return curl_text
except Exception as e:
logger.warning(f"POST {url} failed (attempt {attempt + 1}/{retries}): {e}")
curl_text = await self._curl_post(url, payload, headers, timeout)
if curl_text:
logger.info(f"POST {url} 异常后浏览器指纹回退成功")
return curl_text
if attempt < retries - 1:
await asyncio.sleep(random.uniform(1, 3))
try:
headers = self.get_headers(url=url, for_post=True)
p = urlparse(url)
if p.scheme and p.netloc:
headers["Origin"] = f"{p.scheme}://{p.netloc}"
headers["Referer"] = url
headers["Content-Type"] = "application/x-www-form-urlencoded"
text = await asyncio.to_thread(
self._sync_post, url, payload, timeout, self.get_headers()
self._sync_post, url, payload, timeout, headers
)
if text:
logger.info(f"POST {url} 使用同步回退成功")

View File

@@ -1,5 +1,5 @@
import re
from typing import List
from typing import List, Optional
from bs4 import BeautifulSoup
from app.core.plugin_system import ProxyRaw
from app.plugins.base import BaseHTTPPlugin
@@ -26,8 +26,8 @@ class Ip3366Plugin(BaseHTTPPlugin):
f"http://www.ip3366.net/free/?stype=2&page={i}" for i in range(1, max_pages + 1)
]
def get_headers(self) -> dict:
headers = super().get_headers()
def get_headers(self, url: Optional[str] = None, **kwargs) -> dict:
headers = super().get_headers(url=url, **kwargs)
headers["Referer"] = "http://www.ip3366.net/free/"
return headers

View File

@@ -1,7 +1,7 @@
import re
import asyncio
import random
from typing import List
from typing import List, Optional
from bs4 import BeautifulSoup
from app.core.plugin_system import ProxyRaw
from app.plugins.base import BaseHTTPPlugin
@@ -26,8 +26,8 @@ class KuaiDaiLiPlugin(BaseHTTPPlugin):
"https://www.kuaidaili.com/free/intr/1/",
]
def get_headers(self) -> dict:
headers = super().get_headers()
def get_headers(self, url: Optional[str] = None, **kwargs) -> dict:
headers = super().get_headers(url=url, **kwargs)
headers["Referer"] = "https://www.kuaidaili.com/free/"
headers["Accept"] = "text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8"
headers["Accept-Encoding"] = "gzip, deflate"