Last active
January 6, 2022 19:32
-
-
Save pingzh/e849084934588a20ce3c440598669fd8 to your computer and use it in GitHub Desktop.
for airflow perf test for ti creation inside the dag_run verify_integrity. The test is against a database without other traffic
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import time | |
import logging | |
from airflow.utils.db import create_session | |
from airflow.utils import timezone | |
from airflow.models import TaskInstance | |
from airflow.models.serialized_dag import SerializedDagModel | |
logger = logging.getLogger(__name__) | |
out_hdlr = logging.FileHandler('./log.txt') | |
out_hdlr.setFormatter(logging.Formatter('%(asctime)s %(message)s')) | |
out_hdlr.setLevel(logging.INFO) | |
logger.addHandler(out_hdlr) | |
logger.setLevel(logging.INFO) | |
def create_tis_in_new_dag_run(dag, execution_date, number_of_tis): | |
tasks = list(dag.task_dict.values())[0:number_of_tis] | |
t1 = time.time() | |
success = True | |
try: | |
with create_session() as session: | |
for task in : | |
ti = TaskInstance(task, execution_date) | |
session.add(ti) | |
session.flush() | |
except: | |
success = False | |
t2 = time.time() | |
logger.info('Created %s tis. success?: %s, perf: %s', number_of_tis, success, t2 - t1) | |
return t2 - t1, success | |
def perf_tis_creation(dag): | |
perf = {} | |
for number_of_tis in [1000, 3000, 5000, 10000, 15000, 20000, 25000]: | |
next_run_date = timezone.utcnow() | |
perf, success = create_tis_in_new_dag_run(dag, next_run_date, number_of_tis) | |
perf[number_of_tis] = (perf, success) | |
time.sleep(5) | |
if __name__ == '__main__': | |
dag_id = 'a-very-large-dag' | |
dm = SerializedDagModel.get(dag_id) | |
dag = dm.dag | |
perf_tis_creation(dag) |
Good stuff @ashb ! I remember times when I got even better results by just batchig DB writes (Telecom Billing process). I think there are only two reason whu you should NOT do batching:
- When you care about transactional integrity "per object"
- When you have risk of deadlocks
I think in this case we have neither the 1) need nor 2) risk.
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Yeah I was surprised by just how much it sped up!