Files
yolkbook/backend/auth.py
derekc 6d09e40f58 Remove admin token from sessionStorage during impersonation
Embed admin_id claim in impersonation JWTs and add a backend
/api/admin/unimpersonate endpoint that re-issues the admin token
from that claim. The admin token no longer needs to be stored in
sessionStorage, eliminating the risk of token theft via XSS.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-03-19 23:32:08 -07:00

88 lines
2.7 KiB
Python

import os
from datetime import datetime, timedelta, timezone
from typing import Optional
from jose import JWTError, jwt
from passlib.context import CryptContext
from fastapi import Depends, HTTPException, status
from fastapi.security import OAuth2PasswordBearer
from sqlalchemy.orm import Session
from database import get_db
from models import User
SECRET_KEY = os.environ["JWT_SECRET"]
ALGORITHM = "HS256"
ACCESS_TOKEN_EXPIRE_DAYS = 30
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="/api/auth/login")
def verify_password(plain_password: str, hashed_password: str) -> bool:
return pwd_context.verify(plain_password, hashed_password)
def hash_password(password: str) -> str:
return pwd_context.hash(password)
def create_access_token(user_id: int, username: str, is_admin: bool, user_timezone: str = "UTC", admin_id: Optional[int] = None) -> str:
expire = datetime.now(timezone.utc) + timedelta(days=ACCESS_TOKEN_EXPIRE_DAYS)
payload = {
"sub": str(user_id),
"username": username,
"is_admin": is_admin,
"timezone": user_timezone,
"exp": expire,
}
if admin_id is not None:
payload["admin_id"] = admin_id
return jwt.encode(payload, SECRET_KEY, algorithm=ALGORITHM)
async def get_token_payload(
token: str = Depends(oauth2_scheme),
) -> dict:
try:
return jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
except JWTError:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Could not validate credentials",
headers={"WWW-Authenticate": "Bearer"},
)
async def get_current_user(
token: str = Depends(oauth2_scheme),
db: Session = Depends(get_db),
) -> User:
credentials_exception = HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Could not validate credentials",
headers={"WWW-Authenticate": "Bearer"},
)
try:
payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
user_id_str: Optional[str] = payload.get("sub")
if user_id_str is None:
raise credentials_exception
user_id = int(user_id_str)
except (JWTError, ValueError):
raise credentials_exception
user = db.get(User, user_id)
if user is None or user.is_disabled:
raise credentials_exception
return user
async def get_current_admin(current_user: User = Depends(get_current_user)) -> User:
if not current_user.is_admin:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Admin access required",
)
return current_user