diff --git a/ingestion/src/metadata/ingestion/lineage/sql_lineage.py b/ingestion/src/metadata/ingestion/lineage/sql_lineage.py index 2d8e3cb377d..2bb9070626c 100644 --- a/ingestion/src/metadata/ingestion/lineage/sql_lineage.py +++ b/ingestion/src/metadata/ingestion/lineage/sql_lineage.py @@ -13,7 +13,7 @@ Helper functions to handle SQL lineage operations """ import traceback from logging.config import DictConfigurator -from typing import Any, Iterator, List, Optional +from typing import Any, Iterable, Iterator, List, Optional from metadata.generated.schema.api.lineage.addLineage import AddLineageRequest from metadata.generated.schema.entity.data.table import Table @@ -29,8 +29,6 @@ from metadata.utils.logger import utils_logger from metadata.utils.lru_cache import LRUCache logger = utils_logger() -column_lineage_map = {} - LRU_CACHE_SIZE = 4096 @@ -154,6 +152,7 @@ def get_column_lineage( from_entity: Table, to_table_raw_name: str, from_table_raw_name: str, + column_lineage_map: dict, ) -> List[ColumnLineage]: column_lineage = [] if column_lineage_map.get(to_table_raw_name) and column_lineage_map.get( @@ -177,6 +176,7 @@ def _build_table_lineage( from_table_raw_name: str, to_table_raw_name: str, query: str, + column_lineage_map: dict, ) -> Optional[Iterator[AddLineageRequest]]: """ Prepare the lineage request generator @@ -186,6 +186,7 @@ def _build_table_lineage( to_table_raw_name=str(to_table_raw_name), from_entity=from_entity, from_table_raw_name=str(from_table_raw_name), + column_lineage_map=column_lineage_map, ) lineage_details = None if col_lineage: @@ -216,7 +217,8 @@ def _create_lineage_by_table_name( database_name: Optional[str], schema_name: Optional[str], query: str, -) -> Optional[Iterator[AddLineageRequest]]: + column_lineage_map: dict, +) -> Optional[Iterable[AddLineageRequest]]: """ This method is to create a lineage between two tables """ @@ -246,6 +248,7 @@ def _create_lineage_by_table_name( to_table_raw_name=to_table, from_table_raw_name=from_table, query=query, + column_lineage_map=column_lineage_map, ) except Exception as err: @@ -296,13 +299,13 @@ def get_lineage_by_query( # Reverting changes after import is done DictConfigurator.configure = configure - column_lineage_map.clear() + column_lineage = {} try: result = LineageRunner(query) raw_column_lineage = result.get_column_lineage() - column_lineage_map.update(populate_column_lineage_map(raw_column_lineage)) + column_lineage.update(populate_column_lineage_map(raw_column_lineage)) for intermediate_table in result.intermediate_tables: for source_table in result.source_tables: @@ -314,6 +317,7 @@ def get_lineage_by_query( database_name=database_name, schema_name=schema_name, query=query, + column_lineage_map=column_lineage, ) for target_table in result.target_tables: yield from _create_lineage_by_table_name( @@ -324,6 +328,7 @@ def get_lineage_by_query( database_name=database_name, schema_name=schema_name, query=query, + column_lineage_map=column_lineage, ) if not result.intermediate_tables: for target_table in result.target_tables: @@ -336,6 +341,7 @@ def get_lineage_by_query( database_name=database_name, schema_name=schema_name, query=query, + column_lineage_map=column_lineage, ) except Exception as err: logger.debug(str(err)) @@ -359,7 +365,8 @@ def get_lineage_via_table_entity( # Reverting changes after import is done DictConfigurator.configure = configure - column_lineage_map.clear() + column_lineage = {} + try: parser = LineageRunner(query) to_table_name = table_entity.name.__root__ @@ -373,6 +380,7 @@ def get_lineage_via_table_entity( database_name=database_name, schema_name=schema_name, query=query, + column_lineage_map=column_lineage, ) or [] except Exception: # pylint: disable=broad-except logger.warn("Failed to create view lineage") diff --git a/ingestion/tests/unit/test_sql_lineage.py b/ingestion/tests/unit/test_sql_lineage.py new file mode 100644 index 00000000000..2f38608a152 --- /dev/null +++ b/ingestion/tests/unit/test_sql_lineage.py @@ -0,0 +1,56 @@ +# Copyright 2021 Collate +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# http://www.apache.org/licenses/LICENSE-2.0 +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +sql lineage utils tests +""" +from logging.config import DictConfigurator +from unittest import TestCase + +from sqllineage.runner import LineageRunner + +from metadata.ingestion.lineage.sql_lineage import populate_column_lineage_map + +# Prevent sqllineage from modifying the logger config +# Disable the DictConfigurator.configure method while importing LineageRunner +configure = DictConfigurator.configure +DictConfigurator.configure = lambda _: None + +# Reverting changes after import is done +DictConfigurator.configure = configure + + +QUERY = [ + "CREATE TABLE MYTABLE2 AS SELECT * FROM MYTABLE1;", + "CREATE TABLE MYTABLE3 AS SELECT ID, NAME FROM MYTABLE1", + "CREATE VIEW MYVIEW2 AS SELECT NAME, CITY FROM MYTABLE1;", + "INSERT INTO MYTABLE5 SELECT ID, NAME, CITY FROM MYTABLE1;", +] +EXPECTED_LINEAGE_MAP = [ + {".mytable2": {".mytable1": [("*", "*")]}}, + {".mytable3": {".mytable1": [("ID", "ID"), ("NAME", "NAME")]}}, + {".myview2": {".mytable1": [("CITY", "CITY"), ("NAME", "NAME")]}}, + { + ".mytable5": { + ".mytable1": [("CITY", "CITY"), ("ID", "ID"), ("NAME", "NAME")] + } + }, +] + + +class SqlLineageTest(TestCase): + def test_populate_column_lineage_map(self): + + for i in range(len(QUERY)): + result = LineageRunner(QUERY[i]) + raw_column_lineage = result.get_column_lineage() + lineage_map = populate_column_lineage_map(raw_column_lineage) + self.assertEqual(lineage_map, EXPECTED_LINEAGE_MAP[i])