Add admin account with user management endpoints

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
2026-03-24 20:45:19 -07:00
parent 7994cc5ff2
commit 48a15c54f6
10 changed files with 238 additions and 5 deletions

View File

@@ -6,6 +6,8 @@ class Settings(BaseSettings):
secret_key: str
access_token_expire_minutes: int = 480
algorithm: str = "HS256"
admin_username: str
admin_password: str
class Config:
env_file = ".env"

View File

@@ -31,5 +31,13 @@ async def get_current_user(
user = result.scalar_one_or_none()
if user is None:
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="User not found")
if user.is_disabled:
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Account disabled")
return user
async def get_current_admin(current_user=Depends(get_current_user)):
if not current_user.is_admin:
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Admin access required")
return current_user

View File

@@ -2,14 +2,37 @@ from contextlib import asynccontextmanager
from fastapi import FastAPI
from fastapi.middleware.cors import CORSMiddleware
from sqlalchemy import select
from app.database import init_db
from app.routers import auth, users, entries, public
from app.config import settings
from app.database import init_db, AsyncSessionLocal
from app.routers import auth, users, entries, public, admin
async def _seed_admin() -> None:
from app.models.user import User
from app.utils.security import hash_password
async with AsyncSessionLocal() as db:
result = await db.execute(select(User).where(User.email == settings.admin_username))
user = result.scalar_one_or_none()
if user is None:
db.add(User(
email=settings.admin_username,
password_hash=hash_password(settings.admin_password),
display_name="Admin",
is_admin=True,
))
else:
user.password_hash = hash_password(settings.admin_password)
user.is_admin = True
await db.commit()
@asynccontextmanager
async def lifespan(app: FastAPI):
await init_db()
await _seed_admin()
yield
@@ -26,3 +49,4 @@ app.include_router(auth.router)
app.include_router(users.router)
app.include_router(entries.router)
app.include_router(public.router)
app.include_router(admin.router)

View File

@@ -1,6 +1,6 @@
from datetime import datetime
from typing import Optional
from sqlalchemy import String, DateTime, func
from sqlalchemy import String, DateTime, Boolean, func
from sqlalchemy.orm import Mapped, mapped_column, relationship
from app.database import Base
@@ -14,6 +14,8 @@ class User(Base):
password_hash: Mapped[str] = mapped_column(String(255), nullable=False)
display_name: Mapped[Optional[str]] = mapped_column(String(100))
timezone: Mapped[str] = mapped_column(String(50), default="UTC")
is_admin: Mapped[bool] = mapped_column(Boolean, default=False, nullable=False)
is_disabled: Mapped[bool] = mapped_column(Boolean, default=False, nullable=False)
created_at: Mapped[datetime] = mapped_column(DateTime, server_default=func.now())
entries: Mapped[list["Entry"]] = relationship("Entry", back_populates="user", cascade="all, delete-orphan")

View File

@@ -0,0 +1,146 @@
from fastapi import APIRouter, Depends, HTTPException, status
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy import select
from app.dependencies import get_db, get_current_admin, bearer_scheme
from app.models.user import User
from app.schemas.user import AdminUserCreate, AdminPasswordReset, AdminUserResponse, Token
from app.utils.security import hash_password, create_token
router = APIRouter(prefix="/api/admin", tags=["admin"])
@router.get("/users", response_model=list[AdminUserResponse])
async def list_users(
_: User = Depends(get_current_admin),
db: AsyncSession = Depends(get_db),
):
result = await db.execute(select(User).order_by(User.created_at))
return result.scalars().all()
@router.post("/users", response_model=AdminUserResponse, status_code=status.HTTP_201_CREATED)
async def create_user(
body: AdminUserCreate,
_: User = Depends(get_current_admin),
db: AsyncSession = Depends(get_db),
):
result = await db.execute(select(User).where(User.email == body.email))
if result.scalar_one_or_none():
raise HTTPException(status_code=status.HTTP_409_CONFLICT, detail="Email already registered")
user = User(
email=body.email,
password_hash=hash_password(body.password),
display_name=body.display_name or body.email.split("@")[0],
is_admin=False,
)
async with db.begin():
db.add(user)
await db.refresh(user)
return user
@router.post("/users/{user_id}/reset-password", status_code=status.HTTP_204_NO_CONTENT)
async def reset_password(
user_id: int,
body: AdminPasswordReset,
_: User = Depends(get_current_admin),
db: AsyncSession = Depends(get_db),
):
result = await db.execute(select(User).where(User.id == user_id))
user = result.scalar_one_or_none()
if not user:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="User not found")
async with db.begin():
user.password_hash = hash_password(body.new_password)
@router.post("/users/{user_id}/disable", status_code=status.HTTP_204_NO_CONTENT)
async def disable_user(
user_id: int,
current_admin: User = Depends(get_current_admin),
db: AsyncSession = Depends(get_db),
):
if user_id == current_admin.id:
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="Cannot disable your own account")
result = await db.execute(select(User).where(User.id == user_id))
user = result.scalar_one_or_none()
if not user:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="User not found")
async with db.begin():
user.is_disabled = True
@router.post("/users/{user_id}/enable", status_code=status.HTTP_204_NO_CONTENT)
async def enable_user(
user_id: int,
_: User = Depends(get_current_admin),
db: AsyncSession = Depends(get_db),
):
result = await db.execute(select(User).where(User.id == user_id))
user = result.scalar_one_or_none()
if not user:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="User not found")
async with db.begin():
user.is_disabled = False
@router.delete("/users/{user_id}", status_code=status.HTTP_204_NO_CONTENT)
async def delete_user(
user_id: int,
current_admin: User = Depends(get_current_admin),
db: AsyncSession = Depends(get_db),
):
if user_id == current_admin.id:
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="Cannot delete your own account")
result = await db.execute(select(User).where(User.id == user_id))
user = result.scalar_one_or_none()
if not user:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="User not found")
async with db.begin():
await db.delete(user)
@router.post("/users/{user_id}/impersonate", response_model=Token)
async def impersonate_user(
user_id: int,
current_admin: User = Depends(get_current_admin),
db: AsyncSession = Depends(get_db),
):
if user_id == current_admin.id:
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="Cannot impersonate yourself")
result = await db.execute(select(User).where(User.id == user_id))
user = result.scalar_one_or_none()
if not user:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="User not found")
return Token(access_token=create_token(user.id, admin_id=current_admin.id))
@router.post("/unimpersonate", response_model=Token)
async def unimpersonate(
credentials=Depends(bearer_scheme),
db: AsyncSession = Depends(get_db),
):
from app.utils.security import decode_token_full
token = credentials.credentials
payload = decode_token_full(token)
admin_id = payload.get("admin_id") if payload else None
if not admin_id:
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="Not an impersonation token")
result = await db.execute(select(User).where(User.id == admin_id))
admin = result.scalar_one_or_none()
if not admin or not admin.is_admin:
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Admin not found")
return Token(access_token=create_token(admin.id))

View File

@@ -37,3 +37,25 @@ class Token(BaseModel):
class LoginRequest(BaseModel):
email: EmailStr
password: str
class AdminUserCreate(BaseModel):
email: EmailStr
password: str
display_name: Optional[str] = None
class AdminPasswordReset(BaseModel):
new_password: str
class AdminUserResponse(BaseModel):
id: int
email: str
display_name: Optional[str]
timezone: str
is_admin: bool
is_disabled: bool
created_at: datetime
model_config = {"from_attributes": True}

View File

@@ -17,9 +17,11 @@ def verify_password(plain: str, hashed: str) -> bool:
return pwd_context.verify(plain, hashed)
def create_token(user_id: int) -> str:
def create_token(user_id: int, admin_id: Optional[int] = None) -> str:
expire = datetime.now(timezone.utc) + timedelta(minutes=settings.access_token_expire_minutes)
payload = {"sub": str(user_id), "exp": expire}
payload: dict = {"sub": str(user_id), "exp": expire}
if admin_id is not None:
payload["admin_id"] = admin_id
return jwt.encode(payload, settings.secret_key, algorithm=settings.algorithm)
@@ -32,3 +34,10 @@ def decode_token(token: str) -> Optional[int]:
return int(user_id)
except JWTError:
return None
def decode_token_full(token: str) -> Optional[dict]:
try:
return jwt.decode(token, settings.secret_key, algorithms=[settings.algorithm])
except JWTError:
return None