Created
January 28, 2021 14:31
-
-
Save f1nality/a823a4b7d447c25019fc02b1d63144cb to your computer and use it in GitHub Desktop.
Jet Bridge example for JWT auth
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import base64 | |
import gzip | |
import json | |
from hashlib import sha256 | |
import jwt | |
from jwt import PyJWTError | |
JWT_VERIFY_KEY = '-----BEGIN PUBLIC KEY-----\nMIICIjANBgkqhkiG9w0BAQEFAAOCAg8AMIICCgKCAgEAyfJablscZmsh7eHisWHi\n/x2gJUjc9juL4VLkEpp6PH9Ah+wBtGmPF9nfoVlQsgH9hra63POiaLUzIwW/ewOl\njyPF0FJngsxreCxNs8lmd/WNXrcnazknFItkFFeXJuMTfoBPQGiZOmVYb14jmvkc\n9vMmeXYjrbT+95SVayN3E6DzLoHDhny4Mka1OsxvIP5s77s0dOo68TzoEfBVeuto\nI/dopG86DVu4wYVtYPITzJ4z47OFVPKCyYVyy5aR3+DUnmdK7xTRVr+iWmHpcr7e\nhoeVcL4CqAILZ0gd54kQmnHbg7Bu6x8JtQkiLU5TQvWzjiN00io4eydvIAkQTAaR\nmdd32O1vJbSHmLyCR2tEW/uV7P25naPUlkApxuLzh5C21S0XJxNJ/P07KSMymt5U\n1lWqt4CInpjAwMI8qs9MkEwJev5+yumxqIrDKcQLMR3TBLJZIb+rL1teCLOW28qB\nL6VSKhfKRIaXUdLpRwAcSuXraTzwa9oCCZa19tw3uizMeMFrCrv43YbyOsS9h7JQ\n8ixj/a1R/ud0fCrhXWUl7nKlz0b15koILLG1Ts+MUTmIaEnHTVEY74CfJVq7waw9\nx2kyzSzbsmMXvFkrVzTmyImTN631+gatU+npJ3vtcD9SooEZLOCLa4pb+DIsv9P1\nEeIEAh1VZC7s2qsQZsiYTG0CAwEAAQ==\n-----END PUBLIC KEY-----\n' | |
def get_sha256_hash(value): | |
return sha256(value.encode('utf-8')).hexdigest() | |
def decompress_data(value): | |
bytes = base64.b64decode(value) | |
data = gzip.decompress(bytes) | |
decoded = data.decode('utf-8') | |
result = json.loads(decoded) | |
return result | |
def has_permissions(user_permissions, project_token, permission_type, permission_object, permission_actions): | |
if user_permissions.get('owner'): | |
return True | |
elif user_permissions.get('super_group'): | |
return True | |
if 'permissions' in user_permissions: | |
permissions = decompress_data(user_permissions['permissions']) | |
else: | |
permissions = [] | |
if user_permissions.get('read_only'): | |
if permission_type == 'model' and all(map(lambda x: x in ['r'], list(permission_actions))): | |
return True | |
else: | |
return False | |
token_hash = get_sha256_hash(project_token.replace('-', '').lower()) | |
for item in permissions: | |
item_type = item.get('permission_type', '') | |
item_object = item.get('permission_object', '') | |
item_actions = item.get('permission_actions', '') | |
if permission_type == 'model': | |
resource_token_hash = item.get('resource_token_hash', '') | |
item_object_model = item_object.split('.', 1)[-1:][0] | |
if resource_token_hash and resource_token_hash != token_hash: | |
continue | |
if item_type != permission_type or item_object_model != permission_object: | |
continue | |
else: | |
if item_type != permission_type or item_object != permission_object: | |
continue | |
return permission_actions in item_actions | |
return False | |
# i.e. JWT eyJ0eXAiOiJKV1QiLCJhbGciOiJSUzI1NiJ9.eyJ.... | |
def check_token(project_unique_name, project_token, user_token, permission_type, permission_object, permission_actions): | |
token_type, token_value = ' '.split(user_token) | |
try: | |
result = jwt.decode(token_value, key=JWT_VERIFY_KEY, algorithms=['RS256']) | |
except PyJWTError: | |
return False | |
user_permissions = result.get('projects', {}).get(project_unique_name) | |
if user_permissions is None: | |
return False | |
return has_permissions(user_permissions, project_token, permission_type, permission_object, permission_actions) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment