2022-04-13 08:19:39 +02:00
|
|
|
import datetime
|
2021-04-05 19:11:28 -07:00
|
|
|
import json
|
2021-04-12 17:40:15 -07:00
|
|
|
import os
|
2021-04-05 19:11:28 -07:00
|
|
|
from contextlib import contextmanager
|
|
|
|
from typing import Iterator
|
|
|
|
from unittest import mock
|
2022-04-13 08:19:39 +02:00
|
|
|
from unittest.mock import Mock
|
2021-04-05 19:11:28 -07:00
|
|
|
|
2021-04-12 17:40:15 -07:00
|
|
|
import airflow.configuration
|
2021-04-26 23:08:43 -07:00
|
|
|
import airflow.version
|
2022-02-19 21:13:01 -05:00
|
|
|
import packaging.version
|
2021-04-12 17:40:15 -07:00
|
|
|
import pytest
|
2021-04-26 23:08:43 -07:00
|
|
|
from airflow.lineage import apply_lineage, prepare_lineage
|
2022-04-13 08:19:39 +02:00
|
|
|
from airflow.models import DAG, Connection, DagBag, DagRun, TaskInstance
|
2023-10-04 06:53:15 -04:00
|
|
|
|
2025-01-17 21:38:29 +05:30
|
|
|
import datahub.emitter.mce_builder as builder
|
2023-10-04 06:53:15 -04:00
|
|
|
from datahub_airflow_plugin import get_provider_info
|
|
|
|
from datahub_airflow_plugin._airflow_shims import (
|
|
|
|
AIRFLOW_PATCHED,
|
|
|
|
AIRFLOW_VERSION,
|
|
|
|
EmptyOperator,
|
|
|
|
)
|
|
|
|
from datahub_airflow_plugin.entities import Dataset, Urn
|
|
|
|
from datahub_airflow_plugin.hooks.datahub import DatahubKafkaHook, DatahubRestHook
|
|
|
|
from datahub_airflow_plugin.operators.datahub import DatahubEmitterOperator
|
2024-03-27 14:32:53 -07:00
|
|
|
from tests.utils import PytestConfig
|
2021-04-05 19:11:28 -07:00
|
|
|
|
2022-12-27 02:59:26 -05:00
|
|
|
assert AIRFLOW_PATCHED
|
2022-11-11 15:04:36 -05:00
|
|
|
|
2023-08-31 02:38:42 +05:30
|
|
|
# TODO: Remove default_view="tree" arg. Figure out why is default_view being picked as "grid" and how to fix it ?
|
2022-11-11 15:04:36 -05:00
|
|
|
|
2022-02-19 21:13:01 -05:00
|
|
|
|
2021-04-05 19:11:28 -07:00
|
|
|
lineage_mce = builder.make_lineage_mce(
|
|
|
|
[
|
|
|
|
builder.make_dataset_urn("bigquery", "upstream1"),
|
|
|
|
builder.make_dataset_urn("bigquery", "upstream2"),
|
|
|
|
],
|
|
|
|
builder.make_dataset_urn("bigquery", "downstream1"),
|
|
|
|
)
|
|
|
|
|
|
|
|
datahub_rest_connection_config = Connection(
|
|
|
|
conn_id="datahub_rest_test",
|
|
|
|
conn_type="datahub_rest",
|
2024-04-18 14:46:32 +05:30
|
|
|
host="http://test_host:8080",
|
2021-04-05 19:11:28 -07:00
|
|
|
extra=None,
|
|
|
|
)
|
2021-08-08 22:30:55 -07:00
|
|
|
datahub_rest_connection_config_with_timeout = Connection(
|
|
|
|
conn_id="datahub_rest_test",
|
|
|
|
conn_type="datahub_rest",
|
2024-04-18 14:46:32 +05:30
|
|
|
host="http://test_host:8080",
|
2021-08-08 22:30:55 -07:00
|
|
|
extra=json.dumps({"timeout_sec": 5}),
|
|
|
|
)
|
|
|
|
|
2021-04-05 19:11:28 -07:00
|
|
|
datahub_kafka_connection_config = Connection(
|
|
|
|
conn_id="datahub_kafka_test",
|
|
|
|
conn_type="datahub_kafka",
|
|
|
|
host="test_broker:9092",
|
|
|
|
extra=json.dumps(
|
|
|
|
{
|
|
|
|
"connection": {
|
|
|
|
"producer_config": {},
|
|
|
|
"schema_registry_url": "http://localhost:8081",
|
|
|
|
}
|
|
|
|
}
|
|
|
|
),
|
|
|
|
)
|
|
|
|
|
|
|
|
|
2021-04-12 17:40:15 -07:00
|
|
|
def setup_module(module):
|
2021-04-20 20:44:38 -07:00
|
|
|
airflow.configuration.conf.load_test_config()
|
2021-04-12 17:40:15 -07:00
|
|
|
|
|
|
|
|
2021-04-05 19:11:28 -07:00
|
|
|
def test_airflow_provider_info():
|
|
|
|
assert get_provider_info()
|
|
|
|
|
|
|
|
|
2023-01-09 09:28:19 -05:00
|
|
|
@pytest.mark.filterwarnings("ignore:.*is deprecated.*")
|
2024-03-27 14:32:53 -07:00
|
|
|
def test_dags_load_with_no_errors(pytestconfig: PytestConfig) -> None:
|
2021-05-12 15:01:11 -07:00
|
|
|
airflow_examples_folder = (
|
2023-08-31 02:38:42 +05:30
|
|
|
pytestconfig.rootpath / "src/datahub_airflow_plugin/example_dags"
|
2021-05-12 15:01:11 -07:00
|
|
|
)
|
2021-04-05 19:11:28 -07:00
|
|
|
|
2022-12-02 13:56:17 -05:00
|
|
|
# Note: the .airflowignore file skips the snowflake DAG.
|
2021-04-05 19:11:28 -07:00
|
|
|
dag_bag = DagBag(dag_folder=str(airflow_examples_folder), include_examples=False)
|
2021-07-26 13:09:25 -07:00
|
|
|
|
|
|
|
import_errors = dag_bag.import_errors
|
|
|
|
|
2022-12-02 13:56:17 -05:00
|
|
|
assert len(import_errors) == 0
|
|
|
|
assert dag_bag.size() > 0
|
2021-04-05 19:11:28 -07:00
|
|
|
|
|
|
|
|
|
|
|
@contextmanager
|
|
|
|
def patch_airflow_connection(conn: Connection) -> Iterator[Connection]:
|
|
|
|
# The return type should really by ContextManager, but mypy doesn't like that.
|
|
|
|
# See https://stackoverflow.com/questions/49733699/python-type-hints-and-context-managers#comment106444758_58349659.
|
2021-04-12 17:40:15 -07:00
|
|
|
with mock.patch(
|
2021-05-12 15:01:11 -07:00
|
|
|
"datahub_provider.hooks.datahub.BaseHook.get_connection", return_value=conn
|
2021-04-12 17:40:15 -07:00
|
|
|
):
|
2021-04-05 19:11:28 -07:00
|
|
|
yield conn
|
|
|
|
|
|
|
|
|
2023-11-30 21:00:43 -05:00
|
|
|
@mock.patch("datahub.emitter.rest_emitter.DataHubRestEmitter", autospec=True)
|
2021-04-05 19:11:28 -07:00
|
|
|
def test_datahub_rest_hook(mock_emitter):
|
|
|
|
with patch_airflow_connection(datahub_rest_connection_config) as config:
|
2022-11-11 15:04:36 -05:00
|
|
|
assert config.conn_id
|
2021-04-05 19:11:28 -07:00
|
|
|
hook = DatahubRestHook(config.conn_id)
|
|
|
|
hook.emit_mces([lineage_mce])
|
|
|
|
|
2023-11-30 21:00:43 -05:00
|
|
|
mock_emitter.assert_called_once_with(config.host, None)
|
2021-08-08 22:30:55 -07:00
|
|
|
instance = mock_emitter.return_value
|
2023-10-04 06:53:15 -04:00
|
|
|
instance.emit.assert_called_with(lineage_mce)
|
2021-08-08 22:30:55 -07:00
|
|
|
|
|
|
|
|
2023-11-30 21:00:43 -05:00
|
|
|
@mock.patch("datahub.emitter.rest_emitter.DataHubRestEmitter", autospec=True)
|
2021-08-08 22:30:55 -07:00
|
|
|
def test_datahub_rest_hook_with_timeout(mock_emitter):
|
|
|
|
with patch_airflow_connection(
|
|
|
|
datahub_rest_connection_config_with_timeout
|
|
|
|
) as config:
|
2022-11-11 15:04:36 -05:00
|
|
|
assert config.conn_id
|
2021-08-08 22:30:55 -07:00
|
|
|
hook = DatahubRestHook(config.conn_id)
|
|
|
|
hook.emit_mces([lineage_mce])
|
|
|
|
|
2023-11-30 21:00:43 -05:00
|
|
|
mock_emitter.assert_called_once_with(config.host, None, timeout_sec=5)
|
2021-04-05 19:11:28 -07:00
|
|
|
instance = mock_emitter.return_value
|
2023-10-04 06:53:15 -04:00
|
|
|
instance.emit.assert_called_with(lineage_mce)
|
2021-04-05 19:11:28 -07:00
|
|
|
|
|
|
|
|
2021-07-28 14:23:06 -07:00
|
|
|
@mock.patch("datahub.emitter.kafka_emitter.DatahubKafkaEmitter", autospec=True)
|
2021-04-05 19:11:28 -07:00
|
|
|
def test_datahub_kafka_hook(mock_emitter):
|
|
|
|
with patch_airflow_connection(datahub_kafka_connection_config) as config:
|
2022-11-11 15:04:36 -05:00
|
|
|
assert config.conn_id
|
2021-04-05 19:11:28 -07:00
|
|
|
hook = DatahubKafkaHook(config.conn_id)
|
|
|
|
hook.emit_mces([lineage_mce])
|
|
|
|
|
|
|
|
mock_emitter.assert_called_once()
|
|
|
|
instance = mock_emitter.return_value
|
2023-10-04 06:53:15 -04:00
|
|
|
instance.emit.assert_called()
|
2021-04-05 19:11:28 -07:00
|
|
|
instance.flush.assert_called_once()
|
|
|
|
|
|
|
|
|
2023-10-04 06:53:15 -04:00
|
|
|
@mock.patch("datahub_provider.hooks.datahub.DatahubRestHook.emit")
|
2021-04-12 17:40:15 -07:00
|
|
|
def test_datahub_lineage_operator(mock_emit):
|
|
|
|
with patch_airflow_connection(datahub_rest_connection_config) as config:
|
2022-11-11 15:04:36 -05:00
|
|
|
assert config.conn_id
|
2021-04-12 17:40:15 -07:00
|
|
|
task = DatahubEmitterOperator(
|
|
|
|
task_id="emit_lineage",
|
|
|
|
datahub_conn_id=config.conn_id,
|
|
|
|
mces=[
|
|
|
|
builder.make_lineage_mce(
|
|
|
|
[
|
|
|
|
builder.make_dataset_urn("snowflake", "mydb.schema.tableA"),
|
|
|
|
builder.make_dataset_urn("snowflake", "mydb.schema.tableB"),
|
|
|
|
],
|
|
|
|
builder.make_dataset_urn("snowflake", "mydb.schema.tableC"),
|
|
|
|
)
|
|
|
|
],
|
|
|
|
)
|
|
|
|
task.execute(None)
|
|
|
|
|
|
|
|
mock_emit.assert_called()
|
|
|
|
|
|
|
|
|
|
|
|
@pytest.mark.parametrize(
|
|
|
|
"hook",
|
|
|
|
[
|
|
|
|
DatahubRestHook,
|
|
|
|
DatahubKafkaHook,
|
|
|
|
],
|
|
|
|
)
|
|
|
|
def test_hook_airflow_ui(hook):
|
|
|
|
# Simply ensure that these run without issue. These will also show up
|
|
|
|
# in the Airflow UI, where it will be even more clear if something
|
|
|
|
# is wrong.
|
|
|
|
hook.get_connection_form_widgets()
|
|
|
|
hook.get_ui_field_behaviour()
|
|
|
|
|
|
|
|
|
2022-12-27 02:59:26 -05:00
|
|
|
def test_entities():
|
|
|
|
assert (
|
|
|
|
Dataset("snowflake", "mydb.schema.tableConsumed").urn
|
|
|
|
== "urn:li:dataset:(urn:li:dataPlatform:snowflake,mydb.schema.tableConsumed,PROD)"
|
|
|
|
)
|
2022-04-13 08:19:39 +02:00
|
|
|
|
2022-12-27 02:59:26 -05:00
|
|
|
assert (
|
|
|
|
Urn(
|
|
|
|
"urn:li:dataset:(urn:li:dataPlatform:snowflake,mydb.schema.tableConsumed,PROD)"
|
|
|
|
).urn
|
|
|
|
== "urn:li:dataset:(urn:li:dataPlatform:snowflake,mydb.schema.tableConsumed,PROD)"
|
|
|
|
)
|
2022-04-13 08:19:39 +02:00
|
|
|
|
2023-11-11 05:29:18 +05:30
|
|
|
assert (
|
|
|
|
Urn("urn:li:dataJob:(urn:li:dataFlow:(airflow,testDag,PROD),testTask)").urn
|
|
|
|
== "urn:li:dataJob:(urn:li:dataFlow:(airflow,testDag,PROD),testTask)"
|
|
|
|
)
|
|
|
|
|
2025-04-04 09:26:58 +02:00
|
|
|
assert (
|
|
|
|
Urn(
|
|
|
|
"urn:li:dataJob:(urn:li:dataFlow:(airflow,platform.testDag,PROD),testTask)"
|
|
|
|
).urn
|
|
|
|
== "urn:li:dataJob:(urn:li:dataFlow:(airflow,platform.testDag,PROD),testTask)"
|
|
|
|
)
|
|
|
|
|
2022-12-27 02:59:26 -05:00
|
|
|
with pytest.raises(ValueError, match="invalid"):
|
|
|
|
Urn("not a URN")
|
2022-04-13 08:19:39 +02:00
|
|
|
|
2023-11-11 05:29:18 +05:30
|
|
|
with pytest.raises(
|
|
|
|
ValueError, match="only supports datasets and upstream datajobs"
|
|
|
|
):
|
2022-12-27 02:59:26 -05:00
|
|
|
Urn("urn:li:mlModel:(urn:li:dataPlatform:science,scienceModel,PROD)")
|
2022-04-13 08:19:39 +02:00
|
|
|
|
2021-04-12 17:40:15 -07:00
|
|
|
|
2022-04-13 08:19:39 +02:00
|
|
|
@pytest.mark.parametrize(
|
2022-12-27 02:59:26 -05:00
|
|
|
["inlets", "outlets", "capture_executions"],
|
2022-04-13 08:19:39 +02:00
|
|
|
[
|
|
|
|
pytest.param(
|
2023-11-11 05:29:18 +05:30
|
|
|
[
|
|
|
|
Dataset("snowflake", "mydb.schema.tableConsumed"),
|
|
|
|
Urn("urn:li:dataJob:(urn:li:dataFlow:(airflow,testDag,PROD),testTask)"),
|
|
|
|
],
|
2022-04-13 08:19:39 +02:00
|
|
|
[Dataset("snowflake", "mydb.schema.tableProduced")],
|
2022-12-27 02:59:26 -05:00
|
|
|
False,
|
|
|
|
id="airflow-lineage-no-executions",
|
|
|
|
),
|
|
|
|
pytest.param(
|
2023-11-11 05:29:18 +05:30
|
|
|
[
|
|
|
|
Dataset("snowflake", "mydb.schema.tableConsumed"),
|
|
|
|
Urn("urn:li:dataJob:(urn:li:dataFlow:(airflow,testDag,PROD),testTask)"),
|
|
|
|
],
|
2022-12-27 02:59:26 -05:00
|
|
|
[Dataset("snowflake", "mydb.schema.tableProduced")],
|
|
|
|
True,
|
|
|
|
id="airflow-lineage-capture-executions",
|
2022-04-13 08:19:39 +02:00
|
|
|
),
|
|
|
|
],
|
|
|
|
)
|
|
|
|
@mock.patch("datahub_provider.hooks.datahub.DatahubRestHook.make_emitter")
|
2022-12-27 02:59:26 -05:00
|
|
|
def test_lineage_backend(mock_emit, inlets, outlets, capture_executions):
|
2022-04-13 08:19:39 +02:00
|
|
|
DEFAULT_DATE = datetime.datetime(2020, 5, 17)
|
|
|
|
mock_emitter = Mock()
|
|
|
|
mock_emit.return_value = mock_emitter
|
2021-07-23 17:07:13 -07:00
|
|
|
# Using autospec on xcom_pull and xcom_push methods fails on Python 3.6.
|
2021-04-12 17:40:15 -07:00
|
|
|
with mock.patch.dict(
|
|
|
|
os.environ,
|
|
|
|
{
|
2021-05-12 15:01:11 -07:00
|
|
|
"AIRFLOW__LINEAGE__BACKEND": "datahub_provider.lineage.datahub.DatahubLineageBackend",
|
2021-04-12 17:40:15 -07:00
|
|
|
"AIRFLOW__LINEAGE__DATAHUB_CONN_ID": datahub_rest_connection_config.conn_id,
|
2021-05-13 20:02:47 -07:00
|
|
|
"AIRFLOW__LINEAGE__DATAHUB_KWARGS": json.dumps(
|
2022-12-27 02:59:26 -05:00
|
|
|
{"graceful_exceptions": False, "capture_executions": capture_executions}
|
2021-05-13 20:02:47 -07:00
|
|
|
),
|
2021-04-12 17:40:15 -07:00
|
|
|
},
|
2021-07-23 17:07:13 -07:00
|
|
|
), mock.patch("airflow.models.BaseOperator.xcom_pull"), mock.patch(
|
|
|
|
"airflow.models.BaseOperator.xcom_push"
|
2025-01-18 15:06:20 +05:30
|
|
|
), patch_airflow_connection(datahub_rest_connection_config):
|
2021-04-12 17:40:15 -07:00
|
|
|
func = mock.Mock()
|
|
|
|
func.__name__ = "foo"
|
|
|
|
|
2023-08-31 02:38:42 +05:30
|
|
|
dag = DAG(
|
|
|
|
dag_id="test_lineage_is_sent_to_backend",
|
|
|
|
start_date=DEFAULT_DATE,
|
|
|
|
default_view="tree",
|
|
|
|
)
|
2021-04-12 17:40:15 -07:00
|
|
|
|
|
|
|
with dag:
|
2023-07-11 12:55:32 -04:00
|
|
|
op1 = EmptyOperator(
|
2021-05-25 22:47:00 -07:00
|
|
|
task_id="task1_upstream",
|
2021-04-26 23:08:43 -07:00
|
|
|
inlets=inlets,
|
|
|
|
outlets=outlets,
|
|
|
|
)
|
2023-07-11 12:55:32 -04:00
|
|
|
op2 = EmptyOperator(
|
2021-05-25 22:47:00 -07:00
|
|
|
task_id="task2",
|
|
|
|
inlets=inlets,
|
|
|
|
outlets=outlets,
|
|
|
|
)
|
|
|
|
op1 >> op2
|
2021-04-12 17:40:15 -07:00
|
|
|
|
2022-02-19 21:13:01 -05:00
|
|
|
# Airflow < 2.2 requires the execution_date parameter. Newer Airflow
|
2021-10-13 02:25:27 -04:00
|
|
|
# versions do not require it, but will attempt to find the associated
|
|
|
|
# run_id in the database if execution_date is provided. As such, we
|
|
|
|
# must fake the run_id parameter for newer Airflow versions.
|
2023-08-31 02:38:42 +05:30
|
|
|
# We need to add type:ignore in else to suppress mypy error in Airflow < 2.2
|
2022-02-19 21:13:01 -05:00
|
|
|
if AIRFLOW_VERSION < packaging.version.parse("2.2.0"):
|
2021-10-13 02:25:27 -04:00
|
|
|
ti = TaskInstance(task=op2, execution_date=DEFAULT_DATE)
|
2022-04-13 08:19:39 +02:00
|
|
|
# Ignoring type here because DagRun state is just a sring at Airflow 1
|
2025-01-18 15:06:20 +05:30
|
|
|
dag_run = DagRun(
|
|
|
|
state="success", # type: ignore[arg-type]
|
|
|
|
run_id=f"scheduled_{DEFAULT_DATE.isoformat()}",
|
|
|
|
)
|
2021-10-11 20:58:37 -07:00
|
|
|
else:
|
2022-04-13 08:19:39 +02:00
|
|
|
from airflow.utils.state import DagRunState
|
|
|
|
|
2023-08-31 02:38:42 +05:30
|
|
|
ti = TaskInstance(task=op2, run_id=f"test_airflow-{DEFAULT_DATE}") # type: ignore[call-arg]
|
2022-04-13 08:19:39 +02:00
|
|
|
dag_run = DagRun(
|
2023-07-11 12:55:32 -04:00
|
|
|
state=DagRunState.SUCCESS,
|
|
|
|
run_id=f"scheduled_{DEFAULT_DATE.isoformat()}",
|
2022-04-13 08:19:39 +02:00
|
|
|
)
|
2022-11-11 15:04:36 -05:00
|
|
|
|
|
|
|
ti.dag_run = dag_run # type: ignore
|
|
|
|
ti.start_date = datetime.datetime.utcnow()
|
|
|
|
ti.execution_date = DEFAULT_DATE
|
2022-04-13 08:19:39 +02:00
|
|
|
|
2021-04-12 17:40:15 -07:00
|
|
|
ctx1 = {
|
|
|
|
"dag": dag,
|
2021-05-25 22:47:00 -07:00
|
|
|
"task": op2,
|
2021-04-12 17:40:15 -07:00
|
|
|
"ti": ti,
|
2022-04-13 08:19:39 +02:00
|
|
|
"dag_run": dag_run,
|
2021-04-12 17:40:15 -07:00
|
|
|
"task_instance": ti,
|
|
|
|
"execution_date": DEFAULT_DATE,
|
|
|
|
"ts": "2021-04-08T00:54:25.771575+00:00",
|
|
|
|
}
|
|
|
|
|
|
|
|
prep = prepare_lineage(func)
|
2021-05-25 22:47:00 -07:00
|
|
|
prep(op2, ctx1)
|
2021-04-12 17:40:15 -07:00
|
|
|
post = apply_lineage(func)
|
2021-05-25 22:47:00 -07:00
|
|
|
post(op2, ctx1)
|
2021-04-12 17:40:15 -07:00
|
|
|
|
2021-04-26 23:08:43 -07:00
|
|
|
# Verify that the inlets and outlets are registered and recognized by Airflow correctly,
|
|
|
|
# or that our lineage backend forces it to.
|
2023-11-11 05:29:18 +05:30
|
|
|
assert len(op2.inlets) == 2
|
2021-05-25 22:47:00 -07:00
|
|
|
assert len(op2.outlets) == 1
|
2023-11-11 05:29:18 +05:30
|
|
|
assert all(
|
|
|
|
map(
|
|
|
|
lambda let: isinstance(let, Dataset) or isinstance(let, Urn), op2.inlets
|
|
|
|
)
|
|
|
|
)
|
2021-05-25 22:47:00 -07:00
|
|
|
assert all(map(lambda let: isinstance(let, Dataset), op2.outlets))
|
2021-04-26 23:08:43 -07:00
|
|
|
|
|
|
|
# Check that the right things were emitted.
|
2024-08-28 09:22:11 +02:00
|
|
|
assert mock_emitter.emit.call_count == 19 if capture_executions else 11
|
2022-12-27 02:59:26 -05:00
|
|
|
|
2024-01-29 10:50:47 -08:00
|
|
|
# TODO: Replace this with a golden file-based comparison.
|
|
|
|
assert mock_emitter.method_calls[0].args[0].aspectName == "dataFlowInfo"
|
|
|
|
assert (
|
|
|
|
mock_emitter.method_calls[0].args[0].entityUrn
|
|
|
|
== "urn:li:dataFlow:(airflow,test_lineage_is_sent_to_backend,prod)"
|
|
|
|
)
|
|
|
|
|
2024-08-28 09:22:11 +02:00
|
|
|
assert mock_emitter.method_calls[1].args[0].aspectName == "status"
|
2024-01-29 10:50:47 -08:00
|
|
|
assert (
|
|
|
|
mock_emitter.method_calls[1].args[0].entityUrn
|
|
|
|
== "urn:li:dataFlow:(airflow,test_lineage_is_sent_to_backend,prod)"
|
|
|
|
)
|
|
|
|
|
2024-08-28 09:22:11 +02:00
|
|
|
assert mock_emitter.method_calls[2].args[0].aspectName == "ownership"
|
2024-01-29 10:50:47 -08:00
|
|
|
assert (
|
|
|
|
mock_emitter.method_calls[2].args[0].entityUrn
|
|
|
|
== "urn:li:dataFlow:(airflow,test_lineage_is_sent_to_backend,prod)"
|
|
|
|
)
|
|
|
|
|
2024-08-28 09:22:11 +02:00
|
|
|
assert mock_emitter.method_calls[3].args[0].aspectName == "globalTags"
|
2024-01-29 10:50:47 -08:00
|
|
|
assert (
|
|
|
|
mock_emitter.method_calls[3].args[0].entityUrn
|
2024-08-28 09:22:11 +02:00
|
|
|
== "urn:li:dataFlow:(airflow,test_lineage_is_sent_to_backend,prod)"
|
2024-01-29 10:50:47 -08:00
|
|
|
)
|
|
|
|
|
2024-08-28 09:22:11 +02:00
|
|
|
assert mock_emitter.method_calls[4].args[0].aspectName == "dataJobInfo"
|
2024-01-29 10:50:47 -08:00
|
|
|
assert (
|
|
|
|
mock_emitter.method_calls[4].args[0].entityUrn
|
|
|
|
== "urn:li:dataJob:(urn:li:dataFlow:(airflow,test_lineage_is_sent_to_backend,prod),task2)"
|
|
|
|
)
|
2024-08-28 09:22:11 +02:00
|
|
|
|
|
|
|
assert mock_emitter.method_calls[5].args[0].aspectName == "status"
|
2024-01-29 10:50:47 -08:00
|
|
|
assert (
|
2024-08-28 09:22:11 +02:00
|
|
|
mock_emitter.method_calls[5].args[0].entityUrn
|
|
|
|
== "urn:li:dataJob:(urn:li:dataFlow:(airflow,test_lineage_is_sent_to_backend,prod),task2)"
|
|
|
|
)
|
|
|
|
|
|
|
|
assert mock_emitter.method_calls[6].args[0].aspectName == "dataJobInputOutput"
|
|
|
|
assert (
|
|
|
|
mock_emitter.method_calls[6].args[0].entityUrn
|
|
|
|
== "urn:li:dataJob:(urn:li:dataFlow:(airflow,test_lineage_is_sent_to_backend,prod),task2)"
|
|
|
|
)
|
|
|
|
assert (
|
|
|
|
mock_emitter.method_calls[6].args[0].aspect.inputDatajobs[0]
|
2024-01-29 10:50:47 -08:00
|
|
|
== "urn:li:dataJob:(urn:li:dataFlow:(airflow,test_lineage_is_sent_to_backend,prod),task1_upstream)"
|
|
|
|
)
|
|
|
|
assert (
|
2024-08-28 09:22:11 +02:00
|
|
|
mock_emitter.method_calls[6].args[0].aspect.inputDatajobs[1]
|
2024-01-29 10:50:47 -08:00
|
|
|
== "urn:li:dataJob:(urn:li:dataFlow:(airflow,testDag,PROD),testTask)"
|
|
|
|
)
|
|
|
|
assert (
|
2024-08-28 09:22:11 +02:00
|
|
|
mock_emitter.method_calls[6].args[0].aspect.inputDatasets[0]
|
2024-01-29 10:50:47 -08:00
|
|
|
== "urn:li:dataset:(urn:li:dataPlatform:snowflake,mydb.schema.tableConsumed,PROD)"
|
|
|
|
)
|
|
|
|
assert (
|
2024-08-28 09:22:11 +02:00
|
|
|
mock_emitter.method_calls[6].args[0].aspect.outputDatasets[0]
|
2024-01-29 10:50:47 -08:00
|
|
|
== "urn:li:dataset:(urn:li:dataPlatform:snowflake,mydb.schema.tableProduced,PROD)"
|
|
|
|
)
|
|
|
|
|
2024-08-28 09:22:11 +02:00
|
|
|
assert mock_emitter.method_calls[7].args[0].aspectName == "datasetKey"
|
2024-01-29 10:50:47 -08:00
|
|
|
assert (
|
2024-08-28 09:22:11 +02:00
|
|
|
mock_emitter.method_calls[7].args[0].entityUrn
|
2024-01-29 10:50:47 -08:00
|
|
|
== "urn:li:dataset:(urn:li:dataPlatform:snowflake,mydb.schema.tableConsumed,PROD)"
|
|
|
|
)
|
|
|
|
|
2024-08-28 09:22:11 +02:00
|
|
|
assert mock_emitter.method_calls[8].args[0].aspectName == "datasetKey"
|
2024-01-29 10:50:47 -08:00
|
|
|
assert (
|
2024-08-28 09:22:11 +02:00
|
|
|
mock_emitter.method_calls[8].args[0].entityUrn
|
2024-01-29 10:50:47 -08:00
|
|
|
== "urn:li:dataset:(urn:li:dataPlatform:snowflake,mydb.schema.tableProduced,PROD)"
|
|
|
|
)
|
|
|
|
|
2024-08-28 09:22:11 +02:00
|
|
|
assert mock_emitter.method_calls[9].args[0].aspectName == "ownership"
|
2024-01-29 10:50:47 -08:00
|
|
|
assert (
|
2024-08-28 09:22:11 +02:00
|
|
|
mock_emitter.method_calls[9].args[0].entityUrn
|
2024-01-29 10:50:47 -08:00
|
|
|
== "urn:li:dataJob:(urn:li:dataFlow:(airflow,test_lineage_is_sent_to_backend,prod),task2)"
|
|
|
|
)
|
|
|
|
|
2024-08-28 09:22:11 +02:00
|
|
|
assert mock_emitter.method_calls[10].args[0].aspectName == "globalTags"
|
2024-01-29 10:50:47 -08:00
|
|
|
assert (
|
2024-08-28 09:22:11 +02:00
|
|
|
mock_emitter.method_calls[10].args[0].entityUrn
|
2024-01-29 10:50:47 -08:00
|
|
|
== "urn:li:dataJob:(urn:li:dataFlow:(airflow,test_lineage_is_sent_to_backend,prod),task2)"
|
|
|
|
)
|
|
|
|
|
|
|
|
if capture_executions:
|
2022-04-13 08:19:39 +02:00
|
|
|
assert (
|
2024-08-28 09:22:11 +02:00
|
|
|
mock_emitter.method_calls[11].args[0].aspectName
|
2024-01-29 10:50:47 -08:00
|
|
|
== "dataProcessInstanceProperties"
|
2022-04-13 08:19:39 +02:00
|
|
|
)
|
|
|
|
assert (
|
2024-08-28 09:22:11 +02:00
|
|
|
mock_emitter.method_calls[11].args[0].entityUrn
|
2024-01-29 10:50:47 -08:00
|
|
|
== "urn:li:dataProcessInstance:5e274228107f44cc2dd7c9782168cc29"
|
2022-04-13 08:19:39 +02:00
|
|
|
)
|
|
|
|
|
|
|
|
assert (
|
2024-08-28 09:22:11 +02:00
|
|
|
mock_emitter.method_calls[12].args[0].aspectName
|
2024-01-29 10:50:47 -08:00
|
|
|
== "dataProcessInstanceRelationships"
|
2022-04-13 08:19:39 +02:00
|
|
|
)
|
|
|
|
assert (
|
2024-08-28 09:22:11 +02:00
|
|
|
mock_emitter.method_calls[12].args[0].entityUrn
|
2024-01-29 10:50:47 -08:00
|
|
|
== "urn:li:dataProcessInstance:5e274228107f44cc2dd7c9782168cc29"
|
2022-04-13 08:19:39 +02:00
|
|
|
)
|
|
|
|
assert (
|
2024-08-28 09:22:11 +02:00
|
|
|
mock_emitter.method_calls[13].args[0].aspectName
|
2024-01-29 10:50:47 -08:00
|
|
|
== "dataProcessInstanceInput"
|
2022-04-13 08:19:39 +02:00
|
|
|
)
|
|
|
|
assert (
|
2024-08-28 09:22:11 +02:00
|
|
|
mock_emitter.method_calls[13].args[0].entityUrn
|
2024-01-29 10:50:47 -08:00
|
|
|
== "urn:li:dataProcessInstance:5e274228107f44cc2dd7c9782168cc29"
|
2022-04-13 08:19:39 +02:00
|
|
|
)
|
|
|
|
assert (
|
2024-08-28 09:22:11 +02:00
|
|
|
mock_emitter.method_calls[14].args[0].aspectName
|
2024-01-29 10:50:47 -08:00
|
|
|
== "dataProcessInstanceOutput"
|
2022-04-13 08:19:39 +02:00
|
|
|
)
|
2023-11-11 05:29:18 +05:30
|
|
|
assert (
|
2024-08-28 09:22:11 +02:00
|
|
|
mock_emitter.method_calls[14].args[0].entityUrn
|
2024-01-29 10:50:47 -08:00
|
|
|
== "urn:li:dataProcessInstance:5e274228107f44cc2dd7c9782168cc29"
|
2023-11-11 05:29:18 +05:30
|
|
|
)
|
2024-08-28 09:22:11 +02:00
|
|
|
assert mock_emitter.method_calls[15].args[0].aspectName == "datasetKey"
|
2022-04-13 08:19:39 +02:00
|
|
|
assert (
|
2024-08-28 09:22:11 +02:00
|
|
|
mock_emitter.method_calls[15].args[0].entityUrn
|
2022-04-13 08:19:39 +02:00
|
|
|
== "urn:li:dataset:(urn:li:dataPlatform:snowflake,mydb.schema.tableConsumed,PROD)"
|
|
|
|
)
|
2024-08-28 09:22:11 +02:00
|
|
|
assert mock_emitter.method_calls[16].args[0].aspectName == "datasetKey"
|
2022-04-13 08:19:39 +02:00
|
|
|
assert (
|
2024-08-28 09:22:11 +02:00
|
|
|
mock_emitter.method_calls[16].args[0].entityUrn
|
2022-04-13 08:19:39 +02:00
|
|
|
== "urn:li:dataset:(urn:li:dataPlatform:snowflake,mydb.schema.tableProduced,PROD)"
|
|
|
|
)
|
|
|
|
assert (
|
2024-08-28 09:22:11 +02:00
|
|
|
mock_emitter.method_calls[17].args[0].aspectName
|
2024-01-29 10:50:47 -08:00
|
|
|
== "dataProcessInstanceRunEvent"
|
2022-04-13 08:19:39 +02:00
|
|
|
)
|
|
|
|
assert (
|
2024-08-28 09:22:11 +02:00
|
|
|
mock_emitter.method_calls[17].args[0].entityUrn
|
2024-01-29 10:50:47 -08:00
|
|
|
== "urn:li:dataProcessInstance:5e274228107f44cc2dd7c9782168cc29"
|
2022-04-13 08:19:39 +02:00
|
|
|
)
|
|
|
|
assert (
|
2024-08-28 09:22:11 +02:00
|
|
|
mock_emitter.method_calls[18].args[0].aspectName
|
2024-01-29 10:50:47 -08:00
|
|
|
== "dataProcessInstanceRunEvent"
|
2022-04-13 08:19:39 +02:00
|
|
|
)
|
|
|
|
assert (
|
2024-08-28 09:22:11 +02:00
|
|
|
mock_emitter.method_calls[18].args[0].entityUrn
|
2024-01-29 10:50:47 -08:00
|
|
|
== "urn:li:dataProcessInstance:5e274228107f44cc2dd7c9782168cc29"
|
2022-04-13 08:19:39 +02:00
|
|
|
)
|