Files
yolkbook/backend/routers/auth_router.py
derekc 2d3ad3a06c Add login lockout with ntfy alerts and update docs
- Lock accounts for 15 minutes after 5 consecutive failed login attempts
- Send urgent ntfy notification when an account is locked
- Send high-priority ntfy notification on login attempt against a locked account
- Auto-reset lockout on expiry; reset counter on successful login
- Add v2.4 migration for failed_login_attempts and locked_until columns
- Add ALLOWED_ORIGINS and SECURE_COOKIES to .env.example
- Update README: lockout row in security table, new ntfy events

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-03-23 23:11:30 -07:00

169 lines
6.2 KiB
Python

import math
from datetime import datetime, timezone, timedelta
from zoneinfo import ZoneInfo, ZoneInfoNotFoundError
from fastapi import APIRouter, BackgroundTasks, Depends, HTTPException, Request, Response, status
from sqlalchemy import select
from sqlalchemy.orm import Session
from database import get_db
from models import User
from ntfy import notify
from schemas import LoginRequest, UserOut, UserCreate, ChangePasswordRequest, TimezoneUpdate, AuthResponse
from auth import (
verify_password, hash_password, create_access_token, get_current_user,
set_auth_cookie, clear_auth_cookie, token_to_user_payload,
)
router = APIRouter(prefix="/api/auth", tags=["auth"])
_LOGIN_MAX_ATTEMPTS = 5
_LOGIN_LOCKOUT_MINUTES = 15
def _issue(response: Response, user: User, admin_id=None) -> AuthResponse:
token = create_access_token(user.id, user.username, user.is_admin, user.timezone, admin_id=admin_id)
set_auth_cookie(response, token)
return AuthResponse(user=token_to_user_payload(token))
@router.post("/login", response_model=AuthResponse)
def login(body: LoginRequest, response: Response, request: Request, background_tasks: BackgroundTasks, db: Session = Depends(get_db)):
ip = request.headers.get("X-Forwarded-For", request.client.host if request.client else "unknown")
ua = request.headers.get("User-Agent", "unknown")
user = db.scalars(select(User).where(User.username == body.username)).first()
# Check lockout before verifying password (don't reveal whether account exists)
if user and user.locked_until:
now = datetime.now(timezone.utc).replace(tzinfo=None)
if user.locked_until > now:
remaining = math.ceil((user.locked_until - now).total_seconds() / 60)
background_tasks.add_task(
notify,
title="Yolkbook Login Attempt on Locked Account",
message=f"User: {user.username}\nLocked for {remaining} more minute(s)\nIP: {ip}\nUA: {ua}",
priority="high",
tags=["lock"],
)
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail=f"Account is temporarily locked. Try again in {remaining} minute(s).",
)
else:
# Lockout expired — reset counters
user.failed_login_attempts = 0
user.locked_until = None
db.commit()
if not user or not verify_password(body.password, user.hashed_password):
if user:
user.failed_login_attempts += 1
if user.failed_login_attempts >= _LOGIN_MAX_ATTEMPTS:
user.locked_until = datetime.now(timezone.utc).replace(tzinfo=None) + timedelta(minutes=_LOGIN_LOCKOUT_MINUTES)
db.commit()
background_tasks.add_task(
notify,
title="Yolkbook Account Locked",
message=f"User: {user.username}\nLocked after {user.failed_login_attempts} failed attempts\nIP: {ip}\nUA: {ua}",
priority="urgent",
tags=["warning"],
)
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail=f"Account locked after too many failed attempts. Try again in {_LOGIN_LOCKOUT_MINUTES} minutes.",
)
db.commit()
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid username or password",
)
if user.is_disabled:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Account is disabled. Contact your administrator.",
)
# Successful login — reset failure counters
user.failed_login_attempts = 0
user.locked_until = None
db.commit()
if user.is_admin:
background_tasks.add_task(
notify,
title="Yolkbook Admin Login",
message=f"User: {user.username}\nIP: {ip}\nUA: {ua}",
priority="high",
tags=["key"],
)
return _issue(response, user)
@router.post("/register", response_model=AuthResponse, status_code=201)
def register(body: UserCreate, response: Response, request: Request, background_tasks: BackgroundTasks, db: Session = Depends(get_db)):
existing = db.scalars(select(User).where(User.username == body.username)).first()
if existing:
raise HTTPException(status_code=409, detail="Username already taken")
user = User(
username=body.username,
hashed_password=hash_password(body.password),
is_admin=False,
timezone="UTC",
)
db.add(user)
db.commit()
db.refresh(user)
ip = request.headers.get("X-Forwarded-For", request.client.host if request.client else "unknown")
ua = request.headers.get("User-Agent", "unknown")
background_tasks.add_task(
notify,
title="Yolkbook New User Registered",
message=f"Username: {user.username}\nIP: {ip}\nUA: {ua}",
priority="default",
tags=["bust_in_silhouette"],
)
return _issue(response, user)
@router.post("/logout")
def logout(response: Response):
clear_auth_cookie(response)
return {"detail": "Logged out"}
@router.get("/me", response_model=UserOut)
def me(current_user: User = Depends(get_current_user)):
return current_user
@router.post("/change-password")
def change_password(
body: ChangePasswordRequest,
current_user: User = Depends(get_current_user),
db: Session = Depends(get_db),
):
if not verify_password(body.current_password, current_user.hashed_password):
raise HTTPException(status_code=400, detail="Current password is incorrect")
current_user.hashed_password = hash_password(body.new_password)
db.commit()
return {"detail": "Password updated"}
@router.put("/timezone", response_model=AuthResponse)
def update_timezone(
body: TimezoneUpdate,
response: Response,
current_user: User = Depends(get_current_user),
db: Session = Depends(get_db),
):
try:
ZoneInfo(body.timezone)
except ZoneInfoNotFoundError:
raise HTTPException(status_code=400, detail=f"Unknown timezone: {body.timezone}")
current_user.timezone = body.timezone
db.commit()
db.refresh(current_user)
return _issue(response, current_user)