import logging from fastapi import APIRouter, Depends, HTTPException, Response from sqlalchemy import select from sqlalchemy.orm import Session from database import get_db from models import User from schemas import UserCreate, UserOut, ResetPasswordRequest, AuthResponse from auth import hash_password, create_access_token, get_current_admin, get_token_payload, set_auth_cookie, token_to_user_payload router = APIRouter(prefix="/api/admin", tags=["admin"]) logger = logging.getLogger("yolkbook") @router.get("/users", response_model=list[UserOut]) def list_users( _: User = Depends(get_current_admin), db: Session = Depends(get_db), ): return db.scalars(select(User).order_by(User.created_at).limit(500)).all() @router.post("/users", response_model=UserOut, status_code=201) def create_user( body: UserCreate, _: User = Depends(get_current_admin), db: Session = Depends(get_db), ): existing = db.scalars(select(User).where(User.username == body.username)).first() if existing: raise HTTPException(status_code=409, detail="Username already taken") user = User( username=body.username, hashed_password=hash_password(body.password), is_admin=False, ) db.add(user) db.commit() db.refresh(user) return user @router.post("/users/{user_id}/reset-password") def reset_password( user_id: int, body: ResetPasswordRequest, current_admin: User = Depends(get_current_admin), db: Session = Depends(get_db), ): user = db.get(User, user_id) if not user: raise HTTPException(status_code=404, detail="User not found") user.hashed_password = hash_password(body.new_password) db.commit() logger.warning("Admin '%s' reset password for user '%s' (id=%d).", current_admin.username, user.username, user.id) return {"detail": f"Password reset for {user.username}"} @router.post("/users/{user_id}/disable") def disable_user( user_id: int, current_admin: User = Depends(get_current_admin), db: Session = Depends(get_db), ): user = db.get(User, user_id) if not user: raise HTTPException(status_code=404, detail="User not found") if user.id == current_admin.id: raise HTTPException(status_code=400, detail="Cannot disable your own account") user.is_disabled = True db.commit() logger.warning("Admin '%s' disabled user '%s' (id=%d).", current_admin.username, user.username, user.id) return {"detail": f"User {user.username} disabled"} @router.post("/users/{user_id}/enable") def enable_user( user_id: int, _: User = Depends(get_current_admin), db: Session = Depends(get_db), ): user = db.get(User, user_id) if not user: raise HTTPException(status_code=404, detail="User not found") user.is_disabled = False db.commit() return {"detail": f"User {user.username} enabled"} @router.delete("/users/{user_id}", status_code=204) def delete_user( user_id: int, current_admin: User = Depends(get_current_admin), db: Session = Depends(get_db), ): user = db.get(User, user_id) if not user: raise HTTPException(status_code=404, detail="User not found") if user.id == current_admin.id: raise HTTPException(status_code=400, detail="Cannot delete your own account") logger.warning("Admin '%s' deleted user '%s' (id=%d).", current_admin.username, user.username, user.id) db.delete(user) db.commit() @router.post("/users/{user_id}/impersonate", response_model=AuthResponse) def impersonate_user( user_id: int, response: Response, current_admin: User = Depends(get_current_admin), db: Session = Depends(get_db), ): user = db.get(User, user_id) if not user: raise HTTPException(status_code=404, detail="User not found") token = create_access_token(user.id, user.username, user.is_admin, user.timezone, admin_id=current_admin.id) set_auth_cookie(response, token) logger.warning("Admin '%s' (id=%d) is impersonating user '%s' (id=%d).", current_admin.username, current_admin.id, user.username, user.id) return AuthResponse(user=token_to_user_payload(token)) @router.post("/unimpersonate", response_model=AuthResponse) def unimpersonate( response: Response, payload: dict = Depends(get_token_payload), db: Session = Depends(get_db), ): admin_id = payload.get("admin_id") if not admin_id: raise HTTPException(status_code=400, detail="Not in an impersonation session") admin = db.get(User, int(admin_id)) if not admin or admin.is_disabled or not admin.is_admin: raise HTTPException(status_code=403, detail="Original admin account is no longer valid") token = create_access_token(admin.id, admin.username, admin.is_admin, admin.timezone) set_auth_cookie(response, token) logger.warning("Admin '%s' (id=%d) ended impersonation session.", admin.username, admin.id) return AuthResponse(user=token_to_user_payload(token))