diff --git a/ingestion/setup.py b/ingestion/setup.py index 228a76e73ca..3e076306165 100644 --- a/ingestion/setup.py +++ b/ingestion/setup.py @@ -50,6 +50,7 @@ base_requirements = { "croniter~=1.3.0", "requests-aws4auth==1.1.2", "pymysql>=1.0.2", + "cached-property==1.5.2", } diff --git a/ingestion/src/metadata/ingestion/lineage/parser.py b/ingestion/src/metadata/ingestion/lineage/parser.py index 3dd25e0222f..3a1b0d43e05 100644 --- a/ingestion/src/metadata/ingestion/lineage/parser.py +++ b/ingestion/src/metadata/ingestion/lineage/parser.py @@ -14,12 +14,18 @@ Lineage Parser configuration import traceback from collections import defaultdict from logging.config import DictConfigurator -from typing import Dict, List, Optional, Tuple +from typing import Any, Dict, List, Optional, Tuple +from cached_property import cached_property from sqlparse.sql import Comparison, Identifier, Statement from metadata.generated.schema.type.tableUsageCount import TableColumn, TableColumnJoin -from metadata.utils.helpers import find_in_iter, get_formatted_entity_name +from metadata.utils.helpers import ( + find_in_iter, + get_formatted_entity_name, + insensitive_match, + insensitive_replace, +) from metadata.utils.logger import ingestion_logger # Prevent sqllineage from modifying the logger config @@ -27,7 +33,7 @@ from metadata.utils.logger import ingestion_logger # pylint: disable=wrong-import-position configure = DictConfigurator.configure DictConfigurator.configure = lambda _: None -from sqllineage.core import models +from sqllineage.core.models import Column, Table from sqllineage.exceptions import SQLLineageException from sqllineage.runner import LineageRunner @@ -37,214 +43,289 @@ DictConfigurator.configure = configure logger = ingestion_logger() -def get_involved_tables_from_parser( - parser: LineageRunner, -) -> Optional[List[models.Table]]: +class LineageParser: """ - Use the LineageRunner parser and combine - source and intermediate tables into - a single set. - :param parser: LineageRunner - :return: List of involved tables + Class that acts like a wrapper for the LineageRunner library usage """ - try: + + parser: LineageRunner + query: str + _clean_query: str + + def __init__(self, query: str): + self.query = query + self._clean_query = self.clean_raw_query(query) + self.parser = LineageRunner(query) + + @cached_property + def involved_tables(self) -> Optional[List[Table]]: + """ + Use the LineageRunner parser and combine + source and intermediate tables into + a single set. + :return: List of involved tables + """ + try: + return list( + set(self.source_tables) + .union(set(self.intermediate_tables)) + .union(set(self.target_tables)) + ) + except SQLLineageException as exc: + logger.debug(traceback.format_exc()) + logger.warning( + f"Cannot extract source table information from query [{self.query}]: {exc}" + ) + return None + + @cached_property + def intermediate_tables(self) -> List[Table]: + """ + Get a list of intermediate tables + """ # These are @lazy_property, not properly being picked up by IDEs. Ignore the warning - return list( - set(parser.source_tables) - .union(set(parser.intermediate_tables)) - .union(set(parser.target_tables)) + return self.retrieve_tables(self.parser.intermediate_tables) + + @cached_property + def source_tables(self) -> List[Table]: + """ + Get a list of source tables + """ + # These are @lazy_property, not properly being picked up by IDEs. Ignore the warning + return self.retrieve_tables(self.parser.source_tables) + + @cached_property + def target_tables(self) -> List[Table]: + """ + Get a list of target tables + """ + # These are @lazy_property, not properly being picked up by IDEs. Ignore the warning + return self.retrieve_tables(self.parser.target_tables) + + @cached_property + def column_lineage(self) -> List[Tuple[Column, Column]]: + """ + Get a list of tuples of column lineage + """ + return self.parser.get_column_lineage() + + @cached_property + def clean_table_list(self) -> List[str]: + """ + Clean the table name if it has . + :return: clean table names + """ + return [get_formatted_entity_name(str(table)) for table in self.involved_tables] + + @cached_property + def table_aliases(self) -> Dict[str, str]: + """ + Prepare a dictionary in the shape of {alias: table_name} from + the parser tables + :return: alias dict + """ + return { + table.alias: str(table).replace(".", "") + for table in self.involved_tables + } + + def get_table_name_from_list( + self, + database_name: Optional[str], + schema_name: Optional[str], + table_name: str, + ) -> Optional[str]: + """ + Find the table name (in any format in my come) + from the list using the given ingredients. + :param database_name: db name + :param schema_name: schema name + :param table_name: table name + :return: table name from parser info + """ + tables = self.clean_table_list + table = find_in_iter(element=table_name, container=tables) + if table: + return table + + schema_table = find_in_iter( + element=f"{schema_name}.{table_name}", container=tables ) - except SQLLineageException as exc: - logger.debug(traceback.format_exc()) - logger.warning( - f"Cannot extract source table information from query [{parser._sql}]: {exc}" # pylint: disable=protected-access + if schema_table: + return schema_table + + db_schema_table = find_in_iter( + element=f"{database_name}.{schema_name}.{table_name}", container=tables ) + if db_schema_table: + return db_schema_table + + logger.debug(f"Cannot find table {db_schema_table} in involved tables") return None + def get_comparison_elements( + self, identifier: Identifier + ) -> Tuple[Optional[str], Optional[str]]: + """ + Return the tuple table_name, column_name from each comparison element + :param identifier: comparison identifier + :return: table name and column name from the identifier + """ + aliases = self.table_aliases + values = identifier.value.split(".") + database_name, schema_name, table_or_alias, column_name = ( + [None] * (4 - len(values)) + ) + values -def get_clean_parser_table_list(tables: List[models.Table]) -> List[str]: - """ - Clean the table name if it has . - :param tables: involved tables - :return: clean table names - """ - return [get_formatted_entity_name(str(table)) for table in tables] + if not table_or_alias or not column_name: + logger.debug( + f"Cannot obtain comparison elements from identifier {identifier}" + ) + return None, None + alias_to_table = aliases.get(table_or_alias) + if alias_to_table: + return alias_to_table, column_name -def get_parser_table_aliases(tables: List[models.Table]) -> Dict[str, str]: - """ - Prepare a dictionary in the shape of {alias: table_name} from - the parser tables - :param tables: parser tables - :return: alias dict - """ - return {table.alias: str(table).replace(".", "") for table in tables} - - -def get_table_name_from_list( - database_name: Optional[str], - schema_name: Optional[str], - table_name: str, - tables: List[str], -) -> Optional[str]: - """ - Find the table name (in any format in my come) - from the list using the given ingredients. - :param database_name: db name - :param schema_name: schema name - :param table_name: table name - :param tables: Contains all involved tables - :return: table name from parser info - """ - table = find_in_iter(element=table_name, container=tables) - if table: - return table - - schema_table = find_in_iter(element=f"{schema_name}.{table_name}", container=tables) - if schema_table: - return schema_table - - db_schema_table = find_in_iter( - element=f"{database_name}.{schema_name}.{table_name}", container=tables - ) - if db_schema_table: - return db_schema_table - - logger.debug(f"Cannot find table {db_schema_table} in involved tables") - return None - - -def get_comparison_elements( - identifier: Identifier, tables: List[str], aliases: Dict[str, str] -) -> Optional[Tuple[str, str]]: - """ - Return the tuple table_name, column_name from each comparison element - :param identifier: comparison identifier - :param tables: involved tables - :param aliases: table aliases - :return: table name and column name from the identifier - """ - values = identifier.value.split(".") - database_name, schema_name, table_or_alias, column_name = ( - [None] * (4 - len(values)) - ) + values - - if not table_or_alias or not column_name: - logger.debug(f"Cannot obtain comparison elements from identifier {identifier}") - return None, None - - alias_to_table = aliases.get(table_or_alias) - if alias_to_table: - return alias_to_table, column_name - - table_from_list = get_table_name_from_list( - database_name=database_name, - schema_name=schema_name, - table_name=table_or_alias, - tables=tables, - ) - - if not table_from_list: - logger.debug(f"Cannot find {table_or_alias} in comparison elements") - return None, None - - return table_from_list, column_name - - -def stateful_add_table_joins( - statement_joins: Dict[str, List[TableColumnJoin]], - source: TableColumn, - target: TableColumn, -) -> None: - """ - Update the statement_joins dict with the new table information - :param statement_joins: dict with state info - :param source: source TableColumn - :param target: target TableColumn - """ - - if source.table not in statement_joins: - statement_joins[source.table].append( - TableColumnJoin(tableColumn=source, joinedWith=[target]) + table_from_list = self.get_table_name_from_list( + database_name=database_name, + schema_name=schema_name, + table_name=table_or_alias, ) - else: - # check if new column from same table - table_columns = [ - join_info.tableColumn for join_info in statement_joins[source.table] - ] - existing_table_column = find_in_iter(element=source, container=table_columns) - if existing_table_column: - existing_join_info = [ - join_info - for join_info in statement_joins[source.table] - if join_info.tableColumn == existing_table_column - ][0] - existing_join_info.joinedWith.append(target) - # processing now join column from source table - else: + if not table_from_list: + logger.debug(f"Cannot find {table_or_alias} in comparison elements") + return None, None + + return table_from_list, column_name + + @staticmethod + def stateful_add_table_joins( + statement_joins: Dict[str, List[TableColumnJoin]], + source: TableColumn, + target: TableColumn, + ) -> None: + """ + Update the statement_joins dict with the new table information + :param statement_joins: dict with state info + :param source: source TableColumn + :param target: target TableColumn + """ + + if source.table not in statement_joins: statement_joins[source.table].append( TableColumnJoin(tableColumn=source, joinedWith=[target]) ) + else: + # check if new column from same table + table_columns = [ + join_info.tableColumn for join_info in statement_joins[source.table] + ] + existing_table_column = find_in_iter( + element=source, container=table_columns + ) + if existing_table_column: + existing_join_info = [ + join_info + for join_info in statement_joins[source.table] + if join_info.tableColumn == existing_table_column + ][0] + existing_join_info.joinedWith.append(target) + # processing now join column from source table + else: + statement_joins[source.table].append( + TableColumnJoin(tableColumn=source, joinedWith=[target]) + ) -def stateful_add_joins_from_statement( - join_data: Dict[str, List[TableColumnJoin]], - statement: Statement, - tables: List[str], - aliases: Dict[str, str], -) -> None: - """ - Parse a single statement to pick up join information - :param join_data: join data from previous statements - :param statement: Parsed sql statement to process - :param tables: involved tables in the query - :param aliases: table aliases dict - :return: for each table name, list all joins against other tables - """ - # Here we want to get tokens such as `tableA.col1 = tableB.col2` - comparisons = [ - sub for sub in statement.get_sublists() if isinstance(sub, Comparison) - ] - for comparison in comparisons: - if "." not in comparison.left.value or "." not in comparison.right.value: - logger.debug(f"Ignoring comparison {comparison}") - continue + def stateful_add_joins_from_statement( + self, + join_data: Dict[str, List[TableColumnJoin]], + statement: Statement, + ) -> None: + """ + Parse a single statement to pick up join information + :param join_data: join data from previous statements + :param statement: Parsed sql statement to process + :return: for each table name, list all joins against other tables + """ + # Here we want to get tokens such as `tableA.col1 = tableB.col2` + comparisons = [ + sub for sub in statement.get_sublists() if isinstance(sub, Comparison) + ] + for comparison in comparisons: + if "." not in comparison.left.value or "." not in comparison.right.value: + logger.debug(f"Ignoring comparison {comparison}") + continue - table_left, column_left = get_comparison_elements( - identifier=comparison.left, tables=tables, aliases=aliases - ) - table_right, column_right = get_comparison_elements( - identifier=comparison.right, tables=tables, aliases=aliases + table_left, column_left = self.get_comparison_elements( + identifier=comparison.left + ) + table_right, column_right = self.get_comparison_elements( + identifier=comparison.right + ) + + if not table_left or not table_right: + logger.warning(f"Cannot find ingredients from {comparison}") + continue + + left_table_column = TableColumn(table=table_left, column=column_left) + right_table_column = TableColumn(table=table_right, column=column_right) + + # We just send the info once, from Left -> Right. + # The backend will prepare the symmetric information. + self.stateful_add_table_joins( + join_data, left_table_column, right_table_column + ) + + @cached_property + def table_joins(self) -> Dict[str, List[TableColumnJoin]]: + """ + For each table involved in the query, find its joins against any + other table. + :return: for each table name, list all joins against other tables + """ + join_data = defaultdict(list) + # These are @lazy_property, not properly being picked up by IDEs. Ignore the warning + for statement in self.parser.statements_parsed: + self.stateful_add_joins_from_statement(join_data, statement=statement) + + return join_data + + def retrieve_tables(self, tables: List[Any]) -> List[Table]: + if not self._clean_query: + return [] + return [table for table in tables if isinstance(table, Table)] + + @classmethod + def clean_raw_query(cls, raw_query: str) -> Optional[str]: + """ + Given a raw query from any input (e.g., view definition, + query from logs, etc.), perform a cleaning step + before passing it to the LineageRunner + """ + clean_query = insensitive_replace( + raw_str=raw_query, + to_replace=" copy grants ", # snowflake specific + replace_by=" ", # remove it as it does not add any value to lineage ) - if not table_left or not table_right: - logger.warning(f"Cannot find ingredients from {comparison}") - continue - - left_table_column = TableColumn(table=table_left, column=column_left) - right_table_column = TableColumn(table=table_right, column=column_right) - - # We just send the info once, from Left -> Right. - # The backend will prepare the symmetric information. - stateful_add_table_joins(join_data, left_table_column, right_table_column) - - -def get_table_joins( - parser: LineageRunner, tables: List[str], aliases: Dict[str, str] -) -> Dict[str, List[TableColumnJoin]]: - """ - For each table involved in the query, find its joins against any - other table. - :param parser: LineageRunner parser - :param tables: involved tables in the query - :param aliases: table aliases dict - :return: for each table name, list all joins against other tables - """ - join_data = defaultdict(list) - for statement in parser.statements_parsed: - stateful_add_joins_from_statement( - join_data, statement=statement, tables=tables, aliases=aliases + clean_query = insensitive_replace( + raw_str=clean_query.strip(), + to_replace="\n", # remove line breaks + replace_by=" ", ) - return join_data + if insensitive_match(clean_query, ".*merge into .*when matched.*"): + clean_query = insensitive_replace( + raw_str=clean_query, + to_replace="when matched.*", # merge into queries specific + replace_by="", # remove it as LineageRunner is not able to perform the lineage + ) + + # We remove queries of the type 'COPY table FROM path' since they are data manipulation statement and do not + # provide value for user + if insensitive_match(clean_query, "^COPY.*"): + return None + + return clean_query.strip() diff --git a/ingestion/src/metadata/ingestion/lineage/sql_lineage.py b/ingestion/src/metadata/ingestion/lineage/sql_lineage.py index 030e19275cf..3ec22a2cdf5 100644 --- a/ingestion/src/metadata/ingestion/lineage/sql_lineage.py +++ b/ingestion/src/metadata/ingestion/lineage/sql_lineage.py @@ -12,7 +12,6 @@ Helper functions to handle SQL lineage operations """ import traceback -from logging.config import DictConfigurator from typing import Any, Iterable, Iterator, List, Optional from metadata.generated.schema.api.lineage.addLineage import AddLineageRequest @@ -23,64 +22,16 @@ from metadata.generated.schema.type.entityLineage import ( LineageDetails, ) from metadata.generated.schema.type.entityReference import EntityReference +from metadata.ingestion.lineage.parser import LineageParser from metadata.ingestion.ometa.ometa_api import OpenMetadata from metadata.utils import fqn -from metadata.utils.helpers import insensitive_match, insensitive_replace from metadata.utils.logger import utils_logger from metadata.utils.lru_cache import LRUCache -# Prevent sqllineage from modifying the logger config -# Disable the DictConfigurator.configure method while importing LineageRunner -configure = DictConfigurator.configure -DictConfigurator.configure = lambda _: None -from sqllineage.runner import LineageRunner # pylint: disable=wrong-import-position - -# Reverting changes after import is done -DictConfigurator.configure = configure - logger = utils_logger() LRU_CACHE_SIZE = 4096 -def clean_raw_query(raw_query: str) -> str: - """ - Given a raw query from any input (e.g., view definition, - query from logs, etc.), perform a cleaning step - before passing it to the LineageRunner - """ - clean_query = insensitive_replace( - raw_str=raw_query, - to_replace=" copy grants ", # snowflake specific - replace_by=" ", # remove it as it does not add any value to lineage - ) - - clean_query = insensitive_replace( - raw_str=clean_query.strip(), - to_replace="\n", # remove line breaks - replace_by=" ", - ) - - if insensitive_match(clean_query, ".*merge into .*when matched.*"): - clean_query = insensitive_replace( - raw_str=clean_query, - to_replace="when matched.*", # merge into queries specific - replace_by="", # remove it as LineageRunner is not able to perform the lineage - ) - - return clean_query.strip() - - -def split_raw_table_name(database: str, raw_name: str) -> dict: - database_schema = None - if "." in raw_name: - # pylint: disable=unbalanced-tuple-unpacking - database_schema, table = fqn.split(raw_name)[-2:] - # pylint: enable=unbalanced-tuple-unpacking - if database_schema == "": - database_schema = None - return {"database": database, "database_schema": database_schema, "table": table} - - def get_column_fqn(table_entity: Table, column: str) -> Optional[str]: """ Get fqn of column if exist in table entity @@ -378,13 +329,13 @@ def get_lineage_by_query( try: logger.debug(f"Running lineage with query: {query}") - result = LineageRunner(clean_raw_query(query)) + lineage_parser = LineageParser(query) - raw_column_lineage = result.get_column_lineage() + raw_column_lineage = lineage_parser.column_lineage column_lineage.update(populate_column_lineage_map(raw_column_lineage)) - for intermediate_table in result.intermediate_tables: - for source_table in result.source_tables: + for intermediate_table in lineage_parser.intermediate_tables: + for source_table in lineage_parser.source_tables: yield from _create_lineage_by_table_name( metadata, from_table=str(source_table), @@ -395,7 +346,7 @@ def get_lineage_by_query( query=query, column_lineage_map=column_lineage, ) - for target_table in result.target_tables: + for target_table in lineage_parser.target_tables: yield from _create_lineage_by_table_name( metadata, from_table=str(intermediate_table), @@ -406,9 +357,9 @@ def get_lineage_by_query( query=query, column_lineage_map=column_lineage, ) - if not result.intermediate_tables: - for target_table in result.target_tables: - for source_table in result.source_tables: + if not lineage_parser.intermediate_tables: + for target_table in lineage_parser.target_tables: + for source_table in lineage_parser.source_tables: yield from _create_lineage_by_table_name( metadata, from_table=str(source_table), @@ -452,10 +403,10 @@ def get_lineage_via_table_entity( try: logger.debug(f"Getting lineage via table entity using query: {query}") - parser = LineageRunner(clean_raw_query(query)) + lineage_parser = LineageParser(query) to_table_name = table_entity.name.__root__ - for from_table_name in parser.source_tables: + for from_table_name in lineage_parser.source_tables: yield from _create_lineage_by_table_name( metadata, from_table=str(from_table_name), diff --git a/ingestion/src/metadata/ingestion/processor/query_parser.py b/ingestion/src/metadata/ingestion/processor/query_parser.py index ecbba14008b..97bfe147761 100644 --- a/ingestion/src/metadata/ingestion/processor/query_parser.py +++ b/ingestion/src/metadata/ingestion/processor/query_parser.py @@ -14,7 +14,6 @@ Query parser implementation import datetime import traceback -from logging.config import DictConfigurator from typing import Optional from metadata.config.common import ConfigModel @@ -24,22 +23,9 @@ from metadata.generated.schema.entity.services.connections.metadata.openMetadata from metadata.generated.schema.type.queryParserData import ParsedData, QueryParserData from metadata.generated.schema.type.tableQuery import TableQueries, TableQuery from metadata.ingestion.api.processor import Processor, ProcessorStatus -from metadata.ingestion.lineage.parser import ( - get_clean_parser_table_list, - get_involved_tables_from_parser, - get_parser_table_aliases, - get_table_joins, -) -from metadata.ingestion.lineage.sql_lineage import clean_raw_query +from metadata.ingestion.lineage.parser import LineageParser from metadata.utils.logger import ingestion_logger -configure = DictConfigurator.configure -DictConfigurator.configure = lambda _: None -from sqllineage.runner import LineageRunner # pylint: disable=wrong-import-position - -# Reverting changes after import is done -DictConfigurator.configure = configure - logger = ingestion_logger() @@ -58,19 +44,14 @@ def parse_sql_statement(record: TableQuery) -> Optional[ParsedData]: str(record.analysisDate), "%Y-%m-%d %H:%M:%S" ).date() - parser = LineageRunner(clean_raw_query(record.query)) + lineage_parser = LineageParser(record.query) - tables = get_involved_tables_from_parser(parser) - - if not tables: + if not lineage_parser.involved_tables: return None - clean_tables = get_clean_parser_table_list(tables) - aliases = get_parser_table_aliases(tables) - return ParsedData( - tables=clean_tables, - joins=get_table_joins(parser=parser, tables=clean_tables, aliases=aliases), + tables=lineage_parser.clean_table_list, + joins=lineage_parser.table_joins, databaseName=record.databaseName, databaseSchema=record.databaseSchema, sql=record.query, diff --git a/ingestion/src/metadata/ingestion/source/dashboard/metabase.py b/ingestion/src/metadata/ingestion/source/dashboard/metabase.py index 61ec0eb7685..7e0021cc310 100644 --- a/ingestion/src/metadata/ingestion/source/dashboard/metabase.py +++ b/ingestion/src/metadata/ingestion/source/dashboard/metabase.py @@ -11,7 +11,6 @@ """Metabase source module""" import traceback -from logging.config import DictConfigurator from typing import Iterable, List, Optional import requests @@ -34,10 +33,8 @@ from metadata.generated.schema.metadataIngestion.workflow import ( ) from metadata.generated.schema.type.entityReference import EntityReference from metadata.ingestion.api.source import InvalidSourceException -from metadata.ingestion.lineage.sql_lineage import ( - clean_raw_query, - search_table_entities, -) +from metadata.ingestion.lineage.parser import LineageParser +from metadata.ingestion.lineage.sql_lineage import search_table_entities from metadata.ingestion.source.dashboard.dashboard_service import DashboardServiceSource from metadata.ingestion.source.database.common_db_source import SQLSourceStatus from metadata.utils import fqn @@ -45,12 +42,6 @@ from metadata.utils.filters import filter_by_chart from metadata.utils.helpers import get_standard_chart_type, replace_special_with from metadata.utils.logger import ingestion_logger -# Prevent sqllineage from modifying the logger config -# Disable the DictConfigurator.configure method while importing LineageRunner -configure = DictConfigurator.configure -DictConfigurator.configure = lambda _: None -from sqllineage.runner import LineageRunner # pylint: disable=C0413 - HEADERS = {"Content-Type": "application/json", "Accept": "*/*"} logger = ingestion_logger() @@ -241,10 +232,9 @@ class MetabaseSource(DashboardServiceSource): resp_database = self.req_get(f"/api/database/{chart_details['database_id']}") if resp_database.status_code == 200: database = resp_database.json() - query = chart_details["dataset_query"]["native"]["query"] - table_list = LineageRunner(clean_raw_query(query)) - for table in table_list.source_tables: + lineage_parser = LineageParser(query) + for table in lineage_parser.source_tables: database_schema_name, table = fqn.split(str(table))[-2:] database_schema_name = ( None diff --git a/ingestion/src/metadata/ingestion/source/dashboard/mode.py b/ingestion/src/metadata/ingestion/source/dashboard/mode.py index 73c95a95e2d..e5f2de26831 100644 --- a/ingestion/src/metadata/ingestion/source/dashboard/mode.py +++ b/ingestion/src/metadata/ingestion/source/dashboard/mode.py @@ -11,7 +11,6 @@ """Mode source module""" import traceback -from logging.config import DictConfigurator from typing import Iterable, List, Optional from metadata.clients import mode_client @@ -33,24 +32,13 @@ from metadata.generated.schema.metadataIngestion.workflow import ( ) from metadata.generated.schema.type.entityReference import EntityReference from metadata.ingestion.api.source import InvalidSourceException -from metadata.ingestion.lineage.sql_lineage import ( - clean_raw_query, - search_table_entities, -) +from metadata.ingestion.lineage.parser import LineageParser +from metadata.ingestion.lineage.sql_lineage import search_table_entities from metadata.ingestion.source.dashboard.dashboard_service import DashboardServiceSource from metadata.utils import fqn from metadata.utils.filters import filter_by_chart from metadata.utils.logger import ingestion_logger -# Prevent sqllineage from modifying the logger config -# Disable the DictConfigurator.configure method while importing LineageRunner -configure = DictConfigurator.configure -DictConfigurator.configure = lambda _: None -from sqllineage.runner import LineageRunner # pylint: disable=C0413 - -# Reverting changes after import is done -DictConfigurator.configure = configure - logger = ingestion_logger() @@ -140,8 +128,8 @@ class ModeSource(DashboardServiceSource): data_source = self.data_sources.get(query.get("data_source_id")) if not data_source: continue - table_list = LineageRunner(clean_raw_query(query.get("raw_query"))) - for table in table_list.source_tables: + lineage_parser = LineageParser(query.get("raw_query")) + for table in lineage_parser.source_tables: database_schema_name, table = fqn.split(str(table))[-2:] database_schema_name = ( None diff --git a/ingestion/src/metadata/ingestion/source/dashboard/redash.py b/ingestion/src/metadata/ingestion/source/dashboard/redash.py index 3b28851d3d4..22ef3a81d0d 100644 --- a/ingestion/src/metadata/ingestion/source/dashboard/redash.py +++ b/ingestion/src/metadata/ingestion/source/dashboard/redash.py @@ -12,7 +12,6 @@ Redash source module """ import traceback -from logging.config import DictConfigurator from typing import Iterable, List, Optional from metadata.generated.schema.api.data.createChart import CreateChartRequest @@ -33,23 +32,13 @@ from metadata.generated.schema.metadataIngestion.workflow import ( ) from metadata.generated.schema.type.entityReference import EntityReference from metadata.ingestion.api.source import InvalidSourceException -from metadata.ingestion.lineage.sql_lineage import clean_raw_query +from metadata.ingestion.lineage.parser import LineageParser from metadata.ingestion.source.dashboard.dashboard_service import DashboardServiceSource from metadata.utils import fqn from metadata.utils.filters import filter_by_chart from metadata.utils.helpers import get_standard_chart_type from metadata.utils.logger import ingestion_logger -# Prevent sqllineage from modifying the logger config -# Disable the DictConfigurator.configure method while importing LineageRunner -configure = DictConfigurator.configure -DictConfigurator.configure = lambda _: None -from sqllineage.runner import LineageRunner # pylint: disable=C0413 - -# Reverting changes after import is done -DictConfigurator.configure = configure - - logger = ingestion_logger() @@ -142,28 +131,26 @@ class RedashSource(DashboardServiceSource): if not visualization: continue if visualization.get("query", {}).get("query"): - parser = LineageRunner( - clean_raw_query(visualization["query"]["query"]) - ) - for table in parser.source_tables: - table_name = str(table) - database_schema_table = fqn.split_table_name(table_name) - from_fqn = fqn.build( - self.metadata, - entity_type=Table, - service_name=db_service_name, - schema_name=database_schema_table.get("database_schema"), - table_name=database_schema_table.get("table"), - database_name=database_schema_table.get("database"), - ) - from_entity = self.metadata.get_by_name( - entity=Table, - fqn=from_fqn, - ) - if from_entity and to_entity: - yield self._get_add_lineage_request( - to_entity=to_entity, from_entity=from_entity + lineage_parser = LineageParser(visualization["query"]["query"]) + for table in lineage_parser.source_tables: + table_name = str(table) + database_schema_table = fqn.split_table_name(table_name) + from_fqn = fqn.build( + self.metadata, + entity_type=Table, + service_name=db_service_name, + schema_name=database_schema_table.get("database_schema"), + table_name=database_schema_table.get("table"), + database_name=database_schema_table.get("database"), ) + from_entity = self.metadata.get_by_name( + entity=Table, + fqn=from_fqn, + ) + if from_entity and to_entity: + yield self._get_add_lineage_request( + to_entity=to_entity, from_entity=from_entity + ) except Exception as exc: logger.debug(traceback.format_exc()) logger.warning( diff --git a/ingestion/src/metadata/ingestion/source/dashboard/tableau.py b/ingestion/src/metadata/ingestion/source/dashboard/tableau.py index 1341f15a729..5b4c29a0d27 100644 --- a/ingestion/src/metadata/ingestion/source/dashboard/tableau.py +++ b/ingestion/src/metadata/ingestion/source/dashboard/tableau.py @@ -124,7 +124,8 @@ class TableauSource(DashboardServiceSource): "\nSomething went wrong while connecting to Tableau Metadata APIs\n" "Please check if the Tableau Metadata APIs are enabled for you Tableau instance\n" "For more information on enabling the Tableau Metadata APIs follow the link below\n" - "https://help.tableau.com/current/api/metadata_api/en-us/docs/meta_api_start.html#enable-the-tableau-metadata-api-for-tableau-server\n" # pylint: disable=line-too-long + "https://help.tableau.com/current/api/metadata_api/en-us/docs/meta_api_start.html" + "#enable-the-tableau-metadata-api-for-tableau-server\n" ) return super().prepare() diff --git a/ingestion/src/metadata/ingestion/source/database/common_db_source.py b/ingestion/src/metadata/ingestion/source/database/common_db_source.py index 20a601bb890..8a576bc0a8e 100644 --- a/ingestion/src/metadata/ingestion/source/database/common_db_source.py +++ b/ingestion/src/metadata/ingestion/source/database/common_db_source.py @@ -14,7 +14,6 @@ Generic source to build SQL connectors. import traceback from abc import ABC from copy import deepcopy -from logging.config import DictConfigurator from typing import Iterable, Optional, Tuple from sqlalchemy.engine import Connection @@ -40,8 +39,8 @@ from metadata.generated.schema.metadataIngestion.workflow import ( Source as WorkflowSource, ) from metadata.generated.schema.type.entityReference import EntityReference +from metadata.ingestion.lineage.parser import LineageParser from metadata.ingestion.lineage.sql_lineage import ( - clean_raw_query, get_lineage_by_query, get_lineage_via_table_entity, ) @@ -399,20 +398,10 @@ class CommonDbSourceService( schema_name=schema_name, inspector=self.inspector, ) - # Prevent sqllineage from modifying the logger config - # Disable the DictConfigurator.configure method while importing LineageRunner - configure = DictConfigurator.configure - DictConfigurator.configure = lambda _: None - from sqllineage.runner import ( # pylint: disable=import-outside-toplevel - LineageRunner, - ) - - # Reverting changes after import is done - DictConfigurator.configure = configure try: - result = LineageRunner(clean_raw_query(view_definition)) - if result.source_tables and result.target_tables: + lineage_parser = LineageParser(view_definition) + if lineage_parser.source_tables and lineage_parser.target_tables: yield from get_lineage_by_query( self.metadata, query=view_definition, diff --git a/ingestion/src/metadata/orm_profiler/api/workflow.py b/ingestion/src/metadata/orm_profiler/api/workflow.py index 8d21a91bc34..032ad477758 100644 --- a/ingestion/src/metadata/orm_profiler/api/workflow.py +++ b/ingestion/src/metadata/orm_profiler/api/workflow.py @@ -421,8 +421,7 @@ class ProfilerWorkflow(WorkflowStatusMixin): raise ValueError( "databaseFilterPattern returned 0 result. At least 1 database must be returned by the filter pattern." f"\n\t- includes: {self.source_config.databaseFilterPattern.includes if self.source_config.databaseFilterPattern else None}" # pylint: disable=line-too-long - f"\n\t- excludes: {self.source_config.databaseFilterPattern.excludes if self.source_config.databaseFilterPattern else None}" - # pylint: disable=line-too-long + f"\n\t- excludes: {self.source_config.databaseFilterPattern.excludes if self.source_config.databaseFilterPattern else None}" # pylint: disable=line-too-long ) for database in databases: diff --git a/ingestion/tests/unit/test_query_parser.py b/ingestion/tests/unit/test_query_parser.py index 9620242e9f3..7d5798c6d6a 100644 --- a/ingestion/tests/unit/test_query_parser.py +++ b/ingestion/tests/unit/test_query_parser.py @@ -13,26 +13,10 @@ Validate query parser logic """ -# Prevent sqllineage from modifying the logger config -# Disable the DictConfigurator.configure method while importing LineageRunner -from logging.config import DictConfigurator from unittest import TestCase from metadata.generated.schema.type.tableUsageCount import TableColumn, TableColumnJoin -from metadata.ingestion.lineage.parser import ( - get_clean_parser_table_list, - get_involved_tables_from_parser, - get_parser_table_aliases, - get_table_joins, -) -from metadata.ingestion.lineage.sql_lineage import clean_raw_query - -configure = DictConfigurator.configure -DictConfigurator.configure = lambda _: None -from sqllineage.runner import LineageRunner - -# Reverting changes after import is done -DictConfigurator.configure = configure +from metadata.ingestion.lineage.parser import LineageParser class QueryParserTests(TestCase): @@ -58,22 +42,20 @@ class QueryParserTests(TestCase): WHERE a.col3 = 'abc' """ - parser = LineageRunner(col_lineage) + parser = LineageParser(col_lineage) def test_involved_tables(self): - tables = {str(table) for table in get_involved_tables_from_parser(self.parser)} + tables = {str(table) for table in self.parser.involved_tables} self.assertEqual( tables, {"db.grault", "db.holis", ".foo", "db.random"} ) def test_clean_parser_table_list(self): - tables = get_involved_tables_from_parser(self.parser) - clean_tables = set(get_clean_parser_table_list(tables)) + clean_tables = set(self.parser.clean_table_list) self.assertEqual(clean_tables, {"db.grault", "db.holis", "foo", "db.random"}) def test_parser_table_aliases(self): - tables = get_involved_tables_from_parser(self.parser) - aliases = get_parser_table_aliases(tables) + aliases = self.parser.table_aliases self.assertEqual( aliases, {"b": "db.grault", "c": "db.holis", "a": "foo", "d": "db.random"} ) @@ -82,14 +64,7 @@ class QueryParserTests(TestCase): """ main logic point """ - tables = get_involved_tables_from_parser(self.parser) - - clean_tables = get_clean_parser_table_list(tables) - aliases = get_parser_table_aliases(tables) - - joins = get_table_joins( - parser=self.parser, tables=clean_tables, aliases=aliases - ) + joins = self.parser.table_joins self.assertEqual( joins["foo"], @@ -126,14 +101,9 @@ class QueryParserTests(TestCase): ; """ - parser = LineageRunner(query) + parser = LineageParser(query) - tables = get_involved_tables_from_parser(parser) - - clean_tables = get_clean_parser_table_list(tables) - aliases = get_parser_table_aliases(tables) - - joins = get_table_joins(parser=parser, tables=clean_tables, aliases=aliases) + joins = parser.table_joins self.assertEqual( joins["testdb.public.users"], @@ -153,23 +123,33 @@ class QueryParserTests(TestCase): def test_clean_raw_query_copy_grants(self): """ - Validate query cleaning logic + Validate COPY GRANT query cleaning logic """ query = "create or replace view my_view copy grants as select * from my_table" self.assertEqual( - clean_raw_query(query), + LineageParser.clean_raw_query(query), "create or replace view my_view as select * from my_table", ) def test_clean_raw_query_merge_into(self): """ - Validate query cleaning logic + Validate MERGE INTO query cleaning logic """ query = """ /* comment */ merge into table_1 using (select a, b from table_2) when matched update set t.a = 'value' when not matched then insert (table_1.a, table_2.b) values ('value1', 'value2') """ self.assertEqual( - clean_raw_query(query), + LineageParser.clean_raw_query(query), "/* comment */ merge into table_1 using (select a, b from table_2)", ) + + def test_clean_raw_query_copy_from(self): + """ + Validate COPY FROM query cleaning logic + """ + query = "COPY my_schema.my_table FROM 's3://bucket/path/object.csv';" + self.assertEqual( + LineageParser.clean_raw_query(query), + None, + ) diff --git a/ingestion/tests/unit/test_sql_lineage.py b/ingestion/tests/unit/test_sql_lineage.py index 2f38608a152..f5225374990 100644 --- a/ingestion/tests/unit/test_sql_lineage.py +++ b/ingestion/tests/unit/test_sql_lineage.py @@ -12,22 +12,11 @@ """ sql lineage utils tests """ -from logging.config import DictConfigurator from unittest import TestCase -from sqllineage.runner import LineageRunner - +from metadata.ingestion.lineage.parser import LineageParser from metadata.ingestion.lineage.sql_lineage import populate_column_lineage_map -# Prevent sqllineage from modifying the logger config -# Disable the DictConfigurator.configure method while importing LineageRunner -configure = DictConfigurator.configure -DictConfigurator.configure = lambda _: None - -# Reverting changes after import is done -DictConfigurator.configure = configure - - QUERY = [ "CREATE TABLE MYTABLE2 AS SELECT * FROM MYTABLE1;", "CREATE TABLE MYTABLE3 AS SELECT ID, NAME FROM MYTABLE1", @@ -50,7 +39,7 @@ class SqlLineageTest(TestCase): def test_populate_column_lineage_map(self): for i in range(len(QUERY)): - result = LineageRunner(QUERY[i]) - raw_column_lineage = result.get_column_lineage() + lineage_parser = LineageParser(QUERY[i]) + raw_column_lineage = lineage_parser.column_lineage lineage_map = populate_column_lineage_map(raw_column_lineage) self.assertEqual(lineage_map, EXPECTED_LINEAGE_MAP[i])