from datetime import datetime, timedelta, timezone from typing import Optional import bcrypt from jose import JWTError, jwt from fastapi import Request, HTTPException from app.config import JWT_SECRET, ACCESS_TOKEN_EXPIRE_MINUTES ALGORITHM = "HS256" _token_version_cache: dict[str, int] = {} def get_cached_token_version(user_id: str) -> int | None: return _token_version_cache.get(user_id) def set_cached_token_version(user_id: str, version: int): _token_version_cache[user_id] = version def hash_password(password: str) -> str: return bcrypt.hashpw(password.encode("utf-8"), bcrypt.gensalt()).decode("utf-8") def verify_password(plain_password: str, hashed_password: str) -> bool: return bcrypt.checkpw(plain_password.encode("utf-8"), hashed_password.encode("utf-8")) def create_access_token(data: dict, expires_delta: Optional[timedelta] = None) -> str: to_encode = data.copy() expire = datetime.now(timezone.utc) + (expires_delta or timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)) to_encode.update({"exp": expire}) return jwt.encode(to_encode, JWT_SECRET, algorithm=ALGORITHM) def decode_access_token(token: str) -> dict: return jwt.decode(token, JWT_SECRET, algorithms=[ALGORITHM]) def get_current_user(request: Request) -> dict: if hasattr(request.state, "user") and request.state.user: return request.state.user token = request.cookies.get("access_token", "") if not token: raise HTTPException(status_code=401, detail="未登录") try: payload = decode_access_token(token) return payload except JWTError: raise HTTPException(status_code=401, detail="登录已过期,请重新登录")