import os from datetime import datetime, timedelta, timezone from typing import Optional from jose import JWTError, jwt from passlib.context import CryptContext from fastapi import Cookie, Depends, HTTPException, Response, status from sqlalchemy.orm import Session from database import get_db from models import User SECRET_KEY = os.environ["JWT_SECRET"] ALGORITHM = "HS256" ACCESS_TOKEN_EXPIRE_DAYS = 30 COOKIE_NAME = "token" SECURE_COOKIES = os.environ.get("SECURE_COOKIES", "true").lower() == "true" pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto") def verify_password(plain_password: str, hashed_password: str) -> bool: return pwd_context.verify(plain_password, hashed_password) def hash_password(password: str) -> str: return pwd_context.hash(password) def create_access_token(user_id: int, username: str, is_admin: bool, user_timezone: str = "UTC", admin_id: Optional[int] = None) -> str: expire = datetime.now(timezone.utc) + timedelta(days=ACCESS_TOKEN_EXPIRE_DAYS) payload = { "sub": str(user_id), "username": username, "is_admin": is_admin, "timezone": user_timezone, "exp": expire, } if admin_id is not None: payload["admin_id"] = admin_id return jwt.encode(payload, SECRET_KEY, algorithm=ALGORITHM) def set_auth_cookie(response: Response, token: str) -> None: response.set_cookie( key=COOKIE_NAME, value=token, httponly=True, secure=SECURE_COOKIES, samesite="strict", max_age=ACCESS_TOKEN_EXPIRE_DAYS * 24 * 60 * 60, path="/", ) def clear_auth_cookie(response: Response) -> None: response.delete_cookie(key=COOKIE_NAME, httponly=True, secure=SECURE_COOKIES, samesite="strict", path="/") def token_to_user_payload(token: str) -> dict: payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM]) return { "sub": payload["sub"], "username": payload["username"], "is_admin": payload["is_admin"], "timezone": payload["timezone"], "exp": payload["exp"], "admin_id": payload.get("admin_id"), } async def get_current_user( token: Optional[str] = Cookie(default=None, alias=COOKIE_NAME), db: Session = Depends(get_db), ) -> User: credentials_exception = HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Could not validate credentials", ) if not token: raise credentials_exception try: payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM]) user_id_str: Optional[str] = payload.get("sub") if user_id_str is None: raise credentials_exception user_id = int(user_id_str) except (JWTError, ValueError): raise credentials_exception user = db.get(User, user_id) if user is None or user.is_disabled: raise credentials_exception return user async def get_current_admin(current_user: User = Depends(get_current_user)) -> User: if not current_user.is_admin: raise HTTPException( status_code=status.HTTP_403_FORBIDDEN, detail="Admin access required", ) return current_user async def get_token_payload( token: Optional[str] = Cookie(default=None, alias=COOKIE_NAME), ) -> dict: if not token: raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Could not validate credentials", ) try: return jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM]) except JWTError: raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Could not validate credentials", )