From 958fe8ea8394da5114ba9205c893bd0fcb20511b Mon Sep 17 00:00:00 2001 From: Harshal Sheth Date: Tue, 25 May 2021 22:47:00 -0700 Subject: [PATCH] feat(ingest): populate inputDatajobs field in airflow integration (#2606) --- .../src/datahub_provider/_lineage_core.py | 6 ++++- metadata-ingestion/tests/unit/test_airflow.py | 24 ++++++++++++------- 2 files changed, 20 insertions(+), 10 deletions(-) diff --git a/metadata-ingestion/src/datahub_provider/_lineage_core.py b/metadata-ingestion/src/datahub_provider/_lineage_core.py index f598997617..239222d0e1 100644 --- a/metadata-ingestion/src/datahub_provider/_lineage_core.py +++ b/metadata-ingestion/src/datahub_provider/_lineage_core.py @@ -51,7 +51,7 @@ def send_lineage_to_datahub( ) dag: "DAG" = context["dag"] - task = context["task"] + task: "BaseOperator" = context["task"] # TODO: capture context # context dag_run @@ -150,6 +150,10 @@ def send_lineage_to_datahub( models.DataJobInputOutputClass( inputDatasets=_entities_to_urn_list(inlets or []), outputDatasets=_entities_to_urn_list(outlets or []), + inputDatajobs=[ + builder.make_data_job_urn_with_flow(flow_urn, task_id) + for task_id in task.upstream_task_ids + ], ), *ownership_aspect, *tags_aspect, diff --git a/metadata-ingestion/tests/unit/test_airflow.py b/metadata-ingestion/tests/unit/test_airflow.py index 9b1b7bd29f..6e8f68dc8a 100644 --- a/metadata-ingestion/tests/unit/test_airflow.py +++ b/metadata-ingestion/tests/unit/test_airflow.py @@ -190,15 +190,21 @@ def test_lineage_backend(mock_emit, inlets, outlets): with dag: op1 = DummyOperator( - task_id="task1", + task_id="task1_upstream", inlets=inlets, outlets=outlets, ) + op2 = DummyOperator( + task_id="task2", + inlets=inlets, + outlets=outlets, + ) + op1 >> op2 - ti = TI(task=op1, execution_date=DEFAULT_DATE) + ti = TI(task=op2, execution_date=DEFAULT_DATE) ctx1 = { "dag": dag, - "task": op1, + "task": op2, "ti": ti, "task_instance": ti, "execution_date": DEFAULT_DATE, @@ -206,16 +212,16 @@ def test_lineage_backend(mock_emit, inlets, outlets): } prep = prepare_lineage(func) - prep(op1, ctx1) + prep(op2, ctx1) post = apply_lineage(func) - post(op1, ctx1) + post(op2, ctx1) # Verify that the inlets and outlets are registered and recognized by Airflow correctly, # or that our lineage backend forces it to. - assert len(op1.inlets) == 1 - assert len(op1.outlets) == 1 - assert all(map(lambda let: isinstance(let, Dataset), op1.inlets)) - assert all(map(lambda let: isinstance(let, Dataset), op1.outlets)) + assert len(op2.inlets) == 1 + assert len(op2.outlets) == 1 + assert all(map(lambda let: isinstance(let, Dataset), op2.inlets)) + assert all(map(lambda let: isinstance(let, Dataset), op2.outlets)) # Check that the right things were emitted. mock_emit.assert_called_once()