Skip to content

Instantly share code, notes, and snippets.

@MatrixManAtYrService
Last active December 26, 2024 20:03
Show Gist options
  • Save MatrixManAtYrService/18c3c4269c8513902c2d9fcd34e95a81 to your computer and use it in GitHub Desktop.
Save MatrixManAtYrService/18c3c4269c8513902c2d9fcd34e95a81 to your computer and use it in GitHub Desktop.
Dumping Dag Runs from Astronomer to CSV
import airflow_client
from airflow_client.client.api import dag_api, dag_run_api
from datetime import datetime
import csv
import sys
from time import sleep
# you will need a different token than this one, see Workspace Settings -> Access Management to create one
token = "eyJhbGcREDACTED-Jjw6GJAp5A"
# you will need a different url than this one, open the Airflow UI for the deployment of interest and get the URL from your browser
deployment_url = "https://clztuyh0a01z701mwxjahsza2.astronomer.run/d1elhp68"
configuration = airflow_client.client.Configuration(
host=deployment_url + "/api/v1"
)
def get_all_dag_runs(dagrun_api_instance, dag_id, limit=100):
"""
Get DAG runs for a specific DAG using pagination
"""
offset = 0
all_dag_runs = []
while True:
print(f" Fetching runs {offset} to {offset+limit} for {dag_id}...", file=sys.stderr)
sleep(0.25) # don't hammer the webeserver too hard
dag_runs_page = dagrun_api_instance.get_dag_runs(
dag_id=dag_id,
limit=limit,
offset=offset,
order_by='-start_date' # Get most recent first
)
if not dag_runs_page.dag_runs:
break
all_dag_runs.extend(dag_runs_page.dag_runs)
if len(dag_runs_page.dag_runs) < limit:
break
offset += limit
return all_dag_runs
# Prepare CSV file
csv_filename = "dag_info.csv"
csv_headers = ["DAG Name", "DAG Run Id", "Schedule", "Owner", "Start Date", "Duration (seconds)", "State"]
print(f"Writing results to {csv_filename}", file=sys.stderr)
with open(csv_filename, 'w', newline='') as csvfile:
writer = csv.DictWriter(csvfile, fieldnames=csv_headers)
writer.writeheader()
with airflow_client.client.ApiClient(configuration) as api_client:
api_client.default_headers['Authorization'] = f'Bearer {token}'
dag_api_instance = dag_api.DAGApi(api_client)
dagrun_api_instance = dag_run_api.DAGRunApi(api_client)
try:
print("Fetching list of DAGs...", file=sys.stderr)
dags_response = dag_api_instance.get_dags()
total_dags = len(dags_response.dags)
print(f"Found {total_dags} DAGs", file=sys.stderr)
for index, dag in enumerate(dags_response.dags, 1):
print(f"\nProcessing DAG {index}/{total_dags}: {dag.dag_id}", file=sys.stderr)
try:
dag_runs = get_all_dag_runs(dagrun_api_instance, dag.dag_id)
print(f" Found {len(dag_runs)} runs", file=sys.stderr)
for run in dag_runs:
duration = "N/A"
if run.start_date and run.end_date:
duration = (run.end_date - run.start_date).total_seconds()
writer.writerow({
"DAG Name": dag.dag_id,
"DAG Run Id": run.dag_run_id,
"Schedule": str(dag.schedule_interval) if dag.schedule_interval else "None",
"Owner": ", ".join(dag.owners),
"Start Date": run.start_date.isoformat() if run.start_date else "N/A",
"Duration (seconds)": duration,
"State": run.state
})
except airflow_client.client.ApiException as e:
print(f"Error processing DAG {dag.dag_id}: {e}", file=sys.stderr)
writer.writerow({
"DAG Name": dag.dag_id,
"Schedule": str(dag.schedule_interval) if dag.schedule_interval else "None",
"Owner": ", ".join(dag.owners),
"Start Date": "Error",
"Duration (seconds)": "Error",
"State": f"Error: {str(e)}"
})
except airflow_client.client.ApiException as e:
print(f"Failed to fetch DAGs: {e}", file=sys.stderr)
sys.exit(1)
print(f"\nComplete! Results written to {csv_filename}", file=sys.stderr)
apache-airflow-client==2.10.0
python-dateutil==2.9.0.post0
six==1.17.0
urllib3==2.3.0
$ pip install apache-airflow-client
Collecting apache-airflow-client
Using cached apache_airflow_client-2.10.0-py3-none-any.whl (1.5 MB)
Collecting urllib3>=1.25.3
Using cached urllib3-2.3.0-py3-none-any.whl (128 kB)
Collecting python-dateutil
Using cached python_dateutil-2.9.0.post0-py2.py3-none-any.whl (229 kB)
Collecting six>=1.5
Using cached six-1.17.0-py2.py3-none-any.whl (11 kB)
Installing collected packages: six, urllib3, python-dateutil, apache-airflow-client
Successfully installed apache-airflow-client-2.10.0 python-dateutil-2.9.0.post0 six-1.17.0 urllib3-2.3.0
$ python deployment2csv.py
Writing results to dag_info.csv
Fetching list of DAGs...
Found 100 DAGs
Processing DAG 1/100: generated_dag_001
Fetching runs 0 to 100 for generated_dag_001...
Fetching runs 100 to 200 for generated_dag_001...
Fetching runs 200 to 300 for generated_dag_001...
Fetching runs 300 to 400 for generated_dag_001...
Processing DAG 2/100: generated_dag_002
Fetching runs 0 to 100 for generated_dag_002...
...some output omitted...
Processing DAG 100/100: generated_dag_100
Fetching runs 0 to 100 for generated_dag_100...
Complete! Results written to dag_info.csv
@MatrixManAtYrService
Copy link
Author

This is not an officially supported method, just a hack I put together.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment