from fastapi import HTTPException, Depends, Header, status from typing import Optional from config import Config from core.log import logger class PermissionLevel: READ_ONLY = "read_only" ADMIN = "admin" def verify_api_key( x_api_key: Optional[str] = Header(None, alias="X-API-Key"), authorization: Optional[str] = Header(None) ) -> str: """ 验证API Key并返回权限级别 Args: x_api_key: X-API-Key header中的API Key authorization: Authorization header中的Bearer token Returns: str: 权限级别 Raises: HTTPException: 认证失败时抛出401错误 """ api_key = x_api_key if authorization and authorization.startswith("Bearer "): api_key = authorization.replace("Bearer ", "") if not api_key: logger.warning("API请求缺少API Key") raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="缺少API Key,请在请求头中添加 X-API-Key 或 Authorization: Bearer ", headers={"WWW-Authenticate": "Bearer"}, ) if api_key == Config.ADMIN_API_KEY: logger.info(f"管理员API认证成功: {api_key[:8]}...") return PermissionLevel.ADMIN elif api_key == Config.API_KEY: logger.info(f"普通用户API认证成功: {api_key[:8]}...") return PermissionLevel.READ_ONLY else: logger.warning(f"无效的API Key尝试: {api_key[:8]}...") raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="无效的API Key", headers={"WWW-Authenticate": "Bearer"}, ) def require_admin( x_api_key: Optional[str] = Header(None, alias="X-API-Key"), authorization: Optional[str] = Header(None) ) -> str: """ 要求管理员权限的依赖函数 Args: x_api_key: X-API-Key header中的API Key authorization: Authorization header中的Bearer token Returns: str: 权限级别 Raises: HTTPException: 权限不足时抛出403错误 """ # 如果未启用认证,直接返回管理员权限 if not Config.REQUIRE_AUTH: logger.info("开发模式:跳过管理员权限检查") return PermissionLevel.ADMIN # 验证API Key api_key = x_api_key if authorization and authorization.startswith("Bearer "): api_key = authorization.replace("Bearer ", "") if not api_key: raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="缺少API Key,请在请求头中添加 X-API-Key 或 Authorization: Bearer ", headers={"WWW-Authenticate": "Bearer"}, ) # 检查权限级别 if api_key == Config.ADMIN_API_KEY: logger.info(f"管理员API认证成功: {api_key[:8]}...") return PermissionLevel.ADMIN else: logger.warning(f"非管理员用户尝试访问管理接口") raise HTTPException( status_code=status.HTTP_403_FORBIDDEN, detail="需要管理员权限才能执行此操作" ) def skip_auth_for_dev() -> Optional[str]: """ 开发环境跳过认证(仅在开发模式下使用) Returns: Optional[str]: 返回管理员权限级别 Warning: 仅用于开发环境,生产环境务必使用真实认证 """ import os if os.getenv("SKIP_AUTH", "false").lower() == "true": logger.warning("开发模式:跳过API Key认证") return PermissionLevel.ADMIN return None