feat(ingest): populate inputDatajobs field in airflow integration (#2606)

This commit is contained in:
Harshal Sheth 2021-05-25 22:47:00 -07:00 committed by GitHub
parent 5562b615e7
commit 958fe8ea83
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 20 additions and 10 deletions

View File

@ -51,7 +51,7 @@ def send_lineage_to_datahub(
)
dag: "DAG" = context["dag"]
task = context["task"]
task: "BaseOperator" = context["task"]
# TODO: capture context
# context dag_run
@ -150,6 +150,10 @@ def send_lineage_to_datahub(
models.DataJobInputOutputClass(
inputDatasets=_entities_to_urn_list(inlets or []),
outputDatasets=_entities_to_urn_list(outlets or []),
inputDatajobs=[
builder.make_data_job_urn_with_flow(flow_urn, task_id)
for task_id in task.upstream_task_ids
],
),
*ownership_aspect,
*tags_aspect,

View File

@ -190,15 +190,21 @@ def test_lineage_backend(mock_emit, inlets, outlets):
with dag:
op1 = DummyOperator(
task_id="task1",
task_id="task1_upstream",
inlets=inlets,
outlets=outlets,
)
op2 = DummyOperator(
task_id="task2",
inlets=inlets,
outlets=outlets,
)
op1 >> op2
ti = TI(task=op1, execution_date=DEFAULT_DATE)
ti = TI(task=op2, execution_date=DEFAULT_DATE)
ctx1 = {
"dag": dag,
"task": op1,
"task": op2,
"ti": ti,
"task_instance": ti,
"execution_date": DEFAULT_DATE,
@ -206,16 +212,16 @@ def test_lineage_backend(mock_emit, inlets, outlets):
}
prep = prepare_lineage(func)
prep(op1, ctx1)
prep(op2, ctx1)
post = apply_lineage(func)
post(op1, ctx1)
post(op2, ctx1)
# Verify that the inlets and outlets are registered and recognized by Airflow correctly,
# or that our lineage backend forces it to.
assert len(op1.inlets) == 1
assert len(op1.outlets) == 1
assert all(map(lambda let: isinstance(let, Dataset), op1.inlets))
assert all(map(lambda let: isinstance(let, Dataset), op1.outlets))
assert len(op2.inlets) == 1
assert len(op2.outlets) == 1
assert all(map(lambda let: isinstance(let, Dataset), op2.inlets))
assert all(map(lambda let: isinstance(let, Dataset), op2.outlets))
# Check that the right things were emitted.
mock_emit.assert_called_once()