121 lines
4.1 KiB
Python
121 lines
4.1 KiB
Python
import logging
|
|
from fastapi import APIRouter, Depends, HTTPException, Request, status
|
|
from sqlalchemy.ext.asyncio import AsyncSession
|
|
from sqlalchemy import select, delete
|
|
|
|
logger = logging.getLogger("homeschool.admin")
|
|
|
|
from app.auth.jwt import create_admin_token, create_access_token, hash_password
|
|
from app.config import get_settings
|
|
from app.dependencies import get_db, get_admin_user
|
|
from app.models.user import User
|
|
from app.utils.ntfy import notify
|
|
|
|
router = APIRouter(prefix="/api/admin", tags=["admin"])
|
|
settings = get_settings()
|
|
|
|
|
|
@router.post("/login")
|
|
async def admin_login(body: dict, request: Request):
|
|
username = body.get("username", "")
|
|
password = body.get("password", "")
|
|
if username != settings.admin_username or password != settings.admin_password:
|
|
logger.warning("Failed super-admin login attempt for username=%s", username)
|
|
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid admin credentials")
|
|
token = create_admin_token({"sub": "admin"})
|
|
ip = request.headers.get("X-Forwarded-For", request.client.host if request.client else "unknown").split(",")[0].strip()
|
|
ua = request.headers.get("User-Agent", "unknown")
|
|
await notify(
|
|
title="Homeschool Dashboard Super Admin Login",
|
|
message=f"User: {username}\nIP: {ip}\nUA: {ua}",
|
|
priority="high",
|
|
tags=["key"],
|
|
)
|
|
return {"access_token": token, "token_type": "bearer"}
|
|
|
|
|
|
@router.get("/users")
|
|
async def list_users(
|
|
_: dict = Depends(get_admin_user),
|
|
db: AsyncSession = Depends(get_db),
|
|
):
|
|
result = await db.execute(select(User).order_by(User.id))
|
|
users = result.scalars().all()
|
|
return [
|
|
{
|
|
"id": u.id,
|
|
"email": u.email,
|
|
"full_name": u.full_name,
|
|
"is_active": u.is_active,
|
|
"created_at": u.created_at,
|
|
"last_active_at": u.last_active_at,
|
|
"timezone": u.timezone,
|
|
}
|
|
for u in users
|
|
]
|
|
|
|
|
|
@router.post("/toggle-active/{user_id}")
|
|
async def toggle_user_active(
|
|
user_id: int,
|
|
_: dict = Depends(get_admin_user),
|
|
db: AsyncSession = Depends(get_db),
|
|
):
|
|
result = await db.execute(select(User).where(User.id == user_id))
|
|
user = result.scalar_one_or_none()
|
|
if not user:
|
|
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="User not found")
|
|
user.is_active = not user.is_active
|
|
await db.commit()
|
|
return {"id": user.id, "is_active": user.is_active}
|
|
|
|
|
|
@router.delete("/users/{user_id}")
|
|
async def delete_user(
|
|
user_id: int,
|
|
_: dict = Depends(get_admin_user),
|
|
db: AsyncSession = Depends(get_db),
|
|
):
|
|
result = await db.execute(select(User).where(User.id == user_id))
|
|
user = result.scalar_one_or_none()
|
|
if not user:
|
|
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="User not found")
|
|
await db.execute(delete(User).where(User.id == user_id))
|
|
await db.commit()
|
|
return {"ok": True}
|
|
|
|
|
|
@router.post("/reset-password/{user_id}")
|
|
async def reset_user_password(
|
|
user_id: int,
|
|
body: dict,
|
|
_: dict = Depends(get_admin_user),
|
|
db: AsyncSession = Depends(get_db),
|
|
):
|
|
new_password = body.get("new_password", "").strip()
|
|
if not new_password:
|
|
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="new_password is required")
|
|
result = await db.execute(select(User).where(User.id == user_id))
|
|
user = result.scalar_one_or_none()
|
|
if not user:
|
|
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="User not found")
|
|
user.hashed_password = hash_password(new_password)
|
|
user.failed_login_attempts = 0
|
|
user.locked_until = None
|
|
await db.commit()
|
|
return {"ok": True}
|
|
|
|
|
|
@router.post("/impersonate/{user_id}")
|
|
async def impersonate_user(
|
|
user_id: int,
|
|
_: dict = Depends(get_admin_user),
|
|
db: AsyncSession = Depends(get_db),
|
|
):
|
|
result = await db.execute(select(User).where(User.id == user_id))
|
|
user = result.scalar_one_or_none()
|
|
if not user:
|
|
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="User not found")
|
|
token = create_access_token({"sub": str(user.id)})
|
|
return {"access_token": token, "token_type": "bearer"}
|