Source code for hostedpi.auth

from datetime import datetime, timedelta
from importlib.metadata import version
from typing import Union

from pydantic import ValidationError
from requests import HTTPError, Session
from structlog import get_logger

from .exc import MythicAuthenticationError
from .models.mythic.responses import AuthResponse
from .settings import Settings

hostedpi_version = version("hostedpi")
logger = get_logger()


[docs] class MythicAuth: """ This class handles authentication with the Mythic Beasts Hosted Pi API. It manages the access token used to authenticate requests to the API, and automatically refreshes it when it expires. :type settings: :class:`~hostedpi.settings.Settings` or None :param settings: The settings used to configure the API. If not provided, defaults to a new instance of :class:`~hostedpi.settings.Settings`. :type auth_session: :class:`requests.Session` or None :param auth_session: The session used to make authentication requests. If not provided, defaults to a new :class:`requests.Session`. :type api_session: :class:`requests.Session` or None :param api_session: The session used to make API requests. If not provided, defaults to a new :class:`requests.Session`. .. warning:: This is for advanced use only. Most users should not need to interact with the authentication system directly, as it is handled automatically by :class:`~hostedpi.picloud.PiCloud`. :raises pydantic_core.ValidationError: If the provided settings are invalid or missing required fields. """ def __init__( self, *, settings: Union[Settings, None] = None, auth_session: Union[Session, None] = None, api_session: Union[Session, None] = None, ): if settings is None: settings = Settings() if auth_session is None: auth_session = Session() if api_session is None: api_session = Session() self._settings = settings self._token = None self._token_expiry = datetime.now() api_session.headers = { "User-Agent": f"python-hostedpi/{hostedpi_version}", } self._auth_session = auth_session self._api_session = api_session def __repr__(self): return f"<MythicAuth id={self._settings.id}>" @property def session(self) -> Session: """ The session used to make requests to the Hosted Pi API """ self._api_session.headers["Authorization"] = f"Bearer {self.token}" return self._api_session @property def settings(self) -> Settings: """ The settings used to configure the API """ return self._settings @property def token(self) -> str: """ The access token used to authenticate requests to the API. The token is automatically refreshed when it expires. """ if self._token is None or datetime.now() > self._token_expiry: data = {"grant_type": "client_credentials"} creds = (self.settings.id, self.settings.secret.get_secret_value()) logger.debug("Authenticating", client_id=self.settings.id) response = self._auth_session.post(str(self.settings.auth_url), auth=creds, data=data) try: response.raise_for_status() except HTTPError as exc: logger.debug("Failed to authenticate", error=str(exc)) raise MythicAuthenticationError("Failed to authenticate") from exc try: body = AuthResponse.model_validate(response.json()) except ValidationError as exc: logger.debug("Failed to validate auth response", error=str(exc)) raise MythicAuthenticationError("Failed to validate auth response") from exc self._token = body.access_token self._token_expiry = datetime.now() + timedelta(seconds=body.expires_in) logger.debug("Got token", expires_in=body.expires_in, expires_at=self._token_expiry) return self._token