|
import airflow_client |
|
from airflow_client.client.api import dag_api, dag_run_api |
|
from datetime import datetime |
|
import csv |
|
import sys |
|
from time import sleep |
|
|
|
# you will need a different token than this one, see Workspace Settings -> Access Management to create one |
|
token = "eyJhbGcREDACTED-Jjw6GJAp5A" |
|
|
|
# you will need a different url than this one, open the Airflow UI for the deployment of interest and get the URL from your browser |
|
deployment_url = "https://clztuyh0a01z701mwxjahsza2.astronomer.run/d1elhp68" |
|
|
|
configuration = airflow_client.client.Configuration( |
|
host=deployment_url + "/api/v1" |
|
) |
|
|
|
def get_all_dag_runs(dagrun_api_instance, dag_id, limit=100): |
|
""" |
|
Get DAG runs for a specific DAG using pagination |
|
""" |
|
offset = 0 |
|
all_dag_runs = [] |
|
|
|
while True: |
|
print(f" Fetching runs {offset} to {offset+limit} for {dag_id}...", file=sys.stderr) |
|
sleep(0.25) # don't hammer the webeserver too hard |
|
dag_runs_page = dagrun_api_instance.get_dag_runs( |
|
dag_id=dag_id, |
|
limit=limit, |
|
offset=offset, |
|
order_by='-start_date' # Get most recent first |
|
) |
|
|
|
if not dag_runs_page.dag_runs: |
|
break |
|
|
|
all_dag_runs.extend(dag_runs_page.dag_runs) |
|
|
|
if len(dag_runs_page.dag_runs) < limit: |
|
break |
|
|
|
offset += limit |
|
|
|
return all_dag_runs |
|
|
|
# Prepare CSV file |
|
csv_filename = "dag_info.csv" |
|
csv_headers = ["DAG Name", "DAG Run Id", "Schedule", "Owner", "Start Date", "Duration (seconds)", "State"] |
|
|
|
print(f"Writing results to {csv_filename}", file=sys.stderr) |
|
with open(csv_filename, 'w', newline='') as csvfile: |
|
writer = csv.DictWriter(csvfile, fieldnames=csv_headers) |
|
writer.writeheader() |
|
|
|
with airflow_client.client.ApiClient(configuration) as api_client: |
|
api_client.default_headers['Authorization'] = f'Bearer {token}' |
|
dag_api_instance = dag_api.DAGApi(api_client) |
|
dagrun_api_instance = dag_run_api.DAGRunApi(api_client) |
|
|
|
try: |
|
print("Fetching list of DAGs...", file=sys.stderr) |
|
dags_response = dag_api_instance.get_dags() |
|
total_dags = len(dags_response.dags) |
|
print(f"Found {total_dags} DAGs", file=sys.stderr) |
|
|
|
for index, dag in enumerate(dags_response.dags, 1): |
|
print(f"\nProcessing DAG {index}/{total_dags}: {dag.dag_id}", file=sys.stderr) |
|
|
|
try: |
|
dag_runs = get_all_dag_runs(dagrun_api_instance, dag.dag_id) |
|
print(f" Found {len(dag_runs)} runs", file=sys.stderr) |
|
|
|
for run in dag_runs: |
|
duration = "N/A" |
|
if run.start_date and run.end_date: |
|
duration = (run.end_date - run.start_date).total_seconds() |
|
|
|
writer.writerow({ |
|
"DAG Name": dag.dag_id, |
|
"DAG Run Id": run.dag_run_id, |
|
"Schedule": str(dag.schedule_interval) if dag.schedule_interval else "None", |
|
"Owner": ", ".join(dag.owners), |
|
"Start Date": run.start_date.isoformat() if run.start_date else "N/A", |
|
"Duration (seconds)": duration, |
|
"State": run.state |
|
}) |
|
|
|
except airflow_client.client.ApiException as e: |
|
print(f"Error processing DAG {dag.dag_id}: {e}", file=sys.stderr) |
|
writer.writerow({ |
|
"DAG Name": dag.dag_id, |
|
"Schedule": str(dag.schedule_interval) if dag.schedule_interval else "None", |
|
"Owner": ", ".join(dag.owners), |
|
"Start Date": "Error", |
|
"Duration (seconds)": "Error", |
|
"State": f"Error: {str(e)}" |
|
}) |
|
|
|
except airflow_client.client.ApiException as e: |
|
print(f"Failed to fetch DAGs: {e}", file=sys.stderr) |
|
sys.exit(1) |
|
|
|
print(f"\nComplete! Results written to {csv_filename}", file=sys.stderr) |
This is not an officially supported method, just a hack I put together.