Last active
February 10, 2024 21:35
-
-
Save hanleybrand/bafaa42d43d1faa93771dae3798c7656 to your computer and use it in GitHub Desktop.
Importing instructure-dap-client and using it programatically - quickstart example scripts
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import os | |
from pathlib import Path | |
import shutil | |
from urllib.parse import urlparse | |
from dap.api import DAPClient | |
from dap.dap_types import Format, IncrementalQuery, SnapshotQuery | |
import requests | |
output_dir_base = Path("downloads") | |
table_name = 'accounts' | |
ns = 'canvas' | |
fmt = Format('csv') | |
snapshot = SnapshotQuery(format=fmt, filter=None) | |
with DAPClient() as dc: | |
start_job = dc.query_snapshot(ns, table_name, snapshot) | |
job_results = dc.await_job(start_job) | |
objects = dc.get_objects(job_results.id) | |
resources = dc.get_resources(objects) | |
# downloads = dc.download_resources(resources, job_results.id, output_dir_base)# dc. | |
# download_resources is omitted and replaced with | |
# a copy paste of the methods code altered to seperate the downloads | |
# into table_name folders and prepend table_name to each file name | |
downloads = [] | |
dir = output_dir_base / table_name / f"{table_name}_job_{job_results.id}" | |
os.makedirs(dir, exist_ok=True) | |
for u in resources: | |
url = str(u.url) | |
url_path = urlparse(url).path | |
file_base_name = os.path.basename(url_path) | |
file_path = os.path.join(dir, file_base_name) | |
with requests.get(url, stream=True) as data, open(file_path, "wb") as file: | |
# save gzip data to file without decompressing | |
shutil.copyfileobj(data.raw, file) | |
downloads.append(file_path) | |
print(f"Files from server downloaded to folder: {dir}") | |
print(downloads) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import os | |
from datetime import datetime | |
from pathlib import Path | |
from dap.api import DAPClient | |
from dap.dap_types import Format, IncrementalQuery, SnapshotQuery | |
# -- Note, this script assumes you've set up dap correctly | |
# more specifically that you have ENV variables DAP_API_URL and DAP_API_KEY set up as per | |
# https://pypi.org/project/instructure-dap-client/ so that | |
# in a shell dap commands (e.g. `dap list`, `dap incremental --namespace canvas --table accounts` ) work locally | |
# if this is not the case, set up the cli first | |
# setup variables | |
table_name = 'accounts' | |
output_dir_base = Path("downloads") # this can be replaced with an absolute path (e.g. `/users/me/cd2_dl' etc) | |
print(f"Data downloads will be saved here: {output_dir_base.absolute()}") | |
# dap.DAPClient specifies a variety of types (defined in dap.types) , many of which employ python's new-ish dataclasses | |
# to handle data between steps. You'll need to instantiate a couple of them | |
# (specifically SnapshotQuery or IncrementalQuery) to start, and all the methods of DAPClient you'll use in your code | |
# return dap.types as well | |
# see https://realpython.com/python-data-classes/ or https://docs.python.org/3/library/dataclasses.html for more | |
ns = 'canvas' | |
# note: Format is an enum, not a dataclass | |
fmt = Format('csv') | |
snapshot = SnapshotQuery(format=fmt, filter=None) | |
# under the hood note: | |
# DAPClient uses the python-requests library to implement all the tedious 'do i have to' functionality like authentication, | |
# session maintenence, and constructing the GET and POST requests needed to interact with the API. | |
# If you have experience with python-requests, note that DAPClient should be invoked as a context manager as described in | |
# https://requests.readthedocs.io/en/latest/user/advanced/#session-objects | |
with DAPClient() as dc: | |
print("Beginning query/retrieval, this may take a few minutes - wait for ALL_CLEAR before evaluating variables") | |
# optional, you can get a list of the table names for writing loops that iterate through all the tables: | |
# dap_tables = dc.get_tables(ns) | |
start_job = dc.query_snapshot(ns, table_name, snapshot) | |
# TableJob( | |
# id='ea55451d-7515-4f4c-8b42-2c2c373c7540', | |
# status=<JobStatus.Waiting: 'waiting'>, | |
# expires_at=datetime.datetime(2023, 2, 1, 19, 18, 47, tzinfo=datetime.timezone.utc)) | |
print(f"job id {start_job.id} is now pending") | |
job_results = dc.await_job(start_job) # note, await_job implements a wait | |
# CompleteSnapshotJob( | |
# id='ea55451d-7515-4f4c-8b42-2c2c373c7540', | |
# status=<JobStatus.Complete: 'complete'>, | |
# expires_at=datetime.datetime(expires_dt), | |
# objects=[Object(id='self.id/part-00000-e45ddcac-8c4a-4058-87f6-f05a6278034d-c000.csv.gz')], | |
# schema_version=1, | |
# at=datetime.datetime(2023, 1, 31, 16, 30, 37, tzinfo=datetime.timezone.utc)) | |
print(f"Update on job id {start_job.id} : {job_results.status}") | |
# -- sidebar -- | |
# if you are automating you need to save at least the table name and job_results.at value | |
# so that when you schedule future incremental jobs, ie | |
# next_incremental = IncrementalQuery(format=fmt, since=job_results.at, filter=None, until=None) | |
# note: you don't need the table name to create the IncrementalQuery, but you'll need it to know | |
# which tablename a specific previous_query_job_results.at is paired with | |
next_incremental = IncrementalQuery(format=fmt, since=job_results.at, filter=None, until=None) | |
# -- end sidebar -- | |
objects = dc.get_objects(job_results.id) | |
# [Object( | |
# id='ea55451d-7515-4f4c-8b42-2c2c373c7540/part-00000-e45ddcac-8c4a-4058-87f6-f05a6278034d-c000.csv.gz')] | |
resources = dc.get_resources(objects) | |
# [Resource(url= | |
# URL(url='https://data-access-platform-output-prod-iad.s3.amazonaws.com/output/rootAccountId%3DndqgwsoafylHTTXdvHbgAfcLstsHCInrvmRfjLzU/queryId%3D97f43c5a-463c-4207-b24a-60e8954d1bb0/part-00000-068e82e4-99c2-47ac-b6d1-1555900627c4-c000.csv.gz?X-Amz-Security-Token=FwoGZXIvYXdzEFsaDO%2Ft%2BAIlm6FW34A%2FtyK7AUQmSLHgXLsS%2FnRG%2FYsD5TB%2BFNKH%2F%2F0xmQ9KjUXeNTHAnshJqLMB9p7B723kUwGftrCO9BOSXXw3bjhkjFdjAPU1CI6xE%2Bo46wMOqnH%2FQTEXYbFtg%2B60FukzsIcx8MtQpQy2nOTwWPry%2Bbd3%2BB06O0kesOUl54x3E5B5zY7xlUcdRPnqcMfZwThkEGFgXdOHu16j81LWd4otdi4goI0EEYSSCYoGDu0oqPef3mf7zOl%2BZ%2F4LE%2BsLQLJg704o4MzqngYyLSWuHaY%2BHekiZvL%2FAWfqvCmGGWR5%2B6Ya%2F%2Fs9T%2ByLAhG5v5PQRMewAAKm8c7uvA%3D%3D&X-Amz-Algorithm=AWS4-HMAC-SHA256&X-Amz-Date=20230201T175242Z&X-Amz-SignedHeaders=host&X-Amz-Expires=3599&X-Amz-Credential=ASIAXX2PINZLJ24HOBP3%2F20230201%2Fus-east-1%2Fs3%2Faws4_request&X-Amz-Signature=ac8b11d84869bab0785d3cd973cc277b887a830c6c98fea945dc2f11eb023cd2'))] | |
print(f"beginning download of {[dr.url.url for dr in resources]}") | |
downloads = dc.download_resources(resources, job_results.id, output_dir_base) | |
# downloads the file[s] to the path specified in output_dir_base | |
# ['/Users/hanley/dev/k8s-airflow/canvas_data/downloads/job_ea55451d-7515-4f4c-8b42-2c2c373c7540/part-00000-ee952c38-4185-4cc3-ac55-16e123706759-c000.csv.gz'] | |
print(f"files downloaded to {output_dir_base.absolute()}/job_{job_results.id}/ as if you had invoked") | |
print(f"\t'dap snapshot --namespace {ns} --table {table_name} --output-directory {output_dir_base}' \nin a shell") | |
print("""if in an interactive shell, inspect the variables | |
start_job, job_results, objects, resources, downloads""") | |
print("\nALL_CLEAR") | |
print(f"\n\nNext Incremental job for table {table_name} can use this IncrementalQuery:\n{next_incremental}") | |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import os | |
from pathlib import Path | |
from dap.api import DAPClient | |
from dap.dap_types import Format, IncrementalQuery, SnapshotQuery | |
output_dir_base = Path("downloads") | |
table_name = 'accounts' | |
ns = 'canvas' | |
fmt = Format('csv') | |
snapshot = SnapshotQuery(format=fmt, filter=None) | |
with DAPClient() as dc: | |
start_job = dc.query_snapshot(ns, table_name, snapshot) | |
job_results = dc.await_job(start_job) | |
objects = dc.get_objects(job_results.id) | |
resources = dc.get_resources(objects) | |
downloads = dc.download_resources(resources, job_results.id, output_dir_base) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment