Add Super Admin panel with user impersonation

- New /super-admin/login and /super-admin routes with separate auth
- Super admin can view all registered accounts and impersonate any user
- Impersonation banner shows at top of screen with exit button
- ADMIN_USERNAME and ADMIN_PASSWORD config added to .env and docker-compose.yml
- Fixed auth store: export setToken, clearToken, and setUser so they are
  accessible from superAdmin store
- Updated README with super admin feature, new env vars, and setup notes

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
2026-03-04 22:30:44 -08:00
parent a8e1b322f1
commit c560055b10
14 changed files with 600 additions and 6 deletions

View File

@@ -26,6 +26,13 @@ def create_access_token(data: dict[str, Any]) -> str:
return jwt.encode(payload, settings.secret_key, algorithm=settings.algorithm)
def create_admin_token(data: dict[str, Any]) -> str:
payload = data.copy()
expire = datetime.now(timezone.utc) + timedelta(hours=8)
payload.update({"exp": expire, "type": "access", "role": "admin"})
return jwt.encode(payload, settings.secret_key, algorithm=settings.algorithm)
def create_refresh_token(data: dict[str, Any]) -> str:
payload = data.copy()
expire = datetime.now(timezone.utc) + timedelta(days=settings.refresh_token_expire_days)

View File

@@ -9,6 +9,8 @@ class Settings(BaseSettings):
access_token_expire_minutes: int = 30
refresh_token_expire_days: int = 30
cors_origins: str = "http://localhost:8054"
admin_username: str = "admin"
admin_password: str = "change_me_admin_password"
@property
def cors_origins_list(self) -> list[str]:

View File

@@ -42,3 +42,17 @@ async def get_current_user(
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="User not found")
return user
async def get_admin_user(
credentials: Optional[HTTPAuthorizationCredentials] = Depends(bearer_scheme),
) -> dict:
if not credentials:
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Not authenticated")
try:
payload = decode_token(credentials.credentials)
except ValueError:
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid token")
if payload.get("type") != "access" or payload.get("role") != "admin":
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Admin access required")
return payload

View File

@@ -9,7 +9,7 @@ from app.config import get_settings
from app.database import engine
from app.models import Base
from app.routers import auth, users, children, subjects, schedules, sessions, logs, dashboard
from app.routers import morning_routine, break_activity
from app.routers import morning_routine, break_activity, admin
from app.websocket.manager import manager
settings = get_settings()
@@ -66,6 +66,7 @@ app.include_router(logs.router)
app.include_router(morning_routine.router)
app.include_router(break_activity.router)
app.include_router(dashboard.router)
app.include_router(admin.router)
@app.get("/api/health")

View File

@@ -0,0 +1,54 @@
from fastapi import APIRouter, Depends, HTTPException, status
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy import select
from app.auth.jwt import create_admin_token, create_access_token
from app.config import get_settings
from app.dependencies import get_db, get_admin_user
from app.models.user import User
router = APIRouter(prefix="/api/admin", tags=["admin"])
settings = get_settings()
@router.post("/login")
async def admin_login(body: dict):
username = body.get("username", "")
password = body.get("password", "")
if username != settings.admin_username or password != settings.admin_password:
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid admin credentials")
token = create_admin_token({"sub": "admin"})
return {"access_token": token, "token_type": "bearer"}
@router.get("/users")
async def list_users(
_: dict = Depends(get_admin_user),
db: AsyncSession = Depends(get_db),
):
result = await db.execute(select(User).order_by(User.id))
users = result.scalars().all()
return [
{
"id": u.id,
"email": u.email,
"full_name": u.full_name,
"is_active": u.is_active,
"created_at": u.created_at,
}
for u in users
]
@router.post("/impersonate/{user_id}")
async def impersonate_user(
user_id: int,
_: dict = Depends(get_admin_user),
db: AsyncSession = Depends(get_db),
):
result = await db.execute(select(User).where(User.id == user_id))
user = result.scalar_one_or_none()
if not user:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="User not found")
token = create_access_token({"sub": str(user.id)})
return {"access_token": token, "token_type": "bearer"}