Replace default auto-generated password with a first-run setup page that lets users choose their own nickname and password. The /auth/setup endpoint now accepts an optional nickname field (also sets site_name). Remove set_default_password() since setup is now mandatory before login.
154 lines
4.9 KiB
Python
154 lines
4.9 KiB
Python
from fastapi import APIRouter, Depends, HTTPException, Request
|
|
from fastapi.responses import JSONResponse
|
|
from sqlalchemy.orm import Session
|
|
|
|
from app.database import get_db
|
|
from app.models.user_settings import UserSettings
|
|
from app.schemas.auth import LoginRequest, LoginResponse, ChangePasswordRequest, AuthStatusResponse, SetupPasswordRequest, AuthSetupStatusResponse
|
|
from app.utils.auth import (
|
|
hash_password, verify_password, create_access_token,
|
|
get_current_user,
|
|
)
|
|
from app.utils.datetime import utcnow
|
|
from app.utils.rate_limiter import login_limiter
|
|
|
|
router = APIRouter(prefix="/api/auth", tags=["认证"])
|
|
|
|
|
|
def _get_client_ip(request: Request) -> str:
|
|
forwarded = request.headers.get("X-Forwarded-For")
|
|
if forwarded:
|
|
return forwarded.split(",")[0].strip()
|
|
return request.client.host if request.client else "unknown"
|
|
|
|
|
|
def _get_or_create_settings(db: Session) -> UserSettings:
|
|
settings = db.query(UserSettings).filter(UserSettings.id == 1).first()
|
|
if not settings:
|
|
settings = UserSettings(id=1)
|
|
db.add(settings)
|
|
db.commit()
|
|
db.refresh(settings)
|
|
return settings
|
|
|
|
|
|
@router.get("/status", response_model=AuthSetupStatusResponse)
|
|
def auth_status(db: Session = Depends(get_db)):
|
|
"""检查系统密码是否已设置"""
|
|
settings = db.query(UserSettings).filter(UserSettings.id == 1).first()
|
|
has_password = bool(settings and settings.password_hash)
|
|
return AuthSetupStatusResponse(has_password=has_password)
|
|
|
|
|
|
@router.post("/setup")
|
|
def setup_password(data: SetupPasswordRequest, db: Session = Depends(get_db)):
|
|
"""首次设置密码(仅在无密码时可用),设置成功后自动登录"""
|
|
settings = db.query(UserSettings).filter(UserSettings.id == 1).first()
|
|
if settings and settings.password_hash:
|
|
raise HTTPException(status_code=400, detail="密码已设置,请使用登录接口")
|
|
if not settings:
|
|
settings = UserSettings(id=1)
|
|
db.add(settings)
|
|
settings.password_hash = hash_password(data.password)
|
|
settings.token_version = 1
|
|
if data.nickname and data.nickname.strip():
|
|
nick = data.nickname.strip()
|
|
settings.nickname = nick
|
|
settings.site_name = f"{nick}待办"
|
|
settings.updated_at = utcnow()
|
|
db.commit()
|
|
|
|
token = create_access_token({
|
|
"sub": str(settings.id),
|
|
"tv": settings.token_version,
|
|
})
|
|
response = JSONResponse(content={"message": "密码设置成功"})
|
|
response.set_cookie(
|
|
key="access_token",
|
|
value=token,
|
|
httponly=True,
|
|
samesite="strict",
|
|
max_age=86400,
|
|
path="/",
|
|
)
|
|
return response
|
|
|
|
|
|
@router.post("/login", response_model=LoginResponse)
|
|
def login(data: LoginRequest, request: Request, db: Session = Depends(get_db)):
|
|
ip = _get_client_ip(request)
|
|
|
|
allowed, retry_after = login_limiter.check(ip)
|
|
if not allowed:
|
|
return JSONResponse(
|
|
status_code=429,
|
|
content={
|
|
"detail": f"登录尝试过于频繁,请 {retry_after // 60 + 1} 分钟后再试",
|
|
"retry_after": retry_after,
|
|
},
|
|
headers={"Retry-After": str(retry_after)},
|
|
)
|
|
|
|
settings = _get_or_create_settings(db)
|
|
|
|
if not settings.password_hash:
|
|
raise HTTPException(status_code=400, detail="请先设置密码")
|
|
|
|
if not verify_password(data.password, settings.password_hash):
|
|
login_limiter.record_failure(ip)
|
|
raise HTTPException(status_code=401, detail="密码错误")
|
|
|
|
login_limiter.reset(ip)
|
|
|
|
token = create_access_token({
|
|
"sub": str(settings.id),
|
|
"tv": settings.token_version,
|
|
})
|
|
|
|
response = JSONResponse(content={"message": "登录成功"})
|
|
response.set_cookie(
|
|
key="access_token",
|
|
value=token,
|
|
httponly=True,
|
|
samesite="strict",
|
|
max_age=86400,
|
|
path="/",
|
|
)
|
|
return response
|
|
|
|
|
|
@router.post("/logout")
|
|
def logout():
|
|
response = JSONResponse(content={"message": "已退出登录"})
|
|
response.delete_cookie("access_token", path="/")
|
|
return response
|
|
|
|
|
|
@router.get("/me", response_model=AuthStatusResponse)
|
|
def me(request: Request):
|
|
user = get_current_user(request)
|
|
return AuthStatusResponse(authenticated=True, user_id=user.get("sub", ""))
|
|
|
|
|
|
@router.post("/change-password")
|
|
def change_password(
|
|
data: ChangePasswordRequest,
|
|
request: Request,
|
|
db: Session = Depends(get_db)
|
|
):
|
|
user = get_current_user(request)
|
|
|
|
settings = db.query(UserSettings).filter(UserSettings.id == 1).first()
|
|
if not settings:
|
|
raise HTTPException(status_code=500, detail="用户设置不存在")
|
|
|
|
if not verify_password(data.old_password, settings.password_hash):
|
|
raise HTTPException(status_code=400, detail="原密码错误")
|
|
|
|
settings.password_hash = hash_password(data.new_password)
|
|
settings.token_version = (settings.token_version or 0) + 1
|
|
settings.updated_at = utcnow()
|
|
db.commit()
|
|
|
|
return {"message": "密码修改成功"}
|