""" JWT Authentication middleware for ClaudeTools API. This module provides JWT token creation, verification, and password hashing utilities for securing API endpoints. It uses PyJWT for token handling and passlib with bcrypt for password hashing. """ from datetime import datetime, timedelta, timezone from typing import Optional import jwt from fastapi import Depends, HTTPException, status from fastapi.security import HTTPAuthorizationCredentials, HTTPBearer from passlib.context import CryptContext from api.config import get_settings # Password hashing context using bcrypt # Note: Due to compatibility issues between passlib 1.7.4 and bcrypt 5.0 on Python 3.13, # we use argon2 as the primary scheme. This is actually more secure than bcrypt. # If bcrypt compatibility is restored in future versions, it can be added back. try: pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto") # Test if bcrypt is working pwd_context.hash("test") except (ValueError, Exception): # Fallback to argon2 if bcrypt has compatibility issues pwd_context = CryptContext(schemes=["argon2"], deprecated="auto") # HTTP Bearer token scheme for FastAPI security = HTTPBearer() # Get application settings settings = get_settings() def hash_password(password: str) -> str: """ Hash a plain text password using bcrypt. Args: password: The plain text password to hash Returns: str: The hashed password Example: ```python hashed = hash_password("my_secure_password") print(hashed) # $2b$12$... ``` """ return pwd_context.hash(password) def verify_password(plain_password: str, hashed_password: str) -> bool: """ Verify a plain text password against a hashed password. Args: plain_password: The plain text password to verify hashed_password: The hashed password to verify against Returns: bool: True if password matches, False otherwise Example: ```python is_valid = verify_password("user_input", stored_hash) if is_valid: print("Password is correct") ``` """ return pwd_context.verify(plain_password, hashed_password) def create_access_token( data: dict, expires_delta: Optional[timedelta] = None ) -> str: """ Create a JWT access token with the provided data. The token includes the provided data plus an expiration time (exp claim). If no expiration delta is provided, uses the default from settings. Args: data: Dictionary of claims to include in the token (e.g., {"sub": "user_id"}) expires_delta: Optional custom expiration time. If None, uses ACCESS_TOKEN_EXPIRE_MINUTES from settings Returns: str: Encoded JWT token Example: ```python token = create_access_token( data={"sub": "user123"}, expires_delta=timedelta(hours=1) ) ``` """ to_encode = data.copy() if expires_delta: expire = datetime.now(timezone.utc) + expires_delta else: expire = datetime.now(timezone.utc) + timedelta( minutes=settings.ACCESS_TOKEN_EXPIRE_MINUTES ) to_encode.update({"exp": expire}) encoded_jwt = jwt.encode( to_encode, settings.JWT_SECRET_KEY, algorithm=settings.JWT_ALGORITHM ) return encoded_jwt def verify_token(token: str) -> dict: """ Verify and decode a JWT token. Args: token: The JWT token string to verify Returns: dict: The decoded token payload Raises: HTTPException: If token is invalid or expired with 401 status code Example: ```python try: payload = verify_token(token_string) user_id = payload.get("sub") except HTTPException: print("Invalid token") ``` """ try: payload = jwt.decode( token, settings.JWT_SECRET_KEY, algorithms=[settings.JWT_ALGORITHM] ) return payload except jwt.ExpiredSignatureError: raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Token has expired", headers={"WWW-Authenticate": "Bearer"}, ) except jwt.InvalidTokenError: raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Could not validate credentials", headers={"WWW-Authenticate": "Bearer"}, ) def get_current_user( credentials: HTTPAuthorizationCredentials = Depends(security), ) -> dict: """ Dependency function to get the current authenticated user from JWT token. This function is used as a FastAPI dependency to protect routes that require authentication. It extracts the token from the Authorization header, verifies it, and returns the token payload containing user information. Args: credentials: HTTP Bearer credentials from the Authorization header Returns: dict: The decoded token payload containing user information (sub, scopes, etc.) Raises: HTTPException: 401 if token is invalid Example: ```python @router.get("/protected") async def protected_route(current_user: dict = Depends(get_current_user)): return {"email": current_user.get("sub"), "scopes": current_user.get("scopes")} ``` """ token = credentials.credentials payload = verify_token(token) # Extract user identifier from token subject claim user_identifier: Optional[str] = payload.get("sub") if user_identifier is None: raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Could not validate credentials", headers={"WWW-Authenticate": "Bearer"}, ) return payload def get_optional_current_user( credentials: Optional[HTTPAuthorizationCredentials] = Depends( HTTPBearer(auto_error=False) ), ) -> Optional[dict]: """ Dependency function to get the current user if authenticated, None otherwise. This is useful for routes that have optional authentication where behavior changes based on whether a user is logged in or not. Args: credentials: Optional HTTP Bearer credentials from the Authorization header Returns: Optional[dict]: The decoded token payload or None if not authenticated Example: ```python @router.get("/content") async def get_content(user: Optional[dict] = Depends(get_optional_current_user)): if user: return {"content": "Premium content", "email": user.get("sub")} return {"content": "Public content"} ``` """ if credentials is None: return None try: token = credentials.credentials payload = verify_token(token) user_identifier: Optional[str] = payload.get("sub") if user_identifier is None: return None return payload except HTTPException: return None def require_scopes(*required_scopes: str): """ Dependency factory to require specific permission scopes. This function creates a dependency that checks if the authenticated user has all the required permission scopes. Args: *required_scopes: Variable number of scope strings required (e.g., "msp:read", "msp:write") Returns: Callable: A dependency function that validates scopes Raises: HTTPException: 403 if user lacks required scopes Example: ```python @router.post("/admin/action") async def admin_action( current_user: dict = Depends(get_current_user), _: None = Depends(require_scopes("msp:admin")) ): return {"message": "Admin action performed"} ``` """ def check_scopes(current_user: dict = Depends(get_current_user)) -> None: user_scopes = current_user.get("scopes", []) for scope in required_scopes: if scope not in user_scopes: raise HTTPException( status_code=status.HTTP_403_FORBIDDEN, detail=f"Missing required permission: {scope}", ) return check_scopes