from datetime import datetime, timedelta, timezone from typing import Any from jose import JWTError, jwt from passlib.context import CryptContext from app.config import get_settings settings = get_settings() pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto") def hash_password(plain: str) -> str: return pwd_context.hash(plain) def verify_password(plain: str, hashed: str) -> bool: return pwd_context.verify(plain, hashed) def create_access_token(data: dict[str, Any]) -> str: payload = data.copy() expire = datetime.now(timezone.utc) + timedelta(minutes=settings.access_token_expire_minutes) payload.update({"exp": expire, "type": "access"}) return jwt.encode(payload, settings.secret_key, algorithm=settings.algorithm) def create_admin_token(data: dict[str, Any]) -> str: payload = data.copy() expire = datetime.now(timezone.utc) + timedelta(hours=8) payload.update({"exp": expire, "type": "access", "role": "admin"}) return jwt.encode(payload, settings.secret_key, algorithm=settings.algorithm) def create_refresh_token(data: dict[str, Any]) -> str: payload = data.copy() expire = datetime.now(timezone.utc) + timedelta(days=settings.refresh_token_expire_days) payload.update({"exp": expire, "type": "refresh"}) return jwt.encode(payload, settings.secret_key, algorithm=settings.algorithm) def decode_token(token: str) -> dict[str, Any]: try: return jwt.decode(token, settings.secret_key, algorithms=[settings.algorithm]) except JWTError: raise ValueError("Invalid or expired token")