# auth/tokens.py
from __future__ import annotations
from datetime import datetime, timedelta
from fastapi import Depends, HTTPException, Query, Request, Response, status
from jose import JWTError, jwt
from .config import (
ALGORITHM,
SECRET_KEY,
TOKEN_TTL_MINUTES,
TOKEN_TTL_SECONDS,
oauth2_scheme,
pwd_ctx,
)
from .models import Token, User
from .roles import role_level
from .users import USERS, _User
def _verify_password(plain: str, hashed: str) -> bool:
return pwd_ctx.verify(plain, hashed)
def _authenticate(username: str, password: str) -> _User | None:
usr = USERS.get(username)
if usr and _verify_password(password, usr.hashed):
return usr
return None
def _create_access_token(
data: dict, ttl_minutes: int = TOKEN_TTL_MINUTES
) -> str:
exp = datetime.utcnow() + timedelta(minutes=ttl_minutes)
to_encode = {**data, "exp": int(exp.timestamp())}
return jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
def _extract_token(
request: Request, bearer_token: str | None, query_token: str | None
) -> str | None:
"""
1. Authorization header (Bearer …)
2. ?token=… (handy for simple GET tests like curl/browser)
3. taranta_jwt cookie (automatic with our login routes)
"""
return bearer_token or query_token or request.cookies.get("taranta_jwt")
[docs]
async def get_current_user(
request: Request,
token: str | None = Depends(oauth2_scheme),
query_token: str | None = Query(default=None, alias="token"),
) -> User:
"""
Shared dependency - accepts header / query / cookie.
"""
exc = HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Could not validate credentials",
headers={"WWW-Authenticate": "Bearer"},
)
token_val = _extract_token(request, token, query_token)
if not token_val:
raise exc
try:
payload = jwt.decode(token_val, SECRET_KEY, algorithms=[ALGORITHM])
username: str | None = payload.get("username") or payload.get("sub")
if not username or username not in USERS:
raise exc
except JWTError:
raise exc
u = USERS[username]
return User(
username=u.username,
full_name=u.full_name,
role=u.role,
role_level=role_level(u.role),
groups=u.groups,
)
def _issue_session(response: Response, usr: _User) -> Token:
"""
Common path for both local and MSAL sign-ins.
"""
payload = {
"sub": usr.username,
"username": usr.username,
"role": usr.role,
"role_level": role_level(usr.role),
"groups": usr.groups,
}
token = _create_access_token(payload)
response.set_cookie(
key="taranta_jwt",
value=token,
max_age=TOKEN_TTL_SECONDS,
httponly=True,
samesite="lax",
path="/",
)
return Token(access_token=token, expires_in=TOKEN_TTL_SECONDS)