Files
ProxyPool/app/api/routes/proxies.py

129 lines
4.4 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""代理相关路由(含统计信息)"""
from typing import Optional
from fastapi import APIRouter, Depends, Query
from fastapi.responses import StreamingResponse
from app.services.proxy_service import ProxyService
from app.services.scheduler_service import SchedulerService
from app.services.dashboard_stats import get_dashboard_stats
from app.models.schemas import ProxyListRequest, BatchDeleteRequest, ProxyDeleteItem
from app.api.deps import get_proxy_service, get_scheduler_service
from app.api.common import success_response, format_proxy
from app.core.exceptions import ProxyPoolException, ProxyNotFoundException
from app.core.config import settings as app_settings
router = APIRouter(prefix="/api/proxies", tags=["proxies"])
_EXPORT_MAX = int(app_settings.export_max_records)
@router.get("/stats")
async def get_stats(
scheduler_service: SchedulerService = Depends(get_scheduler_service),
):
stats = await get_dashboard_stats(scheduler_service.running)
return success_response("获取统计信息成功", stats)
@router.post("")
async def list_proxies(
request: ProxyListRequest,
service: ProxyService = Depends(get_proxy_service),
):
proxies, total = await service.list_proxies(
page=request.page,
page_size=request.page_size,
protocol=request.protocol,
min_score=request.min_score,
max_score=request.max_score,
sort_by=request.sort_by,
sort_order=request.sort_order,
pool_filter=request.pool_filter,
)
return success_response(
"获取代理列表成功",
{
"list": [format_proxy(p) for p in proxies],
"total": total,
"page": request.page,
"page_size": request.page_size,
},
)
@router.get("/random")
async def get_random_proxy(service: ProxyService = Depends(get_proxy_service)):
proxy = await service.get_random_proxy()
if not proxy:
raise ProxyPoolException("暂无可用代理", 404)
return success_response("获取随机代理成功", format_proxy(proxy))
@router.get("/export/{fmt}")
async def export_proxies(
fmt: str,
protocol: Optional[str] = None,
limit: int = Query(default=_EXPORT_MAX, ge=1, le=_EXPORT_MAX),
service: ProxyService = Depends(get_proxy_service),
):
if fmt not in ("csv", "txt", "json"):
raise ProxyPoolException("不支持的导出格式", 400)
media_types = {"csv": "text/csv", "txt": "text/plain", "json": "application/json"}
return StreamingResponse(
service.export_proxies(fmt, protocol, limit),
media_type=media_types[fmt],
headers={"Content-Disposition": f"attachment; filename=proxies.{fmt}"},
)
@router.post("/delete-one")
async def delete_proxy_one(
item: ProxyDeleteItem,
service: ProxyService = Depends(get_proxy_service),
):
"""JSON 删除推荐IPv6 等含冒号 IP 不受路径分段影响。"""
await service.delete_proxy(item.ip, item.port)
return success_response("删除代理成功")
@router.delete("/{ip}/{port}")
async def delete_proxy(ip: str, port: int, service: ProxyService = Depends(get_proxy_service)):
await service.delete_proxy(ip, port)
return success_response("删除代理成功")
@router.post("/batch-delete")
async def batch_delete(
request: BatchDeleteRequest,
service: ProxyService = Depends(get_proxy_service),
):
proxies = [(item.ip, item.port) for item in request.proxies]
deleted = await service.batch_delete(proxies)
return success_response(f"批量删除 {deleted} 个代理成功", {"deleted_count": deleted})
@router.delete("/clean-invalid")
async def clean_invalid(service: ProxyService = Depends(get_proxy_service)):
count = await service.clean_invalid()
return success_response(f"清理了 {count} 个无效代理", {"deleted_count": count})
@router.get("/latency-distribution")
async def get_latency_distribution(
service: ProxyService = Depends(get_proxy_service),
):
"""获取延迟分布数据,用于直方图展示"""
distribution = await service.get_latency_distribution()
return success_response("获取延迟分布成功", distribution)
@router.get("/score-distribution")
async def get_score_distribution(
service: ProxyService = Depends(get_proxy_service),
):
"""获取评分分布数据,用于柱状图展示"""
distribution = await service.get_score_distribution()
return success_response("获取评分分布成功", distribution)