from fastapi import APIRouter, Depends, HTTPException, Request from fastapi.responses import JSONResponse from sqlalchemy.orm import Session from app.database import get_db from app.models.user_settings import UserSettings from app.schemas.auth import LoginRequest, LoginResponse, ChangePasswordRequest, AuthStatusResponse from app.utils.auth import ( hash_password, verify_password, create_access_token, get_current_user, set_default_password ) from app.utils.datetime import utcnow from app.utils.rate_limiter import login_limiter router = APIRouter(prefix="/api/auth", tags=["认证"]) def _get_client_ip(request: Request) -> str: forwarded = request.headers.get("X-Forwarded-For") if forwarded: return forwarded.split(",")[0].strip() return request.client.host if request.client else "unknown" @router.post("/login", response_model=LoginResponse) def login(data: LoginRequest, request: Request, db: Session = Depends(get_db)): ip = _get_client_ip(request) allowed, retry_after = login_limiter.check(ip) if not allowed: return JSONResponse( status_code=429, content={ "detail": f"登录尝试过于频繁,请 {retry_after // 60 + 1} 分钟后再试", "retry_after": retry_after, }, headers={"Retry-After": str(retry_after)}, ) settings = db.query(UserSettings).filter(UserSettings.id == 1).first() if not settings: settings = UserSettings(id=1) db.add(settings) db.commit() db.refresh(settings) set_default_password(db, settings) if not verify_password(data.password, settings.password_hash): login_limiter.record_failure(ip) raise HTTPException(status_code=401, detail="密码错误") login_limiter.reset(ip) token = create_access_token({ "sub": str(settings.id), "tv": settings.token_version, }) response = JSONResponse(content={"message": "登录成功"}) response.set_cookie( key="access_token", value=token, httponly=True, samesite="strict", max_age=86400, path="/", ) return response @router.post("/logout") def logout(): response = JSONResponse(content={"message": "已退出登录"}) response.delete_cookie("access_token", path="/") return response @router.get("/me", response_model=AuthStatusResponse) def me(request: Request): user = get_current_user(request) return AuthStatusResponse(authenticated=True, user_id=user.get("sub", "")) @router.post("/change-password") def change_password( data: ChangePasswordRequest, request: Request, db: Session = Depends(get_db) ): user = get_current_user(request) settings = db.query(UserSettings).filter(UserSettings.id == 1).first() if not settings: raise HTTPException(status_code=500, detail="用户设置不存在") if not verify_password(data.old_password, settings.password_hash): raise HTTPException(status_code=400, detail="原密码错误") settings.password_hash = hash_password(data.new_password) settings.token_version = (settings.token_version or 0) + 1 settings.updated_at = utcnow() db.commit() return {"message": "密码修改成功"}