OpenMetadata/ingestion/tests/unit/airflow/test_lineage_parser.py
Pere Menal-Ferrer ca812852d6
ci/nox-setup-testing (#21377)
* Make pytest to user code from src rather than from install package

* Fix test_amundsen: missing None

* Update pytest configuration to use importlib mode

* Fix custom_basemodel_validation to check model_fields on type(values) to prevent noisy warnings

* Refactor referencedByQueries validation to use field_validator as per deprecation warning

* Update ColumnJson to use model_rebuild rather as replacement for forward reference updates as per deprecation warning

* Move superset test to integration test as they are using testcontainers

* Update coverage source path

* Fix wrong import.

* Add install_dev_env target to Makefile for development dependencies

* Add test-unit as extra in setup.py

* Modify dependencies in dev environment.

* Ignore all airflow tests

* Remove coverage in unit_ingestion_dev_env. Revert coverage source to prevent broken CI.

* Add nox for running unit test

* FIx PowerBI integration test to use pathlib for resource paths and not os.getcwd to prevent failures when not executed from the right path

* Move test_helpers.py to unit test, as it is not an integration test.

* Remove utils empty folder in integration tests

* Refactor testcontainers configuration to avoid pitfalls with max_tries setting

* Add nox unit testing basic setup

* Add format check session

* Refactor nox-unit and add plugins tests

* Add GHA for py-nox-ci

* Add comment to GHA

* Restore conftest.py file

* Clarify comment

* Simplify function

* Fix matrix startegy and nox mismatch

* Improve python version strategy with nox and GHA

---------

Co-authored-by: Pere Menal <pere.menal@getcollate.io>
2025-05-27 10:56:52 +02:00

423 lines
12 KiB
Python

# Copyright 2025 Collate
# Licensed under the Collate Community License, Version 1.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# https://github.com/open-metadata/OpenMetadata/blob/main/ingestion/LICENSE
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Test lineage parser to get inlets and outlets information
"""
from datetime import datetime
from typing import List, Set
import pytest
try:
from airflow import DAG
from airflow.operators.bash import BashOperator
from airflow.serialization.serde import serialize
except ImportError:
pytest.skip("Airflow dependencies not installed", allow_module_level=True)
from metadata.generated.schema.entity.data.container import Container
from metadata.generated.schema.entity.data.dashboard import Dashboard
from metadata.generated.schema.entity.data.table import Table
from metadata.ingestion.source.pipeline.airflow.lineage_parser import (
OMEntity,
XLets,
XLetsMode,
_parse_xlets,
get_xlets_from_dag,
get_xlets_from_operator,
parse_xlets,
)
SLEEP = "sleep 1"
def xlet_fqns(xlet: XLets, xlet_mode: XLetsMode) -> Set[str]:
"""Helper method to get a set of FQNs out of the xlet"""
return set(elem.fqn for elem in getattr(xlet, xlet_mode.value))
def assert_xlets_equals(first: List[XLets], second: List[XLets]):
"""
Check that both XLet lists are the same
Even if they are lists, we don't care about the order.
Note that we cannot use sets since `OMEntity` is not hashable.
For this test, we will assume that by having the same FQN, the
entity type will also be the same.
"""
assert len(first) == len(second)
for xlet1 in first:
match = False
first_inlets = xlet_fqns(xlet1, XLetsMode.INLETS)
first_outlets = xlet_fqns(xlet1, XLetsMode.OUTLETS)
for xlet2 in second:
second_inlets = xlet_fqns(xlet2, XLetsMode.INLETS)
second_outlets = xlet_fqns(xlet2, XLetsMode.OUTLETS)
if first_inlets == second_inlets and first_outlets == second_outlets:
match = True
break
assert match
def test_parse_xlets():
"""
Handle the shape validation of inlets and outlets, e.g.,
[{
"tables": ["A"],
"more_tables": ["X"]
}],
"""
raw_xlet = [{"tables": ["A"], "more_tables": ["X"]}]
assert parse_xlets(raw_xlet) == (
{
"tables": [OMEntity(entity=Table, fqn="A")],
"more_tables": [OMEntity(entity=Table, fqn="X")],
}
)
raw_xlet_without_list = [{"tables": ["A"], "more_tables": "random"}]
assert parse_xlets(raw_xlet_without_list) == (
{
"tables": [OMEntity(entity=Table, fqn="A")],
}
)
not_an_xlet_list = {"tables": ["A"]}
assert parse_xlets(not_an_xlet_list) is None
def test_get_xlets_from_operator():
"""
Check how to get xlet data from operators
"""
operator = BashOperator(
task_id="print_date",
bash_command="date",
outlets={"tables": ["A"]},
)
assert get_xlets_from_operator(operator, XLetsMode.INLETS) is None
# But the outlets are parsed correctly
assert get_xlets_from_operator(operator, xlet_mode=XLetsMode.OUTLETS) == (
{"tables": [OMEntity(entity=Table, fqn="A")]}
)
operator = BashOperator(
task_id="print_date",
bash_command="date",
inlets={"tables": ["A"], "more_tables": ["X"]},
)
assert get_xlets_from_operator(operator, xlet_mode=XLetsMode.INLETS) == (
{
"tables": [OMEntity(entity=Table, fqn="A")],
"more_tables": [OMEntity(entity=Table, fqn="X")],
}
)
assert get_xlets_from_operator(operator, xlet_mode=XLetsMode.OUTLETS) is None
def test_get_string_xlets_from_dag():
"""
Check that we can properly join the xlet information from
all operators in the DAG
"""
with DAG("test_dag", start_date=datetime(2021, 1, 1)) as dag:
BashOperator(
task_id="print_date",
bash_command="date",
inlets={"tables": ["A"]},
)
BashOperator(
task_id="sleep",
bash_command=SLEEP,
outlets={"tables": ["B"]},
)
assert_xlets_equals(
get_xlets_from_dag(dag),
[
XLets(
inlets=[OMEntity(entity=Table, fqn="A")],
outlets=[OMEntity(entity=Table, fqn="B")],
)
],
)
with DAG("test_dag", start_date=datetime(2021, 1, 1)) as dag:
BashOperator(
task_id="print_date",
bash_command="date",
inlets={"tables": ["A", "A"], "this is a bit random": "foo"},
)
BashOperator(
task_id="sleep",
bash_command=SLEEP,
outlets={"tables": ["B"]},
)
assert_xlets_equals(
get_xlets_from_dag(dag),
[
XLets(
inlets=[OMEntity(entity=Table, fqn="A")],
outlets=[OMEntity(entity=Table, fqn="B")],
)
],
)
with DAG("test_dag", start_date=datetime(2021, 1, 1)) as dag:
BashOperator(
task_id="print_date",
bash_command="date",
inlets={
"tables": ["A", "A"],
"more_tables": ["X", "Y"],
"this is a bit random": "foo",
},
)
BashOperator(
task_id="sleep",
bash_command=SLEEP,
outlets={
"tables": ["B"],
"more_tables": ["Z"],
},
)
assert_xlets_equals(
get_xlets_from_dag(dag),
[
XLets(
inlets=[OMEntity(entity=Table, fqn="A")],
outlets=[OMEntity(entity=Table, fqn="B")],
),
XLets(
inlets=[
OMEntity(entity=Table, fqn="X"),
OMEntity(entity=Table, fqn="Y"),
],
outlets=[OMEntity(entity=Table, fqn="Z")],
),
],
)
with DAG("test_dag_cycle", start_date=datetime(2021, 1, 1)) as dag:
BashOperator(
task_id="print_date",
bash_command="date",
inlets={
"tables": ["A", "B"],
},
)
BashOperator(
task_id="sleep",
bash_command=SLEEP,
outlets={
"tables": ["B"],
},
)
assert_xlets_equals(
get_xlets_from_dag(dag),
[
XLets(
inlets=[
OMEntity(entity=Table, fqn="A"),
OMEntity(entity=Table, fqn="B"),
],
outlets=[OMEntity(entity=Table, fqn="B")],
),
],
)
def test_get_dict_xlets_from_dag():
"""
We can get the dict-based xlets for any entity without
annotating directly with OMEntity
"""
with DAG("test_dag_cycle", start_date=datetime(2021, 1, 1)) as dag:
BashOperator(
task_id="print_date",
bash_command="date",
inlets=[
{"entity": "container", "fqn": "C1", "key": "test"},
{"entity": "container", "fqn": "C2", "key": "test"},
],
)
BashOperator(
task_id="sleep",
bash_command=SLEEP,
outlets=[
{"entity": "table", "fqn": "T", "key": "test"},
],
)
assert_xlets_equals(
get_xlets_from_dag(dag),
[
XLets(
inlets=[
OMEntity(entity=Container, fqn="C1", key="test"),
OMEntity(entity=Container, fqn="C2", key="test"),
],
outlets=[OMEntity(entity=Table, fqn="T", key="test")],
),
],
)
def test_get_attrs_xlets_from_dag():
"""
Check that we can properly join the xlet information from
all operators in the DAG
"""
with DAG("test_dag", start_date=datetime(2021, 1, 1)) as dag:
BashOperator(
task_id="print_date",
bash_command="date",
inlets=[
OMEntity(entity=Table, fqn="A"),
OMEntity(entity=Table, fqn="B"),
],
)
BashOperator(
task_id="sleep",
bash_command=SLEEP,
outlets=[OMEntity(entity=Table, fqn="C")],
)
BashOperator(
task_id="sleep2",
bash_command=SLEEP,
outlets=[OMEntity(entity=Container, fqn="D")],
)
assert_xlets_equals(
get_xlets_from_dag(dag),
[
XLets(
inlets=[
OMEntity(entity=Table, fqn="A"),
OMEntity(entity=Table, fqn="B"),
],
outlets=[
OMEntity(entity=Table, fqn="C"),
OMEntity(entity=Container, fqn="D"),
],
)
],
)
def test_om_entity_serializer():
"""To ensure the serialized DAGs will have the right shape"""
om_entity = OMEntity(
entity=Table,
fqn="FQN",
key="test",
)
assert str(om_entity) == (
'{"entity": "metadata.generated.schema.entity.data.table.Table", "fqn": "FQN", "key": "test"}'
)
om_entity = OMEntity(
entity=Container,
fqn="FQN",
key="test",
)
assert str(om_entity) == (
'{"entity": "metadata.generated.schema.entity.data.container.Container", "fqn": "FQN", "key": "test"}'
)
def test_str_deserializer():
"""
Once a DAG is serialized, the xlet info will be stored as:
```
['{"entity": "metadata.generated.schema.entity.data.table.Table", "fqn": "FQN", "key": "test"}']
```
based on our custom serialization logic.
Validate the deserialization process.
"""
assert _parse_xlets("random") is None
assert _parse_xlets(
'{"entity": "metadata.generated.schema.entity.data.table.Table", "fqn": "FQN", "key": "test"}'
) == (
{
"test": [
OMEntity(
entity=Table,
fqn="FQN",
key="test",
)
]
}
)
assert _parse_xlets(
'{"entity": "metadata.generated.schema.entity.data.container.Container", "fqn": "FQN", "key": "test"}'
) == (
{
"test": [
OMEntity(
entity=Container,
fqn="FQN",
key="test",
)
]
}
)
assert _parse_xlets(
'{"entity": "metadata.generated.schema.entity.data.dashboard.Dashboard", "fqn": "FQN", "key": "test"}'
) == (
{
"test": [
OMEntity(
entity=Dashboard,
fqn="FQN",
key="test",
)
]
}
)
def test_airflow_serializer():
"""It should be able to serialize our models"""
om_entity = OMEntity(
entity=Table,
fqn="FQN",
key="test",
)
assert serialize(om_entity).get("__data__") == (
'{"entity": "metadata.generated.schema.entity.data.table.Table", "fqn": "FQN", "key": "test"}'
)