import logging from fastapi import APIRouter, Depends, HTTPException from sqlalchemy import select from sqlalchemy.orm import Session from database import get_db from models import User from schemas import UserCreate, UserOut, ResetPasswordRequest, TokenResponse from auth import hash_password, create_access_token, get_current_admin router = APIRouter(prefix="/api/admin", tags=["admin"]) logger = logging.getLogger("yolkbook") @router.get("/users", response_model=list[UserOut]) def list_users( _: User = Depends(get_current_admin), db: Session = Depends(get_db), ): return db.scalars(select(User).order_by(User.created_at)).all() @router.post("/users", response_model=UserOut, status_code=201) def create_user( body: UserCreate, _: User = Depends(get_current_admin), db: Session = Depends(get_db), ): existing = db.scalars(select(User).where(User.username == body.username)).first() if existing: raise HTTPException(status_code=409, detail="Username already taken") user = User( username=body.username, hashed_password=hash_password(body.password), is_admin=False, ) db.add(user) db.commit() db.refresh(user) return user @router.post("/users/{user_id}/reset-password") def reset_password( user_id: int, body: ResetPasswordRequest, current_admin: User = Depends(get_current_admin), db: Session = Depends(get_db), ): user = db.get(User, user_id) if not user: raise HTTPException(status_code=404, detail="User not found") user.hashed_password = hash_password(body.new_password) db.commit() logger.warning("Admin '%s' reset password for user '%s' (id=%d).", current_admin.username, user.username, user.id) return {"detail": f"Password reset for {user.username}"} @router.post("/users/{user_id}/disable") def disable_user( user_id: int, current_admin: User = Depends(get_current_admin), db: Session = Depends(get_db), ): user = db.get(User, user_id) if not user: raise HTTPException(status_code=404, detail="User not found") if user.id == current_admin.id: raise HTTPException(status_code=400, detail="Cannot disable your own account") user.is_disabled = True db.commit() logger.warning("Admin '%s' disabled user '%s' (id=%d).", current_admin.username, user.username, user.id) return {"detail": f"User {user.username} disabled"} @router.post("/users/{user_id}/enable") def enable_user( user_id: int, _: User = Depends(get_current_admin), db: Session = Depends(get_db), ): user = db.get(User, user_id) if not user: raise HTTPException(status_code=404, detail="User not found") user.is_disabled = False db.commit() return {"detail": f"User {user.username} enabled"} @router.delete("/users/{user_id}", status_code=204) def delete_user( user_id: int, current_admin: User = Depends(get_current_admin), db: Session = Depends(get_db), ): user = db.get(User, user_id) if not user: raise HTTPException(status_code=404, detail="User not found") if user.id == current_admin.id: raise HTTPException(status_code=400, detail="Cannot delete your own account") logger.warning("Admin '%s' deleted user '%s' (id=%d).", current_admin.username, user.username, user.id) db.delete(user) db.commit() @router.post("/users/{user_id}/impersonate", response_model=TokenResponse) def impersonate_user( user_id: int, current_admin: User = Depends(get_current_admin), db: Session = Depends(get_db), ): user = db.get(User, user_id) if not user: raise HTTPException(status_code=404, detail="User not found") token = create_access_token(user.id, user.username, user.is_admin, user.timezone) logger.warning("Admin '%s' (id=%d) is impersonating user '%s' (id=%d).", current_admin.username, current_admin.id, user.username, user.id) return TokenResponse(access_token=token)