Files
ProxyPool/tasks_manager.py
2026-01-27 21:17:36 +08:00

225 lines
8.3 KiB
Python

import asyncio
from datetime import datetime
from core.plugin_manager import PluginManager
from core.sqlite import SQLiteManager
from core.validator import ProxyValidator
from core.log import logger
from typing import Optional, Callable
class TasksManager:
def __init__(self):
self.is_running = False
self.stop_requested = False
self.current_task = None
self.validator_tasks = []
self.progress_callback = None
self.status_callback = None
self.proxy_queue = asyncio.Queue(maxsize=500)
self.stats = {
'total_found': 0,
'total_verified': 0,
'start_time': None,
'current_url': None,
'plugins': []
}
def set_callbacks(self, progress_callback: Optional[Callable] = None, status_callback: Optional[Callable] = None):
self.progress_callback = progress_callback
self.status_callback = status_callback
async def _notify_progress(self, data: dict):
if self.progress_callback:
data['timestamp'] = datetime.now().isoformat()
if 'found' in data and 'verified' in data:
data['success_rate'] = round((data['verified'] / data['found'] * 100), 2) if data['found'] > 0 else 0
await self.progress_callback(data)
async def _notify_status(self, status: str, message: str):
if self.status_callback:
await self.status_callback({
'status': status,
'message': message,
'timestamp': datetime.now().isoformat()
})
async def run_crawler(self):
await self._notify_status('crawling', '开始爬取代理啦~')
manager = PluginManager()
count = 0
self.stats['plugins'] = [plugin.name for plugin in manager.plugins]
async for ip, port, protocol in manager.run_all():
if self.stop_requested:
logger.info("爬虫收到停止信号")
break
await self.proxy_queue.put((ip, port, protocol))
count += 1
self.stats['total_found'] = count
if count % 10 == 0:
await self._notify_progress({
'type': 'crawling',
'found': count,
'verified': self.stats['total_verified']
})
if self.stop_requested:
await self._notify_status('stopped', '爬虫已停止啦~')
else:
await self._notify_status('crawling_done', f'爬虫抓取完成啦,共发现 {count} 个潜在代理~')
logger.info(f"爬虫抓取阶段完成,共发现 {count} 个潜在代理。")
async def run_validator(self, db: SQLiteManager, validator: ProxyValidator):
await self._notify_status('validating', '开始验证代理啦~')
verified_count = 0
while True:
proxy = await self.proxy_queue.get()
if proxy is None or self.stop_requested:
self.proxy_queue.task_done()
break
ip, port, protocol = proxy
try:
is_valid, latency = await validator.validate(ip, port, protocol)
if is_valid:
logger.info(f"验证通过: {ip}:{port} ({protocol}) - 延迟: {latency}ms")
await db.insert_proxy(ip, port, protocol)
verified_count += 1
self.stats['total_verified'] = verified_count
if verified_count % 5 == 0:
await self._notify_progress({
'type': 'validating',
'found': self.stats['total_found'],
'verified': verified_count,
'current_proxy': f"{ip}:{port}"
})
else:
logger.info(f"验证失败: {ip}:{port} ({protocol})")
except Exception as e:
logger.error(f"验证器异常: {e}")
finally:
self.proxy_queue.task_done()
if self.stop_requested:
await self._notify_status('stopped', '验证器已停止啦~')
elif verified_count > 0:
await self._notify_status('validating_done', f'验证完成啦,入库 {verified_count} 个代理~')
logger.info(f"验证协程完成,入库 {verified_count} 个代理。")
async def start_task(self, db: SQLiteManager, validator: ProxyValidator, num_validators: int = 50):
if self.is_running:
await self._notify_status('error', '任务正在运行中呢~')
return False
self.is_running = True
self.stop_requested = False
self.stats = {
'total_found': 0,
'total_verified': 0,
'start_time': datetime.now().isoformat(),
'current_url': None,
'plugins': []
}
await self._notify_status('running', '任务开始啦~')
crawler_task = asyncio.create_task(self.run_crawler())
self.validator_tasks = [asyncio.create_task(self.run_validator(db, validator)) for _ in range(num_validators)]
await crawler_task
for _ in range(num_validators):
await self.proxy_queue.put(None)
await self.proxy_queue.join()
await asyncio.gather(*self.validator_tasks, return_exceptions=True)
total = await db.count_proxies()
self.is_running = False
self.stop_requested = False
if not self.stop_requested:
await self._notify_status('completed', f'任务完成啦,当前池内总数: {total}~')
await self._notify_progress({
'type': 'completed',
'found': self.stats['total_found'],
'verified': self.stats['total_verified'],
'total': total
})
logger.info(f"=== 运行结束,当前池内总数: {total} ===")
return True
async def stop_task(self):
if not self.is_running:
return False
self.stop_requested = True
# 取消所有验证器任务
for task in self.validator_tasks:
if not task.done():
task.cancel()
# 清空队列并添加停止信号
while not self.proxy_queue.empty():
try:
self.proxy_queue.get_nowait()
except asyncio.QueueEmpty:
break
# 添加停止信号到队列
for _ in range(len(self.validator_tasks)):
await self.proxy_queue.put(None)
await self._notify_status('stopped', '任务已停止~')
logger.info("任务被手动停止")
return True
def get_stats(self) -> dict:
return self.stats.copy()
def is_task_running(self) -> bool:
return self.is_running
class ScheduledTasks:
def __init__(self, tasks_manager: TasksManager):
self.tasks_manager = tasks_manager
self.scheduler_task = None
self.is_scheduled = False
self.interval_minutes = 60
async def scheduler(self):
from core.validator import ProxyValidator
from core.sqlite import SQLiteManager
while self.is_scheduled:
try:
db = SQLiteManager()
await db.init_db()
async with ProxyValidator(max_concurrency=200) as validator:
await self.tasks_manager.start_task(db, validator, num_validators=50)
await asyncio.sleep(self.interval_minutes * 60)
except Exception as e:
logger.error(f"定时任务异常: {e}")
await asyncio.sleep(60)
def start_scheduled(self, interval_minutes: int = 60):
self.interval_minutes = interval_minutes
self.is_scheduled = True
self.scheduler_task = asyncio.create_task(self.scheduler())
logger.info(f"定时任务已启动,间隔: {interval_minutes} 分钟")
def stop_scheduled(self):
self.is_scheduled = False
if self.scheduler_task:
self.scheduler_task.cancel()
logger.info("定时任务已停止")