90 lines
2.7 KiB
Python
90 lines
2.7 KiB
Python
from fastapi import HTTPException, Depends, Header, status
|
||
from typing import Optional
|
||
from config import Config
|
||
from core.log import logger
|
||
|
||
class PermissionLevel:
|
||
READ_ONLY = "read_only"
|
||
ADMIN = "admin"
|
||
|
||
def verify_api_key(
|
||
x_api_key: Optional[str] = Header(None, alias="X-API-Key"),
|
||
authorization: Optional[str] = Header(None)
|
||
) -> str:
|
||
"""
|
||
验证API Key并返回权限级别
|
||
|
||
Args:
|
||
x_api_key: X-API-Key header中的API Key
|
||
authorization: Authorization header中的Bearer token
|
||
|
||
Returns:
|
||
str: 权限级别
|
||
|
||
Raises:
|
||
HTTPException: 认证失败时抛出401错误
|
||
"""
|
||
api_key = x_api_key
|
||
|
||
if authorization and authorization.startswith("Bearer "):
|
||
api_key = authorization.replace("Bearer ", "")
|
||
|
||
if not api_key:
|
||
logger.warning("API请求缺少API Key")
|
||
raise HTTPException(
|
||
status_code=status.HTTP_401_UNAUTHORIZED,
|
||
detail="缺少API Key,请在请求头中添加 X-API-Key 或 Authorization: Bearer <key>",
|
||
headers={"WWW-Authenticate": "Bearer"},
|
||
)
|
||
|
||
if api_key == Config.ADMIN_API_KEY:
|
||
logger.info(f"管理员API认证成功: {api_key[:8]}...")
|
||
return PermissionLevel.ADMIN
|
||
elif api_key == Config.API_KEY:
|
||
logger.info(f"普通用户API认证成功: {api_key[:8]}...")
|
||
return PermissionLevel.READ_ONLY
|
||
else:
|
||
logger.warning(f"无效的API Key尝试: {api_key[:8]}...")
|
||
raise HTTPException(
|
||
status_code=status.HTTP_401_UNAUTHORIZED,
|
||
detail="无效的API Key",
|
||
headers={"WWW-Authenticate": "Bearer"},
|
||
)
|
||
|
||
def require_admin(permission_level: str = Depends(verify_api_key)) -> str:
|
||
"""
|
||
要求管理员权限的依赖函数
|
||
|
||
Args:
|
||
permission_level: 从verify_api_key获得的权限级别
|
||
|
||
Returns:
|
||
str: 权限级别
|
||
|
||
Raises:
|
||
HTTPException: 权限不足时抛出403错误
|
||
"""
|
||
if permission_level != PermissionLevel.ADMIN:
|
||
logger.warning(f"非管理员用户尝试访问管理接口")
|
||
raise HTTPException(
|
||
status_code=status.HTTP_403_FORBIDDEN,
|
||
detail="需要管理员权限才能执行此操作"
|
||
)
|
||
return permission_level
|
||
|
||
def skip_auth_for_dev() -> Optional[str]:
|
||
"""
|
||
开发环境跳过认证(仅在开发模式下使用)
|
||
|
||
Returns:
|
||
Optional[str]: 返回管理员权限级别
|
||
|
||
Warning:
|
||
仅用于开发环境,生产环境务必使用真实认证
|
||
"""
|
||
import os
|
||
if os.getenv("SKIP_AUTH", "false").lower() == "true":
|
||
logger.warning("开发模式:跳过API Key认证")
|
||
return PermissionLevel.ADMIN
|
||
return None
|