from typing import AsyncGenerator, Optional from fastapi import Depends, HTTPException, Request, status from fastapi.security import HTTPAuthorizationCredentials, HTTPBearer from sqlalchemy.ext.asyncio import AsyncSession from app.auth.jwt import decode_token from app.database import AsyncSessionLocal from app.models.user import User from sqlalchemy import select bearer_scheme = HTTPBearer(auto_error=False) async def get_db() -> AsyncGenerator[AsyncSession, None]: async with AsyncSessionLocal() as session: yield session async def get_current_user( credentials: Optional[HTTPAuthorizationCredentials] = Depends(bearer_scheme), db: AsyncSession = Depends(get_db), ) -> User: if not credentials: raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Not authenticated") try: payload = decode_token(credentials.credentials) except ValueError: raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid token") if payload.get("type") != "access": raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Wrong token type") user_id: int = payload.get("sub") if user_id is None: raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid token payload") result = await db.execute(select(User).where(User.id == int(user_id), User.is_active == True)) user = result.scalar_one_or_none() if not user: raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="User not found") return user async def get_admin_user( credentials: Optional[HTTPAuthorizationCredentials] = Depends(bearer_scheme), ) -> dict: if not credentials: raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Not authenticated") try: payload = decode_token(credentials.credentials) except ValueError: raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid token") if payload.get("type") != "access" or payload.get("role") != "admin": raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Admin access required") return payload