Files
homeschool/backend/app/routers/admin.py
derekc 3022bc328b Security hardening: go-live review fixes
- TV tokens upgraded from 4 to 6 digits; Regen Token button in Admin
- Nginx rate limiting on TV dashboard and WebSocket endpoints
- Login lockout after 5 failed attempts (15 min); clears on admin password reset
- HSTS header added; CSP unsafe-inline removed from script-src; CORS restricted to explicit methods/headers
- Dependency CVE fixes: PyJWT 2.12.0, aiomysql 0.3.0, cryptography 46.0.5, python-multipart 0.0.22
- datetime.utcnow() replaced with datetime.now(timezone.utc) throughout
- SQL identifier whitelist for startup migration queries
- README updated: security notes section, lockout docs, token regen, NPM proxy guidance

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-03-22 00:00:14 -07:00

112 lines
3.7 KiB
Python

import logging
from fastapi import APIRouter, Depends, HTTPException, status
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy import select, delete
logger = logging.getLogger("homeschool.admin")
from app.auth.jwt import create_admin_token, create_access_token, hash_password
from app.config import get_settings
from app.dependencies import get_db, get_admin_user
from app.models.user import User
router = APIRouter(prefix="/api/admin", tags=["admin"])
settings = get_settings()
@router.post("/login")
async def admin_login(body: dict):
username = body.get("username", "")
password = body.get("password", "")
if username != settings.admin_username or password != settings.admin_password:
logger.warning("Failed super-admin login attempt for username=%s", username)
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid admin credentials")
token = create_admin_token({"sub": "admin"})
return {"access_token": token, "token_type": "bearer"}
@router.get("/users")
async def list_users(
_: dict = Depends(get_admin_user),
db: AsyncSession = Depends(get_db),
):
result = await db.execute(select(User).order_by(User.id))
users = result.scalars().all()
return [
{
"id": u.id,
"email": u.email,
"full_name": u.full_name,
"is_active": u.is_active,
"created_at": u.created_at,
"last_active_at": u.last_active_at,
"timezone": u.timezone,
}
for u in users
]
@router.post("/toggle-active/{user_id}")
async def toggle_user_active(
user_id: int,
_: dict = Depends(get_admin_user),
db: AsyncSession = Depends(get_db),
):
result = await db.execute(select(User).where(User.id == user_id))
user = result.scalar_one_or_none()
if not user:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="User not found")
user.is_active = not user.is_active
await db.commit()
return {"id": user.id, "is_active": user.is_active}
@router.delete("/users/{user_id}")
async def delete_user(
user_id: int,
_: dict = Depends(get_admin_user),
db: AsyncSession = Depends(get_db),
):
result = await db.execute(select(User).where(User.id == user_id))
user = result.scalar_one_or_none()
if not user:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="User not found")
await db.execute(delete(User).where(User.id == user_id))
await db.commit()
return {"ok": True}
@router.post("/reset-password/{user_id}")
async def reset_user_password(
user_id: int,
body: dict,
_: dict = Depends(get_admin_user),
db: AsyncSession = Depends(get_db),
):
new_password = body.get("new_password", "").strip()
if not new_password:
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="new_password is required")
result = await db.execute(select(User).where(User.id == user_id))
user = result.scalar_one_or_none()
if not user:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="User not found")
user.hashed_password = hash_password(new_password)
user.failed_login_attempts = 0
user.locked_until = None
await db.commit()
return {"ok": True}
@router.post("/impersonate/{user_id}")
async def impersonate_user(
user_id: int,
_: dict = Depends(get_admin_user),
db: AsyncSession = Depends(get_db),
):
result = await db.execute(select(User).where(User.id == user_id))
user = result.scalar_one_or_none()
if not user:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="User not found")
token = create_access_token({"sub": str(user.id)})
return {"access_token": token, "token_type": "bearer"}