Source code for auth.tokens

# auth/tokens.py
from __future__ import annotations

from datetime import datetime, timedelta

from fastapi import Depends, HTTPException, Query, Request, Response, status
from jose import JWTError, jwt

from .config import (
    ALGORITHM,
    SECRET_KEY,
    TOKEN_TTL_MINUTES,
    TOKEN_TTL_SECONDS,
    oauth2_scheme,
    pwd_ctx,
)
from .models import Token, User
from .roles import role_level
from .users import USERS, _User


def _verify_password(plain: str, hashed: str) -> bool:
    return pwd_ctx.verify(plain, hashed)


def _authenticate(username: str, password: str) -> _User | None:
    usr = USERS.get(username)
    if usr and _verify_password(password, usr.hashed):
        return usr
    return None


def _create_access_token(
    data: dict, ttl_minutes: int = TOKEN_TTL_MINUTES
) -> str:
    exp = datetime.utcnow() + timedelta(minutes=ttl_minutes)
    to_encode = {**data, "exp": int(exp.timestamp())}
    return jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)


def _extract_token(
    request: Request, bearer_token: str | None, query_token: str | None
) -> str | None:
    """
    1. Authorization header  (Bearer …)
    2. ?token=…              (handy for simple GET tests like curl/browser)
    3. taranta_jwt cookie    (automatic with our login routes)
    """
    return bearer_token or query_token or request.cookies.get("taranta_jwt")


[docs] async def get_current_user( request: Request, token: str | None = Depends(oauth2_scheme), query_token: str | None = Query(default=None, alias="token"), ) -> User: """ Shared dependency - accepts header / query / cookie. """ exc = HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Could not validate credentials", headers={"WWW-Authenticate": "Bearer"}, ) token_val = _extract_token(request, token, query_token) if not token_val: raise exc try: payload = jwt.decode(token_val, SECRET_KEY, algorithms=[ALGORITHM]) username: str | None = payload.get("username") or payload.get("sub") if not username or username not in USERS: raise exc except JWTError: raise exc u = USERS[username] return User( username=u.username, full_name=u.full_name, role=u.role, role_level=role_level(u.role), groups=u.groups, )
def _issue_session(response: Response, usr: _User) -> Token: """ Common path for both local and MSAL sign-ins. """ payload = { "sub": usr.username, "username": usr.username, "role": usr.role, "role_level": role_level(usr.role), "groups": usr.groups, } token = _create_access_token(payload) response.set_cookie( key="taranta_jwt", value=token, max_age=TOKEN_TTL_SECONDS, httponly=True, samesite="lax", path="/", ) return Token(access_token=token, expires_in=TOKEN_TTL_SECONDS)