Created
June 14, 2023 19:00
-
-
Save prydin/1c608fd6fdd8dcdac0985dd868579cdf to your computer and use it in GitHub Desktop.
Simplified vRO API
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from functools import lru_cache | |
import requests | |
import time | |
class AutomationClient: | |
""" | |
Low level Aria Automation/Orchestration client. | |
""" | |
url = "" | |
headers = { | |
"Content-Type": "application/json", | |
"Accept": "application/json" | |
} | |
def __init__(self, host): | |
self.url = f"https://{host}" | |
def get(self, url): | |
result = requests.get(url=self.url + url, headers=self.headers, verify=False) | |
if result.status_code < 200 or result.status_code > 299: | |
raise Exception(f"{result.status_code}: {result.content}") | |
return result.json() | |
def post(self, url, data): | |
result = requests.post(url=self.url + url, json=data, headers=self.headers, verify=False) | |
if result.status_code < 200 or result.status_code > 299: | |
raise Exception(f"{result.status_code}: {result.content}") | |
return result.json() | |
def patch(self, url, data): | |
result = requests.post(url=self.url + url, json=data, headers=self.headers, verify=False) | |
if result.status_code < 200 or result.status_code > 299: | |
raise Exception(f"{result.status_code}: {result.content}") | |
return result.json() | |
def authenticate_password(self, user, password, domain): | |
""" | |
Authenticate using username, password and domain. For on-prem use only. | |
""" | |
login = { | |
"username": user, | |
"password": password, | |
"domain": domain | |
} | |
result = self.post("/csp/gateway/am/api/login?access_token", login) | |
self.authenticate_token(result["refresh_token"]) | |
def authenticate_token(self, token): | |
""" | |
Authenticate using refresh token. For use with on-prem and SaaS. | |
:param token: A valid refresh token | |
:return: A bearer token | |
""" | |
login = {"refreshToken": token} | |
result = self.post("/iaas/api/login", login) | |
self.headers["Authorization"] = "Bearer " + result["token"] | |
def to_dict(values, key="name", value="value"): | |
""" | |
Converts a vRO key-value structure to a simple dict | |
:param values: The structure to convert | |
:param key: Name of the key field | |
:param value: Name of the value field | |
:return: A dict of values keyed by their names | |
""" | |
dict = {} | |
for attr in values: | |
if value in attr: | |
dict[attr[key]] = attr[value] | |
return dict | |
def infer_type(value): | |
""" | |
Infer the vRO type based on the Python type of the value supplied | |
:param value: The value to obtain the type for | |
:return: String representation of the vRO type | |
""" | |
if isinstance(value, (int, float)): | |
return "number" | |
if isinstance(value, str): | |
return "string" | |
if isinstance(value, (tuple, list)): | |
return "Array/Any" | |
if isinstance(value, dict): | |
return "Properties" | |
return "Any" | |
def encode_value(type, value): | |
""" | |
Encode a type and a value into a structure that can be used as an input to vRO | |
:param type: The vRO type name | |
:param value: The value | |
:return: A structure suitable for suppying as an input parameter | |
""" | |
if type == "Any": | |
result = {infer_type(value): {"value": value}} | |
elif type == "Properties": | |
properties = [] | |
result = {"properties": {"property": properties}} | |
for k, v in value.items(): | |
properties.append({"key": k, "value": encode_value(infer_type(v), v)}) | |
elif type.startswith("Array/"): | |
element_type = type[6:] | |
elements = [] | |
result = {"array": {"elements": elements}} | |
for v in value: | |
elements.append(encode_value(element_type, v)) | |
else: | |
result = {type: {"value": value}} | |
return result | |
def decode_value(node): | |
""" | |
Decodes an output value structure from vRO to a simple calue | |
:param node: The value node from the workflow output parameter | |
:return: A simple value | |
""" | |
v = node.get("number", node.get("string", None)) | |
if v: | |
return v["value"] | |
v = node.get("array", None) | |
if v: | |
array = [] | |
for element in v["elements"]: | |
array.append(decode_value(element)) | |
return array | |
v = node.get("properties", None) | |
if v: | |
props = {} | |
for property in v["property"]: | |
props[property["key"]] = decode_value(property["value"]) | |
return props | |
v = node.get("any", None) | |
if v: | |
return decode_value(v) | |
raise Exception("Don't know how to decode output value: " + str(node)) | |
class WorkflowExecution: | |
""" | |
Represents a running or completed execution of a workflow | |
""" | |
id = "" | |
workflow_id = "" | |
outputs = {} | |
state = "unknown" | |
client = None | |
def __init__(self, client, id, workflow_id, state, outputs=[]): | |
self.client = client | |
self.id = id | |
self.workflow_id = workflow_id | |
self.state = state | |
self.outputs = outputs | |
def wait_for_completion(self): | |
""" | |
Waits for a workflow to complete and returns its status and output parameters | |
:param execution: The execution object of the workflow run to wait for | |
:return: An object representing the status and output paramerers | |
""" | |
delay = 0.5 | |
factor = 1.1 | |
while True: | |
response = self.client.get( | |
"/vco/api/workflows/%s/executions/%s/state" % (self.workflow_id, self.id)) | |
state = response["value"] | |
if state != "running": | |
response = self.client.get( | |
"/vco/api/workflows/%s/executions/%s" % (self.workflow_id, self.id)) | |
outputs = {} | |
for p in response["output-parameters"]: | |
outputs[p["name"]] = decode_value(p["value"]) | |
return WorkflowExecution(self.client, self.id, self.workflow_id, state, outputs) | |
time.sleep(delay) | |
delay *= factor | |
class API: | |
""" | |
A simplified API for launching vRO workflows | |
""" | |
def __init__(self, client): | |
self.client = client | |
@lru_cache(maxsize=1000) | |
def get_workflow_metadata(self, id): | |
""" | |
Returns the metadata for a workflow | |
:param id: The ID of the workflow | |
:return: The metadata | |
""" | |
return self.client.get("/vco/api/workflows/%s/content" % id) | |
@lru_cache(maxsize=1000) | |
def get_workflow_by_name(self, name): | |
""" | |
Returns a workflow by its name | |
:param name: The name of the workflow | |
:return: A workflow object | |
""" | |
response = self.client.get("/vco/api/workflows?conditions=name~%s" % name) | |
workflows = response["link"] | |
if len(workflows) == 0: | |
raise Exception("Workflow not found") | |
if len(workflows) > 1: | |
raise Exception("More than one workflow found") | |
return workflows[0] | |
def execute_by_id_asynch(self, id, inputs={}): | |
""" | |
Starts a workflow without waiting for it to complete | |
:param id: The ID of the workflow | |
:param inputs: A dict of name/values representing the workflow inputs | |
:return: An object representing the workflow execution | |
""" | |
metadata = self.get_workflow_metadata(id) | |
input_types = to_dict(metadata["input"]["param"], value="type") | |
payload = [] | |
for key, value in inputs.items(): | |
type = input_types.get(key, None) | |
if type is None: | |
raise Exception("Parameter '%s' is not defined in the workflow" % key) | |
parameter = { | |
"name": key, | |
"scope": "local", | |
"type": type, | |
"value": encode_value(type, value) | |
} | |
payload.append(parameter) | |
response = self.client.post("/vco/api/workflows/%s/executions" % id, {"parameters": payload}) | |
return WorkflowExecution(self.client, response["id"], id, response["state"]) | |
def execute_by_id(self, id, inputs={}): | |
""" | |
Starts a workflow and waits for it to complete | |
:param id: The ID of the workflow | |
:param inputs: A dict of name/values representing the workflow inputs | |
:return: An object representing the workflow execution | |
""" | |
e = self.execute_by_id_asynch(id, inputs) | |
return e.wait_for_completion() | |
def execute_by_name_asynch(self, name, inputs): | |
""" | |
Starts a workflow without waiting for it to complete | |
:param name: The name of the workflow | |
:param inputs: A dict of name/values representing the workflow inputs | |
:return: An object representing the workflow execution | |
""" | |
workflow = self.get_workflow_by_name(name) | |
return self.execute_by_id_asynch(to_dict(workflow["attributes"])["id"], inputs) | |
def execute_by_name(self, name, inputs): | |
""" | |
Starts a workflow and waits for it to complete | |
:param name: The name of the workflow | |
:param inputs: A dict of name/values representing the workflow inputs | |
:return: An object representing the workflow execution | |
""" | |
e = self.execute_by_name_asynch(name, inputs) | |
return e.wait_for_completion() | |
client = AutomationClient("test.vmware.com") | |
client.authenticate_password("fritz", "****", "my.domain") | |
api = API(client) | |
e = api.execute_by_name("APITest", { | |
"string": "string", | |
"number": 42, | |
"string_array": ["foo", "bar"], | |
"properties": {"foo": "bar", "bar": "foo"}, | |
"any": "any"}) | |
print(e.outputs) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment