import logging from fastapi import APIRouter, Depends, HTTPException, Request, status from sqlalchemy.ext.asyncio import AsyncSession from sqlalchemy import select, delete logger = logging.getLogger("homeschool.admin") from app.auth.jwt import create_admin_token, create_access_token, hash_password from app.config import get_settings from app.dependencies import get_db, get_admin_user from app.models.user import User from app.utils.ntfy import notify router = APIRouter(prefix="/api/admin", tags=["admin"]) settings = get_settings() @router.post("/login") async def admin_login(body: dict, request: Request): username = body.get("username", "") password = body.get("password", "") if username != settings.admin_username or password != settings.admin_password: logger.warning("Failed super-admin login attempt for username=%s", username) raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid admin credentials") token = create_admin_token({"sub": "admin"}) ip = request.headers.get("X-Forwarded-For", request.client.host if request.client else "unknown").split(",")[0].strip() ua = request.headers.get("User-Agent", "unknown") await notify( title="Homeschool Dashboard Super Admin Login", message=f"User: {username}\nIP: {ip}\nUA: {ua}", priority="high", tags=["key"], ) return {"access_token": token, "token_type": "bearer"} @router.get("/users") async def list_users( _: dict = Depends(get_admin_user), db: AsyncSession = Depends(get_db), ): result = await db.execute(select(User).order_by(User.id)) users = result.scalars().all() return [ { "id": u.id, "email": u.email, "full_name": u.full_name, "is_active": u.is_active, "created_at": u.created_at, "last_active_at": u.last_active_at, "timezone": u.timezone, } for u in users ] @router.post("/toggle-active/{user_id}") async def toggle_user_active( user_id: int, _: dict = Depends(get_admin_user), db: AsyncSession = Depends(get_db), ): result = await db.execute(select(User).where(User.id == user_id)) user = result.scalar_one_or_none() if not user: raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="User not found") user.is_active = not user.is_active await db.commit() return {"id": user.id, "is_active": user.is_active} @router.delete("/users/{user_id}") async def delete_user( user_id: int, _: dict = Depends(get_admin_user), db: AsyncSession = Depends(get_db), ): result = await db.execute(select(User).where(User.id == user_id)) user = result.scalar_one_or_none() if not user: raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="User not found") await db.execute(delete(User).where(User.id == user_id)) await db.commit() return {"ok": True} @router.post("/reset-password/{user_id}") async def reset_user_password( user_id: int, body: dict, _: dict = Depends(get_admin_user), db: AsyncSession = Depends(get_db), ): new_password = body.get("new_password", "").strip() if not new_password: raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="new_password is required") result = await db.execute(select(User).where(User.id == user_id)) user = result.scalar_one_or_none() if not user: raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="User not found") user.hashed_password = hash_password(new_password) user.failed_login_attempts = 0 user.locked_until = None await db.commit() return {"ok": True} @router.post("/impersonate/{user_id}") async def impersonate_user( user_id: int, _: dict = Depends(get_admin_user), db: AsyncSession = Depends(get_db), ): result = await db.execute(select(User).where(User.id == user_id)) user = result.scalar_one_or_none() if not user: raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="User not found") token = create_access_token({"sub": str(user.id)}) return {"access_token": token, "token_type": "bearer"}