mirror of
				https://github.com/open-metadata/OpenMetadata.git
				synced 2025-10-25 07:42:40 +00:00 
			
		
		
		
	
		
			
				
	
	
		
			418 lines
		
	
	
		
			12 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			418 lines
		
	
	
		
			12 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
| #  Copyright 2025 Collate
 | |
| #  Licensed under the Collate Community License, Version 1.0 (the "License");
 | |
| #  you may not use this file except in compliance with the License.
 | |
| #  You may obtain a copy of the License at
 | |
| #  https://github.com/open-metadata/OpenMetadata/blob/main/ingestion/LICENSE
 | |
| #  Unless required by applicable law or agreed to in writing, software
 | |
| #  distributed under the License is distributed on an "AS IS" BASIS,
 | |
| #  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 | |
| #  See the License for the specific language governing permissions and
 | |
| #  limitations under the License.
 | |
| """
 | |
| Test lineage parser to get inlets and outlets information
 | |
| """
 | |
| from datetime import datetime
 | |
| from typing import List, Set
 | |
| 
 | |
| from airflow import DAG
 | |
| from airflow.operators.bash import BashOperator
 | |
| from airflow.serialization.serde import serialize
 | |
| 
 | |
| from metadata.generated.schema.entity.data.container import Container
 | |
| from metadata.generated.schema.entity.data.dashboard import Dashboard
 | |
| from metadata.generated.schema.entity.data.table import Table
 | |
| from metadata.ingestion.source.pipeline.airflow.lineage_parser import (
 | |
|     OMEntity,
 | |
|     XLets,
 | |
|     XLetsMode,
 | |
|     _parse_xlets,
 | |
|     get_xlets_from_dag,
 | |
|     get_xlets_from_operator,
 | |
|     parse_xlets,
 | |
| )
 | |
| 
 | |
| SLEEP = "sleep 1"
 | |
| 
 | |
| 
 | |
| def xlet_fqns(xlet: XLets, xlet_mode: XLetsMode) -> Set[str]:
 | |
|     """Helper method to get a set of FQNs out of the xlet"""
 | |
|     return set(elem.fqn for elem in getattr(xlet, xlet_mode.value))
 | |
| 
 | |
| 
 | |
| def assert_xlets_equals(first: List[XLets], second: List[XLets]):
 | |
|     """
 | |
|     Check that both XLet lists are the same
 | |
| 
 | |
|     Even if they are lists, we don't care about the order.
 | |
| 
 | |
|     Note that we cannot use sets since `OMEntity` is not hashable.
 | |
| 
 | |
|     For this test, we will assume that by having the same FQN, the
 | |
|     entity type will also be the same.
 | |
|     """
 | |
|     assert len(first) == len(second)
 | |
| 
 | |
|     for xlet1 in first:
 | |
|         match = False
 | |
| 
 | |
|         first_inlets = xlet_fqns(xlet1, XLetsMode.INLETS)
 | |
|         first_outlets = xlet_fqns(xlet1, XLetsMode.OUTLETS)
 | |
| 
 | |
|         for xlet2 in second:
 | |
|             second_inlets = xlet_fqns(xlet2, XLetsMode.INLETS)
 | |
|             second_outlets = xlet_fqns(xlet2, XLetsMode.OUTLETS)
 | |
| 
 | |
|             if first_inlets == second_inlets and first_outlets == second_outlets:
 | |
|                 match = True
 | |
|                 break
 | |
| 
 | |
|         assert match
 | |
| 
 | |
| 
 | |
| def test_parse_xlets():
 | |
|     """
 | |
|     Handle the shape validation of inlets and outlets, e.g.,
 | |
|     [{
 | |
|         "tables": ["A"],
 | |
|         "more_tables": ["X"]
 | |
|     }],
 | |
|     """
 | |
|     raw_xlet = [{"tables": ["A"], "more_tables": ["X"]}]
 | |
|     assert parse_xlets(raw_xlet) == (
 | |
|         {
 | |
|             "tables": [OMEntity(entity=Table, fqn="A")],
 | |
|             "more_tables": [OMEntity(entity=Table, fqn="X")],
 | |
|         }
 | |
|     )
 | |
| 
 | |
|     raw_xlet_without_list = [{"tables": ["A"], "more_tables": "random"}]
 | |
|     assert parse_xlets(raw_xlet_without_list) == (
 | |
|         {
 | |
|             "tables": [OMEntity(entity=Table, fqn="A")],
 | |
|         }
 | |
|     )
 | |
| 
 | |
|     not_an_xlet_list = {"tables": ["A"]}
 | |
|     assert parse_xlets(not_an_xlet_list) is None
 | |
| 
 | |
| 
 | |
| def test_get_xlets_from_operator():
 | |
|     """
 | |
|     Check how to get xlet data from operators
 | |
|     """
 | |
|     operator = BashOperator(
 | |
|         task_id="print_date",
 | |
|         bash_command="date",
 | |
|         outlets={"tables": ["A"]},
 | |
|     )
 | |
| 
 | |
|     assert get_xlets_from_operator(operator, XLetsMode.INLETS) is None
 | |
|     # But the outlets are parsed correctly
 | |
|     assert get_xlets_from_operator(operator, xlet_mode=XLetsMode.OUTLETS) == (
 | |
|         {"tables": [OMEntity(entity=Table, fqn="A")]}
 | |
|     )
 | |
| 
 | |
|     operator = BashOperator(
 | |
|         task_id="print_date",
 | |
|         bash_command="date",
 | |
|         inlets={"tables": ["A"], "more_tables": ["X"]},
 | |
|     )
 | |
| 
 | |
|     assert get_xlets_from_operator(operator, xlet_mode=XLetsMode.INLETS) == (
 | |
|         {
 | |
|             "tables": [OMEntity(entity=Table, fqn="A")],
 | |
|             "more_tables": [OMEntity(entity=Table, fqn="X")],
 | |
|         }
 | |
|     )
 | |
|     assert get_xlets_from_operator(operator, xlet_mode=XLetsMode.OUTLETS) is None
 | |
| 
 | |
| 
 | |
| def test_get_string_xlets_from_dag():
 | |
|     """
 | |
|     Check that we can properly join the xlet information from
 | |
|     all operators in the DAG
 | |
|     """
 | |
| 
 | |
|     with DAG("test_dag", start_date=datetime(2021, 1, 1)) as dag:
 | |
|         BashOperator(
 | |
|             task_id="print_date",
 | |
|             bash_command="date",
 | |
|             inlets={"tables": ["A"]},
 | |
|         )
 | |
| 
 | |
|         BashOperator(
 | |
|             task_id="sleep",
 | |
|             bash_command=SLEEP,
 | |
|             outlets={"tables": ["B"]},
 | |
|         )
 | |
| 
 | |
|         assert_xlets_equals(
 | |
|             get_xlets_from_dag(dag),
 | |
|             [
 | |
|                 XLets(
 | |
|                     inlets=[OMEntity(entity=Table, fqn="A")],
 | |
|                     outlets=[OMEntity(entity=Table, fqn="B")],
 | |
|                 )
 | |
|             ],
 | |
|         )
 | |
| 
 | |
|     with DAG("test_dag", start_date=datetime(2021, 1, 1)) as dag:
 | |
|         BashOperator(
 | |
|             task_id="print_date",
 | |
|             bash_command="date",
 | |
|             inlets={"tables": ["A", "A"], "this is a bit random": "foo"},
 | |
|         )
 | |
| 
 | |
|         BashOperator(
 | |
|             task_id="sleep",
 | |
|             bash_command=SLEEP,
 | |
|             outlets={"tables": ["B"]},
 | |
|         )
 | |
| 
 | |
|         assert_xlets_equals(
 | |
|             get_xlets_from_dag(dag),
 | |
|             [
 | |
|                 XLets(
 | |
|                     inlets=[OMEntity(entity=Table, fqn="A")],
 | |
|                     outlets=[OMEntity(entity=Table, fqn="B")],
 | |
|                 )
 | |
|             ],
 | |
|         )
 | |
| 
 | |
|     with DAG("test_dag", start_date=datetime(2021, 1, 1)) as dag:
 | |
|         BashOperator(
 | |
|             task_id="print_date",
 | |
|             bash_command="date",
 | |
|             inlets={
 | |
|                 "tables": ["A", "A"],
 | |
|                 "more_tables": ["X", "Y"],
 | |
|                 "this is a bit random": "foo",
 | |
|             },
 | |
|         )
 | |
| 
 | |
|         BashOperator(
 | |
|             task_id="sleep",
 | |
|             bash_command=SLEEP,
 | |
|             outlets={
 | |
|                 "tables": ["B"],
 | |
|                 "more_tables": ["Z"],
 | |
|             },
 | |
|         )
 | |
| 
 | |
|         assert_xlets_equals(
 | |
|             get_xlets_from_dag(dag),
 | |
|             [
 | |
|                 XLets(
 | |
|                     inlets=[OMEntity(entity=Table, fqn="A")],
 | |
|                     outlets=[OMEntity(entity=Table, fqn="B")],
 | |
|                 ),
 | |
|                 XLets(
 | |
|                     inlets=[
 | |
|                         OMEntity(entity=Table, fqn="X"),
 | |
|                         OMEntity(entity=Table, fqn="Y"),
 | |
|                     ],
 | |
|                     outlets=[OMEntity(entity=Table, fqn="Z")],
 | |
|                 ),
 | |
|             ],
 | |
|         )
 | |
| 
 | |
|     with DAG("test_dag_cycle", start_date=datetime(2021, 1, 1)) as dag:
 | |
|         BashOperator(
 | |
|             task_id="print_date",
 | |
|             bash_command="date",
 | |
|             inlets={
 | |
|                 "tables": ["A", "B"],
 | |
|             },
 | |
|         )
 | |
| 
 | |
|         BashOperator(
 | |
|             task_id="sleep",
 | |
|             bash_command=SLEEP,
 | |
|             outlets={
 | |
|                 "tables": ["B"],
 | |
|             },
 | |
|         )
 | |
| 
 | |
|         assert_xlets_equals(
 | |
|             get_xlets_from_dag(dag),
 | |
|             [
 | |
|                 XLets(
 | |
|                     inlets=[
 | |
|                         OMEntity(entity=Table, fqn="A"),
 | |
|                         OMEntity(entity=Table, fqn="B"),
 | |
|                     ],
 | |
|                     outlets=[OMEntity(entity=Table, fqn="B")],
 | |
|                 ),
 | |
|             ],
 | |
|         )
 | |
| 
 | |
| 
 | |
| def test_get_dict_xlets_from_dag():
 | |
|     """
 | |
|     We can get the dict-based xlets  for any entity without
 | |
|     annotating directly with OMEntity
 | |
|     """
 | |
|     with DAG("test_dag_cycle", start_date=datetime(2021, 1, 1)) as dag:
 | |
|         BashOperator(
 | |
|             task_id="print_date",
 | |
|             bash_command="date",
 | |
|             inlets=[
 | |
|                 {"entity": "container", "fqn": "C1", "key": "test"},
 | |
|                 {"entity": "container", "fqn": "C2", "key": "test"},
 | |
|             ],
 | |
|         )
 | |
| 
 | |
|         BashOperator(
 | |
|             task_id="sleep",
 | |
|             bash_command=SLEEP,
 | |
|             outlets=[
 | |
|                 {"entity": "table", "fqn": "T", "key": "test"},
 | |
|             ],
 | |
|         )
 | |
| 
 | |
|         assert_xlets_equals(
 | |
|             get_xlets_from_dag(dag),
 | |
|             [
 | |
|                 XLets(
 | |
|                     inlets=[
 | |
|                         OMEntity(entity=Container, fqn="C1", key="test"),
 | |
|                         OMEntity(entity=Container, fqn="C2", key="test"),
 | |
|                     ],
 | |
|                     outlets=[OMEntity(entity=Table, fqn="T", key="test")],
 | |
|                 ),
 | |
|             ],
 | |
|         )
 | |
| 
 | |
| 
 | |
| def test_get_attrs_xlets_from_dag():
 | |
|     """
 | |
|     Check that we can properly join the xlet information from
 | |
|     all operators in the DAG
 | |
|     """
 | |
|     with DAG("test_dag", start_date=datetime(2021, 1, 1)) as dag:
 | |
|         BashOperator(
 | |
|             task_id="print_date",
 | |
|             bash_command="date",
 | |
|             inlets=[
 | |
|                 OMEntity(entity=Table, fqn="A"),
 | |
|                 OMEntity(entity=Table, fqn="B"),
 | |
|             ],
 | |
|         )
 | |
| 
 | |
|         BashOperator(
 | |
|             task_id="sleep",
 | |
|             bash_command=SLEEP,
 | |
|             outlets=[OMEntity(entity=Table, fqn="C")],
 | |
|         )
 | |
| 
 | |
|         BashOperator(
 | |
|             task_id="sleep2",
 | |
|             bash_command=SLEEP,
 | |
|             outlets=[OMEntity(entity=Container, fqn="D")],
 | |
|         )
 | |
| 
 | |
|         assert_xlets_equals(
 | |
|             get_xlets_from_dag(dag),
 | |
|             [
 | |
|                 XLets(
 | |
|                     inlets=[
 | |
|                         OMEntity(entity=Table, fqn="A"),
 | |
|                         OMEntity(entity=Table, fqn="B"),
 | |
|                     ],
 | |
|                     outlets=[
 | |
|                         OMEntity(entity=Table, fqn="C"),
 | |
|                         OMEntity(entity=Container, fqn="D"),
 | |
|                     ],
 | |
|                 )
 | |
|             ],
 | |
|         )
 | |
| 
 | |
| 
 | |
| def test_om_entity_serializer():
 | |
|     """To ensure the serialized DAGs will have the right shape"""
 | |
|     om_entity = OMEntity(
 | |
|         entity=Table,
 | |
|         fqn="FQN",
 | |
|         key="test",
 | |
|     )
 | |
|     assert str(om_entity) == (
 | |
|         '{"entity": "metadata.generated.schema.entity.data.table.Table", "fqn": "FQN", "key": "test"}'
 | |
|     )
 | |
| 
 | |
|     om_entity = OMEntity(
 | |
|         entity=Container,
 | |
|         fqn="FQN",
 | |
|         key="test",
 | |
|     )
 | |
|     assert str(om_entity) == (
 | |
|         '{"entity": "metadata.generated.schema.entity.data.container.Container", "fqn": "FQN", "key": "test"}'
 | |
|     )
 | |
| 
 | |
| 
 | |
| def test_str_deserializer():
 | |
|     """
 | |
|     Once a DAG is serialized, the xlet info will be stored as:
 | |
|     ```
 | |
|     ['{"entity": "metadata.generated.schema.entity.data.table.Table", "fqn": "FQN", "key": "test"}']
 | |
|     ```
 | |
|     based on our custom serialization logic.
 | |
| 
 | |
|     Validate the deserialization process.
 | |
|     """
 | |
|     assert _parse_xlets("random") is None
 | |
| 
 | |
|     assert _parse_xlets(
 | |
|         '{"entity": "metadata.generated.schema.entity.data.table.Table", "fqn": "FQN", "key": "test"}'
 | |
|     ) == (
 | |
|         {
 | |
|             "test": [
 | |
|                 OMEntity(
 | |
|                     entity=Table,
 | |
|                     fqn="FQN",
 | |
|                     key="test",
 | |
|                 )
 | |
|             ]
 | |
|         }
 | |
|     )
 | |
| 
 | |
|     assert _parse_xlets(
 | |
|         '{"entity": "metadata.generated.schema.entity.data.container.Container", "fqn": "FQN", "key": "test"}'
 | |
|     ) == (
 | |
|         {
 | |
|             "test": [
 | |
|                 OMEntity(
 | |
|                     entity=Container,
 | |
|                     fqn="FQN",
 | |
|                     key="test",
 | |
|                 )
 | |
|             ]
 | |
|         }
 | |
|     )
 | |
| 
 | |
|     assert _parse_xlets(
 | |
|         '{"entity": "metadata.generated.schema.entity.data.dashboard.Dashboard", "fqn": "FQN", "key": "test"}'
 | |
|     ) == (
 | |
|         {
 | |
|             "test": [
 | |
|                 OMEntity(
 | |
|                     entity=Dashboard,
 | |
|                     fqn="FQN",
 | |
|                     key="test",
 | |
|                 )
 | |
|             ]
 | |
|         }
 | |
|     )
 | |
| 
 | |
| 
 | |
| def test_airflow_serializer():
 | |
|     """It should be able to serialize our models"""
 | |
|     om_entity = OMEntity(
 | |
|         entity=Table,
 | |
|         fqn="FQN",
 | |
|         key="test",
 | |
|     )
 | |
| 
 | |
|     assert serialize(om_entity).get("__data__") == (
 | |
|         '{"entity": "metadata.generated.schema.entity.data.table.Table", "fqn": "FQN", "key": "test"}'
 | |
|     )
 | 
