- Replace hardcoded JWT secret with randomly generated key persisted to file - Replace hardcoded default password with random password shown in logs - Migrate token storage from localStorage to HttpOnly SameSite=strict cookie - Add IP-based login rate limiter (5 attempts / 15 min, 429 on lockout) - Add token_version for JWT revocation on password change - Add password strength validation (min 6 chars, 3+ unique characters) - Inject decoded user payload into request.state.user in auth middleware - Add /api/auth/me and /api/auth/logout endpoints - Narrow auth middleware exception handling (JWTError only, not all Exception) - Update updated_at timestamp on password change - Remove localStorage token management from frontend (axios, router, store) Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
60 lines
1.7 KiB
Python
60 lines
1.7 KiB
Python
"""
|
||
登录频率限制器(内存实现,单用户场景适用)
|
||
"""
|
||
import time
|
||
|
||
|
||
class LoginRateLimiter:
|
||
"""IP 级别的登录频率限制"""
|
||
|
||
MAX_ATTEMPTS = 5
|
||
WINDOW_SECONDS = 900 # 统计窗口:15 分钟
|
||
LOCKOUT_SECONDS = 900 # 锁定时间:15 分钟
|
||
|
||
def __init__(self):
|
||
self._attempts: dict[str, list[float]] = {}
|
||
self._lockout: dict[str, float] = {}
|
||
|
||
def _cleanup(self):
|
||
"""清除过期的记录"""
|
||
now = time.time()
|
||
self._attempts = {
|
||
ip: [t for t in times if now - t < self.WINDOW_SECONDS]
|
||
for ip, times in self._attempts.items()
|
||
}
|
||
self._attempts = {ip: times for ip, times in self._attempts.items() if times}
|
||
|
||
self._lockout = {
|
||
ip: until for ip, until in self._lockout.items()
|
||
if now < until
|
||
}
|
||
|
||
def check(self, ip: str) -> tuple[bool, int | None]:
|
||
"""返回 (是否允许, 剩余等待秒数)"""
|
||
self._cleanup()
|
||
|
||
now = time.time()
|
||
|
||
if ip in self._lockout:
|
||
remaining = int(self._lockout[ip] - now)
|
||
if remaining > 0:
|
||
return False, remaining
|
||
del self._lockout[ip]
|
||
|
||
return True, None
|
||
|
||
def record_failure(self, ip: str):
|
||
"""记录一次失败"""
|
||
now = time.time()
|
||
self._attempts.setdefault(ip, []).append(now)
|
||
if len(self._attempts[ip]) >= self.MAX_ATTEMPTS:
|
||
self._lockout[ip] = now + self.LOCKOUT_SECONDS
|
||
|
||
def reset(self, ip: str):
|
||
"""登录成功后重置计数"""
|
||
self._attempts.pop(ip, None)
|
||
self._lockout.pop(ip, None)
|
||
|
||
|
||
login_limiter = LoginRateLimiter()
|