Files
homeschool/backend/app/routers/auth.py
derekc 3022bc328b Security hardening: go-live review fixes
- TV tokens upgraded from 4 to 6 digits; Regen Token button in Admin
- Nginx rate limiting on TV dashboard and WebSocket endpoints
- Login lockout after 5 failed attempts (15 min); clears on admin password reset
- HSTS header added; CSP unsafe-inline removed from script-src; CORS restricted to explicit methods/headers
- Dependency CVE fixes: PyJWT 2.12.0, aiomysql 0.3.0, cryptography 46.0.5, python-multipart 0.0.22
- datetime.utcnow() replaced with datetime.now(timezone.utc) throughout
- SQL identifier whitelist for startup migration queries
- README updated: security notes section, lockout docs, token regen, NPM proxy guidance

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-03-22 00:00:14 -07:00

156 lines
5.6 KiB
Python

import logging
from datetime import datetime, timezone, timedelta
from fastapi import APIRouter, Depends, HTTPException, Response, Request, status
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy import select
logger = logging.getLogger("homeschool.auth")
from app.auth.jwt import (
create_access_token,
create_refresh_token,
decode_token,
hash_password,
verify_password,
)
from app.dependencies import get_db, get_current_user
from app.models.user import User
from app.models.subject import Subject
from app.schemas.auth import LoginRequest, RegisterRequest, TokenResponse
from app.schemas.user import UserOut
router = APIRouter(prefix="/api/auth", tags=["auth"])
REFRESH_COOKIE = "refresh_token"
COOKIE_OPTS = {
"httponly": True,
"samesite": "lax",
"secure": True,
}
@router.post("/register", response_model=TokenResponse, status_code=status.HTTP_201_CREATED)
async def register(body: RegisterRequest, response: Response, db: AsyncSession = Depends(get_db)):
existing = await db.execute(select(User).where(User.email == body.email))
if existing.scalar_one_or_none():
raise HTTPException(status_code=400, detail="Email already registered")
user = User(
email=body.email,
hashed_password=hash_password(body.password),
full_name=body.full_name,
)
db.add(user)
await db.commit()
await db.refresh(user)
meeting = Subject(
user_id=user.id,
name="Meeting",
icon="📅",
color="#6366f1",
is_system=True,
)
db.add(meeting)
await db.commit()
access = create_access_token({"sub": str(user.id)})
refresh = create_refresh_token({"sub": str(user.id)})
response.set_cookie(REFRESH_COOKIE, refresh, **COOKIE_OPTS)
return TokenResponse(access_token=access)
_LOGIN_MAX_ATTEMPTS = 5
_LOGIN_LOCKOUT_MINUTES = 15
@router.post("/login", response_model=TokenResponse)
async def login(body: LoginRequest, response: Response, db: AsyncSession = Depends(get_db)):
result = await db.execute(select(User).where(User.email == body.email))
user = result.scalar_one_or_none()
if not user:
raise HTTPException(status_code=401, detail="Invalid credentials")
now = datetime.now(timezone.utc).replace(tzinfo=None)
if user.locked_until and user.locked_until > now:
remaining = int((user.locked_until - now).total_seconds() / 60) + 1
logger.warning("Locked account login attempt for email=%s", body.email)
raise HTTPException(status_code=429, detail=f"Account locked. Try again in {remaining} minute(s).")
if not user.is_active:
logger.warning("Login attempt on disabled account email=%s", body.email)
raise HTTPException(status_code=403, detail="This account has been disabled. Please contact your administrator.")
if not verify_password(body.password, user.hashed_password):
user.failed_login_attempts += 1
if user.failed_login_attempts >= _LOGIN_MAX_ATTEMPTS:
user.locked_until = now + timedelta(minutes=_LOGIN_LOCKOUT_MINUTES)
logger.warning("Account locked for email=%s after %d failed attempts", body.email, user.failed_login_attempts)
else:
logger.warning("Failed login attempt %d/%d for email=%s", user.failed_login_attempts, _LOGIN_MAX_ATTEMPTS, body.email)
await db.commit()
raise HTTPException(status_code=401, detail="Invalid credentials")
user.failed_login_attempts = 0
user.locked_until = None
user.last_active_at = datetime.now(timezone.utc)
await db.commit()
access = create_access_token({"sub": str(user.id)})
refresh = create_refresh_token({"sub": str(user.id)})
response.set_cookie(REFRESH_COOKIE, refresh, **COOKIE_OPTS)
return TokenResponse(access_token=access)
@router.post("/refresh", response_model=TokenResponse)
async def refresh_token(request: Request, response: Response, db: AsyncSession = Depends(get_db)):
token = request.cookies.get(REFRESH_COOKIE)
if not token:
raise HTTPException(status_code=401, detail="No refresh token")
try:
payload = decode_token(token)
except ValueError:
raise HTTPException(status_code=401, detail="Invalid refresh token")
if payload.get("type") != "refresh":
raise HTTPException(status_code=401, detail="Wrong token type")
user_id = payload.get("sub")
result = await db.execute(select(User).where(User.id == int(user_id), User.is_active == True))
user = result.scalar_one_or_none()
if not user:
raise HTTPException(status_code=401, detail="User not found")
user.last_active_at = datetime.now(timezone.utc)
await db.commit()
access = create_access_token({"sub": str(user.id)})
new_refresh = create_refresh_token({"sub": str(user.id)})
response.set_cookie(REFRESH_COOKIE, new_refresh, **COOKIE_OPTS)
return TokenResponse(access_token=access)
@router.post("/logout")
async def logout(response: Response):
response.delete_cookie(REFRESH_COOKIE)
return {"detail": "Logged out"}
@router.post("/change-password")
async def change_password(
body: dict,
current_user: User = Depends(get_current_user),
db: AsyncSession = Depends(get_db),
):
current = body.get("current_password", "")
new = body.get("new_password", "").strip()
if not current or not new:
raise HTTPException(status_code=400, detail="current_password and new_password are required")
if not verify_password(current, current_user.hashed_password):
raise HTTPException(status_code=400, detail="Current password is incorrect")
current_user.hashed_password = hash_password(new)
await db.commit()
return {"ok": True}