from fastapi import FastAPI, HTTPException, Request, status from fastapi.middleware.cors import CORSMiddleware from fastapi.responses import StreamingResponse, JSONResponse from pydantic import BaseModel, Field, field_validator, ValidationError from typing import Optional, List import asyncio import json from datetime import datetime import re import os from contextlib import asynccontextmanager from core.sqlite import SQLiteManager from core.plugin_manager import PluginManager from core.scheduler import ValidationScheduler from core.log import logger from config import config # 全局调度器实例 scheduler = ValidationScheduler() # 设置文件路径 SETTINGS_FILE = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'data', 'settings.json') # 默认设置 DEFAULT_SETTINGS = { "crawl_timeout": 30, "validation_timeout": config.VALIDATOR_TIMEOUT, "max_retries": 3, "default_concurrency": config.VALIDATOR_MAX_CONCURRENCY, "min_proxy_score": config.SCORE_MIN, "proxy_expiry_days": 7, "auto_validate": True, "validate_interval_minutes": 30 } def load_settings(): """从文件加载设置""" try: if os.path.exists(SETTINGS_FILE): with open(SETTINGS_FILE, 'r', encoding='utf-8') as f: saved_settings = json.load(f) # 合并默认设置和保存的设置 settings = DEFAULT_SETTINGS.copy() settings.update(saved_settings) return settings except Exception as e: logger.error(f"加载设置失败: {e}") return DEFAULT_SETTINGS.copy() def save_settings_to_file(settings: dict): """保存设置到文件""" try: # 确保目录存在 os.makedirs(os.path.dirname(SETTINGS_FILE), exist_ok=True) with open(SETTINGS_FILE, 'w', encoding='utf-8') as f: json.dump(settings, f, ensure_ascii=False, indent=2) return True except Exception as e: logger.error(f"保存设置失败: {e}") return False @asynccontextmanager async def lifespan(app: FastAPI): """应用生命周期管理""" db = SQLiteManager() await db.init_db() # 加载设置并应用到调度器 settings = load_settings() scheduler.interval_minutes = settings.get('validate_interval_minutes', 30) # 如果启用了自动验证,启动调度器 if settings.get('auto_validate', True): await scheduler.start() logger.info("API服务器启动") yield # 关闭调度器 await scheduler.stop() logger.info("API服务器关闭") app = FastAPI(title="代理池API", version="1.3.0", lifespan=lifespan) def format_datetime(datetime_str: str) -> str: """将数据库时间格式统一转换为ISO 8601格式""" if not datetime_str: return None if isinstance(datetime_str, str): if 'T' in datetime_str: return datetime_str if re.match(r'\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}', datetime_str): return datetime_str.replace(' ', 'T') + '.000Z' return datetime_str @app.exception_handler(ValidationError) async def validation_exception_handler(request: Request, exc: ValidationError): logger.error(f"参数验证失败: {exc}") return JSONResponse( status_code=status.HTTP_422_UNPROCESSABLE_ENTITY, content={"code": 422, "message": "参数验证失败", "data": exc.errors()} ) @app.exception_handler(HTTPException) async def http_exception_handler(request: Request, exc: HTTPException): logger.error(f"HTTP异常: {exc.status_code} - {exc.detail}") return JSONResponse( status_code=exc.status_code, content={"code": exc.status_code, "message": exc.detail, "data": None} ) @app.exception_handler(Exception) async def general_exception_handler(request: Request, exc: Exception): logger.error(f"未处理的异常: {exc}", exc_info=True) return JSONResponse( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, content={"code": 500, "message": "服务器内部错误", "data": None} ) app.add_middleware( CORSMiddleware, allow_origins=["*"], allow_credentials=True, allow_methods=["*"], allow_headers=["*"], ) plugin_manager = PluginManager() class ProxyRequest(BaseModel): page: int = Field(default=1, ge=1, description="页码,必须大于等于1") page_size: int = Field(default=20, ge=1, le=100, description="每页数量,必须在1-100之间") protocol: Optional[str] = None min_score: int = Field(default=0, ge=0, description="最低分数") max_score: Optional[int] = Field(default=None, ge=0, description="最高分数") sort_by: str = 'last_check' sort_order: str = 'DESC' @field_validator('protocol') @classmethod def validate_protocol(cls, v): if v is not None and v.lower() not in ['http', 'https', 'socks4', 'socks5']: raise ValueError('协议类型必须是 http, https, socks4 或 socks5') return v.lower() if v else v @field_validator('sort_by') @classmethod def validate_sort_by(cls, v): if v not in ['ip', 'port', 'protocol', 'score', 'last_check']: raise ValueError('排序字段必须是 ip, port, protocol, score 或 last_check') return v @field_validator('sort_order') @classmethod def validate_sort_order(cls, v): if v.upper() not in ['ASC', 'DESC']: raise ValueError('排序方式必须是 ASC 或 DESC') return v.upper() class ProxyDeleteItem(BaseModel): ip: str port: int @field_validator('port') @classmethod def validate_port(cls, v): if not 1 <= v <= 65535: raise ValueError('端口号必须在1-65535范围内') return v class DeleteProxiesRequest(BaseModel): proxies: List[ProxyDeleteItem] @field_validator('proxies') @classmethod def validate_proxies_count(cls, v): if len(v) > 1000: raise ValueError('单次最多删除1000个代理') return v @app.get("/") async def root(): return {"message": "欢迎使用代理池API", "status": "running", "data": None} @app.get("/health") async def health_check(): try: db = SQLiteManager() await db.count_proxies() return { "status": "healthy", "timestamp": datetime.now().isoformat(), "database": "connected", "scheduler": "running" if scheduler.running else "stopped", "version": "1.3.0" } except Exception as e: logger.error(f"健康检查失败: {e}") return { "status": "unhealthy", "timestamp": datetime.now().isoformat(), "database": "disconnected", "error": str(e) } @app.get("/api/stats") async def get_stats(): try: db = SQLiteManager() stats = await db.get_stats() today_new = await db.get_today_new_count() stats['today_new'] = today_new stats['scheduler_running'] = scheduler.running return {"code": 200, "message": "获取统计信息成功", "data": stats} except Exception as e: logger.error(f"获取统计信息失败: {e}") return {"code": 500, "message": "获取统计信息失败", "data": None} @app.post("/api/proxies") async def get_proxies(request: ProxyRequest): try: db = SQLiteManager() proxies = await db.get_proxies_paginated( page=request.page, page_size=request.page_size, protocol=request.protocol, min_score=request.min_score, max_score=request.max_score, sort_by=request.sort_by, sort_order=request.sort_order ) total = await db.get_proxies_total( protocol=request.protocol, min_score=request.min_score, max_score=request.max_score ) proxy_list = [] for proxy in proxies: proxy_list.append({ "ip": proxy[0], "port": proxy[1], "protocol": proxy[2], "score": proxy[3], "last_check": format_datetime(proxy[4]) }) return { "code": 200, "message": "获取代理列表成功", "data": { "list": proxy_list, "total": total, "page": request.page, "page_size": request.page_size } } except Exception as e: logger.error(f"获取代理列表失败: {e}") return {"code": 500, "message": "获取代理列表失败", "data": None} @app.get("/api/proxies/random") async def get_random_proxy(): db = SQLiteManager() proxy = await db.get_random_proxy() if proxy: return { "code": 200, "message": "获取随机代理成功", "data": { "ip": proxy[0], "port": proxy[1], "protocol": proxy[2], "score": proxy[3], "last_check": format_datetime(proxy[4]) } } return {"code": 404, "message": "没有找到可用的代理", "data": None} @app.get("/api/proxies/export/{format}") async def export_proxies(format: str, protocol: Optional[str] = None, limit: int = 10000): try: db = SQLiteManager() if format not in ['csv', 'txt', 'json']: raise HTTPException(status_code=400, detail="不支持的导出格式") if limit > 100000: raise HTTPException(status_code=400, detail="导出数量不能超过100000条") async def generate_csv(): proxies = await db.get_all_proxies() if protocol: proxies = [p for p in proxies if p[2].lower() == protocol.lower()] proxies = proxies[:limit] output = [] output.append('IP,Port,Protocol,Score,Last Check') for proxy in proxies: output.append(f"{proxy[0]},{proxy[1]},{proxy[2]},{proxy[3]},{format_datetime(proxy[4])}") for line in output: yield line + '\n' async def generate_txt(): proxies = await db.get_all_proxies() if protocol: proxies = [p for p in proxies if p[2].lower() == protocol.lower()] proxies = proxies[:limit] for proxy in proxies: yield f"{proxy[0]}:{proxy[1]}\n" async def generate_json(): proxies = await db.get_all_proxies() if protocol: proxies = [p for p in proxies if p[2].lower() == protocol.lower()] proxies = proxies[:limit] proxy_list = [] for proxy in proxies: proxy_list.append({'ip': proxy[0], 'port': proxy[1], 'protocol': proxy[2], 'score': proxy[3], 'last_check': format_datetime(proxy[4])}) yield '[\n' for i, item in enumerate(proxy_list): if i > 0: yield ',\n' yield json.dumps(item, ensure_ascii=False, indent=2) yield '\n]' if format == 'csv': return StreamingResponse( generate_csv(), media_type='text/csv', headers={'Content-Disposition': 'attachment; filename=proxies.csv'} ) elif format == 'txt': return StreamingResponse( generate_txt(), media_type='text/plain', headers={'Content-Disposition': 'attachment; filename=proxies.txt'} ) elif format == 'json': return StreamingResponse( generate_json(), media_type='application/json', headers={'Content-Disposition': 'attachment; filename=proxies.json'} ) except HTTPException: raise except Exception as e: logger.error(f"导出代理失败: {e}") raise HTTPException(status_code=500, detail="导出代理失败") @app.get("/api/proxies/{ip}/{port}") async def get_proxy_detail(ip: str, port: int): db = SQLiteManager() proxy = await db.get_proxy_detail(ip, port) if proxy: return { "code": 200, "message": "获取代理详情成功", "data": { "ip": proxy[0], "port": proxy[1], "protocol": proxy[2], "score": proxy[3], "last_check": format_datetime(proxy[4]) } } raise HTTPException(status_code=404, detail="代理不存在") @app.delete("/api/proxies/{ip}/{port}") async def delete_proxy(ip: str, port: int): db = SQLiteManager() await db.delete_proxy(ip, port) return {"code": 200, "message": "删除代理成功", "data": None} @app.post("/api/proxies/batch-delete") async def batch_delete_proxies(request: DeleteProxiesRequest): db = SQLiteManager() proxy_tuples = [(item.ip, item.port) for item in request.proxies] deleted_count = await db.batch_delete_proxies(proxy_tuples) return {"code": 200, "message": f"批量删除 {deleted_count} 个代理成功", "data": {"deleted_count": deleted_count}} @app.delete("/api/proxies/clean-invalid") async def clean_invalid_proxies(): db = SQLiteManager() deleted_count = await db.clean_invalid_proxies() return {"code": 200, "message": f"清理了 {deleted_count} 个无效代理", "data": {"deleted_count": deleted_count}} @app.get("/api/plugins") async def get_plugins(): try: plugins_info = plugin_manager.get_all_plugin_info() return { "code": 200, "message": "获取插件列表成功", "data": { "plugins": plugins_info } } except Exception as e: logger.error(f"获取插件列表失败: {e}") return {"code": 500, "message": "获取插件列表失败", "data": None} class PluginToggleRequest(BaseModel): enabled: bool @app.put("/api/plugins/{plugin_id}/toggle") async def toggle_plugin(plugin_id: str, request: PluginToggleRequest): try: success = plugin_manager.toggle_plugin(plugin_id, request.enabled) if success: return { "code": 200, "message": f"插件 {plugin_id} 已{'启用' if request.enabled else '禁用'}", "data": { "plugin_id": plugin_id, "enabled": request.enabled } } else: return {"code": 404, "message": "插件不存在", "data": None} except Exception as e: logger.error(f"切换插件状态失败: {e}") return {"code": 500, "message": "切换插件状态失败", "data": None} @app.post("/api/plugins/{plugin_id}/crawl") async def crawl_plugin(plugin_id: str): try: # 1. 执行爬取 results = await plugin_manager.run_plugin(plugin_id) if not results: return { "code": 200, "message": f"插件 {plugin_id} 爬取完成,未获取到代理", "data": { "plugin_id": plugin_id, "proxy_count": 0, "valid_count": 0 } } logger.info(f"插件 {plugin_id} 爬取完成,获取 {len(results)} 个代理,开始验证...") # 2. 验证新抓取的代理 valid_proxies, invalid_proxies = await scheduler.validate_proxies_batch(results) # 3. 只将有效代理存入数据库 db = SQLiteManager() inserted_count = 0 for ip, port, protocol in valid_proxies: success = await db.insert_proxy(ip, port, protocol, score=config.SCORE_VALID) if success: inserted_count += 1 logger.info(f"插件 {plugin_id} 处理完成: 有效 {inserted_count}, 无效 {len(invalid_proxies)}") return { "code": 200, "message": f"插件 {plugin_id} 爬取并验证完成", "data": { "plugin_id": plugin_id, "proxy_count": len(results), "valid_count": inserted_count, "invalid_count": len(invalid_proxies) } } except Exception as e: logger.error(f"插件爬取失败: {e}") return {"code": 500, "message": f"插件爬取失败: {str(e)}", "data": None} @app.post("/api/plugins/crawl-all") async def crawl_all_plugins(): """运行所有插件并验证""" try: all_results = [] all_valid = [] all_invalid = [] for plugin in plugin_manager.plugins: if not plugin.enabled: continue try: results = await plugin_manager.run_plugin(plugin.name) if results: all_results.extend(results) except Exception as e: logger.error(f"插件 {plugin.name} 执行失败: {e}") continue if all_results: # 去重 unique_proxies = list(set(all_results)) logger.info(f"所有插件爬取完成,共 {len(unique_proxies)} 个唯一代理,开始验证...") # 验证 valid_proxies, invalid_proxies = await scheduler.validate_proxies_batch(unique_proxies) # 保存有效代理 db = SQLiteManager() inserted_count = 0 for ip, port, protocol in valid_proxies: success = await db.insert_proxy(ip, port, protocol, score=config.SCORE_VALID) if success: inserted_count += 1 return { "code": 200, "message": "所有插件爬取并验证完成", "data": { "total_crawled": len(unique_proxies), "valid_count": inserted_count, "invalid_count": len(invalid_proxies) } } return { "code": 200, "message": "所有插件爬取完成,未获取到代理", "data": { "total_crawled": 0, "valid_count": 0, "invalid_count": 0 } } except Exception as e: logger.error(f"批量爬取失败: {e}") return {"code": 500, "message": f"批量爬取失败: {str(e)}", "data": None} # 验证调度器控制 @app.post("/api/scheduler/start") async def start_scheduler(): """启动验证调度器""" try: if scheduler.running: return {"code": 200, "message": "验证调度器已在运行", "data": {"running": True}} await scheduler.start() # 更新设置 settings = load_settings() settings['auto_validate'] = True save_settings_to_file(settings) return {"code": 200, "message": "验证调度器已启动", "data": {"running": True}} except Exception as e: logger.error(f"启动调度器失败: {e}") return {"code": 500, "message": f"启动调度器失败: {str(e)}", "data": None} @app.post("/api/scheduler/stop") async def stop_scheduler(): """停止验证调度器""" try: if not scheduler.running: return {"code": 200, "message": "验证调度器未运行", "data": {"running": False}} await scheduler.stop() # 更新设置 settings = load_settings() settings['auto_validate'] = False save_settings_to_file(settings) return {"code": 200, "message": "验证调度器已停止", "data": {"running": False}} except Exception as e: logger.error(f"停止调度器失败: {e}") return {"code": 500, "message": f"停止调度器失败: {str(e)}", "data": None} @app.post("/api/scheduler/validate-now") async def validate_now(): """立即执行一次全量验证""" try: # 在后台运行验证,不阻塞响应 asyncio.create_task(scheduler.validate_all_proxies()) return {"code": 200, "message": "已开始全量验证", "data": {"started": True}} except Exception as e: logger.error(f"启动验证失败: {e}") return {"code": 500, "message": f"启动验证失败: {str(e)}", "data": None} @app.get("/api/scheduler/status") async def get_scheduler_status(): """获取调度器状态""" return { "code": 200, "message": "获取状态成功", "data": { "running": scheduler.running, "interval_minutes": scheduler.interval_minutes } } # 设置管理 class SettingsRequest(BaseModel): crawl_timeout: int = Field(default=30, ge=5, le=120) validation_timeout: int = Field(default=10, ge=3, le=60) max_retries: int = Field(default=3, ge=0, le=10) default_concurrency: int = Field(default=50, ge=10, le=200) min_proxy_score: int = Field(default=0, ge=0, le=100) proxy_expiry_days: int = Field(default=7, ge=1, le=30) auto_validate: bool = True validate_interval_minutes: int = Field(default=30, ge=5, le=1440) @app.get("/api/settings") async def get_settings(): """获取系统设置""" try: settings = load_settings() return {"code": 200, "message": "获取设置成功", "data": settings} except Exception as e: logger.error(f"获取设置失败: {e}") return {"code": 500, "message": "获取设置失败", "data": None} @app.post("/api/settings") async def save_settings(request: SettingsRequest): """保存系统设置""" try: settings = { "crawl_timeout": request.crawl_timeout, "validation_timeout": request.validation_timeout, "max_retries": request.max_retries, "default_concurrency": request.default_concurrency, "min_proxy_score": request.min_proxy_score, "proxy_expiry_days": request.proxy_expiry_days, "auto_validate": request.auto_validate, "validate_interval_minutes": request.validate_interval_minutes } # 保存到文件 if save_settings_to_file(settings): # 更新调度器配置 scheduler.interval_minutes = request.validate_interval_minutes # 如果自动验证状态改变,启动或停止调度器 if request.auto_validate and not scheduler.running: await scheduler.start() elif not request.auto_validate and scheduler.running: await scheduler.stop() return {"code": 200, "message": "保存设置成功", "data": settings} else: return {"code": 500, "message": "保存设置失败", "data": None} except Exception as e: logger.error(f"保存设置失败: {e}") return {"code": 500, "message": f"保存设置失败: {str(e)}", "data": None} if __name__ == "__main__": import uvicorn uvicorn.run(app, host=config.HOST, port=config.PORT)