first commit
This commit is contained in:
89
core/auth.py
Normal file
89
core/auth.py
Normal file
@@ -0,0 +1,89 @@
|
||||
from fastapi import HTTPException, Depends, Header, status
|
||||
from typing import Optional
|
||||
from config import Config
|
||||
from core.log import logger
|
||||
|
||||
class PermissionLevel:
|
||||
READ_ONLY = "read_only"
|
||||
ADMIN = "admin"
|
||||
|
||||
def verify_api_key(
|
||||
x_api_key: Optional[str] = Header(None, alias="X-API-Key"),
|
||||
authorization: Optional[str] = Header(None)
|
||||
) -> str:
|
||||
"""
|
||||
验证API Key并返回权限级别
|
||||
|
||||
Args:
|
||||
x_api_key: X-API-Key header中的API Key
|
||||
authorization: Authorization header中的Bearer token
|
||||
|
||||
Returns:
|
||||
str: 权限级别
|
||||
|
||||
Raises:
|
||||
HTTPException: 认证失败时抛出401错误
|
||||
"""
|
||||
api_key = x_api_key
|
||||
|
||||
if authorization and authorization.startswith("Bearer "):
|
||||
api_key = authorization.replace("Bearer ", "")
|
||||
|
||||
if not api_key:
|
||||
logger.warning("API请求缺少API Key")
|
||||
raise HTTPException(
|
||||
status_code=status.HTTP_401_UNAUTHORIZED,
|
||||
detail="缺少API Key,请在请求头中添加 X-API-Key 或 Authorization: Bearer <key>",
|
||||
headers={"WWW-Authenticate": "Bearer"},
|
||||
)
|
||||
|
||||
if api_key == Config.ADMIN_API_KEY:
|
||||
logger.info(f"管理员API认证成功: {api_key[:8]}...")
|
||||
return PermissionLevel.ADMIN
|
||||
elif api_key == Config.API_KEY:
|
||||
logger.info(f"普通用户API认证成功: {api_key[:8]}...")
|
||||
return PermissionLevel.READ_ONLY
|
||||
else:
|
||||
logger.warning(f"无效的API Key尝试: {api_key[:8]}...")
|
||||
raise HTTPException(
|
||||
status_code=status.HTTP_401_UNAUTHORIZED,
|
||||
detail="无效的API Key",
|
||||
headers={"WWW-Authenticate": "Bearer"},
|
||||
)
|
||||
|
||||
def require_admin(permission_level: str = Depends(verify_api_key)) -> str:
|
||||
"""
|
||||
要求管理员权限的依赖函数
|
||||
|
||||
Args:
|
||||
permission_level: 从verify_api_key获得的权限级别
|
||||
|
||||
Returns:
|
||||
str: 权限级别
|
||||
|
||||
Raises:
|
||||
HTTPException: 权限不足时抛出403错误
|
||||
"""
|
||||
if permission_level != PermissionLevel.ADMIN:
|
||||
logger.warning(f"非管理员用户尝试访问管理接口")
|
||||
raise HTTPException(
|
||||
status_code=status.HTTP_403_FORBIDDEN,
|
||||
detail="需要管理员权限才能执行此操作"
|
||||
)
|
||||
return permission_level
|
||||
|
||||
def skip_auth_for_dev() -> Optional[str]:
|
||||
"""
|
||||
开发环境跳过认证(仅在开发模式下使用)
|
||||
|
||||
Returns:
|
||||
Optional[str]: 返回管理员权限级别
|
||||
|
||||
Warning:
|
||||
仅用于开发环境,生产环境务必使用真实认证
|
||||
"""
|
||||
import os
|
||||
if os.getenv("SKIP_AUTH", "false").lower() == "true":
|
||||
logger.warning("开发模式:跳过API Key认证")
|
||||
return PermissionLevel.ADMIN
|
||||
return None
|
||||
Reference in New Issue
Block a user