import math from datetime import datetime, timezone, timedelta from zoneinfo import ZoneInfo, ZoneInfoNotFoundError from fastapi import APIRouter, BackgroundTasks, Depends, HTTPException, Request, Response, status from sqlalchemy import select from sqlalchemy.orm import Session from database import get_db from models import User from ntfy import notify from schemas import LoginRequest, UserOut, UserCreate, ChangePasswordRequest, TimezoneUpdate, AuthResponse from auth import ( verify_password, hash_password, create_access_token, get_current_user, set_auth_cookie, clear_auth_cookie, token_to_user_payload, ) router = APIRouter(prefix="/api/auth", tags=["auth"]) _LOGIN_MAX_ATTEMPTS = 5 _LOGIN_LOCKOUT_MINUTES = 15 def _issue(response: Response, user: User, admin_id=None) -> AuthResponse: token = create_access_token(user.id, user.username, user.is_admin, user.timezone, admin_id=admin_id) set_auth_cookie(response, token) return AuthResponse(user=token_to_user_payload(token)) @router.post("/login", response_model=AuthResponse) def login(body: LoginRequest, response: Response, request: Request, background_tasks: BackgroundTasks, db: Session = Depends(get_db)): ip = request.headers.get("X-Forwarded-For", request.client.host if request.client else "unknown") ua = request.headers.get("User-Agent", "unknown") user = db.scalars(select(User).where(User.username == body.username)).first() # Check lockout before verifying password (don't reveal whether account exists) if user and user.locked_until: now = datetime.now(timezone.utc).replace(tzinfo=None) if user.locked_until > now: remaining = math.ceil((user.locked_until - now).total_seconds() / 60) background_tasks.add_task( notify, title="Yolkbook Login Attempt on Locked Account", message=f"User: {user.username}\nLocked for {remaining} more minute(s)\nIP: {ip}\nUA: {ua}", priority="high", tags=["lock"], ) raise HTTPException( status_code=status.HTTP_403_FORBIDDEN, detail=f"Account is temporarily locked. Try again in {remaining} minute(s).", ) else: # Lockout expired — reset counters user.failed_login_attempts = 0 user.locked_until = None db.commit() if not user or not verify_password(body.password, user.hashed_password): if user: user.failed_login_attempts += 1 if user.failed_login_attempts >= _LOGIN_MAX_ATTEMPTS: user.locked_until = datetime.now(timezone.utc).replace(tzinfo=None) + timedelta(minutes=_LOGIN_LOCKOUT_MINUTES) db.commit() background_tasks.add_task( notify, title="Yolkbook Account Locked", message=f"User: {user.username}\nLocked after {user.failed_login_attempts} failed attempts\nIP: {ip}\nUA: {ua}", priority="urgent", tags=["warning"], ) raise HTTPException( status_code=status.HTTP_403_FORBIDDEN, detail=f"Account locked after too many failed attempts. Try again in {_LOGIN_LOCKOUT_MINUTES} minutes.", ) db.commit() raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid username or password", ) if user.is_disabled: raise HTTPException( status_code=status.HTTP_403_FORBIDDEN, detail="Account is disabled. Contact your administrator.", ) # Successful login — reset failure counters user.failed_login_attempts = 0 user.locked_until = None db.commit() if user.is_admin: background_tasks.add_task( notify, title="Yolkbook Admin Login", message=f"User: {user.username}\nIP: {ip}\nUA: {ua}", priority="high", tags=["key"], ) return _issue(response, user) @router.post("/register", response_model=AuthResponse, status_code=201) def register(body: UserCreate, response: Response, request: Request, background_tasks: BackgroundTasks, db: Session = Depends(get_db)): existing = db.scalars(select(User).where(User.username == body.username)).first() if existing: raise HTTPException(status_code=409, detail="Username already taken") user = User( username=body.username, hashed_password=hash_password(body.password), is_admin=False, timezone="UTC", ) db.add(user) db.commit() db.refresh(user) ip = request.headers.get("X-Forwarded-For", request.client.host if request.client else "unknown") ua = request.headers.get("User-Agent", "unknown") background_tasks.add_task( notify, title="Yolkbook New User Registered", message=f"Username: {user.username}\nIP: {ip}\nUA: {ua}", priority="default", tags=["bust_in_silhouette"], ) return _issue(response, user) @router.post("/logout") def logout(response: Response): clear_auth_cookie(response) return {"detail": "Logged out"} @router.get("/me", response_model=UserOut) def me(current_user: User = Depends(get_current_user)): return current_user @router.post("/change-password") def change_password( body: ChangePasswordRequest, current_user: User = Depends(get_current_user), db: Session = Depends(get_db), ): if not verify_password(body.current_password, current_user.hashed_password): raise HTTPException(status_code=400, detail="Current password is incorrect") current_user.hashed_password = hash_password(body.new_password) db.commit() return {"detail": "Password updated"} @router.put("/timezone", response_model=AuthResponse) def update_timezone( body: TimezoneUpdate, response: Response, current_user: User = Depends(get_current_user), db: Session = Depends(get_db), ): try: ZoneInfo(body.timezone) except ZoneInfoNotFoundError: raise HTTPException(status_code=400, detail=f"Unknown timezone: {body.timezone}") current_user.timezone = body.timezone db.commit() db.refresh(current_user) return _issue(response, current_user)