Files
ToDoList/api/app/routers/auth.py
祀梦 5048de4fa1 feat: add data backup/import, goal step ordering, and PostgreSQL migration
- Add GET /api/backup/export and POST /api/backup/import endpoints for full data backup
- Add drag-and-drop reorder for goal steps with PUT /api/goals/{id}/steps/reorder
- Auto-assign sort_order on step creation (preserves creation order)
- Fix duplicate milestone rendering in goal detail page
- Add category management button in goal dialog
- Migrate database default from SQLite to PostgreSQL
- Fix router guard redirect loop for logged-in users on setup/login pages
- Fix ALTER TABLE ADD COLUMN crash on callable defaults (uuid.uuid4)
- Add auth status rate limiter and token version caching
- Update CLAUDE.md to reflect current architecture

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-18 00:02:18 +08:00

179 lines
5.8 KiB
Python

from fastapi import APIRouter, Depends, HTTPException, Request
from fastapi.responses import JSONResponse
from sqlalchemy.orm import Session
import time
from app.database import get_db
from app.models.user_settings import UserSettings
from app.schemas.auth import LoginRequest, LoginResponse, ChangePasswordRequest, AuthStatusResponse, SetupPasswordRequest, AuthSetupStatusResponse
from app.utils.auth import (
hash_password, verify_password, create_access_token,
get_current_user, set_cached_token_version,
)
from app.utils.datetime import utcnow
from app.utils.rate_limiter import login_limiter
from app.config import ACCESS_TOKEN_EXPIRE_SECONDS
router = APIRouter(prefix="/api/auth", tags=["认证"])
def _get_client_ip(request: Request) -> str:
forwarded = request.headers.get("X-Forwarded-For")
if forwarded:
return forwarded.split(",")[0].strip()
return request.client.host if request.client else "unknown"
def _get_or_create_settings(db: Session) -> UserSettings:
settings = db.query(UserSettings).filter(UserSettings.id == 1).first()
if not settings:
settings = UserSettings(id=1)
db.add(settings)
db.commit()
db.refresh(settings)
return settings
class _StatusLimiter:
MAX_REQUESTS = 30
WINDOW_SECONDS = 60
def __init__(self):
self._requests: dict[str, list[float]] = {}
def check(self, ip: str) -> bool:
now = time.time()
times = [t for t in self._requests.get(ip, []) if now - t < self.WINDOW_SECONDS]
self._requests[ip] = times
if len(times) >= self.MAX_REQUESTS:
return False
times.append(now)
return True
_status_limiter = _StatusLimiter()
@router.get("/status", response_model=AuthSetupStatusResponse)
def auth_status(request: Request, db: Session = Depends(get_db)):
"""检查系统密码是否已设置"""
ip = _get_client_ip(request)
if not _status_limiter.check(ip):
raise HTTPException(status_code=429, detail="请求过于频繁")
settings = db.query(UserSettings).filter(UserSettings.id == 1).first()
has_password = bool(settings and settings.password_hash)
return AuthSetupStatusResponse(has_password=has_password)
@router.post("/setup")
def setup_password(data: SetupPasswordRequest, db: Session = Depends(get_db)):
"""首次设置密码(仅在无密码时可用),设置成功后自动登录"""
settings = db.query(UserSettings).filter(UserSettings.id == 1).first()
if settings and settings.password_hash:
raise HTTPException(status_code=400, detail="密码已设置,请使用登录接口")
if not settings:
settings = UserSettings(id=1)
db.add(settings)
settings.password_hash = hash_password(data.password)
settings.token_version = 1
if data.nickname and data.nickname.strip():
nick = data.nickname.strip()
settings.nickname = nick
settings.site_name = f"{nick}待办"
settings.updated_at = utcnow()
db.commit()
token = create_access_token({
"sub": str(settings.id),
"tv": settings.token_version,
})
response = JSONResponse(content={"message": "密码设置成功"})
response.set_cookie(
key="access_token",
value=token,
httponly=True,
samesite="strict",
max_age=ACCESS_TOKEN_EXPIRE_SECONDS,
path="/",
)
return response
@router.post("/login", response_model=LoginResponse)
def login(data: LoginRequest, request: Request, db: Session = Depends(get_db)):
ip = _get_client_ip(request)
allowed, retry_after = login_limiter.check(ip)
if not allowed:
return JSONResponse(
status_code=429,
content={
"detail": f"登录尝试过于频繁,请 {retry_after // 60 + 1} 分钟后再试",
"retry_after": retry_after,
},
headers={"Retry-After": str(retry_after)},
)
settings = _get_or_create_settings(db)
if not settings.password_hash:
raise HTTPException(status_code=400, detail="请先设置密码")
if not verify_password(data.password, settings.password_hash):
login_limiter.record_failure(ip)
raise HTTPException(status_code=401, detail="密码错误")
login_limiter.reset(ip)
token = create_access_token({
"sub": str(settings.id),
"tv": settings.token_version,
})
response = JSONResponse(content={"message": "登录成功"})
response.set_cookie(
key="access_token",
value=token,
httponly=True,
samesite="strict",
max_age=ACCESS_TOKEN_EXPIRE_SECONDS,
path="/",
)
return response
@router.post("/logout")
def logout():
response = JSONResponse(content={"message": "已退出登录"})
response.delete_cookie("access_token", path="/")
return response
@router.get("/me", response_model=AuthStatusResponse)
def me(request: Request):
user = get_current_user(request)
return AuthStatusResponse(authenticated=True, user_id=user.get("sub", ""))
@router.post("/change-password")
def change_password(
data: ChangePasswordRequest,
request: Request,
db: Session = Depends(get_db)
):
user = get_current_user(request)
settings = db.query(UserSettings).filter(UserSettings.id == 1).first()
if not settings:
raise HTTPException(status_code=500, detail="用户设置不存在")
if not verify_password(data.old_password, settings.password_hash):
raise HTTPException(status_code=400, detail="原密码错误")
settings.password_hash = hash_password(data.new_password)
settings.token_version = (settings.token_version or 0) + 1
set_cached_token_version(str(settings.id), settings.token_version)
settings.updated_at = utcnow()
db.commit()
return {"message": "密码修改成功"}