import asyncio from datetime import datetime from core.plugin_manager import PluginManager from core.sqlite import SQLiteManager from core.validator import ProxyValidator from core.log import logger from typing import Optional, Callable class TasksManager: def __init__(self): self.is_running = False self.stop_requested = False self.current_task = None self.validator_tasks = [] self.progress_callback = None self.status_callback = None self.proxy_queue = asyncio.Queue(maxsize=500) self.stats = { 'total_found': 0, 'total_verified': 0, 'start_time': None, 'current_url': None, 'plugins': [] } self.estimated_total = 1000 def set_callbacks(self, progress_callback: Optional[Callable] = None, status_callback: Optional[Callable] = None): self.progress_callback = progress_callback self.status_callback = status_callback async def _notify_progress(self, data: dict): if self.progress_callback: data['timestamp'] = datetime.now().isoformat() if 'found' in data and 'verified' in data: data['success_rate'] = round((data['verified'] / data['found'] * 100), 2) if data['found'] > 0 else 0 if 'found' in data: data['current'] = data['found'] + self.stats['total_verified'] data['total'] = self.estimated_total await self.progress_callback(data) async def _notify_status(self, status: str, message: str): if self.status_callback: await self.status_callback({ 'status': status, 'message': message, 'timestamp': datetime.now().isoformat() }) async def run_crawler(self): await self._notify_status('crawling_start', '开始爬取代理啦~') manager = PluginManager() count = 0 self.stats['plugins'] = [plugin.name for plugin in manager.plugins] async for ip, port, protocol in manager.run_all(): if self.stop_requested: logger.info("爬虫收到停止信号") break await self.proxy_queue.put((ip, port, protocol)) count += 1 self.stats['total_found'] = count if count % 5 == 0: await self._notify_progress({ 'type': 'crawling', 'found': count, 'verified': self.stats['total_verified'], 'current_proxy': f"{ip}:{port}", 'message': f'正在爬取:已发现 {count} 个代理' }) if self.stop_requested: await self._notify_status('stopped', '爬虫已停止啦~') else: await self._notify_status('crawling_done', f'爬虫抓取完成啦,共发现 {count} 个潜在代理~') logger.info(f"爬虫抓取阶段完成,共发现 {count} 个潜在代理。") async def run_validator(self, db: SQLiteManager, validator: ProxyValidator): await self._notify_status('validating_start', '开始验证代理啦~') verified_count = 0 while True: proxy = await self.proxy_queue.get() if proxy is None or self.stop_requested: self.proxy_queue.task_done() break ip, port, protocol = proxy try: is_valid, latency = await validator.validate(ip, port, protocol) if is_valid: logger.info(f"验证通过: {ip}:{port} ({protocol}) - 延迟: {latency}ms") await db.insert_proxy(ip, port, protocol) verified_count += 1 self.stats['total_verified'] = verified_count await self._notify_progress({ 'type': 'validating', 'found': self.stats['total_found'], 'verified': verified_count, 'current_proxy': f"{ip}:{port}", 'message': f'正在验证:已验证 {verified_count} 个代理' }) else: logger.info(f"验证失败: {ip}:{port} ({protocol})") except Exception as e: logger.error(f"验证器异常: {e}") finally: self.proxy_queue.task_done() if self.stop_requested: await self._notify_status('stopped', '验证器已停止啦~') elif verified_count > 0: await self._notify_status('validating_done', f'验证完成啦,入库 {verified_count} 个代理~') logger.info(f"验证协程完成,入库 {verified_count} 个代理。") async def start_task(self, db: SQLiteManager, num_validators: int = 50): if self.is_running: await self._notify_status('error', '任务正在运行中呢~') return False self.is_running = True self.stop_requested = False self.stats = { 'total_found': 0, 'total_verified': 0, 'start_time': datetime.now().isoformat(), 'current_url': None, 'plugins': [] } await self._notify_status('connecting', '正在连接插件源...') await self._notify_status('starting', '正在启动爬虫...') await self._notify_status('running', '任务开始啦~') async with ProxyValidator(max_concurrency=200) as validator: crawler_task = asyncio.create_task(self.run_crawler()) self.validator_tasks = [asyncio.create_task(self.run_validator(db, validator)) for _ in range(num_validators)] await crawler_task for _ in range(num_validators): await self.proxy_queue.put(None) await self.proxy_queue.join() await asyncio.gather(*self.validator_tasks, return_exceptions=True) total = await db.count_proxies() self.is_running = False self.stop_requested = False if not self.stop_requested: await self._notify_status('completed', f'任务完成啦,当前池内总数: {total}~') await self._notify_progress({ 'type': 'completed', 'found': self.stats['total_found'], 'verified': self.stats['total_verified'], 'total': total }) logger.info(f"=== 运行结束,当前池内总数: {total} ===") return True async def stop_task(self): if not self.is_running: return False self.stop_requested = True # 取消所有验证器任务 for task in self.validator_tasks: if not task.done(): task.cancel() # 清空队列并添加停止信号 while not self.proxy_queue.empty(): try: self.proxy_queue.get_nowait() except asyncio.QueueEmpty: break # 添加停止信号到队列 for _ in range(len(self.validator_tasks)): await self.proxy_queue.put(None) await self._notify_status('stopped', '任务已停止~') logger.info("任务被手动停止") return True def get_stats(self) -> dict: return self.stats.copy() def is_task_running(self) -> bool: return self.is_running class ScheduledTasks: def __init__(self, tasks_manager: TasksManager): self.tasks_manager = tasks_manager self.scheduler_task = None self.is_scheduled = False self.interval_minutes = 60 async def scheduler(self): from core.sqlite import SQLiteManager while self.is_scheduled: try: db = SQLiteManager() await db.init_db() await self.tasks_manager.start_task(db, num_validators=50) await asyncio.sleep(self.interval_minutes * 60) except Exception as e: logger.error(f"定时任务异常: {e}") await asyncio.sleep(60) def start_scheduled(self, interval_minutes: int = 60): self.interval_minutes = interval_minutes self.is_scheduled = True self.scheduler_task = asyncio.create_task(self.scheduler()) logger.info(f"定时任务已启动,间隔: {interval_minutes} 分钟") def stop_scheduled(self): self.is_scheduled = False if self.scheduler_task: self.scheduler_task.cancel() logger.info("定时任务已停止")