mirror of
https://github.com/datahub-project/datahub.git
synced 2025-12-24 16:38:19 +00:00
feat(tableau): ability to force extraction of table/column level linage from SQL queries (#9838)
This commit is contained in:
parent
7a2d61d424
commit
e6e5c091ed
@ -142,6 +142,9 @@ from datahub.metadata.schema_classes import (
|
||||
SubTypesClass,
|
||||
ViewPropertiesClass,
|
||||
)
|
||||
from datahub.sql_parsing.sql_parsing_result_utils import (
|
||||
transform_parsing_result_to_in_tables_schemas,
|
||||
)
|
||||
from datahub.sql_parsing.sqlglot_lineage import (
|
||||
ColumnLineageInfo,
|
||||
SqlParsingResult,
|
||||
@ -375,6 +378,17 @@ class TableauConfig(
|
||||
description="[Experimental] Whether to extract lineage from unsupported custom sql queries using SQL parsing",
|
||||
)
|
||||
|
||||
force_extraction_of_lineage_from_custom_sql_queries: bool = Field(
|
||||
default=False,
|
||||
description="[Experimental] Force extraction of lineage from custom sql queries using SQL parsing, ignoring Tableau metadata",
|
||||
)
|
||||
|
||||
sql_parsing_disable_schema_awareness: bool = Field(
|
||||
default=False,
|
||||
description="[Experimental] Ignore pre ingested tables schemas during parsing of SQL queries "
|
||||
"(allows to workaround ingestion errors when pre ingested schema and queries are out of sync)",
|
||||
)
|
||||
|
||||
# pre = True because we want to take some decision before pydantic initialize the configuration to default values
|
||||
@root_validator(pre=True)
|
||||
def projects_backward_compatibility(cls, values: Dict) -> Dict:
|
||||
@ -432,21 +446,43 @@ class DatabaseTable:
|
||||
"""
|
||||
|
||||
urn: str
|
||||
id: str
|
||||
num_cols: Optional[int]
|
||||
id: Optional[
|
||||
str
|
||||
] = None # is not None only for tables that came from Tableau metadata
|
||||
num_cols: Optional[int] = None
|
||||
|
||||
paths: Set[str] # maintains all browse paths encountered for this table
|
||||
paths: Optional[
|
||||
Set[str]
|
||||
] = None # maintains all browse paths encountered for this table
|
||||
|
||||
parsed_columns: Optional[
|
||||
Set[str]
|
||||
] = None # maintains all columns encountered for this table during parsing SQL queries
|
||||
|
||||
def update_table(
|
||||
self, id: str, num_tbl_cols: Optional[int], path: Optional[str]
|
||||
self,
|
||||
id: Optional[str] = None,
|
||||
num_tbl_cols: Optional[int] = None,
|
||||
path: Optional[str] = None,
|
||||
parsed_columns: Optional[Set[str]] = None,
|
||||
) -> None:
|
||||
if path and path not in self.paths:
|
||||
self.paths.add(path)
|
||||
if path:
|
||||
if self.paths:
|
||||
self.paths.add(path)
|
||||
else:
|
||||
self.paths = {path}
|
||||
|
||||
# the new instance of table has columns information, prefer its id.
|
||||
if not self.num_cols and num_tbl_cols:
|
||||
self.id = id
|
||||
self.num_cols = num_tbl_cols
|
||||
|
||||
if parsed_columns:
|
||||
if self.parsed_columns:
|
||||
self.parsed_columns.update(parsed_columns)
|
||||
else:
|
||||
self.parsed_columns = parsed_columns
|
||||
|
||||
|
||||
class TableauSourceReport(StaleEntityRemovalSourceReport):
|
||||
get_all_datasources_query_failed: bool = False
|
||||
@ -1137,9 +1173,16 @@ class TableauSource(StatefulIngestionSourceBase, TestableSource):
|
||||
and upstream_table_id in table_id_to_urn.keys()
|
||||
):
|
||||
parent_dataset_urn = table_id_to_urn[upstream_table_id]
|
||||
if self.is_snowflake_urn(parent_dataset_urn):
|
||||
if (
|
||||
self.is_snowflake_urn(parent_dataset_urn)
|
||||
and not self.config.ingest_tables_external
|
||||
):
|
||||
# This is required for column level lineage to work correctly as
|
||||
# DataHub Snowflake source lowercases all field names in the schema.
|
||||
#
|
||||
# It should not be done if snowflake tables are not pre ingested but
|
||||
# parsed from SQL queries or ingested from Tableau metadata (in this case
|
||||
# it just breaks case sensitive table level linage)
|
||||
name = name.lower()
|
||||
input_columns.append(
|
||||
builder.make_schema_field_urn(
|
||||
@ -1299,6 +1342,7 @@ class TableauSource(StatefulIngestionSourceBase, TestableSource):
|
||||
custom_sql_filter,
|
||||
)
|
||||
)
|
||||
|
||||
unique_custom_sql = get_unique_custom_sql(custom_sql_connection)
|
||||
|
||||
for csql in unique_custom_sql:
|
||||
@ -1364,33 +1408,46 @@ class TableauSource(StatefulIngestionSourceBase, TestableSource):
|
||||
|
||||
project = self._get_project_browse_path_name(datasource)
|
||||
|
||||
tables = csql.get(c.TABLES, [])
|
||||
# if condition is needed as graphQL return "columns": None
|
||||
columns: List[Dict[Any, Any]] = (
|
||||
cast(List[Dict[Any, Any]], csql.get(c.COLUMNS))
|
||||
if c.COLUMNS in csql and csql.get(c.COLUMNS) is not None
|
||||
else []
|
||||
)
|
||||
|
||||
if tables:
|
||||
# lineage from custom sql -> datasets/tables #
|
||||
yield from self._create_lineage_to_upstream_tables(
|
||||
csql_urn, tables, datasource
|
||||
)
|
||||
elif self.config.extract_lineage_from_unsupported_custom_sql_queries:
|
||||
logger.debug("Extracting TLL & CLL from custom sql")
|
||||
# custom sql tables may contain unsupported sql, causing incomplete lineage
|
||||
# we extract the lineage from the raw queries
|
||||
# The Tableau SQL parser much worse than our sqlglot based parser,
|
||||
# so relying on metadata parsed by Tableau from SQL queries can be
|
||||
# less accurate. This option allows us to ignore Tableau's parser and
|
||||
# only use our own.
|
||||
if self.config.force_extraction_of_lineage_from_custom_sql_queries:
|
||||
logger.debug("Extracting TLL & CLL from custom sql (forced)")
|
||||
yield from self._create_lineage_from_unsupported_csql(
|
||||
csql_urn, csql
|
||||
csql_urn, csql, columns
|
||||
)
|
||||
else:
|
||||
tables = csql.get(c.TABLES, [])
|
||||
|
||||
if tables:
|
||||
# lineage from custom sql -> datasets/tables #
|
||||
yield from self._create_lineage_to_upstream_tables(
|
||||
csql_urn, tables, datasource
|
||||
)
|
||||
elif (
|
||||
self.config.extract_lineage_from_unsupported_custom_sql_queries
|
||||
):
|
||||
logger.debug("Extracting TLL & CLL from custom sql")
|
||||
# custom sql tables may contain unsupported sql, causing incomplete lineage
|
||||
# we extract the lineage from the raw queries
|
||||
yield from self._create_lineage_from_unsupported_csql(
|
||||
csql_urn, csql, columns
|
||||
)
|
||||
|
||||
# Schema Metadata
|
||||
# if condition is needed as graphQL return "cloumns": None
|
||||
columns: List[Dict[Any, Any]] = (
|
||||
cast(List[Dict[Any, Any]], csql.get(c.COLUMNS))
|
||||
if c.COLUMNS in csql and csql.get(c.COLUMNS) is not None
|
||||
else []
|
||||
)
|
||||
schema_metadata = self.get_schema_metadata_for_custom_sql(columns)
|
||||
if schema_metadata is not None:
|
||||
dataset_snapshot.aspects.append(schema_metadata)
|
||||
|
||||
# Browse path
|
||||
|
||||
if project and datasource_name:
|
||||
browse_paths = BrowsePathsClass(
|
||||
paths=[f"{self.dataset_browse_prefix}/{project}/{datasource_name}"]
|
||||
@ -1585,6 +1642,33 @@ class TableauSource(StatefulIngestionSourceBase, TestableSource):
|
||||
aspect=upstream_lineage,
|
||||
)
|
||||
|
||||
@staticmethod
|
||||
def _clean_tableau_query_parameters(query: str) -> str:
|
||||
if not query:
|
||||
return query
|
||||
|
||||
#
|
||||
# It replaces all following occurrences by 1
|
||||
# which is enough to fix syntax of SQL query
|
||||
# and make sqlglot parser happy:
|
||||
#
|
||||
# <[Parameters].[SomeParameterName]>
|
||||
# <Parameters.[SomeParameterName]>
|
||||
# <[Parameters].SomeParameterName>
|
||||
# <[Parameters].SomeParameter Name>
|
||||
# <Parameters.SomeParameterName>
|
||||
#
|
||||
# After, it unescapes (Tableau escapes it)
|
||||
# >> to >
|
||||
# << to <
|
||||
#
|
||||
return (
|
||||
re.sub(r"\<\[?[Pp]arameters\]?\.(\[[^\]]+\]|[^\>]+)\>", "1", query)
|
||||
.replace("<<", "<")
|
||||
.replace(">>", ">")
|
||||
.replace("\n\n", "\n")
|
||||
)
|
||||
|
||||
def parse_custom_sql(
|
||||
self,
|
||||
datasource: dict,
|
||||
@ -1604,9 +1688,15 @@ class TableauSource(StatefulIngestionSourceBase, TestableSource):
|
||||
]
|
||||
],
|
||||
) -> Optional["SqlParsingResult"]:
|
||||
database_info = datasource.get(c.DATABASE) or {}
|
||||
database_info = datasource.get(c.DATABASE) or {
|
||||
c.NAME: c.UNKNOWN.lower(),
|
||||
c.CONNECTION_TYPE: "databricks",
|
||||
}
|
||||
|
||||
if datasource.get(c.IS_UNSUPPORTED_CUSTOM_SQL) in (None, False):
|
||||
if (
|
||||
datasource.get(c.IS_UNSUPPORTED_CUSTOM_SQL) in (None, False)
|
||||
and not self.config.force_extraction_of_lineage_from_custom_sql_queries
|
||||
):
|
||||
logger.debug(f"datasource {datasource_urn} is not created from custom sql")
|
||||
return None
|
||||
|
||||
@ -1622,6 +1712,7 @@ class TableauSource(StatefulIngestionSourceBase, TestableSource):
|
||||
f"raw sql query is not available for datasource {datasource_urn}"
|
||||
)
|
||||
return None
|
||||
query = self._clean_tableau_query_parameters(query)
|
||||
|
||||
logger.debug(f"Parsing sql={query}")
|
||||
|
||||
@ -1647,10 +1738,33 @@ class TableauSource(StatefulIngestionSourceBase, TestableSource):
|
||||
platform_instance=platform_instance,
|
||||
env=env,
|
||||
graph=self.ctx.graph,
|
||||
schema_aware=not self.config.sql_parsing_disable_schema_awareness,
|
||||
)
|
||||
|
||||
def _enrich_database_tables_with_parsed_schemas(
|
||||
self, parsing_result: SqlParsingResult
|
||||
) -> None:
|
||||
|
||||
in_tables_schemas: Dict[
|
||||
str, Set[str]
|
||||
] = transform_parsing_result_to_in_tables_schemas(parsing_result)
|
||||
|
||||
if not in_tables_schemas:
|
||||
logger.info("Unable to extract table schema from parsing result")
|
||||
return
|
||||
|
||||
for table_urn, columns in in_tables_schemas.items():
|
||||
if table_urn in self.database_tables:
|
||||
self.database_tables[table_urn].update_table(
|
||||
table_urn, parsed_columns=columns
|
||||
)
|
||||
else:
|
||||
self.database_tables[table_urn] = DatabaseTable(
|
||||
urn=table_urn, parsed_columns=columns
|
||||
)
|
||||
|
||||
def _create_lineage_from_unsupported_csql(
|
||||
self, csql_urn: str, csql: dict
|
||||
self, csql_urn: str, csql: dict, out_columns: List[Dict[Any, Any]]
|
||||
) -> Iterable[MetadataWorkUnit]:
|
||||
parsed_result = self.parse_custom_sql(
|
||||
datasource=csql,
|
||||
@ -1667,6 +1781,8 @@ class TableauSource(StatefulIngestionSourceBase, TestableSource):
|
||||
)
|
||||
return
|
||||
|
||||
self._enrich_database_tables_with_parsed_schemas(parsed_result)
|
||||
|
||||
upstream_tables = make_upstream_class(parsed_result)
|
||||
|
||||
logger.debug(f"Upstream tables = {upstream_tables}")
|
||||
@ -1675,7 +1791,7 @@ class TableauSource(StatefulIngestionSourceBase, TestableSource):
|
||||
if self.config.extract_column_level_lineage:
|
||||
logger.info("Extracting CLL from custom sql")
|
||||
fine_grained_lineages = make_fine_grained_lineage_class(
|
||||
parsed_result, csql_urn
|
||||
parsed_result, csql_urn, out_columns
|
||||
)
|
||||
|
||||
upstream_lineage = UpstreamLineage(
|
||||
@ -1811,7 +1927,11 @@ class TableauSource(StatefulIngestionSourceBase, TestableSource):
|
||||
dataset_snapshot.aspects.append(dataset_props)
|
||||
|
||||
# Upstream Tables
|
||||
if datasource.get(c.UPSTREAM_TABLES) or datasource.get(c.UPSTREAM_DATA_SOURCES):
|
||||
if (
|
||||
datasource.get(c.UPSTREAM_TABLES)
|
||||
or datasource.get(c.UPSTREAM_DATA_SOURCES)
|
||||
or datasource.get(c.FIELDS)
|
||||
):
|
||||
# datasource -> db table relations
|
||||
(
|
||||
upstream_tables,
|
||||
@ -1906,32 +2026,58 @@ class TableauSource(StatefulIngestionSourceBase, TestableSource):
|
||||
yield from self.emit_datasource(datasource)
|
||||
|
||||
def emit_upstream_tables(self) -> Iterable[MetadataWorkUnit]:
|
||||
database_table_id_to_urn_map: Dict[str, str] = dict()
|
||||
tableau_database_table_id_to_urn_map: Dict[str, str] = dict()
|
||||
for urn, tbl in self.database_tables.items():
|
||||
database_table_id_to_urn_map[tbl.id] = urn
|
||||
tables_filter = (
|
||||
f"{c.ID_WITH_IN}: {json.dumps(list(database_table_id_to_urn_map.keys()))}"
|
||||
)
|
||||
# only tables that came from Tableau metadata have id
|
||||
if tbl.id:
|
||||
tableau_database_table_id_to_urn_map[tbl.id] = urn
|
||||
|
||||
for table in self.get_connection_objects(
|
||||
tables_filter = f"{c.ID_WITH_IN}: {json.dumps(list(tableau_database_table_id_to_urn_map.keys()))}"
|
||||
|
||||
# Emmitting tables that came from Tableau metadata
|
||||
for tableau_table in self.get_connection_objects(
|
||||
database_tables_graphql_query,
|
||||
c.DATABASE_TABLES_CONNECTION,
|
||||
tables_filter,
|
||||
):
|
||||
yield from self.emit_table(table, database_table_id_to_urn_map)
|
||||
database_table = self.database_tables[
|
||||
tableau_database_table_id_to_urn_map[tableau_table[c.ID]]
|
||||
]
|
||||
tableau_columns = tableau_table.get(c.COLUMNS, [])
|
||||
is_embedded = tableau_table.get(c.IS_EMBEDDED) or False
|
||||
if not is_embedded and not self.config.ingest_tables_external:
|
||||
logger.debug(
|
||||
f"Skipping external table {database_table.urn} as ingest_tables_external is set to False"
|
||||
)
|
||||
continue
|
||||
|
||||
yield from self.emit_table(database_table, tableau_columns)
|
||||
|
||||
# Emmitting tables that were purely parsed from SQL queries
|
||||
for database_table in self.database_tables.values():
|
||||
# Only tables purely parsed from SQL queries don't have ID
|
||||
if database_table.id:
|
||||
logger.debug(
|
||||
f"Skipping external table {database_table.urn} should have already been ingested from Tableau metadata"
|
||||
)
|
||||
continue
|
||||
|
||||
if not self.config.ingest_tables_external:
|
||||
logger.debug(
|
||||
f"Skipping external table {database_table.urn} as ingest_tables_external is set to False"
|
||||
)
|
||||
continue
|
||||
|
||||
yield from self.emit_table(database_table, None)
|
||||
|
||||
def emit_table(
|
||||
self, table: dict, database_table_id_to_urn_map: Dict[str, str]
|
||||
self,
|
||||
database_table: DatabaseTable,
|
||||
tableau_columns: Optional[List[Dict[str, Any]]],
|
||||
) -> Iterable[MetadataWorkUnit]:
|
||||
database_table = self.database_tables[database_table_id_to_urn_map[table[c.ID]]]
|
||||
columns = table.get(c.COLUMNS, [])
|
||||
is_embedded = table.get(c.IS_EMBEDDED) or False
|
||||
if not is_embedded and not self.config.ingest_tables_external:
|
||||
logger.debug(
|
||||
f"Skipping external table {database_table.urn} as ingest_tables_external is set to False"
|
||||
)
|
||||
return
|
||||
|
||||
logger.debug(
|
||||
f"Emiting external table {database_table} tableau_columns {tableau_columns}"
|
||||
)
|
||||
dataset_snapshot = DatasetSnapshot(
|
||||
urn=database_table.urn,
|
||||
aspects=[],
|
||||
@ -1948,19 +2094,25 @@ class TableauSource(StatefulIngestionSourceBase, TestableSource):
|
||||
else:
|
||||
logger.debug(f"Browse path not set for table {database_table.urn}")
|
||||
|
||||
schema_metadata = self.get_schema_metadata_for_table(columns or [])
|
||||
schema_metadata = self.get_schema_metadata_for_table(
|
||||
tableau_columns, database_table.parsed_columns
|
||||
)
|
||||
if schema_metadata is not None:
|
||||
dataset_snapshot.aspects.append(schema_metadata)
|
||||
|
||||
yield self.get_metadata_change_event(dataset_snapshot)
|
||||
|
||||
def get_schema_metadata_for_table(
|
||||
self, columns: List[dict]
|
||||
self,
|
||||
tableau_columns: Optional[List[Dict[str, Any]]],
|
||||
parsed_columns: Optional[Set[str]] = None,
|
||||
) -> Optional[SchemaMetadata]:
|
||||
schema_metadata: Optional[SchemaMetadata] = None
|
||||
if columns:
|
||||
fields = []
|
||||
for field in columns:
|
||||
|
||||
fields = []
|
||||
|
||||
if tableau_columns:
|
||||
for field in tableau_columns:
|
||||
if field.get(c.NAME) is None:
|
||||
self.report.num_table_field_skipped_no_name += 1
|
||||
logger.warning(
|
||||
@ -1979,6 +2131,24 @@ class TableauSource(StatefulIngestionSourceBase, TestableSource):
|
||||
|
||||
fields.append(schema_field)
|
||||
|
||||
if parsed_columns:
|
||||
remaining_columns = (
|
||||
parsed_columns.difference(map(lambda x: x.get(c.NAME), tableau_columns))
|
||||
if tableau_columns
|
||||
else parsed_columns
|
||||
)
|
||||
remaining_schema_fields = [
|
||||
SchemaField(
|
||||
fieldPath=col,
|
||||
type=SchemaFieldDataType(type=NullTypeClass()),
|
||||
description="",
|
||||
nativeDataType=c.UNKNOWN,
|
||||
)
|
||||
for col in remaining_columns
|
||||
]
|
||||
fields.extend(remaining_schema_fields)
|
||||
|
||||
if fields:
|
||||
schema_metadata = SchemaMetadata(
|
||||
schemaName="test",
|
||||
platform=f"urn:li:dataPlatform:{self.platform}",
|
||||
@ -1988,6 +2158,10 @@ class TableauSource(StatefulIngestionSourceBase, TestableSource):
|
||||
platformSchema=OtherSchema(rawSchema=""),
|
||||
)
|
||||
|
||||
# TODO: optionally add logic that will lookup current table schema from DataHub
|
||||
# and merge it together with what was inferred during current run, it allows incrementally
|
||||
# ingest different Tableau projects sharing the same tables
|
||||
|
||||
return schema_metadata
|
||||
|
||||
def get_sheetwise_upstream_datasources(self, sheet: dict) -> set:
|
||||
|
||||
@ -2,7 +2,7 @@ import html
|
||||
import logging
|
||||
from dataclasses import dataclass
|
||||
from functools import lru_cache
|
||||
from typing import Dict, List, Optional, Tuple
|
||||
from typing import Any, Dict, List, Optional, Tuple
|
||||
|
||||
from pydantic.fields import Field
|
||||
from tableauserverclient import Server
|
||||
@ -762,8 +762,19 @@ def make_upstream_class(
|
||||
|
||||
|
||||
def make_fine_grained_lineage_class(
|
||||
parsed_result: Optional[SqlParsingResult], dataset_urn: str
|
||||
parsed_result: Optional[SqlParsingResult],
|
||||
dataset_urn: str,
|
||||
out_columns: List[Dict[Any, Any]],
|
||||
) -> List[FineGrainedLineage]:
|
||||
# 1) fine grained lineage links are case sensitive
|
||||
# 2) parsed out columns are always lower cased
|
||||
# 3) corresponding Custom SQL output columns can be in any case lower/upper/mix
|
||||
#
|
||||
# we need a map between 2 and 3 that will be used during building column level linage links (see below)
|
||||
out_columns_map = {
|
||||
col.get(c.NAME, "").lower(): col.get(c.NAME, "") for col in out_columns
|
||||
}
|
||||
|
||||
fine_grained_lineages: List[FineGrainedLineage] = []
|
||||
|
||||
if parsed_result is None:
|
||||
@ -775,7 +786,15 @@ def make_fine_grained_lineage_class(
|
||||
|
||||
for cll_info in cll:
|
||||
downstream = (
|
||||
[builder.make_schema_field_urn(dataset_urn, cll_info.downstream.column)]
|
||||
[
|
||||
builder.make_schema_field_urn(
|
||||
dataset_urn,
|
||||
out_columns_map.get(
|
||||
cll_info.downstream.column.lower(),
|
||||
cll_info.downstream.column,
|
||||
),
|
||||
)
|
||||
]
|
||||
if cll_info.downstream is not None
|
||||
and cll_info.downstream.column is not None
|
||||
else []
|
||||
|
||||
@ -0,0 +1,23 @@
|
||||
from typing import Dict, Set
|
||||
|
||||
from datahub.sql_parsing.sqlglot_lineage import SqlParsingResult, Urn
|
||||
|
||||
|
||||
def transform_parsing_result_to_in_tables_schemas(
|
||||
parsing_result: SqlParsingResult,
|
||||
) -> Dict[Urn, Set[str]]:
|
||||
table_urn_to_schema_map: Dict[str, Set[str]] = (
|
||||
{it: set() for it in parsing_result.in_tables}
|
||||
if parsing_result.in_tables
|
||||
else {}
|
||||
)
|
||||
|
||||
if parsing_result.column_lineage:
|
||||
for cli in parsing_result.column_lineage:
|
||||
for upstream in cli.upstreams:
|
||||
if upstream.table in table_urn_to_schema_map:
|
||||
table_urn_to_schema_map[upstream.table].add(upstream.column)
|
||||
else:
|
||||
table_urn_to_schema_map[upstream.table] = {upstream.column}
|
||||
|
||||
return table_urn_to_schema_map
|
||||
@ -529,6 +529,9 @@ def _column_level_lineage( # noqa: C901
|
||||
|
||||
# Parse the column name out of the node name.
|
||||
# Sqlglot calls .sql(), so we have to do the inverse.
|
||||
if node.name == "*":
|
||||
continue
|
||||
|
||||
normalized_col = sqlglot.parse_one(node.name).this.name
|
||||
if node.subfield:
|
||||
normalized_col = f"{normalized_col}.{node.subfield}"
|
||||
@ -834,6 +837,7 @@ def _sqlglot_lineage_inner(
|
||||
# Fetch schema info for the relevant tables.
|
||||
table_name_urn_mapping: Dict[_TableName, str] = {}
|
||||
table_name_schema_mapping: Dict[_TableName, SchemaInfo] = {}
|
||||
|
||||
for table in tables | modified:
|
||||
# For select statements, qualification will be a no-op. For other statements, this
|
||||
# is where the qualification actually happens.
|
||||
@ -1016,8 +1020,9 @@ def create_lineage_sql_parsed_result(
|
||||
env: str,
|
||||
default_schema: Optional[str] = None,
|
||||
graph: Optional[DataHubGraph] = None,
|
||||
schema_aware: bool = True,
|
||||
) -> SqlParsingResult:
|
||||
if graph:
|
||||
if graph and schema_aware:
|
||||
needs_close = False
|
||||
schema_resolver = graph._make_schema_resolver(
|
||||
platform=platform,
|
||||
|
||||
@ -16,6 +16,11 @@ def _get_dialect_str(platform: str) -> str:
|
||||
return "tsql"
|
||||
elif platform == "athena":
|
||||
return "trino"
|
||||
# TODO: define SalesForce SOQL dialect
|
||||
# Temporary workaround is to treat SOQL as databricks dialect
|
||||
# At least it allows to parse simple SQL queries and built linage for them
|
||||
elif platform == "salesforce":
|
||||
return "databricks"
|
||||
elif platform in {"mysql", "mariadb"}:
|
||||
# In sqlglot v20+, MySQL is now case-sensitive by default, which is the
|
||||
# default behavior on Linux. However, MySQL's default case sensitivity
|
||||
@ -31,6 +36,7 @@ def _get_dialect_str(platform: str) -> str:
|
||||
def get_dialect(platform: DialectOrStr) -> sqlglot.Dialect:
|
||||
if isinstance(platform, sqlglot.Dialect):
|
||||
return platform
|
||||
|
||||
return sqlglot.Dialect.get_or_raise(_get_dialect_str(platform))
|
||||
|
||||
|
||||
|
||||
@ -42870,6 +42870,38 @@
|
||||
"lastRunId": "no-run-id-provided"
|
||||
}
|
||||
},
|
||||
{
|
||||
"entityType": "dataset",
|
||||
"entityUrn": "urn:li:dataset:(urn:li:dataPlatform:bigquery,demo-custom-323403.bigquery_demo.order_items,PROD)",
|
||||
"changeType": "UPSERT",
|
||||
"aspectName": "status",
|
||||
"aspect": {
|
||||
"json": {
|
||||
"removed": false
|
||||
}
|
||||
},
|
||||
"systemMetadata": {
|
||||
"lastObserved": 1638860400000,
|
||||
"runId": "tableau-test",
|
||||
"lastRunId": "no-run-id-provided"
|
||||
}
|
||||
},
|
||||
{
|
||||
"entityType": "dataset",
|
||||
"entityUrn": "urn:li:dataset:(urn:li:dataPlatform:bigquery,demo-custom-323403.bigquery_demo.sellers,PROD)",
|
||||
"changeType": "UPSERT",
|
||||
"aspectName": "status",
|
||||
"aspect": {
|
||||
"json": {
|
||||
"removed": false
|
||||
}
|
||||
},
|
||||
"systemMetadata": {
|
||||
"lastObserved": 1638860400000,
|
||||
"runId": "tableau-test",
|
||||
"lastRunId": "no-run-id-provided"
|
||||
}
|
||||
},
|
||||
{
|
||||
"entityType": "dataset",
|
||||
"entityUrn": "urn:li:dataset:(urn:li:dataPlatform:external,sample - superstore%2C %28new%29.xls.orders,PROD)",
|
||||
|
||||
@ -319,6 +319,8 @@ def test_tableau_cll_ingest(pytestconfig, tmp_path, mock_datahub_graph):
|
||||
new_pipeline_config: Dict[Any, Any] = {
|
||||
**config_source_default,
|
||||
"extract_lineage_from_unsupported_custom_sql_queries": True,
|
||||
"force_extraction_of_lineage_from_custom_sql_queries": False,
|
||||
"sql_parsing_disable_schema_awareness": False,
|
||||
"extract_column_level_lineage": True,
|
||||
}
|
||||
|
||||
@ -834,6 +836,7 @@ def test_tableau_unsupported_csql(mock_datahub_graph):
|
||||
"connectionType": "bigquery",
|
||||
},
|
||||
},
|
||||
out_columns=[],
|
||||
)
|
||||
|
||||
mcp = cast(MetadataChangeProposalClass, next(iter(lineage)).metadata)
|
||||
|
||||
@ -0,0 +1,67 @@
|
||||
from datahub.sql_parsing.sql_parsing_result_utils import (
|
||||
transform_parsing_result_to_in_tables_schemas,
|
||||
)
|
||||
from datahub.sql_parsing.sqlglot_lineage import (
|
||||
ColumnLineageInfo,
|
||||
ColumnRef,
|
||||
DownstreamColumnRef,
|
||||
SqlParsingResult,
|
||||
)
|
||||
|
||||
|
||||
def test_transform_parsing_result_to_in_tables_schemas__empty_parsing_result():
|
||||
parsing_result = SqlParsingResult(in_tables=[], out_tables=[], column_lineage=None)
|
||||
|
||||
in_tables_schema = transform_parsing_result_to_in_tables_schemas(parsing_result)
|
||||
assert not in_tables_schema
|
||||
|
||||
|
||||
def test_transform_parsing_result_to_in_tables_schemas__in_tables_only():
|
||||
parsing_result = SqlParsingResult(
|
||||
in_tables=["table_urn1", "table_urn2", "table_urn3"],
|
||||
out_tables=[],
|
||||
column_lineage=None,
|
||||
)
|
||||
|
||||
in_tables_schema = transform_parsing_result_to_in_tables_schemas(parsing_result)
|
||||
assert in_tables_schema == {
|
||||
"table_urn1": set(),
|
||||
"table_urn2": set(),
|
||||
"table_urn3": set(),
|
||||
}
|
||||
|
||||
|
||||
def test_transform_parsing_result_to_in_tables_schemas__in_tables_and_column_linage():
|
||||
parsing_result = SqlParsingResult(
|
||||
in_tables=["table_urn1", "table_urn2", "table_urn3"],
|
||||
out_tables=[],
|
||||
column_lineage=[
|
||||
ColumnLineageInfo(
|
||||
downstream=DownstreamColumnRef(column="out_col1"),
|
||||
upstreams=[
|
||||
ColumnRef(table="table_urn1", column="col11"),
|
||||
],
|
||||
),
|
||||
ColumnLineageInfo(
|
||||
downstream=DownstreamColumnRef(column="out_col2"),
|
||||
upstreams=[
|
||||
ColumnRef(table="table_urn2", column="col21"),
|
||||
ColumnRef(table="table_urn2", column="col22"),
|
||||
],
|
||||
),
|
||||
ColumnLineageInfo(
|
||||
downstream=DownstreamColumnRef(column="out_col3"),
|
||||
upstreams=[
|
||||
ColumnRef(table="table_urn1", column="col12"),
|
||||
ColumnRef(table="table_urn2", column="col23"),
|
||||
],
|
||||
),
|
||||
],
|
||||
)
|
||||
|
||||
in_tables_schema = transform_parsing_result_to_in_tables_schemas(parsing_result)
|
||||
assert in_tables_schema == {
|
||||
"table_urn1": {"col11", "col12"},
|
||||
"table_urn2": {"col21", "col22", "col23"},
|
||||
"table_urn3": set(),
|
||||
}
|
||||
123
metadata-ingestion/tests/unit/test_tableau_source.py
Normal file
123
metadata-ingestion/tests/unit/test_tableau_source.py
Normal file
@ -0,0 +1,123 @@
|
||||
import pytest
|
||||
|
||||
from datahub.ingestion.source.tableau import TableauSource
|
||||
|
||||
|
||||
def test_tableau_source_unescapes_lt():
|
||||
res = TableauSource._clean_tableau_query_parameters(
|
||||
"select * from t where c1 << 135"
|
||||
)
|
||||
|
||||
assert res == "select * from t where c1 < 135"
|
||||
|
||||
|
||||
def test_tableau_source_unescapes_gt():
|
||||
res = TableauSource._clean_tableau_query_parameters(
|
||||
"select * from t where c1 >> 135"
|
||||
)
|
||||
|
||||
assert res == "select * from t where c1 > 135"
|
||||
|
||||
|
||||
def test_tableau_source_unescapes_gte():
|
||||
res = TableauSource._clean_tableau_query_parameters(
|
||||
"select * from t where c1 >>= 135"
|
||||
)
|
||||
|
||||
assert res == "select * from t where c1 >= 135"
|
||||
|
||||
|
||||
def test_tableau_source_unescapeslgte():
|
||||
res = TableauSource._clean_tableau_query_parameters(
|
||||
"select * from t where c1 <<= 135"
|
||||
)
|
||||
|
||||
assert res == "select * from t where c1 <= 135"
|
||||
|
||||
|
||||
def test_tableau_source_doesnt_touch_not_escaped():
|
||||
res = TableauSource._clean_tableau_query_parameters(
|
||||
"select * from t where c1 < 135 and c2 > 15"
|
||||
)
|
||||
|
||||
assert res == "select * from t where c1 < 135 and c2 > 15"
|
||||
|
||||
|
||||
TABLEAU_PARAMS = [
|
||||
"<Parameters.MyParam>",
|
||||
"<Parameters.MyParam_1>",
|
||||
"<Parameters.My Param _ 1>",
|
||||
"<Parameters.My Param 1 !@\"',.#$%^:;&*()-_+={}|\\ /<>",
|
||||
"<[Parameters].MyParam>",
|
||||
"<[Parameters].MyParam_1>",
|
||||
"<[Parameters].My Param _ 1>",
|
||||
"<[Parameters].My Param 1 !@\"',.#$%^:;&*()-_+={}|\\ /<>",
|
||||
"<Parameters.[MyParam]>",
|
||||
"<Parameters.[MyParam_1]>",
|
||||
"<Parameters.[My Param _ 1]>",
|
||||
"<Parameters.[My Param 1 !@\"',.#$%^:;&*()-_+={}|\\ /<]>",
|
||||
"<[Parameters].[MyParam]>",
|
||||
"<[Parameters].[MyParam_1]>",
|
||||
"<[Parameters].[My Param _ 1]>",
|
||||
"<[Parameters].[My Param 1 !@\"',.#$%^:;&*()-_+={}|\\ /<]>",
|
||||
"<Parameters.[My Param 1 !@\"',.#$%^:;&*()-_+={}|\\ /<>]>",
|
||||
"<[Parameters].[My Param 1 !@\"',.#$%^:;&*()-_+={}|\\ /<>]>",
|
||||
]
|
||||
|
||||
|
||||
@pytest.mark.parametrize("p", TABLEAU_PARAMS)
|
||||
def test_tableau_source_cleanups_tableau_parameters_in_equi_predicates(p):
|
||||
assert (
|
||||
TableauSource._clean_tableau_query_parameters(
|
||||
f"select * from t where c1 = {p} and c2 = {p} and c3 = 7"
|
||||
)
|
||||
== "select * from t where c1 = 1 and c2 = 1 and c3 = 7"
|
||||
)
|
||||
|
||||
|
||||
@pytest.mark.parametrize("p", TABLEAU_PARAMS)
|
||||
def test_tableau_source_cleanups_tableau_parameters_in_lt_gt_predicates(p):
|
||||
assert (
|
||||
TableauSource._clean_tableau_query_parameters(
|
||||
f"select * from t where c1 << {p} and c2<<{p} and c3 >> {p} and c4>>{p} or {p} >> c1 and {p}>>c2 and {p} << c3 and {p}<<c4"
|
||||
)
|
||||
== "select * from t where c1 < 1 and c2<1 and c3 > 1 and c4>1 or 1 > c1 and 1>c2 and 1 < c3 and 1<c4"
|
||||
)
|
||||
|
||||
|
||||
@pytest.mark.parametrize("p", TABLEAU_PARAMS)
|
||||
def test_tableau_source_cleanups_tableau_parameters_in_lte_gte_predicates(p):
|
||||
assert (
|
||||
TableauSource._clean_tableau_query_parameters(
|
||||
f"select * from t where c1 <<= {p} and c2<<={p} and c3 >>= {p} and c4>>={p} or {p} >>= c1 and {p}>>=c2 and {p} <<= c3 and {p}<<=c4"
|
||||
)
|
||||
== "select * from t where c1 <= 1 and c2<=1 and c3 >= 1 and c4>=1 or 1 >= c1 and 1>=c2 and 1 <= c3 and 1<=c4"
|
||||
)
|
||||
|
||||
|
||||
@pytest.mark.parametrize("p", TABLEAU_PARAMS)
|
||||
def test_tableau_source_cleanups_tableau_parameters_in_join_predicate(p):
|
||||
assert (
|
||||
TableauSource._clean_tableau_query_parameters(
|
||||
f"select * from t1 inner join t2 on t1.id = t2.id and t2.c21 = {p} and t1.c11 = 123 + {p}"
|
||||
)
|
||||
== "select * from t1 inner join t2 on t1.id = t2.id and t2.c21 = 1 and t1.c11 = 123 + 1"
|
||||
)
|
||||
|
||||
|
||||
@pytest.mark.parametrize("p", TABLEAU_PARAMS)
|
||||
def test_tableau_source_cleanups_tableau_parameters_in_complex_expressions(p):
|
||||
assert (
|
||||
TableauSource._clean_tableau_query_parameters(
|
||||
f"select myudf1(c1, {p}, c2) / myudf2({p}) > ({p} + 3 * {p} * c5) * {p} - c4"
|
||||
)
|
||||
== "select myudf1(c1, 1, c2) / myudf2(1) > (1 + 3 * 1 * c5) * 1 - c4"
|
||||
)
|
||||
|
||||
|
||||
@pytest.mark.parametrize("p", TABLEAU_PARAMS)
|
||||
def test_tableau_source_cleanups_tableau_parameters_in_udfs(p):
|
||||
assert (
|
||||
TableauSource._clean_tableau_query_parameters(f"select myudf({p}) from t")
|
||||
== "select myudf(1) from t"
|
||||
)
|
||||
Loading…
x
Reference in New Issue
Block a user