Files
ProxyPool/core/crawler.py
2026-01-27 21:17:36 +08:00

87 lines
4.0 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import aiohttp
import asyncio
import random
from core.log import logger
class BaseCrawler:
def __init__(self):
self.user_agents = [
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36",
"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/119.0.0.0 Safari/537.36",
"Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/118.0.0.0 Safari/537.36",
"Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:109.0) Gecko/20100101 Firefox/121.0",
"Mozilla/5.0 (iPhone; CPU iPhone OS 17_1_2 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/17.1.2 Mobile/15E148 Safari/604.1"
]
def get_headers(self):
return {
'User-Agent': random.choice(self.user_agents),
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8',
'Accept-Language': 'zh-CN,zh;q=0.8,zh-TW;q=0.7,zh-HK;q=0.5,en-US;q=0.3,en;q=0.2',
'Connection': 'keep-alive',
}
async def fetch(self, url, method='GET', params=None, data=None, proxies=None, timeout=10, retry_count=3):
"""异步抓取方法"""
headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36'
}
async with aiohttp.ClientSession(headers=headers) as session:
for i in range(retry_count):
try:
# 注意aiohttp 的代理格式与 requests 不同,通常为 http://user:pass@host:port
async with session.request(
method=method,
url=url,
params=params,
data=data,
proxy=proxies,
timeout=aiohttp.ClientTimeout(total=timeout)
) as response:
if response.status == 200:
# 先读取内容,再处理编码
content = await response.read()
# 尝试获取编码
encoding = response.get_encoding()
if encoding == 'utf-8' or not encoding:
try:
return content.decode('utf-8')
except UnicodeDecodeError:
# 尝试从内容中检测编码或手动设置为 gbk (国内网站常见)
return content.decode('gbk', errors='ignore')
return content.decode(encoding, errors='ignore')
else:
logger.warning(f"请求失败 [{response.status}]: {url}, 正在进行第 {i+1} 次重试...")
except Exception as e:
logger.error(f"请求异常: {url}, 错误: {e}, 正在进行第 {i+1} 次重试...")
await asyncio.sleep(random.uniform(1, 3))
return None
class BasePlugin(BaseCrawler):
def __init__(self):
super().__init__()
self.name = "BasePlugin"
self.urls = []
self.enabled = True
async def parse(self, html):
"""异步解析网页内容,需在子类中实现"""
raise NotImplementedError("Please implement parse method")
async def run(self):
"""异步运行插件"""
logger.info(f"正在运行插件: {self.name}")
results = []
for url in self.urls:
self.current_url = url # 记录当前正在抓取的 URL供 parse 使用
html = await self.fetch(url)
if html:
async for proxy in self.parse(html):
results.append(proxy)
await asyncio.sleep(random.uniform(1, 2))
return results