Files
bourbonacci/backend/app/routers/admin.py
derekc f1b82baebd Overhaul nav, fix DB transaction bugs, add admin UI
- Replace nav user area with display name (non-clickable), gear settings
  modal, admin button (admins only), and logout button
- Settings modal handles display name, timezone, and password change
- Add admin.html + admin.js: user table with reset PW, disable/enable,
  login-as (impersonation), and delete; return-to-admin flow in nav
- Add is_admin to UserResponse so frontend can gate the Admin button
- Fix all db.begin() bugs in admin.py and users.py (transaction already
  active from get_current_user query; use commit() directly instead)
- Add email-validator and pin bcrypt==4.0.1 for passlib compatibility
- Add escHtml() to api.js and admin API namespace
- Group nav brand + links in nav-left for left-aligned layout

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-03-24 21:09:38 -07:00

147 lines
5.1 KiB
Python

from fastapi import APIRouter, Depends, HTTPException, status
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy import select
from app.dependencies import get_db, get_current_admin, bearer_scheme
from app.models.user import User
from app.schemas.user import AdminUserCreate, AdminPasswordReset, AdminUserResponse, Token
from app.utils.security import hash_password, create_token
router = APIRouter(prefix="/api/admin", tags=["admin"])
@router.get("/users", response_model=list[AdminUserResponse])
async def list_users(
_: User = Depends(get_current_admin),
db: AsyncSession = Depends(get_db),
):
result = await db.execute(select(User).order_by(User.created_at))
return result.scalars().all()
@router.post("/users", response_model=AdminUserResponse, status_code=status.HTTP_201_CREATED)
async def create_user(
body: AdminUserCreate,
_: User = Depends(get_current_admin),
db: AsyncSession = Depends(get_db),
):
result = await db.execute(select(User).where(User.email == body.email))
if result.scalar_one_or_none():
raise HTTPException(status_code=status.HTTP_409_CONFLICT, detail="Email already registered")
user = User(
email=body.email,
password_hash=hash_password(body.password),
display_name=body.display_name or body.email.split("@")[0],
is_admin=False,
)
db.add(user)
await db.commit()
await db.refresh(user)
return user
@router.post("/users/{user_id}/reset-password", status_code=status.HTTP_204_NO_CONTENT)
async def reset_password(
user_id: int,
body: AdminPasswordReset,
_: User = Depends(get_current_admin),
db: AsyncSession = Depends(get_db),
):
result = await db.execute(select(User).where(User.id == user_id))
user = result.scalar_one_or_none()
if not user:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="User not found")
user.password_hash = hash_password(body.new_password)
await db.commit()
@router.post("/users/{user_id}/disable", status_code=status.HTTP_204_NO_CONTENT)
async def disable_user(
user_id: int,
current_admin: User = Depends(get_current_admin),
db: AsyncSession = Depends(get_db),
):
if user_id == current_admin.id:
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="Cannot disable your own account")
result = await db.execute(select(User).where(User.id == user_id))
user = result.scalar_one_or_none()
if not user:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="User not found")
user.is_disabled = True
await db.commit()
@router.post("/users/{user_id}/enable", status_code=status.HTTP_204_NO_CONTENT)
async def enable_user(
user_id: int,
_: User = Depends(get_current_admin),
db: AsyncSession = Depends(get_db),
):
result = await db.execute(select(User).where(User.id == user_id))
user = result.scalar_one_or_none()
if not user:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="User not found")
user.is_disabled = False
await db.commit()
@router.delete("/users/{user_id}", status_code=status.HTTP_204_NO_CONTENT)
async def delete_user(
user_id: int,
current_admin: User = Depends(get_current_admin),
db: AsyncSession = Depends(get_db),
):
if user_id == current_admin.id:
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="Cannot delete your own account")
result = await db.execute(select(User).where(User.id == user_id))
user = result.scalar_one_or_none()
if not user:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="User not found")
await db.delete(user)
await db.commit()
@router.post("/users/{user_id}/impersonate", response_model=Token)
async def impersonate_user(
user_id: int,
current_admin: User = Depends(get_current_admin),
db: AsyncSession = Depends(get_db),
):
if user_id == current_admin.id:
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="Cannot impersonate yourself")
result = await db.execute(select(User).where(User.id == user_id))
user = result.scalar_one_or_none()
if not user:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="User not found")
return Token(access_token=create_token(user.id, admin_id=current_admin.id))
@router.post("/unimpersonate", response_model=Token)
async def unimpersonate(
credentials=Depends(bearer_scheme),
db: AsyncSession = Depends(get_db),
):
from app.utils.security import decode_token_full
token = credentials.credentials
payload = decode_token_full(token)
admin_id = payload.get("admin_id") if payload else None
if not admin_id:
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="Not an impersonation token")
result = await db.execute(select(User).where(User.id == admin_id))
admin = result.scalar_one_or_none()
if not admin or not admin.is_admin:
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Admin not found")
return Token(access_token=create_token(admin.id))