Files
yolkbook/backend/routers/auth_router.py
derekc 7cd2dfb710 Add ntfy push notifications for security-relevant events
Sends alerts on admin login, new registrations, user disable/delete, and
impersonation. NTFY_URL and NTFY_TOKEN are optional — leave blank to disable.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-03-23 23:01:13 -07:00

116 lines
4.2 KiB
Python

from zoneinfo import ZoneInfo, ZoneInfoNotFoundError
from fastapi import APIRouter, BackgroundTasks, Depends, HTTPException, Request, Response, status
from sqlalchemy import select
from sqlalchemy.orm import Session
from database import get_db
from models import User
from ntfy import notify
from schemas import LoginRequest, UserOut, UserCreate, ChangePasswordRequest, TimezoneUpdate, AuthResponse
from auth import (
verify_password, hash_password, create_access_token, get_current_user,
set_auth_cookie, clear_auth_cookie, token_to_user_payload,
)
router = APIRouter(prefix="/api/auth", tags=["auth"])
def _issue(response: Response, user: User, admin_id=None) -> AuthResponse:
token = create_access_token(user.id, user.username, user.is_admin, user.timezone, admin_id=admin_id)
set_auth_cookie(response, token)
return AuthResponse(user=token_to_user_payload(token))
@router.post("/login", response_model=AuthResponse)
def login(body: LoginRequest, response: Response, request: Request, background_tasks: BackgroundTasks, db: Session = Depends(get_db)):
user = db.scalars(select(User).where(User.username == body.username)).first()
if not user or not verify_password(body.password, user.hashed_password):
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid username or password",
)
if user.is_disabled:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Account is disabled. Contact your administrator.",
)
if user.is_admin:
ip = request.headers.get("X-Forwarded-For", request.client.host if request.client else "unknown")
ua = request.headers.get("User-Agent", "unknown")
background_tasks.add_task(
notify,
title="Yolkbook Admin Login",
message=f"User: {user.username}\nIP: {ip}\nUA: {ua}",
priority="high",
tags=["key"],
)
return _issue(response, user)
@router.post("/register", response_model=AuthResponse, status_code=201)
def register(body: UserCreate, response: Response, request: Request, background_tasks: BackgroundTasks, db: Session = Depends(get_db)):
existing = db.scalars(select(User).where(User.username == body.username)).first()
if existing:
raise HTTPException(status_code=409, detail="Username already taken")
user = User(
username=body.username,
hashed_password=hash_password(body.password),
is_admin=False,
timezone="UTC",
)
db.add(user)
db.commit()
db.refresh(user)
ip = request.headers.get("X-Forwarded-For", request.client.host if request.client else "unknown")
ua = request.headers.get("User-Agent", "unknown")
background_tasks.add_task(
notify,
title="Yolkbook New User Registered",
message=f"Username: {user.username}\nIP: {ip}\nUA: {ua}",
priority="default",
tags=["bust_in_silhouette"],
)
return _issue(response, user)
@router.post("/logout")
def logout(response: Response):
clear_auth_cookie(response)
return {"detail": "Logged out"}
@router.get("/me", response_model=UserOut)
def me(current_user: User = Depends(get_current_user)):
return current_user
@router.post("/change-password")
def change_password(
body: ChangePasswordRequest,
current_user: User = Depends(get_current_user),
db: Session = Depends(get_db),
):
if not verify_password(body.current_password, current_user.hashed_password):
raise HTTPException(status_code=400, detail="Current password is incorrect")
current_user.hashed_password = hash_password(body.new_password)
db.commit()
return {"detail": "Password updated"}
@router.put("/timezone", response_model=AuthResponse)
def update_timezone(
body: TimezoneUpdate,
response: Response,
current_user: User = Depends(get_current_user),
db: Session = Depends(get_db),
):
try:
ZoneInfo(body.timezone)
except ZoneInfoNotFoundError:
raise HTTPException(status_code=400, detail=f"Unknown timezone: {body.timezone}")
current_user.timezone = body.timezone
db.commit()
db.refresh(current_user)
return _issue(response, current_user)