diff --git a/ingestion/src/metadata/ingestion/lineage/parser.py b/ingestion/src/metadata/ingestion/lineage/parser.py index afa5018d320..ecdba0f72b4 100644 --- a/ingestion/src/metadata/ingestion/lineage/parser.py +++ b/ingestion/src/metadata/ingestion/lineage/parser.py @@ -24,6 +24,7 @@ from metadata.utils.logger import ingestion_logger # Prevent sqllineage from modifying the logger config # Disable the DictConfigurator.configure method while importing LineageRunner +# pylint: disable=wrong-import-position configure = DictConfigurator.configure DictConfigurator.configure = lambda _: None from sqllineage.core import models @@ -36,7 +37,9 @@ DictConfigurator.configure = configure logger = ingestion_logger() -def get_involved_tables_from_parser(parser: LineageRunner) -> List[models.Table]: +def get_involved_tables_from_parser( + parser: LineageRunner, +) -> Optional[List[models.Table]]: """ Use the LineageRunner parser and combine source and intermediate tables into @@ -56,6 +59,7 @@ def get_involved_tables_from_parser(parser: LineageRunner) -> List[models.Table] logger.warning( f"Cannot extract source table information from query [{parser._sql}]: {exc}" # pylint: disable=protected-access ) + return None def get_clean_parser_table_list(tables: List[models.Table]) -> List[str]: diff --git a/ingestion/src/metadata/ingestion/lineage/sql_lineage.py b/ingestion/src/metadata/ingestion/lineage/sql_lineage.py index 3d4680ae829..d03ed01cebb 100644 --- a/ingestion/src/metadata/ingestion/lineage/sql_lineage.py +++ b/ingestion/src/metadata/ingestion/lineage/sql_lineage.py @@ -35,7 +35,9 @@ LRU_CACHE_SIZE = 4096 def split_raw_table_name(database: str, raw_name: str) -> dict: database_schema = None if "." in raw_name: - database_schema, table = fqn.split(raw_name)[-2:] + database_schema, table = fqn.split(raw_name)[ + -2: + ] # pylint: disable=unbalanced-tuple-unpacking if database_schema == "": database_schema = None return {"database": database, "database_schema": database_schema, "table": table} @@ -46,11 +48,13 @@ def get_column_fqn(table_entity: Table, column: str) -> Optional[str]: Get fqn of column if exist in table entity """ if not table_entity: - return + return None for tbl_column in table_entity.columns: if column.lower() == tbl_column.name.__root__.lower(): return tbl_column.fullyQualifiedName.__root__ + return None + search_cache = LRUCache(LRU_CACHE_SIZE) @@ -70,29 +74,29 @@ def search_table_entities( search_tuple = (service_name, database, database_schema, table) if search_tuple in search_cache: return search_cache.get(search_tuple) - else: - try: - table_fqns = fqn.build( - metadata, - entity_type=Table, - service_name=service_name, - database_name=database, - schema_name=database_schema, - table_name=table, - fetch_multiple_entities=True, - ) - table_entities: Optional[List[Table]] = [] - for table_fqn in table_fqns or []: - table_entity: Table = metadata.get_by_name(Table, fqn=table_fqn) - if table_entity: - table_entities.append(table_entity) - search_cache.put(search_tuple, table_entities) - return table_entities - except Exception as exc: - logger.debug(traceback.format_exc()) - logger.error( - f"Error searching for table entities for service [{service_name}]: {exc}" - ) + try: + table_fqns = fqn.build( + metadata, + entity_type=Table, + service_name=service_name, + database_name=database, + schema_name=database_schema, + table_name=table, + fetch_multiple_entities=True, + ) + table_entities: Optional[List[Table]] = [] + for table_fqn in table_fqns or []: + table_entity: Table = metadata.get_by_name(Table, fqn=table_fqn) + if table_entity: + table_entities.append(table_entity) + search_cache.put(search_tuple, table_entities) + return table_entities + except Exception as exc: + logger.debug(traceback.format_exc()) + logger.error( + f"Error searching for table entities for service [{service_name}]: {exc}" + ) + return None def get_table_entities_from_query( @@ -101,7 +105,7 @@ def get_table_entities_from_query( database_name: str, database_schema: str, table_name: str, -) -> List[Table]: +) -> Optional[List[Table]]: """ Fetch data from API and ES with a fallback strategy. @@ -148,6 +152,8 @@ def get_table_entities_from_query( if table_entities: return table_entities + return None + def get_column_lineage( to_entity: Table, @@ -156,6 +162,18 @@ def get_column_lineage( from_table_raw_name: str, column_lineage_map: dict, ) -> List[ColumnLineage]: + """Get column lineage + + Args: + to_entity (Table): entity to link to + from_entity (Table): entity link comes from + to_table_raw_name (str): table entity raw name we link to + from_table_raw_name (str): table entity raw name we link from + column_lineage_map (dict): map of the column lineage + + Returns: + List[ColumnLineage] + """ column_lineage = [] if column_lineage_map.get(to_table_raw_name) and column_lineage_map.get( to_table_raw_name @@ -211,6 +229,7 @@ def _build_table_lineage( yield lineage +# pylint: disable=too-many-arguments def _create_lineage_by_table_name( metadata: OpenMetadata, from_table: str, @@ -261,11 +280,16 @@ def _create_lineage_by_table_name( def populate_column_lineage_map(raw_column_lineage): + """populate column lineage map + + Args: + raw_column_lineage (_type_): raw column lineage + """ lineage_map = {} if not raw_column_lineage or len(raw_column_lineage[0]) != 2: return lineage_map for source, target in raw_column_lineage: - for parent in source._parent: + for parent in source._parent: # pylint: disable=protected-access if lineage_map.get(str(target.parent)): ele = lineage_map.get(str(target.parent)) if ele.get(str(parent)): @@ -299,7 +323,9 @@ def get_lineage_by_query( # Disable the DictConfigurator.configure method while importing LineageRunner configure = DictConfigurator.configure DictConfigurator.configure = lambda _: None - from sqllineage.runner import LineageRunner + from sqllineage.runner import ( + LineageRunner, # pylint: disable=import-outside-toplevel + ) # Reverting changes after import is done DictConfigurator.configure = configure @@ -361,11 +387,29 @@ def get_lineage_via_table_entity( service_name: str, query: str, ) -> Optional[Iterator[AddLineageRequest]]: + """Get lineage from table entity + + Args: + metadata (OpenMetadata): OM Server client Object + table_entity (Table): table entity + database_name (str): name of the database + schema_name (str): name of the schema + service_name (str): name of the service + query (str): query used for lineage + + Returns: + Optional[Iterator[AddLineageRequest]] + + Yields: + Iterator[Optional[Iterator[AddLineageRequest]]] + """ # Prevent sqllineage from modifying the logger config # Disable the DictConfigurator.configure method while importing LineageRunner configure = DictConfigurator.configure DictConfigurator.configure = lambda _: None - from sqllineage.runner import LineageRunner + from sqllineage.runner import ( + LineageRunner, # pylint: disable=import-outside-toplevel + ) # Reverting changes after import is done DictConfigurator.configure = configure