Created
January 8, 2025 14:11
-
-
Save Tishka17/240ba752756c6f8c81c274bbf0507fea to your computer and use it in GitHub Desktop.
open id connect client
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import http | |
import json | |
import logging | |
from dataclasses import dataclass | |
from datetime import datetime | |
from typing import Any | |
import aiohttp | |
from adaptix import Retort | |
@dataclass | |
class Config: | |
issuer: str | |
device_authorization_endpoint: str | |
token_endpoint: str | |
jwks_uri: str | |
@dataclass | |
class AuthResponse: | |
device_code: str | |
user_code: str | |
verification_uri: str | |
verification_uri_complete: str | |
expires_in: float | |
interval: float | |
@dataclass | |
class TokenResponse: | |
access_token: str | |
expires_in: float | |
refresh_expires_in: float | |
token_type: str | |
scope: str | |
refresh_token: str | |
@dataclass | |
class StoredToken: | |
client_id: str | |
access_token: str | |
expires_at: datetime | |
refresh_token: str | |
refresh_expires_at: datetime | |
token_type: str | |
retort = Retort() | |
logger = logging.getLogger(__name__) | |
CONFIG_URL_PATH = ".well-known/openid-configuration" | |
class OpenIDError(Exception): | |
pass | |
class RefreshUnavailableError(OpenIDError): | |
pass | |
class OpenIDClient: | |
def __init__(self, client_id: str) -> None: | |
self._client_id = client_id | |
self._session = aiohttp.ClientSession() | |
async def __aenter__(self) -> "OpenIDClient": | |
return self | |
async def __aexit__(self, exc_type: type, exc_val: BaseException, exc_tb: Any) -> None: | |
await self._session.close() | |
async def close(self) -> None: | |
await self._session.close() | |
@property | |
def client_id(self) -> str: | |
return self._client_id | |
async def get_config(self, keycloak_uri: str) -> Config: | |
keycloak_uri = keycloak_uri.rstrip("/") + "/" + CONFIG_URL_PATH | |
async with self._session.get(keycloak_uri) as resp: | |
config_raw = await resp.json() | |
return retort.load(config_raw, Config) | |
async def get_auth_data(self, config: Config) -> AuthResponse: | |
async with self._session.post( | |
config.device_authorization_endpoint, data={"client_id": self._client_id} | |
) as resp: | |
auth_raw = await resp.json() | |
return retort.load(auth_raw, AuthResponse) | |
async def get_token(self, config: Config, auth: AuthResponse) -> TokenResponse | None: | |
async with self._session.post( | |
config.token_endpoint, | |
data={ | |
"grant_type": "urn:ietf:params:oauth:grant-type:device_code", | |
"client_id": self._client_id, | |
"device_code": auth.device_code, | |
}, | |
) as resp: | |
if resp.status != http.HTTPStatus.OK: | |
return None | |
token_raw = await resp.json() | |
return retort.load(token_raw, TokenResponse) | |
async def refresh_token(self, config: Config, refresh_token: str) -> TokenResponse: | |
try: | |
async with self._session.post( | |
config.token_endpoint, | |
data={ | |
"grant_type": "refresh_token", | |
"client_id": self._client_id, | |
"refresh_token": refresh_token, | |
}, | |
) as resp: | |
if resp.status == http.HTTPStatus.BAD_REQUEST: | |
error_body = await resp.json() | |
if error_body.get("error") == "invalid_grant": | |
logger.debug("Token refresh cannot be done. Received status code: {resp.status}") | |
raise RefreshUnavailableError | |
logger.debug("Token refresh failed. Received status code: {resp.status}, body: {error_body}") | |
raise OpenIDError(f"Cannot refresh token. Received status code: {resp.status}") | |
elif resp.status != http.HTTPStatus.OK: | |
logger.debug("Token refresh failed. Received status code: {resp.status}") | |
raise OpenIDError(f"Cannot refresh token. Received status code: {resp.status}") | |
token_raw = await resp.json() | |
except (aiohttp.ClientError, json.JSONDecodeError) as e: | |
raise OpenIDError("Cannot refresh token") from e | |
return retort.load(token_raw, TokenResponse) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment