feat(ingestion/tableau): optionally ingest multiple sites and create site containers (#10498)

Co-authored-by: Yanik Häni <Yanik.Haeni1@swisscom.com>
This commit is contained in:
haeniya 2024-07-09 20:49:41 +02:00 committed by GitHub
parent d204d5654a
commit 3e86192b29
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
9 changed files with 175592 additions and 122 deletions

View File

@ -32,6 +32,7 @@ This file documents any backwards-incompatible changes in DataHub and assists pe
No loss of functionality expected unless explicitly mentioned in Breaking Changes.
### Other Notable Changes
- #10498 - Tableau ingestion can now be configured to ingest multiple sites at once and add the sites as containers. The feature is currently only available for Tableau Server.
## 0.13.3

View File

@ -1,5 +1,6 @@
import logging
import re
from collections import OrderedDict
from dataclasses import dataclass
from datetime import datetime
from functools import lru_cache
@ -26,6 +27,7 @@ from tableauserverclient import (
PersonalAccessTokenAuth,
Server,
ServerResponseError,
SiteItem,
TableauAuth,
)
from tableauserverclient.server.endpoint.exceptions import NonXMLResponseError
@ -205,24 +207,31 @@ class TableauConnectionConfig(ConfigModel):
def remove_trailing_slash(cls, v):
return config_clean.remove_trailing_slashes(v)
def make_tableau_client(self) -> Server:
def get_tableau_auth(
self, site: str
) -> Union[TableauAuth, PersonalAccessTokenAuth]:
# https://tableau.github.io/server-client-python/docs/api-ref#authentication
authentication: Union[TableauAuth, PersonalAccessTokenAuth]
if self.username and self.password:
authentication = TableauAuth(
username=self.username,
password=self.password,
site_id=self.site,
site_id=site,
)
elif self.token_name and self.token_value:
authentication = PersonalAccessTokenAuth(
self.token_name, self.token_value, self.site
self.token_name, self.token_value, site
)
else:
raise ConfigurationError(
"Tableau Source: Either username/password or token_name/token_value must be set"
)
return authentication
def make_tableau_client(self, site: str) -> Server:
authentication: Union[
TableauAuth, PersonalAccessTokenAuth
] = self.get_tableau_auth(site)
try:
server = Server(
self.connect_uri,
@ -397,6 +406,25 @@ class TableauConfig(
"(allows to workaround ingestion errors when pre ingested schema and queries are out of sync)",
)
ingest_multiple_sites: bool = Field(
False,
description="When enabled, ingests multiple sites the user has access to. If the user doesn't have access to the default site, specify an initial site to query in the site property. By default all sites the user has access to will be ingested. You can filter sites with the site_name_pattern property. This flag is currently only supported for Tableau Server. Tableau Cloud is not supported.",
)
site_name_pattern: AllowDenyPattern = Field(
default=AllowDenyPattern.allow_all(),
description="Filter for specific Tableau sites. "
"By default, all sites will be included in the ingestion. "
"You can both allow and deny sites based on their name using their name, or a Regex pattern. "
"Deny patterns always take precedence over allow patterns. "
"This property is currently only supported for Tableau Server. Tableau Cloud is not supported. ",
)
add_site_container: bool = Field(
False,
description="When enabled, sites are added as containers and therefore visible in the folder structure within Datahub.",
)
# pre = True because we want to take some decision before pydantic initialize the configuration to default values
@root_validator(pre=True)
def projects_backward_compatibility(cls, values: Dict) -> Dict:
@ -425,6 +453,10 @@ class ProjectKey(ContainerKey):
project_id: str
class SiteKey(ContainerKey):
site_id: str
@dataclass
class UsageStat:
view_count: int
@ -534,10 +566,126 @@ class TableauSource(StatefulIngestionSourceBase, TestableSource):
ctx: PipelineContext,
):
super().__init__(config, ctx)
self.config: TableauConfig = config
self.report: TableauSourceReport = TableauSourceReport()
self.server: Optional[Server] = None
self._authenticate(self.config.site)
def _authenticate(self, site_content_url: str) -> None:
try:
logger.info(f"Authenticated to Tableau site: '{site_content_url}'")
self.server = self.config.make_tableau_client(site_content_url)
# Note that we're not catching ConfigurationError, since we want that to throw.
except ValueError as e:
self.report.failure(
title="Tableau Login Error",
message="Failed to authenticate with Tableau.",
exc=e,
)
@staticmethod
def test_connection(config_dict: dict) -> TestConnectionReport:
test_report = TestConnectionReport()
try:
source_config = TableauConfig.parse_obj_allow_extras(config_dict)
source_config.make_tableau_client(source_config.site)
test_report.basic_connectivity = CapabilityReport(capable=True)
except Exception as e:
test_report.basic_connectivity = CapabilityReport(
capable=False, failure_reason=str(e)
)
return test_report
def get_report(self) -> TableauSourceReport:
return self.report
@classmethod
def create(cls, config_dict: dict, ctx: PipelineContext) -> Source:
config = TableauConfig.parse_obj(config_dict)
return cls(config, ctx)
def get_workunit_processors(self) -> List[Optional[MetadataWorkUnitProcessor]]:
return [
*super().get_workunit_processors(),
StaleEntityRemovalHandler.create(
self, self.config, self.ctx
).workunit_processor,
]
def get_workunits_internal(self) -> Iterable[MetadataWorkUnit]:
if self.server is None or not self.server.is_signed_in():
return
try:
if self.config.ingest_multiple_sites:
for site in list(TSC.Pager(self.server.sites)):
if (
site.state != "Active"
or not self.config.site_name_pattern.allowed(site.name)
):
logger.info(
f"Skip site '{site.name}' as it's excluded in site_name_pattern or inactive."
)
continue
self.server.auth.switch_site(site)
site_source = TableauSiteSource(
config=self.config,
ctx=self.ctx,
site=site,
report=self.report,
server=self.server,
platform=self.platform,
)
logger.info(f"Ingesting assets of site '{site.content_url}'.")
yield from site_source.ingest_tableau_site()
else:
site = self.server.sites.get_by_id(self.server.site_id)
site_source = TableauSiteSource(
config=self.config,
ctx=self.ctx,
site=site,
report=self.report,
server=self.server,
platform=self.platform,
)
yield from site_source.ingest_tableau_site()
except MetadataQueryException as md_exception:
self.report.failure(
title="Failed to Retrieve Tableau Metadata",
message="Unable to retrieve metadata from tableau.",
context=str(md_exception),
)
def close(self) -> None:
try:
if self.server is not None:
self.server.auth.sign_out()
except Exception as ex:
logger.warning(
"During graceful closing of Tableau source a sign-out call was tried but ended up with"
" an Exception (%s). Continuing closing of the source",
ex,
)
self.server = None
super().close()
class TableauSiteSource:
def __init__(
self,
config: TableauConfig,
ctx: PipelineContext,
site: SiteItem,
report: TableauSourceReport,
server: Server,
platform: str,
):
self.config: TableauConfig = config
self.report = report
self.server: Server = server
self.ctx: PipelineContext = ctx
self.site: SiteItem = site
self.platform = platform
self.database_tables: Dict[str, DatabaseTable] = {}
self.tableau_stat_registry: Dict[str, UsageStat] = {}
self.tableau_project_registry: Dict[str, TableauProject] = {}
@ -562,34 +710,6 @@ class TableauSource(StatefulIngestionSourceBase, TestableSource):
# when emitting custom SQL data sources.
self.custom_sql_ids_being_used: List[str] = []
self._authenticate()
@staticmethod
def test_connection(config_dict: dict) -> TestConnectionReport:
test_report = TestConnectionReport()
try:
source_config = TableauConfig.parse_obj_allow_extras(config_dict)
source_config.make_tableau_client()
test_report.basic_connectivity = CapabilityReport(capable=True)
except Exception as e:
test_report.basic_connectivity = CapabilityReport(
capable=False, failure_reason=str(e)
)
return test_report
def close(self) -> None:
try:
if self.server is not None:
self.server.auth.sign_out()
except Exception as ex:
logger.warning(
"During graceful closing of Tableau source a sign-out call was tried but ended up with"
" an Exception (%s). Continuing closing of the source",
ex,
)
self.server = None
super().close()
@property
def no_env_browse_prefix(self) -> str:
# Prefix to use with browse path (v1)
@ -600,13 +720,26 @@ class TableauSource(StatefulIngestionSourceBase, TestableSource):
if self.config.platform_instance
else self.platform
)
return f"/{platform_with_instance}"
return f"/{platform_with_instance}{self.site_name_browse_path}"
@property
def site_name_browse_path(self) -> str:
site_name_prefix = (
self.site.name if self.site and self.config.add_site_container else ""
)
return f"/{site_name_prefix}" if site_name_prefix else ""
@property
def dataset_browse_prefix(self) -> str:
# datasets also have the env in the browse path
return f"/{self.config.env.lower()}{self.no_env_browse_prefix}"
def _re_authenticate(self):
tableau_auth: Union[
TableauAuth, PersonalAccessTokenAuth
] = self.config.get_tableau_auth(self.site.content_url)
self.server.auth.sign_in(tableau_auth)
def _populate_usage_stat_registry(self) -> None:
if self.server is None:
return
@ -713,7 +846,7 @@ class TableauSource(StatefulIngestionSourceBase, TestableSource):
def _init_tableau_project_registry(self, all_project_map: dict) -> None:
list_of_skip_projects: List[TableauProject] = []
projects_to_ingest = {}
for project in all_project_map.values():
# Skip project if it is not allowed
logger.debug(f"Evaluating project pattern for {project.name}")
@ -722,7 +855,13 @@ class TableauSource(StatefulIngestionSourceBase, TestableSource):
logger.debug(f"Project {project.name} is skipped")
continue
logger.debug(f"Project {project.name} is added in project registry")
self.tableau_project_registry[project.id] = project
projects_to_ingest[project.id] = project
# We rely on automatic browse paths (v2) when creating containers. That's why we need to sort the projects here.
# Otherwise, nested projects will not have the correct browse paths if not created in correct order / hierarchy.
self.tableau_project_registry = OrderedDict(
sorted(projects_to_ingest.items(), key=lambda item: len(item[1].path))
)
if self.config.extract_project_hierarchy is False:
logger.debug(
@ -793,18 +932,6 @@ class TableauSource(StatefulIngestionSourceBase, TestableSource):
f"Tableau workbooks {self.workbook_project_map}",
)
def _authenticate(self) -> None:
try:
self.server = self.config.make_tableau_client()
logger.info("Authenticated to Tableau server")
# Note that we're not catching ConfigurationError, since we want that to throw.
except ValueError as e:
self.report.failure(
title="Tableau Login Error",
message="Failed to authenticate with Tableau.",
exc=e,
)
def get_data_platform_instance(self) -> DataPlatformInstanceClass:
return DataPlatformInstanceClass(
platform=builder.make_data_platform_urn(self.platform),
@ -845,7 +972,7 @@ class TableauSource(StatefulIngestionSourceBase, TestableSource):
# If ingestion has been running for over 2 hours, the Tableau
# temporary credentials will expire. If this happens, this exception
# will be thrown and we need to re-authenticate and retry.
self._authenticate()
self._re_authenticate()
return self.get_connection_object_page(
query,
connection_type,
@ -1427,6 +1554,7 @@ class TableauSource(StatefulIngestionSourceBase, TestableSource):
datasource_name = None
project = None
columns: List[Dict[Any, Any]] = []
if len(csql[c.DATA_SOURCES]) > 0:
# CustomSQLTable id owned by exactly one tableau data source
logger.debug(
@ -1474,7 +1602,7 @@ class TableauSource(StatefulIngestionSourceBase, TestableSource):
project = self._get_project_browse_path_name(datasource)
# if condition is needed as graphQL return "columns": None
columns: List[Dict[Any, Any]] = (
columns = (
cast(List[Dict[Any, Any]], csql.get(c.COLUMNS))
if c.COLUMNS in csql and csql.get(c.COLUMNS) is not None
else []
@ -2318,7 +2446,7 @@ class TableauSource(StatefulIngestionSourceBase, TestableSource):
last_modified = self.get_last_modified(creator, created_at, updated_at)
if sheet.get(c.PATH):
site_part = f"/site/{self.config.site}" if self.config.site else ""
site_part = f"/site/{self.site.content_url}" if self.site else ""
sheet_external_url = (
f"{self.config.connect_uri}/#{site_part}/views/{sheet.get(c.PATH)}"
)
@ -2329,7 +2457,7 @@ class TableauSource(StatefulIngestionSourceBase, TestableSource):
and sheet[c.CONTAINED_IN_DASHBOARDS][0].get(c.PATH)
):
# sheet contained in dashboard
site_part = f"/t/{self.config.site}" if self.config.site else ""
site_part = f"/t/{self.site.content_url}" if self.site else ""
dashboard_path = sheet[c.CONTAINED_IN_DASHBOARDS][0][c.PATH]
sheet_external_url = f"{self.config.connect_uri}{site_part}/authoring/{dashboard_path}/{quote(sheet.get(c.NAME, ''), safe='')}"
else:
@ -2461,7 +2589,7 @@ class TableauSource(StatefulIngestionSourceBase, TestableSource):
else None
)
site_part = f"/site/{self.config.site}" if self.config.site else ""
site_part = f"/site/{self.site.content_url}" if self.site else ""
workbook_uri = workbook.get("uri")
workbook_part = (
workbook_uri[workbook_uri.index("/workbooks/") :] if workbook_uri else None
@ -2510,6 +2638,13 @@ class TableauSource(StatefulIngestionSourceBase, TestableSource):
project_id=project_luid,
)
def gen_site_key(self, site_id: str) -> SiteKey:
return SiteKey(
platform=self.platform,
instance=self.config.platform_instance,
site_id=site_id,
)
@staticmethod
def _create_datahub_dashboard_usage_stat(
usage_stat: UsageStat,
@ -2613,7 +2748,7 @@ class TableauSource(StatefulIngestionSourceBase, TestableSource):
updated_at = dashboard.get(c.UPDATED_AT, datetime.now())
last_modified = self.get_last_modified(creator, created_at, updated_at)
site_part = f"/site/{self.config.site}" if self.config.site else ""
site_part = f"/site/{self.site.content_url}" if self.site else ""
dashboard_external_url = (
f"{self.config.connect_uri}/#{site_part}/views/{dashboard.get(c.PATH, '')}"
)
@ -2756,23 +2891,20 @@ class TableauSource(StatefulIngestionSourceBase, TestableSource):
return None
@classmethod
def create(cls, config_dict: dict, ctx: PipelineContext) -> Source:
config = TableauConfig.parse_obj(config_dict)
return cls(config, ctx)
def emit_project_containers(self) -> Iterable[MetadataWorkUnit]:
for _id, project in self.tableau_project_registry.items():
parent_container_key: Optional[ContainerKey] = None
if project.parent_id:
parent_container_key = self.gen_project_key(project.parent_id)
elif self.config.add_site_container and self.site and self.site.id:
parent_container_key = self.gen_site_key(self.site.id)
yield from gen_containers(
container_key=self.gen_project_key(_id),
name=project.name,
description=project.description,
sub_types=[c.PROJECT],
parent_container_key=(
self.gen_project_key(project.parent_id)
if project.parent_id
else None
),
parent_container_key=parent_container_key,
)
if (
project.parent_id is not None
@ -2787,49 +2919,42 @@ class TableauSource(StatefulIngestionSourceBase, TestableSource):
sub_types=[c.PROJECT],
)
def get_workunit_processors(self) -> List[Optional[MetadataWorkUnitProcessor]]:
return [
*super().get_workunit_processors(),
StaleEntityRemovalHandler.create(
self, self.config, self.ctx
).workunit_processor,
]
def get_workunits_internal(self) -> Iterable[MetadataWorkUnit]:
if self.server is None or not self.server.is_signed_in():
def emit_site_container(self):
if not self.site or not self.site.id:
logger.warning("Can not ingest site container. No site information found.")
return
try:
# Initialise the dictionary to later look-up for chart and dashboard stat
if self.config.extract_usage_stats:
self._populate_usage_stat_registry()
# Populate the map of database names and database hostnames to be used later to map
# databases to platform instances.
if self.config.database_hostname_to_platform_instance_map:
self._populate_database_server_hostname_map()
yield from gen_containers(
container_key=self.gen_site_key(self.site.id),
name=self.site.name or "Default",
sub_types=[c.SITE],
)
self._populate_projects_registry()
yield from self.emit_project_containers()
yield from self.emit_workbooks()
if self.sheet_ids:
yield from self.emit_sheets()
if self.dashboard_ids:
yield from self.emit_dashboards()
if self.embedded_datasource_ids_being_used:
yield from self.emit_embedded_datasources()
if self.datasource_ids_being_used:
yield from self.emit_published_datasources()
if self.custom_sql_ids_being_used:
yield from self.emit_custom_sql_datasources()
if self.database_tables:
yield from self.emit_upstream_tables()
except MetadataQueryException as md_exception:
self.report.failure(
title="Failed to Retrieve Tableau Metadata",
message="Unable to retrieve metadata from tableau.",
context=str(md_exception),
)
def ingest_tableau_site(self):
# Initialise the dictionary to later look-up for chart and dashboard stat
if self.config.extract_usage_stats:
self._populate_usage_stat_registry()
def get_report(self) -> TableauSourceReport:
return self.report
return self.report
# Populate the map of database names and database hostnames to be used later to map
# databases to platform instances.
if self.config.database_hostname_to_platform_instance_map:
self._populate_database_server_hostname_map()
self._populate_projects_registry()
if self.config.add_site_container:
yield from self.emit_site_container()
yield from self.emit_project_containers()
yield from self.emit_workbooks()
if self.sheet_ids:
yield from self.emit_sheets()
if self.dashboard_ids:
yield from self.emit_dashboards()
if self.embedded_datasource_ids_being_used:
yield from self.emit_embedded_datasources()
if self.datasource_ids_being_used:
yield from self.emit_published_datasources()
if self.custom_sql_ids_being_used:
yield from self.emit_custom_sql_datasources()
if self.database_tables:
yield from self.emit_upstream_tables()

View File

@ -767,7 +767,12 @@ def get_overridden_info(
):
platform_instance = database_hostname_to_platform_instance_map.get(hostname)
if original_platform in ("athena", "hive", "mysql"): # Two tier databases
if original_platform in (
"athena",
"hive",
"mysql",
"teradata",
): # Two tier databases
upstream_db = None
return upstream_db, platform_instance, platform, original_platform

View File

@ -78,4 +78,5 @@ DASHBOARD = "dashboard"
DASHBOARDS_CONNECTION = "dashboardsConnection"
EMBEDDED_DATA_SOURCES_CONNECTION = "embeddedDatasourcesConnection"
PROJECT = "Project"
SITE = "Site"
IS_UNSUPPORTED_CUSTOM_SQL = "isUnsupportedCustomSql"

View File

@ -8,9 +8,11 @@ from unittest import mock
import pytest
from freezegun import freeze_time
from requests.adapters import ConnectionError
from tableauserverclient import Server
from tableauserverclient.models import (
DatasourceItem,
ProjectItem,
SiteItem,
ViewItem,
WorkbookItem,
)
@ -18,7 +20,12 @@ from tableauserverclient.models import (
from datahub.configuration.source_common import DEFAULT_ENV
from datahub.emitter.mce_builder import make_schema_field_urn
from datahub.ingestion.run.pipeline import Pipeline, PipelineContext
from datahub.ingestion.source.tableau import TableauConfig, TableauSource
from datahub.ingestion.source.tableau import (
TableauConfig,
TableauSiteSource,
TableauSource,
TableauSourceReport,
)
from datahub.ingestion.source.tableau_common import (
TableauLineageOverrides,
TableauUpstreamReference,
@ -124,6 +131,25 @@ def side_effect_project_data(*arg, **kwargs):
return [project1, project2, project3, project4], mock_pagination
def side_effect_site_data(*arg, **kwargs):
mock_pagination = mock.MagicMock()
mock_pagination.total_available = None
site1: SiteItem = SiteItem(name="Acryl", content_url="acryl")
site1._id = "190a6a5c-63ed-4de1-8045-site1"
site1.state = "Active"
site2: SiteItem = SiteItem(name="Site 2", content_url="site2")
site2._id = "190a6a5c-63ed-4de1-8045-site2"
site2.state = "Active"
site3: SiteItem = SiteItem(name="Site 3", content_url="site3")
site3._id = "190a6a5c-63ed-4de1-8045-site3"
site3.state = "Suspended"
return [site1, site2, site3], mock_pagination
def side_effect_datasource_data(*arg, **kwargs):
mock_pagination = mock.MagicMock()
mock_pagination.total_available = None
@ -199,6 +225,13 @@ def side_effect_datasource_get_by_id(id, *arg, **kwargs):
return ds
def side_effect_site_get_by_id(id, *arg, **kwargs):
sites, _ = side_effect_site_data()
for site in sites:
if site._id == id:
return site
def tableau_ingest_common(
pytestconfig,
tmp_path,
@ -223,9 +256,14 @@ def tableau_ingest_common(
mocked_metadata.query.side_effect = side_effect_query_metadata_response
mock_client.metadata = mocked_metadata
mock_client.auth = mock.Mock()
mock_client.site_id = "190a6a5c-63ed-4de1-8045-site1"
mock_client.views = mock.Mock()
mock_client.projects = mock.Mock()
mock_client.sites = mock.Mock()
mock_client.projects.get.side_effect = side_effect_project_data
mock_client.sites.get.side_effect = side_effect_site_data
mock_client.sites.get_by_id.side_effect = side_effect_site_get_by_id
mock_client.datasources = mock.Mock()
mock_client.datasources.get.side_effect = datasources_side_effect
mock_client.datasources.get_by_id.side_effect = (
@ -893,9 +931,16 @@ def test_tableau_unsupported_csql():
"user_source": "user_source",
}
source = TableauSource(config=config, ctx=context)
site_source = TableauSiteSource(
config=config,
ctx=context,
platform="tableau",
site=SiteItem(name="Site 1", content_url="site1"),
report=TableauSourceReport(),
server=Server("https://test-tableau-server.com"),
)
lineage = source._create_lineage_from_unsupported_csql(
lineage = site_source._create_lineage_from_unsupported_csql(
csql_urn=csql_urn,
csql={
"query": "SELECT user_id, source, user_source FROM (SELECT *, ROW_NUMBER() OVER (partition BY user_id ORDER BY __partition_day DESC) AS rank_ FROM invent_dw.UserDetail ) source_user WHERE rank_ = 1",
@ -916,7 +961,7 @@ def test_tableau_unsupported_csql():
)
# With database as None
lineage = source._create_lineage_from_unsupported_csql(
lineage = site_source._create_lineage_from_unsupported_csql(
csql_urn=csql_urn,
csql={
"query": "SELECT user_id, source, user_source FROM (SELECT *, ROW_NUMBER() OVER (partition BY user_id ORDER BY __partition_day DESC) AS rank_ FROM my_bigquery_project.invent_dw.UserDetail ) source_user WHERE rank_ = 1",
@ -957,3 +1002,107 @@ def test_get_all_datasources_failure(pytestconfig, tmp_path, mock_datahub_graph)
pipeline_name="test_tableau_ingest",
datasources_side_effect=ValueError("project_id must be defined."),
)
@freeze_time(FROZEN_TIME)
@pytest.mark.integration
def test_tableau_ingest_multiple_sites(pytestconfig, tmp_path, mock_datahub_graph):
enable_logging()
output_file_name: str = "tableau_mces_multiple_sites.json"
golden_file_name: str = "tableau_multiple_sites_mces_golden.json"
new_pipeline_config: Dict[Any, Any] = {
**config_source_default,
"add_site_container": True,
"ingest_multiple_sites": True,
}
tableau_ingest_common(
pytestconfig=pytestconfig,
tmp_path=tmp_path,
side_effect_query_metadata_response=[
read_response(pytestconfig, "workbooksConnection_all.json"),
read_response(pytestconfig, "sheetsConnection_all.json"),
read_response(pytestconfig, "dashboardsConnection_all.json"),
read_response(pytestconfig, "embeddedDatasourcesConnection_all.json"),
read_response(pytestconfig, "publishedDatasourcesConnection_all.json"),
read_response(pytestconfig, "customSQLTablesConnection_all.json"),
read_response(pytestconfig, "databaseTablesConnection_all.json"),
read_response(pytestconfig, "workbooksConnection_all.json"),
read_response(pytestconfig, "sheetsConnection_all.json"),
read_response(pytestconfig, "dashboardsConnection_all.json"),
read_response(pytestconfig, "embeddedDatasourcesConnection_all.json"),
read_response(pytestconfig, "publishedDatasourcesConnection_all.json"),
read_response(pytestconfig, "customSQLTablesConnection_all.json"),
read_response(pytestconfig, "databaseTablesConnection_all.json"),
],
golden_file_name=golden_file_name,
output_file_name=output_file_name,
mock_datahub_graph=mock_datahub_graph,
pipeline_name="test_tableau_multiple_site_ingestion",
pipeline_config=new_pipeline_config,
)
@freeze_time(FROZEN_TIME)
@pytest.mark.integration
def test_tableau_ingest_sites_as_container(pytestconfig, tmp_path, mock_datahub_graph):
enable_logging()
output_file_name: str = "tableau_mces_ingest_sites_as_container.json"
golden_file_name: str = "tableau_sites_as_container_mces_golden.json"
new_pipeline_config: Dict[Any, Any] = {
**config_source_default,
"add_site_container": True,
}
tableau_ingest_common(
pytestconfig=pytestconfig,
tmp_path=tmp_path,
side_effect_query_metadata_response=[
read_response(pytestconfig, "workbooksConnection_all.json"),
read_response(pytestconfig, "sheetsConnection_all.json"),
read_response(pytestconfig, "dashboardsConnection_all.json"),
read_response(pytestconfig, "embeddedDatasourcesConnection_all.json"),
read_response(pytestconfig, "publishedDatasourcesConnection_all.json"),
read_response(pytestconfig, "customSQLTablesConnection_all.json"),
read_response(pytestconfig, "databaseTablesConnection_all.json"),
],
golden_file_name=golden_file_name,
output_file_name=output_file_name,
mock_datahub_graph=mock_datahub_graph,
pipeline_name="test_tableau_multiple_site_ingestion",
pipeline_config=new_pipeline_config,
)
@freeze_time(FROZEN_TIME)
@pytest.mark.integration
def test_site_name_pattern(pytestconfig, tmp_path, mock_datahub_graph):
enable_logging()
output_file_name: str = "tableau_site_name_pattern_mces.json"
golden_file_name: str = "tableau_site_name_pattern_mces_golden.json"
new_config = config_source_default.copy()
new_config["ingest_multiple_sites"] = True
new_config["add_site_container"] = True
new_config["site_name_pattern"] = {"allow": ["^Site.*$"]}
tableau_ingest_common(
pytestconfig,
tmp_path,
[
read_response(pytestconfig, "workbooksConnection_all.json"),
read_response(pytestconfig, "sheetsConnection_all.json"),
read_response(pytestconfig, "dashboardsConnection_all.json"),
read_response(pytestconfig, "embeddedDatasourcesConnection_all.json"),
read_response(pytestconfig, "publishedDatasourcesConnection_all.json"),
read_response(pytestconfig, "customSQLTablesConnection_all.json"),
read_response(pytestconfig, "databaseTablesConnection_all.json"),
],
golden_file_name,
output_file_name,
mock_datahub_graph,
pipeline_config=new_config,
pipeline_name="test_tableau_site_name_pattern_ingest",
)

View File

@ -1,12 +1,12 @@
import pytest
import datahub.ingestion.source.tableau_constant as c
from datahub.ingestion.source.tableau import TableauSource
from datahub.ingestion.source.tableau import TableauSiteSource
from datahub.ingestion.source.tableau_common import get_filter_pages, make_filter
def test_tableau_source_unescapes_lt():
res = TableauSource._clean_tableau_query_parameters(
res = TableauSiteSource._clean_tableau_query_parameters(
"select * from t where c1 << 135"
)
@ -14,7 +14,7 @@ def test_tableau_source_unescapes_lt():
def test_tableau_source_unescapes_gt():
res = TableauSource._clean_tableau_query_parameters(
res = TableauSiteSource._clean_tableau_query_parameters(
"select * from t where c1 >> 135"
)
@ -22,7 +22,7 @@ def test_tableau_source_unescapes_gt():
def test_tableau_source_unescapes_gte():
res = TableauSource._clean_tableau_query_parameters(
res = TableauSiteSource._clean_tableau_query_parameters(
"select * from t where c1 >>= 135"
)
@ -30,7 +30,7 @@ def test_tableau_source_unescapes_gte():
def test_tableau_source_unescapeslgte():
res = TableauSource._clean_tableau_query_parameters(
res = TableauSiteSource._clean_tableau_query_parameters(
"select * from t where c1 <<= 135"
)
@ -38,7 +38,7 @@ def test_tableau_source_unescapeslgte():
def test_tableau_source_doesnt_touch_not_escaped():
res = TableauSource._clean_tableau_query_parameters(
res = TableauSiteSource._clean_tableau_query_parameters(
"select * from t where c1 < 135 and c2 > 15"
)
@ -70,7 +70,7 @@ TABLEAU_PARAMS = [
@pytest.mark.parametrize("p", TABLEAU_PARAMS)
def test_tableau_source_cleanups_tableau_parameters_in_equi_predicates(p):
assert (
TableauSource._clean_tableau_query_parameters(
TableauSiteSource._clean_tableau_query_parameters(
f"select * from t where c1 = {p} and c2 = {p} and c3 = 7"
)
== "select * from t where c1 = 1 and c2 = 1 and c3 = 7"
@ -80,7 +80,7 @@ def test_tableau_source_cleanups_tableau_parameters_in_equi_predicates(p):
@pytest.mark.parametrize("p", TABLEAU_PARAMS)
def test_tableau_source_cleanups_tableau_parameters_in_lt_gt_predicates(p):
assert (
TableauSource._clean_tableau_query_parameters(
TableauSiteSource._clean_tableau_query_parameters(
f"select * from t where c1 << {p} and c2<<{p} and c3 >> {p} and c4>>{p} or {p} >> c1 and {p}>>c2 and {p} << c3 and {p}<<c4"
)
== "select * from t where c1 < 1 and c2<1 and c3 > 1 and c4>1 or 1 > c1 and 1>c2 and 1 < c3 and 1<c4"
@ -90,7 +90,7 @@ def test_tableau_source_cleanups_tableau_parameters_in_lt_gt_predicates(p):
@pytest.mark.parametrize("p", TABLEAU_PARAMS)
def test_tableau_source_cleanups_tableau_parameters_in_lte_gte_predicates(p):
assert (
TableauSource._clean_tableau_query_parameters(
TableauSiteSource._clean_tableau_query_parameters(
f"select * from t where c1 <<= {p} and c2<<={p} and c3 >>= {p} and c4>>={p} or {p} >>= c1 and {p}>>=c2 and {p} <<= c3 and {p}<<=c4"
)
== "select * from t where c1 <= 1 and c2<=1 and c3 >= 1 and c4>=1 or 1 >= c1 and 1>=c2 and 1 <= c3 and 1<=c4"
@ -100,7 +100,7 @@ def test_tableau_source_cleanups_tableau_parameters_in_lte_gte_predicates(p):
@pytest.mark.parametrize("p", TABLEAU_PARAMS)
def test_tableau_source_cleanups_tableau_parameters_in_join_predicate(p):
assert (
TableauSource._clean_tableau_query_parameters(
TableauSiteSource._clean_tableau_query_parameters(
f"select * from t1 inner join t2 on t1.id = t2.id and t2.c21 = {p} and t1.c11 = 123 + {p}"
)
== "select * from t1 inner join t2 on t1.id = t2.id and t2.c21 = 1 and t1.c11 = 123 + 1"
@ -110,7 +110,7 @@ def test_tableau_source_cleanups_tableau_parameters_in_join_predicate(p):
@pytest.mark.parametrize("p", TABLEAU_PARAMS)
def test_tableau_source_cleanups_tableau_parameters_in_complex_expressions(p):
assert (
TableauSource._clean_tableau_query_parameters(
TableauSiteSource._clean_tableau_query_parameters(
f"select myudf1(c1, {p}, c2) / myudf2({p}) > ({p} + 3 * {p} * c5) * {p} - c4"
)
== "select myudf1(c1, 1, c2) / myudf2(1) > (1 + 3 * 1 * c5) * 1 - c4"
@ -120,7 +120,7 @@ def test_tableau_source_cleanups_tableau_parameters_in_complex_expressions(p):
@pytest.mark.parametrize("p", TABLEAU_PARAMS)
def test_tableau_source_cleanups_tableau_parameters_in_udfs(p):
assert (
TableauSource._clean_tableau_query_parameters(f"select myudf({p}) from t")
TableauSiteSource._clean_tableau_query_parameters(f"select myudf({p}) from t")
== "select myudf(1) from t"
)