Last active
May 28, 2021 15:47
-
-
Save nathairtras/6ce0b0294be8c27d672e2ad52e8f2117 to your computer and use it in GitHub Desktop.
Callback to clear Airflow SubDag on retry
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import logging | |
from airflow.models import DagBag | |
def callback_subdag_clear(context): | |
"""Clears a subdag's tasks on retry.""" | |
dag_id = "{}.{}".format( | |
context['dag'].dag_id, | |
context['ti'].task_id, | |
) | |
execution_date = context['execution_date'] | |
sdag = DagBag().get_dag(dag_id) | |
logging.info("Clearing SubDag: {} {}".format(dag_id, execution_date)) | |
sdag.clear( | |
start_date=execution_date, | |
end_date=execution_date, | |
only_failed=False, | |
only_running=False, | |
confirm_prompt=False, | |
include_subdags=False) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
You may want to check out this fork and give it a try, it grabs the subdag variable differently, possibly for the very issue you encountered.
https://gist.github.com/camilomartinez/84c5a8bb41ad687ef0b32369a030cdc0
I haven't used subdags since 2017. They were giving us more headaches than they were worth in 1.7, but a lot has changed since then. Don't have enough time right now to invest in exploring them again.