mirror of
https://github.com/open-metadata/OpenMetadata.git
synced 2025-08-09 01:28:11 +00:00
parent
a2894f7990
commit
d0984c5a43
@ -24,6 +24,7 @@ from metadata.utils.logger import ingestion_logger
|
|||||||
|
|
||||||
# Prevent sqllineage from modifying the logger config
|
# Prevent sqllineage from modifying the logger config
|
||||||
# Disable the DictConfigurator.configure method while importing LineageRunner
|
# Disable the DictConfigurator.configure method while importing LineageRunner
|
||||||
|
# pylint: disable=wrong-import-position
|
||||||
configure = DictConfigurator.configure
|
configure = DictConfigurator.configure
|
||||||
DictConfigurator.configure = lambda _: None
|
DictConfigurator.configure = lambda _: None
|
||||||
from sqllineage.core import models
|
from sqllineage.core import models
|
||||||
@ -36,7 +37,9 @@ DictConfigurator.configure = configure
|
|||||||
logger = ingestion_logger()
|
logger = ingestion_logger()
|
||||||
|
|
||||||
|
|
||||||
def get_involved_tables_from_parser(parser: LineageRunner) -> List[models.Table]:
|
def get_involved_tables_from_parser(
|
||||||
|
parser: LineageRunner,
|
||||||
|
) -> Optional[List[models.Table]]:
|
||||||
"""
|
"""
|
||||||
Use the LineageRunner parser and combine
|
Use the LineageRunner parser and combine
|
||||||
source and intermediate tables into
|
source and intermediate tables into
|
||||||
@ -56,6 +59,7 @@ def get_involved_tables_from_parser(parser: LineageRunner) -> List[models.Table]
|
|||||||
logger.warning(
|
logger.warning(
|
||||||
f"Cannot extract source table information from query [{parser._sql}]: {exc}" # pylint: disable=protected-access
|
f"Cannot extract source table information from query [{parser._sql}]: {exc}" # pylint: disable=protected-access
|
||||||
)
|
)
|
||||||
|
return None
|
||||||
|
|
||||||
|
|
||||||
def get_clean_parser_table_list(tables: List[models.Table]) -> List[str]:
|
def get_clean_parser_table_list(tables: List[models.Table]) -> List[str]:
|
||||||
|
@ -35,7 +35,9 @@ LRU_CACHE_SIZE = 4096
|
|||||||
def split_raw_table_name(database: str, raw_name: str) -> dict:
|
def split_raw_table_name(database: str, raw_name: str) -> dict:
|
||||||
database_schema = None
|
database_schema = None
|
||||||
if "." in raw_name:
|
if "." in raw_name:
|
||||||
database_schema, table = fqn.split(raw_name)[-2:]
|
database_schema, table = fqn.split(raw_name)[
|
||||||
|
-2:
|
||||||
|
] # pylint: disable=unbalanced-tuple-unpacking
|
||||||
if database_schema == "<default>":
|
if database_schema == "<default>":
|
||||||
database_schema = None
|
database_schema = None
|
||||||
return {"database": database, "database_schema": database_schema, "table": table}
|
return {"database": database, "database_schema": database_schema, "table": table}
|
||||||
@ -46,11 +48,13 @@ def get_column_fqn(table_entity: Table, column: str) -> Optional[str]:
|
|||||||
Get fqn of column if exist in table entity
|
Get fqn of column if exist in table entity
|
||||||
"""
|
"""
|
||||||
if not table_entity:
|
if not table_entity:
|
||||||
return
|
return None
|
||||||
for tbl_column in table_entity.columns:
|
for tbl_column in table_entity.columns:
|
||||||
if column.lower() == tbl_column.name.__root__.lower():
|
if column.lower() == tbl_column.name.__root__.lower():
|
||||||
return tbl_column.fullyQualifiedName.__root__
|
return tbl_column.fullyQualifiedName.__root__
|
||||||
|
|
||||||
|
return None
|
||||||
|
|
||||||
|
|
||||||
search_cache = LRUCache(LRU_CACHE_SIZE)
|
search_cache = LRUCache(LRU_CACHE_SIZE)
|
||||||
|
|
||||||
@ -70,29 +74,29 @@ def search_table_entities(
|
|||||||
search_tuple = (service_name, database, database_schema, table)
|
search_tuple = (service_name, database, database_schema, table)
|
||||||
if search_tuple in search_cache:
|
if search_tuple in search_cache:
|
||||||
return search_cache.get(search_tuple)
|
return search_cache.get(search_tuple)
|
||||||
else:
|
try:
|
||||||
try:
|
table_fqns = fqn.build(
|
||||||
table_fqns = fqn.build(
|
metadata,
|
||||||
metadata,
|
entity_type=Table,
|
||||||
entity_type=Table,
|
service_name=service_name,
|
||||||
service_name=service_name,
|
database_name=database,
|
||||||
database_name=database,
|
schema_name=database_schema,
|
||||||
schema_name=database_schema,
|
table_name=table,
|
||||||
table_name=table,
|
fetch_multiple_entities=True,
|
||||||
fetch_multiple_entities=True,
|
)
|
||||||
)
|
table_entities: Optional[List[Table]] = []
|
||||||
table_entities: Optional[List[Table]] = []
|
for table_fqn in table_fqns or []:
|
||||||
for table_fqn in table_fqns or []:
|
table_entity: Table = metadata.get_by_name(Table, fqn=table_fqn)
|
||||||
table_entity: Table = metadata.get_by_name(Table, fqn=table_fqn)
|
if table_entity:
|
||||||
if table_entity:
|
table_entities.append(table_entity)
|
||||||
table_entities.append(table_entity)
|
search_cache.put(search_tuple, table_entities)
|
||||||
search_cache.put(search_tuple, table_entities)
|
return table_entities
|
||||||
return table_entities
|
except Exception as exc:
|
||||||
except Exception as exc:
|
logger.debug(traceback.format_exc())
|
||||||
logger.debug(traceback.format_exc())
|
logger.error(
|
||||||
logger.error(
|
f"Error searching for table entities for service [{service_name}]: {exc}"
|
||||||
f"Error searching for table entities for service [{service_name}]: {exc}"
|
)
|
||||||
)
|
return None
|
||||||
|
|
||||||
|
|
||||||
def get_table_entities_from_query(
|
def get_table_entities_from_query(
|
||||||
@ -101,7 +105,7 @@ def get_table_entities_from_query(
|
|||||||
database_name: str,
|
database_name: str,
|
||||||
database_schema: str,
|
database_schema: str,
|
||||||
table_name: str,
|
table_name: str,
|
||||||
) -> List[Table]:
|
) -> Optional[List[Table]]:
|
||||||
"""
|
"""
|
||||||
Fetch data from API and ES with a fallback strategy.
|
Fetch data from API and ES with a fallback strategy.
|
||||||
|
|
||||||
@ -148,6 +152,8 @@ def get_table_entities_from_query(
|
|||||||
if table_entities:
|
if table_entities:
|
||||||
return table_entities
|
return table_entities
|
||||||
|
|
||||||
|
return None
|
||||||
|
|
||||||
|
|
||||||
def get_column_lineage(
|
def get_column_lineage(
|
||||||
to_entity: Table,
|
to_entity: Table,
|
||||||
@ -156,6 +162,18 @@ def get_column_lineage(
|
|||||||
from_table_raw_name: str,
|
from_table_raw_name: str,
|
||||||
column_lineage_map: dict,
|
column_lineage_map: dict,
|
||||||
) -> List[ColumnLineage]:
|
) -> List[ColumnLineage]:
|
||||||
|
"""Get column lineage
|
||||||
|
|
||||||
|
Args:
|
||||||
|
to_entity (Table): entity to link to
|
||||||
|
from_entity (Table): entity link comes from
|
||||||
|
to_table_raw_name (str): table entity raw name we link to
|
||||||
|
from_table_raw_name (str): table entity raw name we link from
|
||||||
|
column_lineage_map (dict): map of the column lineage
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
List[ColumnLineage]
|
||||||
|
"""
|
||||||
column_lineage = []
|
column_lineage = []
|
||||||
if column_lineage_map.get(to_table_raw_name) and column_lineage_map.get(
|
if column_lineage_map.get(to_table_raw_name) and column_lineage_map.get(
|
||||||
to_table_raw_name
|
to_table_raw_name
|
||||||
@ -211,6 +229,7 @@ def _build_table_lineage(
|
|||||||
yield lineage
|
yield lineage
|
||||||
|
|
||||||
|
|
||||||
|
# pylint: disable=too-many-arguments
|
||||||
def _create_lineage_by_table_name(
|
def _create_lineage_by_table_name(
|
||||||
metadata: OpenMetadata,
|
metadata: OpenMetadata,
|
||||||
from_table: str,
|
from_table: str,
|
||||||
@ -261,11 +280,16 @@ def _create_lineage_by_table_name(
|
|||||||
|
|
||||||
|
|
||||||
def populate_column_lineage_map(raw_column_lineage):
|
def populate_column_lineage_map(raw_column_lineage):
|
||||||
|
"""populate column lineage map
|
||||||
|
|
||||||
|
Args:
|
||||||
|
raw_column_lineage (_type_): raw column lineage
|
||||||
|
"""
|
||||||
lineage_map = {}
|
lineage_map = {}
|
||||||
if not raw_column_lineage or len(raw_column_lineage[0]) != 2:
|
if not raw_column_lineage or len(raw_column_lineage[0]) != 2:
|
||||||
return lineage_map
|
return lineage_map
|
||||||
for source, target in raw_column_lineage:
|
for source, target in raw_column_lineage:
|
||||||
for parent in source._parent:
|
for parent in source._parent: # pylint: disable=protected-access
|
||||||
if lineage_map.get(str(target.parent)):
|
if lineage_map.get(str(target.parent)):
|
||||||
ele = lineage_map.get(str(target.parent))
|
ele = lineage_map.get(str(target.parent))
|
||||||
if ele.get(str(parent)):
|
if ele.get(str(parent)):
|
||||||
@ -299,7 +323,9 @@ def get_lineage_by_query(
|
|||||||
# Disable the DictConfigurator.configure method while importing LineageRunner
|
# Disable the DictConfigurator.configure method while importing LineageRunner
|
||||||
configure = DictConfigurator.configure
|
configure = DictConfigurator.configure
|
||||||
DictConfigurator.configure = lambda _: None
|
DictConfigurator.configure = lambda _: None
|
||||||
from sqllineage.runner import LineageRunner
|
from sqllineage.runner import (
|
||||||
|
LineageRunner, # pylint: disable=import-outside-toplevel
|
||||||
|
)
|
||||||
|
|
||||||
# Reverting changes after import is done
|
# Reverting changes after import is done
|
||||||
DictConfigurator.configure = configure
|
DictConfigurator.configure = configure
|
||||||
@ -361,11 +387,29 @@ def get_lineage_via_table_entity(
|
|||||||
service_name: str,
|
service_name: str,
|
||||||
query: str,
|
query: str,
|
||||||
) -> Optional[Iterator[AddLineageRequest]]:
|
) -> Optional[Iterator[AddLineageRequest]]:
|
||||||
|
"""Get lineage from table entity
|
||||||
|
|
||||||
|
Args:
|
||||||
|
metadata (OpenMetadata): OM Server client Object
|
||||||
|
table_entity (Table): table entity
|
||||||
|
database_name (str): name of the database
|
||||||
|
schema_name (str): name of the schema
|
||||||
|
service_name (str): name of the service
|
||||||
|
query (str): query used for lineage
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
Optional[Iterator[AddLineageRequest]]
|
||||||
|
|
||||||
|
Yields:
|
||||||
|
Iterator[Optional[Iterator[AddLineageRequest]]]
|
||||||
|
"""
|
||||||
# Prevent sqllineage from modifying the logger config
|
# Prevent sqllineage from modifying the logger config
|
||||||
# Disable the DictConfigurator.configure method while importing LineageRunner
|
# Disable the DictConfigurator.configure method while importing LineageRunner
|
||||||
configure = DictConfigurator.configure
|
configure = DictConfigurator.configure
|
||||||
DictConfigurator.configure = lambda _: None
|
DictConfigurator.configure = lambda _: None
|
||||||
from sqllineage.runner import LineageRunner
|
from sqllineage.runner import (
|
||||||
|
LineageRunner, # pylint: disable=import-outside-toplevel
|
||||||
|
)
|
||||||
|
|
||||||
# Reverting changes after import is done
|
# Reverting changes after import is done
|
||||||
DictConfigurator.configure = configure
|
DictConfigurator.configure = configure
|
||||||
|
Loading…
x
Reference in New Issue
Block a user