import os from datetime import datetime, timedelta import bcrypt from fastapi import Depends, HTTPException, status from fastapi.security import OAuth2PasswordBearer from jose import JWTError, jwt from sqlalchemy.orm import Session from database import get_db from models import User SECRET_KEY = os.environ.get("SECRET_KEY", "changeme-please-set-a-real-secret-in-env") ALGORITHM = "HS256" ACCESS_TOKEN_EXPIRE_DAYS = 30 oauth2_scheme = OAuth2PasswordBearer(tokenUrl="/auth/login") def verify_password(plain: str, hashed: str) -> bool: return bcrypt.checkpw(plain.encode(), hashed.encode()) def hash_password(password: str) -> str: return bcrypt.hashpw(password.encode(), bcrypt.gensalt()).decode() def create_access_token(user_id: int) -> str: expire = datetime.utcnow() + timedelta(days=ACCESS_TOKEN_EXPIRE_DAYS) return jwt.encode({"sub": str(user_id), "exp": expire}, SECRET_KEY, algorithm=ALGORITHM) def get_current_user(token: str = Depends(oauth2_scheme), db: Session = Depends(get_db)) -> User: exc = HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid or expired token", headers={"WWW-Authenticate": "Bearer"}, ) try: payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM]) user_id = int(payload.get("sub")) except (JWTError, TypeError, ValueError): raise exc user = db.query(User).filter(User.id == user_id).first() if not user: raise exc return user def get_admin_user(current_user: User = Depends(get_current_user)) -> User: if not current_user.is_admin: raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Admin access required") return current_user