fix: harden authentication system (JWT, cookies, rate limiting, password policy)
- Replace hardcoded JWT secret with randomly generated key persisted to file - Replace hardcoded default password with random password shown in logs - Migrate token storage from localStorage to HttpOnly SameSite=strict cookie - Add IP-based login rate limiter (5 attempts / 15 min, 429 on lockout) - Add token_version for JWT revocation on password change - Add password strength validation (min 6 chars, 3+ unique characters) - Inject decoded user payload into request.state.user in auth middleware - Add /api/auth/me and /api/auth/logout endpoints - Narrow auth middleware exception handling (JWTError only, not all Exception) - Update updated_at timestamp on password change - Remove localStorage token management from frontend (axios, router, store) Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
@@ -1,13 +1,15 @@
|
||||
from datetime import datetime, timedelta, timezone
|
||||
from typing import Optional
|
||||
import secrets
|
||||
|
||||
from jose import JWTError, jwt
|
||||
from passlib.context import CryptContext
|
||||
from fastapi import Request, HTTPException
|
||||
from sqlalchemy.orm import Session
|
||||
|
||||
from app.config import JWT_SECRET, ACCESS_TOKEN_EXPIRE_MINUTES
|
||||
from app.database import get_db
|
||||
from app.models.user_settings import UserSettings
|
||||
from app.utils.logger import logger
|
||||
|
||||
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
|
||||
|
||||
@@ -34,8 +36,7 @@ def decode_access_token(token: str) -> dict:
|
||||
|
||||
|
||||
def get_current_user(request: Request) -> dict:
|
||||
auth_header = request.headers.get("Authorization", "")
|
||||
token = auth_header.replace("Bearer ", "")
|
||||
token = request.cookies.get("access_token", "")
|
||||
if not token:
|
||||
raise HTTPException(status_code=401, detail="未登录")
|
||||
try:
|
||||
@@ -47,5 +48,10 @@ def get_current_user(request: Request) -> dict:
|
||||
|
||||
def set_default_password(db: Session, settings: UserSettings):
|
||||
if not settings.password_hash:
|
||||
settings.password_hash = hash_password("elysia")
|
||||
password = secrets.token_urlsafe(8)[:8]
|
||||
settings.password_hash = hash_password(password)
|
||||
db.commit()
|
||||
logger.warning("=" * 50)
|
||||
logger.warning(f" 初始密码: {password}")
|
||||
logger.warning(" 请登录后立即修改!")
|
||||
logger.warning("=" * 50)
|
||||
|
||||
59
api/app/utils/rate_limiter.py
Normal file
59
api/app/utils/rate_limiter.py
Normal file
@@ -0,0 +1,59 @@
|
||||
"""
|
||||
登录频率限制器(内存实现,单用户场景适用)
|
||||
"""
|
||||
import time
|
||||
|
||||
|
||||
class LoginRateLimiter:
|
||||
"""IP 级别的登录频率限制"""
|
||||
|
||||
MAX_ATTEMPTS = 5
|
||||
WINDOW_SECONDS = 900 # 统计窗口:15 分钟
|
||||
LOCKOUT_SECONDS = 900 # 锁定时间:15 分钟
|
||||
|
||||
def __init__(self):
|
||||
self._attempts: dict[str, list[float]] = {}
|
||||
self._lockout: dict[str, float] = {}
|
||||
|
||||
def _cleanup(self):
|
||||
"""清除过期的记录"""
|
||||
now = time.time()
|
||||
self._attempts = {
|
||||
ip: [t for t in times if now - t < self.WINDOW_SECONDS]
|
||||
for ip, times in self._attempts.items()
|
||||
}
|
||||
self._attempts = {ip: times for ip, times in self._attempts.items() if times}
|
||||
|
||||
self._lockout = {
|
||||
ip: until for ip, until in self._lockout.items()
|
||||
if now < until
|
||||
}
|
||||
|
||||
def check(self, ip: str) -> tuple[bool, int | None]:
|
||||
"""返回 (是否允许, 剩余等待秒数)"""
|
||||
self._cleanup()
|
||||
|
||||
now = time.time()
|
||||
|
||||
if ip in self._lockout:
|
||||
remaining = int(self._lockout[ip] - now)
|
||||
if remaining > 0:
|
||||
return False, remaining
|
||||
del self._lockout[ip]
|
||||
|
||||
return True, None
|
||||
|
||||
def record_failure(self, ip: str):
|
||||
"""记录一次失败"""
|
||||
now = time.time()
|
||||
self._attempts.setdefault(ip, []).append(now)
|
||||
if len(self._attempts[ip]) >= self.MAX_ATTEMPTS:
|
||||
self._lockout[ip] = now + self.LOCKOUT_SECONDS
|
||||
|
||||
def reset(self, ip: str):
|
||||
"""登录成功后重置计数"""
|
||||
self._attempts.pop(ip, None)
|
||||
self._lockout.pop(ip, None)
|
||||
|
||||
|
||||
login_limiter = LoginRateLimiter()
|
||||
Reference in New Issue
Block a user