mirror of
https://github.com/open-metadata/OpenMetadata.git
synced 2025-08-18 14:06:59 +00:00
Clean col-level lineage internal logic (#6500)
* Clean col-level lineage internal logic * Fix: The Python checkstyle failed * Fix: SonarCloud Bugfix * Add test * Fix: Add multiple queries * Fix: Based on review comments * Fix: make py_format issue * Fix: pytest issue * Fix: BUG FIX
This commit is contained in:
parent
49033706a8
commit
f3ec8c55a6
@ -13,7 +13,7 @@ Helper functions to handle SQL lineage operations
|
|||||||
"""
|
"""
|
||||||
import traceback
|
import traceback
|
||||||
from logging.config import DictConfigurator
|
from logging.config import DictConfigurator
|
||||||
from typing import Any, Iterator, List, Optional
|
from typing import Any, Iterable, Iterator, List, Optional
|
||||||
|
|
||||||
from metadata.generated.schema.api.lineage.addLineage import AddLineageRequest
|
from metadata.generated.schema.api.lineage.addLineage import AddLineageRequest
|
||||||
from metadata.generated.schema.entity.data.table import Table
|
from metadata.generated.schema.entity.data.table import Table
|
||||||
@ -29,8 +29,6 @@ from metadata.utils.logger import utils_logger
|
|||||||
from metadata.utils.lru_cache import LRUCache
|
from metadata.utils.lru_cache import LRUCache
|
||||||
|
|
||||||
logger = utils_logger()
|
logger = utils_logger()
|
||||||
column_lineage_map = {}
|
|
||||||
|
|
||||||
LRU_CACHE_SIZE = 4096
|
LRU_CACHE_SIZE = 4096
|
||||||
|
|
||||||
|
|
||||||
@ -154,6 +152,7 @@ def get_column_lineage(
|
|||||||
from_entity: Table,
|
from_entity: Table,
|
||||||
to_table_raw_name: str,
|
to_table_raw_name: str,
|
||||||
from_table_raw_name: str,
|
from_table_raw_name: str,
|
||||||
|
column_lineage_map: dict,
|
||||||
) -> List[ColumnLineage]:
|
) -> List[ColumnLineage]:
|
||||||
column_lineage = []
|
column_lineage = []
|
||||||
if column_lineage_map.get(to_table_raw_name) and column_lineage_map.get(
|
if column_lineage_map.get(to_table_raw_name) and column_lineage_map.get(
|
||||||
@ -177,6 +176,7 @@ def _build_table_lineage(
|
|||||||
from_table_raw_name: str,
|
from_table_raw_name: str,
|
||||||
to_table_raw_name: str,
|
to_table_raw_name: str,
|
||||||
query: str,
|
query: str,
|
||||||
|
column_lineage_map: dict,
|
||||||
) -> Optional[Iterator[AddLineageRequest]]:
|
) -> Optional[Iterator[AddLineageRequest]]:
|
||||||
"""
|
"""
|
||||||
Prepare the lineage request generator
|
Prepare the lineage request generator
|
||||||
@ -186,6 +186,7 @@ def _build_table_lineage(
|
|||||||
to_table_raw_name=str(to_table_raw_name),
|
to_table_raw_name=str(to_table_raw_name),
|
||||||
from_entity=from_entity,
|
from_entity=from_entity,
|
||||||
from_table_raw_name=str(from_table_raw_name),
|
from_table_raw_name=str(from_table_raw_name),
|
||||||
|
column_lineage_map=column_lineage_map,
|
||||||
)
|
)
|
||||||
lineage_details = None
|
lineage_details = None
|
||||||
if col_lineage:
|
if col_lineage:
|
||||||
@ -216,7 +217,8 @@ def _create_lineage_by_table_name(
|
|||||||
database_name: Optional[str],
|
database_name: Optional[str],
|
||||||
schema_name: Optional[str],
|
schema_name: Optional[str],
|
||||||
query: str,
|
query: str,
|
||||||
) -> Optional[Iterator[AddLineageRequest]]:
|
column_lineage_map: dict,
|
||||||
|
) -> Optional[Iterable[AddLineageRequest]]:
|
||||||
"""
|
"""
|
||||||
This method is to create a lineage between two tables
|
This method is to create a lineage between two tables
|
||||||
"""
|
"""
|
||||||
@ -246,6 +248,7 @@ def _create_lineage_by_table_name(
|
|||||||
to_table_raw_name=to_table,
|
to_table_raw_name=to_table,
|
||||||
from_table_raw_name=from_table,
|
from_table_raw_name=from_table,
|
||||||
query=query,
|
query=query,
|
||||||
|
column_lineage_map=column_lineage_map,
|
||||||
)
|
)
|
||||||
|
|
||||||
except Exception as err:
|
except Exception as err:
|
||||||
@ -296,13 +299,13 @@ def get_lineage_by_query(
|
|||||||
|
|
||||||
# Reverting changes after import is done
|
# Reverting changes after import is done
|
||||||
DictConfigurator.configure = configure
|
DictConfigurator.configure = configure
|
||||||
column_lineage_map.clear()
|
column_lineage = {}
|
||||||
|
|
||||||
try:
|
try:
|
||||||
result = LineageRunner(query)
|
result = LineageRunner(query)
|
||||||
|
|
||||||
raw_column_lineage = result.get_column_lineage()
|
raw_column_lineage = result.get_column_lineage()
|
||||||
column_lineage_map.update(populate_column_lineage_map(raw_column_lineage))
|
column_lineage.update(populate_column_lineage_map(raw_column_lineage))
|
||||||
|
|
||||||
for intermediate_table in result.intermediate_tables:
|
for intermediate_table in result.intermediate_tables:
|
||||||
for source_table in result.source_tables:
|
for source_table in result.source_tables:
|
||||||
@ -314,6 +317,7 @@ def get_lineage_by_query(
|
|||||||
database_name=database_name,
|
database_name=database_name,
|
||||||
schema_name=schema_name,
|
schema_name=schema_name,
|
||||||
query=query,
|
query=query,
|
||||||
|
column_lineage_map=column_lineage,
|
||||||
)
|
)
|
||||||
for target_table in result.target_tables:
|
for target_table in result.target_tables:
|
||||||
yield from _create_lineage_by_table_name(
|
yield from _create_lineage_by_table_name(
|
||||||
@ -324,6 +328,7 @@ def get_lineage_by_query(
|
|||||||
database_name=database_name,
|
database_name=database_name,
|
||||||
schema_name=schema_name,
|
schema_name=schema_name,
|
||||||
query=query,
|
query=query,
|
||||||
|
column_lineage_map=column_lineage,
|
||||||
)
|
)
|
||||||
if not result.intermediate_tables:
|
if not result.intermediate_tables:
|
||||||
for target_table in result.target_tables:
|
for target_table in result.target_tables:
|
||||||
@ -336,6 +341,7 @@ def get_lineage_by_query(
|
|||||||
database_name=database_name,
|
database_name=database_name,
|
||||||
schema_name=schema_name,
|
schema_name=schema_name,
|
||||||
query=query,
|
query=query,
|
||||||
|
column_lineage_map=column_lineage,
|
||||||
)
|
)
|
||||||
except Exception as err:
|
except Exception as err:
|
||||||
logger.debug(str(err))
|
logger.debug(str(err))
|
||||||
@ -359,7 +365,8 @@ def get_lineage_via_table_entity(
|
|||||||
|
|
||||||
# Reverting changes after import is done
|
# Reverting changes after import is done
|
||||||
DictConfigurator.configure = configure
|
DictConfigurator.configure = configure
|
||||||
column_lineage_map.clear()
|
column_lineage = {}
|
||||||
|
|
||||||
try:
|
try:
|
||||||
parser = LineageRunner(query)
|
parser = LineageRunner(query)
|
||||||
to_table_name = table_entity.name.__root__
|
to_table_name = table_entity.name.__root__
|
||||||
@ -373,6 +380,7 @@ def get_lineage_via_table_entity(
|
|||||||
database_name=database_name,
|
database_name=database_name,
|
||||||
schema_name=schema_name,
|
schema_name=schema_name,
|
||||||
query=query,
|
query=query,
|
||||||
|
column_lineage_map=column_lineage,
|
||||||
) or []
|
) or []
|
||||||
except Exception: # pylint: disable=broad-except
|
except Exception: # pylint: disable=broad-except
|
||||||
logger.warn("Failed to create view lineage")
|
logger.warn("Failed to create view lineage")
|
||||||
|
56
ingestion/tests/unit/test_sql_lineage.py
Normal file
56
ingestion/tests/unit/test_sql_lineage.py
Normal file
@ -0,0 +1,56 @@
|
|||||||
|
# Copyright 2021 Collate
|
||||||
|
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
# you may not use this file except in compliance with the License.
|
||||||
|
# You may obtain a copy of the License at
|
||||||
|
# http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
# Unless required by applicable law or agreed to in writing, software
|
||||||
|
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
# See the License for the specific language governing permissions and
|
||||||
|
# limitations under the License.
|
||||||
|
|
||||||
|
"""
|
||||||
|
sql lineage utils tests
|
||||||
|
"""
|
||||||
|
from logging.config import DictConfigurator
|
||||||
|
from unittest import TestCase
|
||||||
|
|
||||||
|
from sqllineage.runner import LineageRunner
|
||||||
|
|
||||||
|
from metadata.ingestion.lineage.sql_lineage import populate_column_lineage_map
|
||||||
|
|
||||||
|
# Prevent sqllineage from modifying the logger config
|
||||||
|
# Disable the DictConfigurator.configure method while importing LineageRunner
|
||||||
|
configure = DictConfigurator.configure
|
||||||
|
DictConfigurator.configure = lambda _: None
|
||||||
|
|
||||||
|
# Reverting changes after import is done
|
||||||
|
DictConfigurator.configure = configure
|
||||||
|
|
||||||
|
|
||||||
|
QUERY = [
|
||||||
|
"CREATE TABLE MYTABLE2 AS SELECT * FROM MYTABLE1;",
|
||||||
|
"CREATE TABLE MYTABLE3 AS SELECT ID, NAME FROM MYTABLE1",
|
||||||
|
"CREATE VIEW MYVIEW2 AS SELECT NAME, CITY FROM MYTABLE1;",
|
||||||
|
"INSERT INTO MYTABLE5 SELECT ID, NAME, CITY FROM MYTABLE1;",
|
||||||
|
]
|
||||||
|
EXPECTED_LINEAGE_MAP = [
|
||||||
|
{"<default>.mytable2": {"<default>.mytable1": [("*", "*")]}},
|
||||||
|
{"<default>.mytable3": {"<default>.mytable1": [("ID", "ID"), ("NAME", "NAME")]}},
|
||||||
|
{"<default>.myview2": {"<default>.mytable1": [("CITY", "CITY"), ("NAME", "NAME")]}},
|
||||||
|
{
|
||||||
|
"<default>.mytable5": {
|
||||||
|
"<default>.mytable1": [("CITY", "CITY"), ("ID", "ID"), ("NAME", "NAME")]
|
||||||
|
}
|
||||||
|
},
|
||||||
|
]
|
||||||
|
|
||||||
|
|
||||||
|
class SqlLineageTest(TestCase):
|
||||||
|
def test_populate_column_lineage_map(self):
|
||||||
|
|
||||||
|
for i in range(len(QUERY)):
|
||||||
|
result = LineageRunner(QUERY[i])
|
||||||
|
raw_column_lineage = result.get_column_lineage()
|
||||||
|
lineage_map = populate_column_lineage_map(raw_column_lineage)
|
||||||
|
self.assertEqual(lineage_map, EXPECTED_LINEAGE_MAP[i])
|
Loading…
x
Reference in New Issue
Block a user