feat(ingest/tableau): Allow parsing of database name from fullName (#8981)

This commit is contained in:
Andrew Sikowitz 2023-10-10 17:34:06 -04:00 committed by GitHub
parent 6ecdeda5ff
commit 1a72fa499c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 151 additions and 119 deletions

View File

@ -77,6 +77,7 @@ from datahub.ingestion.source.tableau_common import (
FIELD_TYPE_MAPPING,
MetadataQueryException,
TableauLineageOverrides,
TableauUpstreamReference,
clean_query,
custom_sql_graphql_query,
dashboard_graphql_query,
@ -85,7 +86,6 @@ from datahub.ingestion.source.tableau_common import (
get_overridden_info,
get_unique_custom_sql,
make_fine_grained_lineage_class,
make_table_urn,
make_upstream_class,
published_datasource_graphql_query,
query_metadata,
@ -271,7 +271,7 @@ class TableauConfig(
"You can change this if your Tableau projects contain slashes in their names, and you'd like to filter by project.",
)
default_schema_map: dict = Field(
default_schema_map: Dict[str, str] = Field(
default={}, description="Default schema to use when schema is not found."
)
ingest_tags: Optional[bool] = Field(
@ -997,41 +997,16 @@ class TableauSource(StatefulIngestionSourceBase):
)
continue
schema = table.get(tableau_constant.SCHEMA) or ""
table_name = table.get(tableau_constant.NAME) or ""
full_name = table.get(tableau_constant.FULL_NAME) or ""
upstream_db = (
table[tableau_constant.DATABASE][tableau_constant.NAME]
if table.get(tableau_constant.DATABASE)
and table[tableau_constant.DATABASE].get(tableau_constant.NAME)
else ""
)
logger.debug(
"Processing Table with Connection Type: {0} and id {1}".format(
table.get(tableau_constant.CONNECTION_TYPE) or "",
table.get(tableau_constant.ID) or "",
try:
ref = TableauUpstreamReference.create(
table, default_schema_map=self.config.default_schema_map
)
)
schema = self._get_schema(schema, upstream_db, full_name)
# if the schema is included within the table name we omit it
if (
schema
and table_name
and full_name
and table_name == full_name
and schema in table_name
):
logger.debug(
f"Omitting schema for upstream table {table[tableau_constant.ID]}, schema included in table name"
)
schema = ""
except Exception as e:
logger.info(f"Failed to generate upstream reference for {table}: {e}")
continue
table_urn = make_table_urn(
table_urn = ref.make_dataset_urn(
self.config.env,
upstream_db,
table.get(tableau_constant.CONNECTION_TYPE) or "",
schema,
table_name,
self.config.platform_instance_map,
self.config.lineage_overrides,
)
@ -1052,7 +1027,7 @@ class TableauSource(StatefulIngestionSourceBase):
urn=table_urn,
id=table[tableau_constant.ID],
num_cols=num_tbl_cols,
paths=set([table_path]) if table_path else set(),
paths={table_path} if table_path else set(),
)
else:
self.database_tables[table_urn].update_table(
@ -2462,35 +2437,6 @@ class TableauSource(StatefulIngestionSourceBase):
is_embedded_ds=True,
)
@lru_cache(maxsize=None)
def _get_schema(self, schema_provided: str, database: str, fullName: str) -> str:
# For some databases, the schema attribute in tableau api does not return
# correct schema name for the table. For more information, see
# https://help.tableau.com/current/api/metadata_api/en-us/docs/meta_api_model.html#schema_attribute.
# Hence we extract schema from fullName whenever fullName is available
schema = self._extract_schema_from_fullName(fullName) if fullName else ""
if not schema:
schema = schema_provided
elif schema != schema_provided:
logger.debug(
"Correcting schema, provided {0}, corrected {1}".format(
schema_provided, schema
)
)
if not schema and database in self.config.default_schema_map:
schema = self.config.default_schema_map[database]
return schema
@lru_cache(maxsize=None)
def _extract_schema_from_fullName(self, fullName: str) -> str:
# fullName is observed to be in format [schemaName].[tableName]
# OR simply tableName OR [tableName]
if fullName.startswith("[") and "].[" in fullName:
return fullName[1 : fullName.index("]")]
return ""
@lru_cache(maxsize=None)
def get_last_modified(
self, creator: Optional[str], created_at: bytes, updated_at: bytes

View File

@ -1,4 +1,6 @@
import html
import logging
from dataclasses import dataclass
from functools import lru_cache
from typing import Dict, List, Optional, Tuple
@ -6,6 +8,7 @@ from pydantic.fields import Field
import datahub.emitter.mce_builder as builder
from datahub.configuration.common import ConfigModel
from datahub.ingestion.source import tableau_constant as tc
from datahub.metadata.com.linkedin.pegasus2avro.dataset import (
DatasetLineageType,
FineGrainedLineage,
@ -31,6 +34,8 @@ from datahub.metadata.schema_classes import (
)
from datahub.utilities.sqlglot_lineage import ColumnLineageInfo, SqlParsingResult
logger = logging.getLogger(__name__)
class TableauLineageOverrides(ConfigModel):
platform_override_map: Optional[Dict[str, str]] = Field(
@ -537,12 +542,12 @@ def get_fully_qualified_table_name(
platform: str,
upstream_db: str,
schema: str,
full_name: str,
table_name: str,
) -> str:
if platform == "athena":
upstream_db = ""
database_name = f"{upstream_db}." if upstream_db else ""
final_name = full_name.replace("[", "").replace("]", "")
final_name = table_name.replace("[", "").replace("]", "")
schema_name = f"{schema}." if schema else ""
@ -573,17 +578,123 @@ def get_fully_qualified_table_name(
return fully_qualified_table_name
def get_platform_instance(
platform: str, platform_instance_map: Optional[Dict[str, str]]
) -> Optional[str]:
if platform_instance_map is not None and platform in platform_instance_map.keys():
return platform_instance_map[platform]
@dataclass
class TableauUpstreamReference:
database: Optional[str]
schema: Optional[str]
table: str
return None
connection_type: str
@classmethod
def create(
cls, d: dict, default_schema_map: Optional[Dict[str, str]] = None
) -> "TableauUpstreamReference":
# Values directly from `table` object from Tableau
database = t_database = d.get(tc.DATABASE, {}).get(tc.NAME)
schema = t_schema = d.get(tc.SCHEMA)
table = t_table = d.get(tc.NAME) or ""
t_full_name = d.get(tc.FULL_NAME)
t_connection_type = d[tc.CONNECTION_TYPE] # required to generate urn
t_id = d[tc.ID]
parsed_full_name = cls.parse_full_name(t_full_name)
if parsed_full_name and len(parsed_full_name) == 3:
database, schema, table = parsed_full_name
elif parsed_full_name and len(parsed_full_name) == 2:
schema, table = parsed_full_name
else:
logger.debug(
f"Upstream urn generation ({t_id}):"
f" Did not parse full name {t_full_name}: unexpected number of values",
)
if not schema and default_schema_map and database in default_schema_map:
schema = default_schema_map[database]
if database != t_database:
logger.debug(
f"Upstream urn generation ({t_id}):"
f" replacing database {t_database} with {database} from full name {t_full_name}"
)
if schema != t_schema:
logger.debug(
f"Upstream urn generation ({t_id}):"
f" replacing schema {t_schema} with {schema} from full name {t_full_name}"
)
if table != t_table:
logger.debug(
f"Upstream urn generation ({t_id}):"
f" replacing table {t_table} with {table} from full name {t_full_name}"
)
# TODO: See if we can remove this -- made for redshift
if (
schema
and t_table
and t_full_name
and t_table == t_full_name
and schema in t_table
):
logger.debug(
f"Omitting schema for upstream table {t_id}, schema included in table name"
)
schema = ""
return cls(
database=database,
schema=schema,
table=table,
connection_type=t_connection_type,
)
@staticmethod
def parse_full_name(full_name: Optional[str]) -> Optional[List[str]]:
# fullName is observed to be in formats:
# [database].[schema].[table]
# [schema].[table]
# [table]
# table
# schema
# TODO: Validate the startswith check. Currently required for our integration tests
if full_name is None or not full_name.startswith("["):
return None
return full_name.replace("[", "").replace("]", "").split(".")
def make_dataset_urn(
self,
env: str,
platform_instance_map: Optional[Dict[str, str]],
lineage_overrides: Optional[TableauLineageOverrides] = None,
) -> str:
(
upstream_db,
platform_instance,
platform,
original_platform,
) = get_overridden_info(
connection_type=self.connection_type,
upstream_db=self.database,
lineage_overrides=lineage_overrides,
platform_instance_map=platform_instance_map,
)
table_name = get_fully_qualified_table_name(
original_platform,
upstream_db or "",
self.schema,
self.table,
)
return builder.make_dataset_urn_with_platform_instance(
platform, table_name, platform_instance, env
)
def get_overridden_info(
connection_type: str,
connection_type: Optional[str],
upstream_db: Optional[str],
platform_instance_map: Optional[Dict[str, str]],
lineage_overrides: Optional[TableauLineageOverrides] = None,
@ -605,7 +716,9 @@ def get_overridden_info(
):
upstream_db = lineage_overrides.database_override_map[upstream_db]
platform_instance = get_platform_instance(original_platform, platform_instance_map)
platform_instance = (
platform_instance_map.get(original_platform) if platform_instance_map else None
)
if original_platform in ("athena", "hive", "mysql"): # Two tier databases
upstream_db = None
@ -613,35 +726,6 @@ def get_overridden_info(
return upstream_db, platform_instance, platform, original_platform
def make_table_urn(
env: str,
upstream_db: Optional[str],
connection_type: str,
schema: str,
full_name: str,
platform_instance_map: Optional[Dict[str, str]],
lineage_overrides: Optional[TableauLineageOverrides] = None,
) -> str:
upstream_db, platform_instance, platform, original_platform = get_overridden_info(
connection_type=connection_type,
upstream_db=upstream_db,
lineage_overrides=lineage_overrides,
platform_instance_map=platform_instance_map,
)
table_name = get_fully_qualified_table_name(
original_platform,
upstream_db if upstream_db is not None else "",
schema,
full_name,
)
return builder.make_dataset_urn_with_platform_instance(
platform, table_name, platform_instance, env
)
def make_description_from_params(description, formula):
"""
Generate column description

View File

@ -20,7 +20,7 @@ from datahub.ingestion.run.pipeline import Pipeline, PipelineContext
from datahub.ingestion.source.tableau import TableauConfig, TableauSource
from datahub.ingestion.source.tableau_common import (
TableauLineageOverrides,
make_table_urn,
TableauUpstreamReference,
)
from datahub.metadata.com.linkedin.pegasus2avro.dataset import (
DatasetLineageType,
@ -546,13 +546,13 @@ def test_lineage_overrides():
enable_logging()
# Simple - specify platform instance to presto table
assert (
make_table_urn(
DEFAULT_ENV,
TableauUpstreamReference(
"presto_catalog",
"presto",
"test-schema",
"presto_catalog.test-schema.test-table",
platform_instance_map={"presto": "my_presto_instance"},
"test-table",
"presto",
).make_dataset_urn(
env=DEFAULT_ENV, platform_instance_map={"presto": "my_presto_instance"}
)
== "urn:li:dataset:(urn:li:dataPlatform:presto,my_presto_instance.presto_catalog.test-schema.test-table,PROD)"
)
@ -560,12 +560,13 @@ def test_lineage_overrides():
# Transform presto urn to hive urn
# resulting platform instance for hive = mapped platform instance + presto_catalog
assert (
make_table_urn(
DEFAULT_ENV,
TableauUpstreamReference(
"presto_catalog",
"presto",
"test-schema",
"presto_catalog.test-schema.test-table",
"test-table",
"presto",
).make_dataset_urn(
env=DEFAULT_ENV,
platform_instance_map={"presto": "my_instance"},
lineage_overrides=TableauLineageOverrides(
platform_override_map={"presto": "hive"},
@ -574,14 +575,15 @@ def test_lineage_overrides():
== "urn:li:dataset:(urn:li:dataPlatform:hive,my_instance.presto_catalog.test-schema.test-table,PROD)"
)
# tranform hive urn to presto urn
# transform hive urn to presto urn
assert (
make_table_urn(
DEFAULT_ENV,
"",
"hive",
TableauUpstreamReference(
None,
"test-schema",
"test-schema.test-table",
"test-table",
"hive",
).make_dataset_urn(
env=DEFAULT_ENV,
platform_instance_map={"hive": "my_presto_instance.presto_catalog"},
lineage_overrides=TableauLineageOverrides(
platform_override_map={"hive": "presto"},