Skip to content

Instantly share code, notes, and snippets.

@tameem92
Created April 29, 2021 12:45
Show Gist options
  • Save tameem92/14547743b03730fb8ef9e0e361d1a6ef to your computer and use it in GitHub Desktop.
Save tameem92/14547743b03730fb8ef9e0e361d1a6ef to your computer and use it in GitHub Desktop.
VERSION='v1.0.3'
# DAG Factory
factory = DAGFactory(config.environment)
process_invoices_dag = factory.create('ProcessInvoices', config.process_invoices).build()
process_messages_dag = factory.create('ProcessMessages', config.process_messages).build()
def create_main_dag(dag_id, org_config):
with DAG(dag_id=f'main_{VERSION}',
default_args=default_args,
schedule_interval=config.controller['schedule'],
catchup=False,
template_searchpath=['/home/airflow/gcs/dags'],
) as dag:
# Reconfigure the dag object as a subdag
# under our main dag
process_invoices = SubDagOperator(
task_id='process_invoices',
subdag=process_invoices_dag,
)
process_messages = SubDagOperator(
task_id='process_messages',
subdag=process_messages_dag, 'process_messages')
)
return dag
# Loop over dynamic config and auto generate
# dags for each client.
for client_config in client_configs:
parsed_client_config = config.parse_config(client_config)
main_dag_id = f"{client_config['identifier']}_main_{VERSION}"
globals()[main_dag_id] = create_main_dag(
main_dag_id,
parsed_client_config
)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment