Fix#6203: Refactor LineageRunner use (#9073)

* Refactor LineageRunner use

* Address PR comments

* Address pylint errors

* Fix failing test
This commit is contained in:
Nahuel 2022-11-30 16:02:21 +01:00 committed by GitHub
parent 9c8492c7a3
commit 76773e69de
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
12 changed files with 349 additions and 412 deletions

View File

@ -50,6 +50,7 @@ base_requirements = {
"croniter~=1.3.0",
"requests-aws4auth==1.1.2",
"pymysql>=1.0.2",
"cached-property==1.5.2",
}

View File

@ -14,12 +14,18 @@ Lineage Parser configuration
import traceback
from collections import defaultdict
from logging.config import DictConfigurator
from typing import Dict, List, Optional, Tuple
from typing import Any, Dict, List, Optional, Tuple
from cached_property import cached_property
from sqlparse.sql import Comparison, Identifier, Statement
from metadata.generated.schema.type.tableUsageCount import TableColumn, TableColumnJoin
from metadata.utils.helpers import find_in_iter, get_formatted_entity_name
from metadata.utils.helpers import (
find_in_iter,
get_formatted_entity_name,
insensitive_match,
insensitive_replace,
)
from metadata.utils.logger import ingestion_logger
# Prevent sqllineage from modifying the logger config
@ -27,7 +33,7 @@ from metadata.utils.logger import ingestion_logger
# pylint: disable=wrong-import-position
configure = DictConfigurator.configure
DictConfigurator.configure = lambda _: None
from sqllineage.core import models
from sqllineage.core.models import Column, Table
from sqllineage.exceptions import SQLLineageException
from sqllineage.runner import LineageRunner
@ -37,214 +43,289 @@ DictConfigurator.configure = configure
logger = ingestion_logger()
def get_involved_tables_from_parser(
parser: LineageRunner,
) -> Optional[List[models.Table]]:
class LineageParser:
"""
Use the LineageRunner parser and combine
source and intermediate tables into
a single set.
:param parser: LineageRunner
:return: List of involved tables
Class that acts like a wrapper for the LineageRunner library usage
"""
try:
parser: LineageRunner
query: str
_clean_query: str
def __init__(self, query: str):
self.query = query
self._clean_query = self.clean_raw_query(query)
self.parser = LineageRunner(query)
@cached_property
def involved_tables(self) -> Optional[List[Table]]:
"""
Use the LineageRunner parser and combine
source and intermediate tables into
a single set.
:return: List of involved tables
"""
try:
return list(
set(self.source_tables)
.union(set(self.intermediate_tables))
.union(set(self.target_tables))
)
except SQLLineageException as exc:
logger.debug(traceback.format_exc())
logger.warning(
f"Cannot extract source table information from query [{self.query}]: {exc}"
)
return None
@cached_property
def intermediate_tables(self) -> List[Table]:
"""
Get a list of intermediate tables
"""
# These are @lazy_property, not properly being picked up by IDEs. Ignore the warning
return list(
set(parser.source_tables)
.union(set(parser.intermediate_tables))
.union(set(parser.target_tables))
return self.retrieve_tables(self.parser.intermediate_tables)
@cached_property
def source_tables(self) -> List[Table]:
"""
Get a list of source tables
"""
# These are @lazy_property, not properly being picked up by IDEs. Ignore the warning
return self.retrieve_tables(self.parser.source_tables)
@cached_property
def target_tables(self) -> List[Table]:
"""
Get a list of target tables
"""
# These are @lazy_property, not properly being picked up by IDEs. Ignore the warning
return self.retrieve_tables(self.parser.target_tables)
@cached_property
def column_lineage(self) -> List[Tuple[Column, Column]]:
"""
Get a list of tuples of column lineage
"""
return self.parser.get_column_lineage()
@cached_property
def clean_table_list(self) -> List[str]:
"""
Clean the table name if it has <default>.
:return: clean table names
"""
return [get_formatted_entity_name(str(table)) for table in self.involved_tables]
@cached_property
def table_aliases(self) -> Dict[str, str]:
"""
Prepare a dictionary in the shape of {alias: table_name} from
the parser tables
:return: alias dict
"""
return {
table.alias: str(table).replace("<default>.", "")
for table in self.involved_tables
}
def get_table_name_from_list(
self,
database_name: Optional[str],
schema_name: Optional[str],
table_name: str,
) -> Optional[str]:
"""
Find the table name (in any format in my come)
from the list using the given ingredients.
:param database_name: db name
:param schema_name: schema name
:param table_name: table name
:return: table name from parser info
"""
tables = self.clean_table_list
table = find_in_iter(element=table_name, container=tables)
if table:
return table
schema_table = find_in_iter(
element=f"{schema_name}.{table_name}", container=tables
)
except SQLLineageException as exc:
logger.debug(traceback.format_exc())
logger.warning(
f"Cannot extract source table information from query [{parser._sql}]: {exc}" # pylint: disable=protected-access
if schema_table:
return schema_table
db_schema_table = find_in_iter(
element=f"{database_name}.{schema_name}.{table_name}", container=tables
)
if db_schema_table:
return db_schema_table
logger.debug(f"Cannot find table {db_schema_table} in involved tables")
return None
def get_comparison_elements(
self, identifier: Identifier
) -> Tuple[Optional[str], Optional[str]]:
"""
Return the tuple table_name, column_name from each comparison element
:param identifier: comparison identifier
:return: table name and column name from the identifier
"""
aliases = self.table_aliases
values = identifier.value.split(".")
database_name, schema_name, table_or_alias, column_name = (
[None] * (4 - len(values))
) + values
def get_clean_parser_table_list(tables: List[models.Table]) -> List[str]:
"""
Clean the table name if it has <default>.
:param tables: involved tables
:return: clean table names
"""
return [get_formatted_entity_name(str(table)) for table in tables]
if not table_or_alias or not column_name:
logger.debug(
f"Cannot obtain comparison elements from identifier {identifier}"
)
return None, None
alias_to_table = aliases.get(table_or_alias)
if alias_to_table:
return alias_to_table, column_name
def get_parser_table_aliases(tables: List[models.Table]) -> Dict[str, str]:
"""
Prepare a dictionary in the shape of {alias: table_name} from
the parser tables
:param tables: parser tables
:return: alias dict
"""
return {table.alias: str(table).replace("<default>.", "") for table in tables}
def get_table_name_from_list(
database_name: Optional[str],
schema_name: Optional[str],
table_name: str,
tables: List[str],
) -> Optional[str]:
"""
Find the table name (in any format in my come)
from the list using the given ingredients.
:param database_name: db name
:param schema_name: schema name
:param table_name: table name
:param tables: Contains all involved tables
:return: table name from parser info
"""
table = find_in_iter(element=table_name, container=tables)
if table:
return table
schema_table = find_in_iter(element=f"{schema_name}.{table_name}", container=tables)
if schema_table:
return schema_table
db_schema_table = find_in_iter(
element=f"{database_name}.{schema_name}.{table_name}", container=tables
)
if db_schema_table:
return db_schema_table
logger.debug(f"Cannot find table {db_schema_table} in involved tables")
return None
def get_comparison_elements(
identifier: Identifier, tables: List[str], aliases: Dict[str, str]
) -> Optional[Tuple[str, str]]:
"""
Return the tuple table_name, column_name from each comparison element
:param identifier: comparison identifier
:param tables: involved tables
:param aliases: table aliases
:return: table name and column name from the identifier
"""
values = identifier.value.split(".")
database_name, schema_name, table_or_alias, column_name = (
[None] * (4 - len(values))
) + values
if not table_or_alias or not column_name:
logger.debug(f"Cannot obtain comparison elements from identifier {identifier}")
return None, None
alias_to_table = aliases.get(table_or_alias)
if alias_to_table:
return alias_to_table, column_name
table_from_list = get_table_name_from_list(
database_name=database_name,
schema_name=schema_name,
table_name=table_or_alias,
tables=tables,
)
if not table_from_list:
logger.debug(f"Cannot find {table_or_alias} in comparison elements")
return None, None
return table_from_list, column_name
def stateful_add_table_joins(
statement_joins: Dict[str, List[TableColumnJoin]],
source: TableColumn,
target: TableColumn,
) -> None:
"""
Update the statement_joins dict with the new table information
:param statement_joins: dict with state info
:param source: source TableColumn
:param target: target TableColumn
"""
if source.table not in statement_joins:
statement_joins[source.table].append(
TableColumnJoin(tableColumn=source, joinedWith=[target])
table_from_list = self.get_table_name_from_list(
database_name=database_name,
schema_name=schema_name,
table_name=table_or_alias,
)
else:
# check if new column from same table
table_columns = [
join_info.tableColumn for join_info in statement_joins[source.table]
]
existing_table_column = find_in_iter(element=source, container=table_columns)
if existing_table_column:
existing_join_info = [
join_info
for join_info in statement_joins[source.table]
if join_info.tableColumn == existing_table_column
][0]
existing_join_info.joinedWith.append(target)
# processing now join column from source table
else:
if not table_from_list:
logger.debug(f"Cannot find {table_or_alias} in comparison elements")
return None, None
return table_from_list, column_name
@staticmethod
def stateful_add_table_joins(
statement_joins: Dict[str, List[TableColumnJoin]],
source: TableColumn,
target: TableColumn,
) -> None:
"""
Update the statement_joins dict with the new table information
:param statement_joins: dict with state info
:param source: source TableColumn
:param target: target TableColumn
"""
if source.table not in statement_joins:
statement_joins[source.table].append(
TableColumnJoin(tableColumn=source, joinedWith=[target])
)
else:
# check if new column from same table
table_columns = [
join_info.tableColumn for join_info in statement_joins[source.table]
]
existing_table_column = find_in_iter(
element=source, container=table_columns
)
if existing_table_column:
existing_join_info = [
join_info
for join_info in statement_joins[source.table]
if join_info.tableColumn == existing_table_column
][0]
existing_join_info.joinedWith.append(target)
# processing now join column from source table
else:
statement_joins[source.table].append(
TableColumnJoin(tableColumn=source, joinedWith=[target])
)
def stateful_add_joins_from_statement(
join_data: Dict[str, List[TableColumnJoin]],
statement: Statement,
tables: List[str],
aliases: Dict[str, str],
) -> None:
"""
Parse a single statement to pick up join information
:param join_data: join data from previous statements
:param statement: Parsed sql statement to process
:param tables: involved tables in the query
:param aliases: table aliases dict
:return: for each table name, list all joins against other tables
"""
# Here we want to get tokens such as `tableA.col1 = tableB.col2`
comparisons = [
sub for sub in statement.get_sublists() if isinstance(sub, Comparison)
]
for comparison in comparisons:
if "." not in comparison.left.value or "." not in comparison.right.value:
logger.debug(f"Ignoring comparison {comparison}")
continue
def stateful_add_joins_from_statement(
self,
join_data: Dict[str, List[TableColumnJoin]],
statement: Statement,
) -> None:
"""
Parse a single statement to pick up join information
:param join_data: join data from previous statements
:param statement: Parsed sql statement to process
:return: for each table name, list all joins against other tables
"""
# Here we want to get tokens such as `tableA.col1 = tableB.col2`
comparisons = [
sub for sub in statement.get_sublists() if isinstance(sub, Comparison)
]
for comparison in comparisons:
if "." not in comparison.left.value or "." not in comparison.right.value:
logger.debug(f"Ignoring comparison {comparison}")
continue
table_left, column_left = get_comparison_elements(
identifier=comparison.left, tables=tables, aliases=aliases
)
table_right, column_right = get_comparison_elements(
identifier=comparison.right, tables=tables, aliases=aliases
table_left, column_left = self.get_comparison_elements(
identifier=comparison.left
)
table_right, column_right = self.get_comparison_elements(
identifier=comparison.right
)
if not table_left or not table_right:
logger.warning(f"Cannot find ingredients from {comparison}")
continue
left_table_column = TableColumn(table=table_left, column=column_left)
right_table_column = TableColumn(table=table_right, column=column_right)
# We just send the info once, from Left -> Right.
# The backend will prepare the symmetric information.
self.stateful_add_table_joins(
join_data, left_table_column, right_table_column
)
@cached_property
def table_joins(self) -> Dict[str, List[TableColumnJoin]]:
"""
For each table involved in the query, find its joins against any
other table.
:return: for each table name, list all joins against other tables
"""
join_data = defaultdict(list)
# These are @lazy_property, not properly being picked up by IDEs. Ignore the warning
for statement in self.parser.statements_parsed:
self.stateful_add_joins_from_statement(join_data, statement=statement)
return join_data
def retrieve_tables(self, tables: List[Any]) -> List[Table]:
if not self._clean_query:
return []
return [table for table in tables if isinstance(table, Table)]
@classmethod
def clean_raw_query(cls, raw_query: str) -> Optional[str]:
"""
Given a raw query from any input (e.g., view definition,
query from logs, etc.), perform a cleaning step
before passing it to the LineageRunner
"""
clean_query = insensitive_replace(
raw_str=raw_query,
to_replace=" copy grants ", # snowflake specific
replace_by=" ", # remove it as it does not add any value to lineage
)
if not table_left or not table_right:
logger.warning(f"Cannot find ingredients from {comparison}")
continue
left_table_column = TableColumn(table=table_left, column=column_left)
right_table_column = TableColumn(table=table_right, column=column_right)
# We just send the info once, from Left -> Right.
# The backend will prepare the symmetric information.
stateful_add_table_joins(join_data, left_table_column, right_table_column)
def get_table_joins(
parser: LineageRunner, tables: List[str], aliases: Dict[str, str]
) -> Dict[str, List[TableColumnJoin]]:
"""
For each table involved in the query, find its joins against any
other table.
:param parser: LineageRunner parser
:param tables: involved tables in the query
:param aliases: table aliases dict
:return: for each table name, list all joins against other tables
"""
join_data = defaultdict(list)
for statement in parser.statements_parsed:
stateful_add_joins_from_statement(
join_data, statement=statement, tables=tables, aliases=aliases
clean_query = insensitive_replace(
raw_str=clean_query.strip(),
to_replace="\n", # remove line breaks
replace_by=" ",
)
return join_data
if insensitive_match(clean_query, ".*merge into .*when matched.*"):
clean_query = insensitive_replace(
raw_str=clean_query,
to_replace="when matched.*", # merge into queries specific
replace_by="", # remove it as LineageRunner is not able to perform the lineage
)
# We remove queries of the type 'COPY table FROM path' since they are data manipulation statement and do not
# provide value for user
if insensitive_match(clean_query, "^COPY.*"):
return None
return clean_query.strip()

View File

@ -12,7 +12,6 @@
Helper functions to handle SQL lineage operations
"""
import traceback
from logging.config import DictConfigurator
from typing import Any, Iterable, Iterator, List, Optional
from metadata.generated.schema.api.lineage.addLineage import AddLineageRequest
@ -23,64 +22,16 @@ from metadata.generated.schema.type.entityLineage import (
LineageDetails,
)
from metadata.generated.schema.type.entityReference import EntityReference
from metadata.ingestion.lineage.parser import LineageParser
from metadata.ingestion.ometa.ometa_api import OpenMetadata
from metadata.utils import fqn
from metadata.utils.helpers import insensitive_match, insensitive_replace
from metadata.utils.logger import utils_logger
from metadata.utils.lru_cache import LRUCache
# Prevent sqllineage from modifying the logger config
# Disable the DictConfigurator.configure method while importing LineageRunner
configure = DictConfigurator.configure
DictConfigurator.configure = lambda _: None
from sqllineage.runner import LineageRunner # pylint: disable=wrong-import-position
# Reverting changes after import is done
DictConfigurator.configure = configure
logger = utils_logger()
LRU_CACHE_SIZE = 4096
def clean_raw_query(raw_query: str) -> str:
"""
Given a raw query from any input (e.g., view definition,
query from logs, etc.), perform a cleaning step
before passing it to the LineageRunner
"""
clean_query = insensitive_replace(
raw_str=raw_query,
to_replace=" copy grants ", # snowflake specific
replace_by=" ", # remove it as it does not add any value to lineage
)
clean_query = insensitive_replace(
raw_str=clean_query.strip(),
to_replace="\n", # remove line breaks
replace_by=" ",
)
if insensitive_match(clean_query, ".*merge into .*when matched.*"):
clean_query = insensitive_replace(
raw_str=clean_query,
to_replace="when matched.*", # merge into queries specific
replace_by="", # remove it as LineageRunner is not able to perform the lineage
)
return clean_query.strip()
def split_raw_table_name(database: str, raw_name: str) -> dict:
database_schema = None
if "." in raw_name:
# pylint: disable=unbalanced-tuple-unpacking
database_schema, table = fqn.split(raw_name)[-2:]
# pylint: enable=unbalanced-tuple-unpacking
if database_schema == "<default>":
database_schema = None
return {"database": database, "database_schema": database_schema, "table": table}
def get_column_fqn(table_entity: Table, column: str) -> Optional[str]:
"""
Get fqn of column if exist in table entity
@ -378,13 +329,13 @@ def get_lineage_by_query(
try:
logger.debug(f"Running lineage with query: {query}")
result = LineageRunner(clean_raw_query(query))
lineage_parser = LineageParser(query)
raw_column_lineage = result.get_column_lineage()
raw_column_lineage = lineage_parser.column_lineage
column_lineage.update(populate_column_lineage_map(raw_column_lineage))
for intermediate_table in result.intermediate_tables:
for source_table in result.source_tables:
for intermediate_table in lineage_parser.intermediate_tables:
for source_table in lineage_parser.source_tables:
yield from _create_lineage_by_table_name(
metadata,
from_table=str(source_table),
@ -395,7 +346,7 @@ def get_lineage_by_query(
query=query,
column_lineage_map=column_lineage,
)
for target_table in result.target_tables:
for target_table in lineage_parser.target_tables:
yield from _create_lineage_by_table_name(
metadata,
from_table=str(intermediate_table),
@ -406,9 +357,9 @@ def get_lineage_by_query(
query=query,
column_lineage_map=column_lineage,
)
if not result.intermediate_tables:
for target_table in result.target_tables:
for source_table in result.source_tables:
if not lineage_parser.intermediate_tables:
for target_table in lineage_parser.target_tables:
for source_table in lineage_parser.source_tables:
yield from _create_lineage_by_table_name(
metadata,
from_table=str(source_table),
@ -452,10 +403,10 @@ def get_lineage_via_table_entity(
try:
logger.debug(f"Getting lineage via table entity using query: {query}")
parser = LineageRunner(clean_raw_query(query))
lineage_parser = LineageParser(query)
to_table_name = table_entity.name.__root__
for from_table_name in parser.source_tables:
for from_table_name in lineage_parser.source_tables:
yield from _create_lineage_by_table_name(
metadata,
from_table=str(from_table_name),

View File

@ -14,7 +14,6 @@ Query parser implementation
import datetime
import traceback
from logging.config import DictConfigurator
from typing import Optional
from metadata.config.common import ConfigModel
@ -24,22 +23,9 @@ from metadata.generated.schema.entity.services.connections.metadata.openMetadata
from metadata.generated.schema.type.queryParserData import ParsedData, QueryParserData
from metadata.generated.schema.type.tableQuery import TableQueries, TableQuery
from metadata.ingestion.api.processor import Processor, ProcessorStatus
from metadata.ingestion.lineage.parser import (
get_clean_parser_table_list,
get_involved_tables_from_parser,
get_parser_table_aliases,
get_table_joins,
)
from metadata.ingestion.lineage.sql_lineage import clean_raw_query
from metadata.ingestion.lineage.parser import LineageParser
from metadata.utils.logger import ingestion_logger
configure = DictConfigurator.configure
DictConfigurator.configure = lambda _: None
from sqllineage.runner import LineageRunner # pylint: disable=wrong-import-position
# Reverting changes after import is done
DictConfigurator.configure = configure
logger = ingestion_logger()
@ -58,19 +44,14 @@ def parse_sql_statement(record: TableQuery) -> Optional[ParsedData]:
str(record.analysisDate), "%Y-%m-%d %H:%M:%S"
).date()
parser = LineageRunner(clean_raw_query(record.query))
lineage_parser = LineageParser(record.query)
tables = get_involved_tables_from_parser(parser)
if not tables:
if not lineage_parser.involved_tables:
return None
clean_tables = get_clean_parser_table_list(tables)
aliases = get_parser_table_aliases(tables)
return ParsedData(
tables=clean_tables,
joins=get_table_joins(parser=parser, tables=clean_tables, aliases=aliases),
tables=lineage_parser.clean_table_list,
joins=lineage_parser.table_joins,
databaseName=record.databaseName,
databaseSchema=record.databaseSchema,
sql=record.query,

View File

@ -11,7 +11,6 @@
"""Metabase source module"""
import traceback
from logging.config import DictConfigurator
from typing import Iterable, List, Optional
import requests
@ -34,10 +33,8 @@ from metadata.generated.schema.metadataIngestion.workflow import (
)
from metadata.generated.schema.type.entityReference import EntityReference
from metadata.ingestion.api.source import InvalidSourceException
from metadata.ingestion.lineage.sql_lineage import (
clean_raw_query,
search_table_entities,
)
from metadata.ingestion.lineage.parser import LineageParser
from metadata.ingestion.lineage.sql_lineage import search_table_entities
from metadata.ingestion.source.dashboard.dashboard_service import DashboardServiceSource
from metadata.ingestion.source.database.common_db_source import SQLSourceStatus
from metadata.utils import fqn
@ -45,12 +42,6 @@ from metadata.utils.filters import filter_by_chart
from metadata.utils.helpers import get_standard_chart_type, replace_special_with
from metadata.utils.logger import ingestion_logger
# Prevent sqllineage from modifying the logger config
# Disable the DictConfigurator.configure method while importing LineageRunner
configure = DictConfigurator.configure
DictConfigurator.configure = lambda _: None
from sqllineage.runner import LineageRunner # pylint: disable=C0413
HEADERS = {"Content-Type": "application/json", "Accept": "*/*"}
logger = ingestion_logger()
@ -241,10 +232,9 @@ class MetabaseSource(DashboardServiceSource):
resp_database = self.req_get(f"/api/database/{chart_details['database_id']}")
if resp_database.status_code == 200:
database = resp_database.json()
query = chart_details["dataset_query"]["native"]["query"]
table_list = LineageRunner(clean_raw_query(query))
for table in table_list.source_tables:
lineage_parser = LineageParser(query)
for table in lineage_parser.source_tables:
database_schema_name, table = fqn.split(str(table))[-2:]
database_schema_name = (
None

View File

@ -11,7 +11,6 @@
"""Mode source module"""
import traceback
from logging.config import DictConfigurator
from typing import Iterable, List, Optional
from metadata.clients import mode_client
@ -33,24 +32,13 @@ from metadata.generated.schema.metadataIngestion.workflow import (
)
from metadata.generated.schema.type.entityReference import EntityReference
from metadata.ingestion.api.source import InvalidSourceException
from metadata.ingestion.lineage.sql_lineage import (
clean_raw_query,
search_table_entities,
)
from metadata.ingestion.lineage.parser import LineageParser
from metadata.ingestion.lineage.sql_lineage import search_table_entities
from metadata.ingestion.source.dashboard.dashboard_service import DashboardServiceSource
from metadata.utils import fqn
from metadata.utils.filters import filter_by_chart
from metadata.utils.logger import ingestion_logger
# Prevent sqllineage from modifying the logger config
# Disable the DictConfigurator.configure method while importing LineageRunner
configure = DictConfigurator.configure
DictConfigurator.configure = lambda _: None
from sqllineage.runner import LineageRunner # pylint: disable=C0413
# Reverting changes after import is done
DictConfigurator.configure = configure
logger = ingestion_logger()
@ -140,8 +128,8 @@ class ModeSource(DashboardServiceSource):
data_source = self.data_sources.get(query.get("data_source_id"))
if not data_source:
continue
table_list = LineageRunner(clean_raw_query(query.get("raw_query")))
for table in table_list.source_tables:
lineage_parser = LineageParser(query.get("raw_query"))
for table in lineage_parser.source_tables:
database_schema_name, table = fqn.split(str(table))[-2:]
database_schema_name = (
None

View File

@ -12,7 +12,6 @@
Redash source module
"""
import traceback
from logging.config import DictConfigurator
from typing import Iterable, List, Optional
from metadata.generated.schema.api.data.createChart import CreateChartRequest
@ -33,23 +32,13 @@ from metadata.generated.schema.metadataIngestion.workflow import (
)
from metadata.generated.schema.type.entityReference import EntityReference
from metadata.ingestion.api.source import InvalidSourceException
from metadata.ingestion.lineage.sql_lineage import clean_raw_query
from metadata.ingestion.lineage.parser import LineageParser
from metadata.ingestion.source.dashboard.dashboard_service import DashboardServiceSource
from metadata.utils import fqn
from metadata.utils.filters import filter_by_chart
from metadata.utils.helpers import get_standard_chart_type
from metadata.utils.logger import ingestion_logger
# Prevent sqllineage from modifying the logger config
# Disable the DictConfigurator.configure method while importing LineageRunner
configure = DictConfigurator.configure
DictConfigurator.configure = lambda _: None
from sqllineage.runner import LineageRunner # pylint: disable=C0413
# Reverting changes after import is done
DictConfigurator.configure = configure
logger = ingestion_logger()
@ -142,28 +131,26 @@ class RedashSource(DashboardServiceSource):
if not visualization:
continue
if visualization.get("query", {}).get("query"):
parser = LineageRunner(
clean_raw_query(visualization["query"]["query"])
)
for table in parser.source_tables:
table_name = str(table)
database_schema_table = fqn.split_table_name(table_name)
from_fqn = fqn.build(
self.metadata,
entity_type=Table,
service_name=db_service_name,
schema_name=database_schema_table.get("database_schema"),
table_name=database_schema_table.get("table"),
database_name=database_schema_table.get("database"),
)
from_entity = self.metadata.get_by_name(
entity=Table,
fqn=from_fqn,
)
if from_entity and to_entity:
yield self._get_add_lineage_request(
to_entity=to_entity, from_entity=from_entity
lineage_parser = LineageParser(visualization["query"]["query"])
for table in lineage_parser.source_tables:
table_name = str(table)
database_schema_table = fqn.split_table_name(table_name)
from_fqn = fqn.build(
self.metadata,
entity_type=Table,
service_name=db_service_name,
schema_name=database_schema_table.get("database_schema"),
table_name=database_schema_table.get("table"),
database_name=database_schema_table.get("database"),
)
from_entity = self.metadata.get_by_name(
entity=Table,
fqn=from_fqn,
)
if from_entity and to_entity:
yield self._get_add_lineage_request(
to_entity=to_entity, from_entity=from_entity
)
except Exception as exc:
logger.debug(traceback.format_exc())
logger.warning(

View File

@ -124,7 +124,8 @@ class TableauSource(DashboardServiceSource):
"\nSomething went wrong while connecting to Tableau Metadata APIs\n"
"Please check if the Tableau Metadata APIs are enabled for you Tableau instance\n"
"For more information on enabling the Tableau Metadata APIs follow the link below\n"
"https://help.tableau.com/current/api/metadata_api/en-us/docs/meta_api_start.html#enable-the-tableau-metadata-api-for-tableau-server\n" # pylint: disable=line-too-long
"https://help.tableau.com/current/api/metadata_api/en-us/docs/meta_api_start.html"
"#enable-the-tableau-metadata-api-for-tableau-server\n"
)
return super().prepare()

View File

@ -14,7 +14,6 @@ Generic source to build SQL connectors.
import traceback
from abc import ABC
from copy import deepcopy
from logging.config import DictConfigurator
from typing import Iterable, Optional, Tuple
from sqlalchemy.engine import Connection
@ -40,8 +39,8 @@ from metadata.generated.schema.metadataIngestion.workflow import (
Source as WorkflowSource,
)
from metadata.generated.schema.type.entityReference import EntityReference
from metadata.ingestion.lineage.parser import LineageParser
from metadata.ingestion.lineage.sql_lineage import (
clean_raw_query,
get_lineage_by_query,
get_lineage_via_table_entity,
)
@ -399,20 +398,10 @@ class CommonDbSourceService(
schema_name=schema_name,
inspector=self.inspector,
)
# Prevent sqllineage from modifying the logger config
# Disable the DictConfigurator.configure method while importing LineageRunner
configure = DictConfigurator.configure
DictConfigurator.configure = lambda _: None
from sqllineage.runner import ( # pylint: disable=import-outside-toplevel
LineageRunner,
)
# Reverting changes after import is done
DictConfigurator.configure = configure
try:
result = LineageRunner(clean_raw_query(view_definition))
if result.source_tables and result.target_tables:
lineage_parser = LineageParser(view_definition)
if lineage_parser.source_tables and lineage_parser.target_tables:
yield from get_lineage_by_query(
self.metadata,
query=view_definition,

View File

@ -421,8 +421,7 @@ class ProfilerWorkflow(WorkflowStatusMixin):
raise ValueError(
"databaseFilterPattern returned 0 result. At least 1 database must be returned by the filter pattern."
f"\n\t- includes: {self.source_config.databaseFilterPattern.includes if self.source_config.databaseFilterPattern else None}" # pylint: disable=line-too-long
f"\n\t- excludes: {self.source_config.databaseFilterPattern.excludes if self.source_config.databaseFilterPattern else None}"
# pylint: disable=line-too-long
f"\n\t- excludes: {self.source_config.databaseFilterPattern.excludes if self.source_config.databaseFilterPattern else None}" # pylint: disable=line-too-long
)
for database in databases:

View File

@ -13,26 +13,10 @@
Validate query parser logic
"""
# Prevent sqllineage from modifying the logger config
# Disable the DictConfigurator.configure method while importing LineageRunner
from logging.config import DictConfigurator
from unittest import TestCase
from metadata.generated.schema.type.tableUsageCount import TableColumn, TableColumnJoin
from metadata.ingestion.lineage.parser import (
get_clean_parser_table_list,
get_involved_tables_from_parser,
get_parser_table_aliases,
get_table_joins,
)
from metadata.ingestion.lineage.sql_lineage import clean_raw_query
configure = DictConfigurator.configure
DictConfigurator.configure = lambda _: None
from sqllineage.runner import LineageRunner
# Reverting changes after import is done
DictConfigurator.configure = configure
from metadata.ingestion.lineage.parser import LineageParser
class QueryParserTests(TestCase):
@ -58,22 +42,20 @@ class QueryParserTests(TestCase):
WHERE a.col3 = 'abc'
"""
parser = LineageRunner(col_lineage)
parser = LineageParser(col_lineage)
def test_involved_tables(self):
tables = {str(table) for table in get_involved_tables_from_parser(self.parser)}
tables = {str(table) for table in self.parser.involved_tables}
self.assertEqual(
tables, {"db.grault", "db.holis", "<default>.foo", "db.random"}
)
def test_clean_parser_table_list(self):
tables = get_involved_tables_from_parser(self.parser)
clean_tables = set(get_clean_parser_table_list(tables))
clean_tables = set(self.parser.clean_table_list)
self.assertEqual(clean_tables, {"db.grault", "db.holis", "foo", "db.random"})
def test_parser_table_aliases(self):
tables = get_involved_tables_from_parser(self.parser)
aliases = get_parser_table_aliases(tables)
aliases = self.parser.table_aliases
self.assertEqual(
aliases, {"b": "db.grault", "c": "db.holis", "a": "foo", "d": "db.random"}
)
@ -82,14 +64,7 @@ class QueryParserTests(TestCase):
"""
main logic point
"""
tables = get_involved_tables_from_parser(self.parser)
clean_tables = get_clean_parser_table_list(tables)
aliases = get_parser_table_aliases(tables)
joins = get_table_joins(
parser=self.parser, tables=clean_tables, aliases=aliases
)
joins = self.parser.table_joins
self.assertEqual(
joins["foo"],
@ -126,14 +101,9 @@ class QueryParserTests(TestCase):
;
"""
parser = LineageRunner(query)
parser = LineageParser(query)
tables = get_involved_tables_from_parser(parser)
clean_tables = get_clean_parser_table_list(tables)
aliases = get_parser_table_aliases(tables)
joins = get_table_joins(parser=parser, tables=clean_tables, aliases=aliases)
joins = parser.table_joins
self.assertEqual(
joins["testdb.public.users"],
@ -153,23 +123,33 @@ class QueryParserTests(TestCase):
def test_clean_raw_query_copy_grants(self):
"""
Validate query cleaning logic
Validate COPY GRANT query cleaning logic
"""
query = "create or replace view my_view copy grants as select * from my_table"
self.assertEqual(
clean_raw_query(query),
LineageParser.clean_raw_query(query),
"create or replace view my_view as select * from my_table",
)
def test_clean_raw_query_merge_into(self):
"""
Validate query cleaning logic
Validate MERGE INTO query cleaning logic
"""
query = """
/* comment */ merge into table_1 using (select a, b from table_2) when matched update set t.a = 'value'
when not matched then insert (table_1.a, table_2.b) values ('value1', 'value2')
"""
self.assertEqual(
clean_raw_query(query),
LineageParser.clean_raw_query(query),
"/* comment */ merge into table_1 using (select a, b from table_2)",
)
def test_clean_raw_query_copy_from(self):
"""
Validate COPY FROM query cleaning logic
"""
query = "COPY my_schema.my_table FROM 's3://bucket/path/object.csv';"
self.assertEqual(
LineageParser.clean_raw_query(query),
None,
)

View File

@ -12,22 +12,11 @@
"""
sql lineage utils tests
"""
from logging.config import DictConfigurator
from unittest import TestCase
from sqllineage.runner import LineageRunner
from metadata.ingestion.lineage.parser import LineageParser
from metadata.ingestion.lineage.sql_lineage import populate_column_lineage_map
# Prevent sqllineage from modifying the logger config
# Disable the DictConfigurator.configure method while importing LineageRunner
configure = DictConfigurator.configure
DictConfigurator.configure = lambda _: None
# Reverting changes after import is done
DictConfigurator.configure = configure
QUERY = [
"CREATE TABLE MYTABLE2 AS SELECT * FROM MYTABLE1;",
"CREATE TABLE MYTABLE3 AS SELECT ID, NAME FROM MYTABLE1",
@ -50,7 +39,7 @@ class SqlLineageTest(TestCase):
def test_populate_column_lineage_map(self):
for i in range(len(QUERY)):
result = LineageRunner(QUERY[i])
raw_column_lineage = result.get_column_lineage()
lineage_parser = LineageParser(QUERY[i])
raw_column_lineage = lineage_parser.column_lineage
lineage_map = populate_column_lineage_map(raw_column_lineage)
self.assertEqual(lineage_map, EXPECTED_LINEAGE_MAP[i])