Skip to content

Instantly share code, notes, and snippets.

@Tishka17
Created January 8, 2025 14:11
Show Gist options
  • Save Tishka17/240ba752756c6f8c81c274bbf0507fea to your computer and use it in GitHub Desktop.
Save Tishka17/240ba752756c6f8c81c274bbf0507fea to your computer and use it in GitHub Desktop.
open id connect client
import http
import json
import logging
from dataclasses import dataclass
from datetime import datetime
from typing import Any
import aiohttp
from adaptix import Retort
@dataclass
class Config:
issuer: str
device_authorization_endpoint: str
token_endpoint: str
jwks_uri: str
@dataclass
class AuthResponse:
device_code: str
user_code: str
verification_uri: str
verification_uri_complete: str
expires_in: float
interval: float
@dataclass
class TokenResponse:
access_token: str
expires_in: float
refresh_expires_in: float
token_type: str
scope: str
refresh_token: str
@dataclass
class StoredToken:
client_id: str
access_token: str
expires_at: datetime
refresh_token: str
refresh_expires_at: datetime
token_type: str
retort = Retort()
logger = logging.getLogger(__name__)
CONFIG_URL_PATH = ".well-known/openid-configuration"
class OpenIDError(Exception):
pass
class RefreshUnavailableError(OpenIDError):
pass
class OpenIDClient:
def __init__(self, client_id: str) -> None:
self._client_id = client_id
self._session = aiohttp.ClientSession()
async def __aenter__(self) -> "OpenIDClient":
return self
async def __aexit__(self, exc_type: type, exc_val: BaseException, exc_tb: Any) -> None:
await self._session.close()
async def close(self) -> None:
await self._session.close()
@property
def client_id(self) -> str:
return self._client_id
async def get_config(self, keycloak_uri: str) -> Config:
keycloak_uri = keycloak_uri.rstrip("/") + "/" + CONFIG_URL_PATH
async with self._session.get(keycloak_uri) as resp:
config_raw = await resp.json()
return retort.load(config_raw, Config)
async def get_auth_data(self, config: Config) -> AuthResponse:
async with self._session.post(
config.device_authorization_endpoint, data={"client_id": self._client_id}
) as resp:
auth_raw = await resp.json()
return retort.load(auth_raw, AuthResponse)
async def get_token(self, config: Config, auth: AuthResponse) -> TokenResponse | None:
async with self._session.post(
config.token_endpoint,
data={
"grant_type": "urn:ietf:params:oauth:grant-type:device_code",
"client_id": self._client_id,
"device_code": auth.device_code,
},
) as resp:
if resp.status != http.HTTPStatus.OK:
return None
token_raw = await resp.json()
return retort.load(token_raw, TokenResponse)
async def refresh_token(self, config: Config, refresh_token: str) -> TokenResponse:
try:
async with self._session.post(
config.token_endpoint,
data={
"grant_type": "refresh_token",
"client_id": self._client_id,
"refresh_token": refresh_token,
},
) as resp:
if resp.status == http.HTTPStatus.BAD_REQUEST:
error_body = await resp.json()
if error_body.get("error") == "invalid_grant":
logger.debug("Token refresh cannot be done. Received status code: {resp.status}")
raise RefreshUnavailableError
logger.debug("Token refresh failed. Received status code: {resp.status}, body: {error_body}")
raise OpenIDError(f"Cannot refresh token. Received status code: {resp.status}")
elif resp.status != http.HTTPStatus.OK:
logger.debug("Token refresh failed. Received status code: {resp.status}")
raise OpenIDError(f"Cannot refresh token. Received status code: {resp.status}")
token_raw = await resp.json()
except (aiohttp.ClientError, json.JSONDecodeError) as e:
raise OpenIDError("Cannot refresh token") from e
return retort.load(token_raw, TokenResponse)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment