mirror of
https://github.com/datahub-project/datahub.git
synced 2025-08-19 06:38:04 +00:00
feat(tableau): fine-grained page size (#12354)
This commit is contained in:
parent
0392a22d9d
commit
b7b541ca7d
@ -174,6 +174,8 @@ from datahub.utilities.perf_timer import PerfTimer
|
||||
from datahub.utilities.stats_collections import TopKDict
|
||||
from datahub.utilities.urns.dataset_urn import DatasetUrn
|
||||
|
||||
DEFAULT_PAGE_SIZE = 10
|
||||
|
||||
try:
|
||||
# On earlier versions of the tableauserverclient, the NonXMLResponseError
|
||||
# was thrown when reauthentication was necessary. We'll keep both exceptions
|
||||
@ -342,11 +344,140 @@ class PermissionIngestionConfig(ConfigModel):
|
||||
)
|
||||
|
||||
|
||||
class TableauPageSizeConfig(ConfigModel):
|
||||
"""
|
||||
Configuration for setting page sizes for different Tableau metadata objects.
|
||||
|
||||
Some considerations:
|
||||
- All have default values, so no setting is mandatory.
|
||||
- In general, with the `effective_` methods, if not specifically set fine-grained metrics fallback to `page_size`
|
||||
or correlate with `page_size`.
|
||||
|
||||
Measuring the impact of changing these values can be done by looking at the
|
||||
`num_(filter_|paginated_)?queries_by_connection_type` metrics in the report.
|
||||
"""
|
||||
|
||||
page_size: int = Field(
|
||||
default=DEFAULT_PAGE_SIZE,
|
||||
description="[advanced] Number of metadata objects (e.g. CustomSQLTable, PublishedDatasource, etc) to query at a time using the Tableau API.",
|
||||
)
|
||||
|
||||
database_server_page_size: Optional[int] = Field(
|
||||
default=None,
|
||||
description="[advanced] Number of database servers to query at a time using the Tableau API; fallbacks to `page_size` if not set.",
|
||||
)
|
||||
|
||||
@property
|
||||
def effective_database_server_page_size(self) -> int:
|
||||
return self.database_server_page_size or self.page_size
|
||||
|
||||
# We've found that even with a small workbook page size (e.g. 10), the Tableau API often
|
||||
# returns warnings like this:
|
||||
# {
|
||||
# 'message': 'Showing partial results. The request exceeded the 20000 node limit. Use pagination, additional filtering, or both in the query to adjust results.',
|
||||
# 'extensions': {
|
||||
# 'severity': 'WARNING',
|
||||
# 'code': 'NODE_LIMIT_EXCEEDED',
|
||||
# 'properties': {
|
||||
# 'nodeLimit': 20000
|
||||
# }
|
||||
# }
|
||||
# }
|
||||
# Reducing the page size for the workbook queries helps to avoid this.
|
||||
workbook_page_size: Optional[int] = Field(
|
||||
default=1,
|
||||
description="[advanced] Number of workbooks to query at a time using the Tableau API; defaults to `1` and fallbacks to `page_size` if not set.",
|
||||
)
|
||||
|
||||
@property
|
||||
def effective_workbook_page_size(self) -> int:
|
||||
return self.workbook_page_size or self.page_size
|
||||
|
||||
sheet_page_size: Optional[int] = Field(
|
||||
default=None,
|
||||
description="[advanced] Number of sheets to query at a time using the Tableau API; fallbacks to `page_size` if not set.",
|
||||
)
|
||||
|
||||
@property
|
||||
def effective_sheet_page_size(self) -> int:
|
||||
return self.sheet_page_size or self.page_size
|
||||
|
||||
dashboard_page_size: Optional[int] = Field(
|
||||
default=None,
|
||||
description="[advanced] Number of dashboards to query at a time using the Tableau API; fallbacks to `page_size` if not set.",
|
||||
)
|
||||
|
||||
@property
|
||||
def effective_dashboard_page_size(self) -> int:
|
||||
return self.dashboard_page_size or self.page_size
|
||||
|
||||
embedded_datasource_page_size: Optional[int] = Field(
|
||||
default=None,
|
||||
description="[advanced] Number of embedded datasources to query at a time using the Tableau API; fallbacks to `page_size` if not set.",
|
||||
)
|
||||
|
||||
@property
|
||||
def effective_embedded_datasource_page_size(self) -> int:
|
||||
return self.embedded_datasource_page_size or self.page_size
|
||||
|
||||
# Since the field upstream query was separated from the embedded datasource queries into an independent query,
|
||||
# the number of queries increased significantly and so the execution time.
|
||||
# To increase the batching and so reduce the number of queries, we can increase the page size for that
|
||||
# particular case.
|
||||
#
|
||||
# That's why unless specifically set, we will effectively use 10 times the page size as the default page size.
|
||||
embedded_datasource_field_upstream_page_size: Optional[int] = Field(
|
||||
default=None,
|
||||
description="[advanced] Number of upstream fields to query at a time for embedded datasources using the Tableau API; fallbacks to `page_size` * 10 if not set.",
|
||||
)
|
||||
|
||||
@property
|
||||
def effective_embedded_datasource_field_upstream_page_size(self) -> int:
|
||||
return self.embedded_datasource_field_upstream_page_size or self.page_size * 10
|
||||
|
||||
published_datasource_page_size: Optional[int] = Field(
|
||||
default=None,
|
||||
description="[advanced] Number of published datasources to query at a time using the Tableau API; fallbacks to `page_size` if not set.",
|
||||
)
|
||||
|
||||
@property
|
||||
def effective_published_datasource_page_size(self) -> int:
|
||||
return self.published_datasource_page_size or self.page_size
|
||||
|
||||
published_datasource_field_upstream_page_size: Optional[int] = Field(
|
||||
default=None,
|
||||
description="[advanced] Number of upstream fields to query at a time for published datasources using the Tableau API; fallbacks to `page_size` * 10 if not set.",
|
||||
)
|
||||
|
||||
@property
|
||||
def effective_published_datasource_field_upstream_page_size(self) -> int:
|
||||
return self.published_datasource_field_upstream_page_size or self.page_size * 10
|
||||
|
||||
custom_sql_table_page_size: Optional[int] = Field(
|
||||
default=None,
|
||||
description="[advanced] Number of custom sql datasources to query at a time using the Tableau API; fallbacks to `page_size` if not set.",
|
||||
)
|
||||
|
||||
@property
|
||||
def effective_custom_sql_table_page_size(self) -> int:
|
||||
return self.custom_sql_table_page_size or self.page_size
|
||||
|
||||
database_table_page_size: Optional[int] = Field(
|
||||
default=None,
|
||||
description="[advanced] Number of database tables to query at a time using the Tableau API; fallbacks to `page_size` if not set.",
|
||||
)
|
||||
|
||||
@property
|
||||
def effective_database_table_page_size(self) -> int:
|
||||
return self.database_table_page_size or self.page_size
|
||||
|
||||
|
||||
class TableauConfig(
|
||||
DatasetLineageProviderConfigBase,
|
||||
StatefulIngestionConfigBase,
|
||||
DatasetSourceConfigMixin,
|
||||
TableauConnectionConfig,
|
||||
TableauPageSizeConfig,
|
||||
):
|
||||
projects: Optional[List[str]] = Field(
|
||||
default=["default"],
|
||||
@ -396,40 +527,6 @@ class TableauConfig(
|
||||
description="Ingest details for tables external to (not embedded in) tableau as entities.",
|
||||
)
|
||||
|
||||
page_size: int = Field(
|
||||
default=10,
|
||||
description="[advanced] Number of metadata objects (e.g. CustomSQLTable, PublishedDatasource, etc) to query at a time using the Tableau API.",
|
||||
)
|
||||
|
||||
# We've found that even with a small workbook page size (e.g. 10), the Tableau API often
|
||||
# returns warnings like this:
|
||||
# {
|
||||
# 'message': 'Showing partial results. The request exceeded the 20000 node limit. Use pagination, additional filtering, or both in the query to adjust results.',
|
||||
# 'extensions': {
|
||||
# 'severity': 'WARNING',
|
||||
# 'code': 'NODE_LIMIT_EXCEEDED',
|
||||
# 'properties': {
|
||||
# 'nodeLimit': 20000
|
||||
# }
|
||||
# }
|
||||
# }
|
||||
# Reducing the page size for the workbook queries helps to avoid this.
|
||||
workbook_page_size: int = Field(
|
||||
default=1,
|
||||
description="[advanced] Number of workbooks to query at a time using the Tableau API.",
|
||||
)
|
||||
|
||||
# Since the field upstream query was separated from the embedded datasource queries into an independent query,
|
||||
# the number of queries increased significantly and so the execution time.
|
||||
# To increase the batching and so reduce the number of queries, we can increase the page size for that
|
||||
# particular case.
|
||||
#
|
||||
# `num_(filter_|paginated_)?queries_by_connection_type` metrics in the report will help to understand the impact of this change.
|
||||
embedded_datasource_field_upstream_page_size: int = Field(
|
||||
default=100,
|
||||
description="[advanced] Number of upstream fields to query at a time for embedded datasources using the Tableau API.",
|
||||
)
|
||||
|
||||
env: str = Field(
|
||||
default=builder.DEFAULT_ENV,
|
||||
description="Environment to use in namespace when constructing URNs.",
|
||||
@ -1022,7 +1119,9 @@ class TableauSiteSource:
|
||||
return server_connection
|
||||
|
||||
for database_server in self.get_connection_objects(
|
||||
database_servers_graphql_query, c.DATABASE_SERVERS_CONNECTION
|
||||
query=database_servers_graphql_query,
|
||||
connection_type=c.DATABASE_SERVERS_CONNECTION,
|
||||
page_size=self.config.effective_database_server_page_size,
|
||||
):
|
||||
database_server_id = database_server.get(c.ID)
|
||||
server_connection = database_server.get(c.HOST_NAME)
|
||||
@ -1448,14 +1547,13 @@ class TableauSiteSource:
|
||||
self,
|
||||
query: str,
|
||||
connection_type: str,
|
||||
page_size: int,
|
||||
query_filter: dict = {},
|
||||
page_size_override: Optional[int] = None,
|
||||
) -> Iterable[dict]:
|
||||
query_filter = optimize_query_filter(query_filter)
|
||||
|
||||
# Calls the get_connection_object_page function to get the objects,
|
||||
# and automatically handles pagination.
|
||||
page_size = page_size_override or self.config.page_size
|
||||
|
||||
filter_pages = get_filter_pages(query_filter, page_size)
|
||||
self.report.num_queries_by_connection_type[connection_type] += 1
|
||||
@ -1500,10 +1598,10 @@ class TableauSiteSource:
|
||||
projects = {c.PROJECT_NAME_WITH_IN: project_names}
|
||||
|
||||
for workbook in self.get_connection_objects(
|
||||
workbook_graphql_query,
|
||||
c.WORKBOOKS_CONNECTION,
|
||||
projects,
|
||||
page_size_override=self.config.workbook_page_size,
|
||||
query=workbook_graphql_query,
|
||||
connection_type=c.WORKBOOKS_CONNECTION,
|
||||
query_filter=projects,
|
||||
page_size=self.config.effective_workbook_page_size,
|
||||
):
|
||||
# This check is needed as we are using projectNameWithin which return project as per project name so if
|
||||
# user want to ingest only nested project C from A->B->C then tableau might return more than one Project
|
||||
@ -1958,9 +2056,10 @@ class TableauSiteSource:
|
||||
|
||||
custom_sql_connection = list(
|
||||
self.get_connection_objects(
|
||||
custom_sql_graphql_query,
|
||||
c.CUSTOM_SQL_TABLE_CONNECTION,
|
||||
custom_sql_filter,
|
||||
query=custom_sql_graphql_query,
|
||||
connection_type=c.CUSTOM_SQL_TABLE_CONNECTION,
|
||||
query_filter=custom_sql_filter,
|
||||
page_size=self.config.effective_custom_sql_table_page_size,
|
||||
)
|
||||
)
|
||||
|
||||
@ -2669,7 +2768,7 @@ class TableauSiteSource:
|
||||
self,
|
||||
datasource: dict,
|
||||
field_upstream_query: str,
|
||||
page_size_override: Optional[int] = None,
|
||||
page_size: int,
|
||||
) -> dict:
|
||||
# Collect field ids to fetch field upstreams
|
||||
field_ids: List[str] = []
|
||||
@ -2683,7 +2782,7 @@ class TableauSiteSource:
|
||||
query=field_upstream_query,
|
||||
connection_type=c.FIELDS_CONNECTION,
|
||||
query_filter={c.ID_WITH_IN: field_ids},
|
||||
page_size_override=page_size_override,
|
||||
page_size=page_size,
|
||||
):
|
||||
if field_upstream.get(c.ID):
|
||||
field_id = field_upstream[c.ID]
|
||||
@ -2706,13 +2805,15 @@ class TableauSiteSource:
|
||||
datasource_filter = {c.ID_WITH_IN: self.datasource_ids_being_used}
|
||||
|
||||
for datasource in self.get_connection_objects(
|
||||
published_datasource_graphql_query,
|
||||
c.PUBLISHED_DATA_SOURCES_CONNECTION,
|
||||
datasource_filter,
|
||||
query=published_datasource_graphql_query,
|
||||
connection_type=c.PUBLISHED_DATA_SOURCES_CONNECTION,
|
||||
query_filter=datasource_filter,
|
||||
page_size=self.config.effective_published_datasource_page_size,
|
||||
):
|
||||
datasource = self.update_datasource_for_field_upstream(
|
||||
datasource=datasource,
|
||||
field_upstream_query=datasource_upstream_fields_graphql_query,
|
||||
page_size=self.config.effective_published_datasource_field_upstream_page_size,
|
||||
)
|
||||
|
||||
yield from self.emit_datasource(datasource)
|
||||
@ -2730,9 +2831,10 @@ class TableauSiteSource:
|
||||
|
||||
# Emitting tables that came from Tableau metadata
|
||||
for tableau_table in self.get_connection_objects(
|
||||
database_tables_graphql_query,
|
||||
c.DATABASE_TABLES_CONNECTION,
|
||||
tables_filter,
|
||||
query=database_tables_graphql_query,
|
||||
connection_type=c.DATABASE_TABLES_CONNECTION,
|
||||
query_filter=tables_filter,
|
||||
page_size=self.config.effective_database_table_page_size,
|
||||
):
|
||||
database_table = self.database_tables[
|
||||
tableau_database_table_id_to_urn_map[tableau_table[c.ID]]
|
||||
@ -2921,9 +3023,10 @@ class TableauSiteSource:
|
||||
sheets_filter = {c.ID_WITH_IN: self.sheet_ids}
|
||||
|
||||
for sheet in self.get_connection_objects(
|
||||
sheet_graphql_query,
|
||||
c.SHEETS_CONNECTION,
|
||||
sheets_filter,
|
||||
query=sheet_graphql_query,
|
||||
connection_type=c.SHEETS_CONNECTION,
|
||||
query_filter=sheets_filter,
|
||||
page_size=self.config.effective_sheet_page_size,
|
||||
):
|
||||
if self.config.ingest_hidden_assets or not self._is_hidden_view(sheet):
|
||||
yield from self.emit_sheets_as_charts(sheet, sheet.get(c.WORKBOOK))
|
||||
@ -3241,9 +3344,10 @@ class TableauSiteSource:
|
||||
dashboards_filter = {c.ID_WITH_IN: self.dashboard_ids}
|
||||
|
||||
for dashboard in self.get_connection_objects(
|
||||
dashboard_graphql_query,
|
||||
c.DASHBOARDS_CONNECTION,
|
||||
dashboards_filter,
|
||||
query=dashboard_graphql_query,
|
||||
connection_type=c.DASHBOARDS_CONNECTION,
|
||||
query_filter=dashboards_filter,
|
||||
page_size=self.config.effective_dashboard_page_size,
|
||||
):
|
||||
if self.config.ingest_hidden_assets or not self._is_hidden_view(dashboard):
|
||||
yield from self.emit_dashboard(dashboard, dashboard.get(c.WORKBOOK))
|
||||
@ -3388,14 +3492,15 @@ class TableauSiteSource:
|
||||
datasource_filter = {c.ID_WITH_IN: self.embedded_datasource_ids_being_used}
|
||||
|
||||
for datasource in self.get_connection_objects(
|
||||
embedded_datasource_graphql_query,
|
||||
c.EMBEDDED_DATA_SOURCES_CONNECTION,
|
||||
datasource_filter,
|
||||
query=embedded_datasource_graphql_query,
|
||||
connection_type=c.EMBEDDED_DATA_SOURCES_CONNECTION,
|
||||
query_filter=datasource_filter,
|
||||
page_size=self.config.effective_embedded_datasource_page_size,
|
||||
):
|
||||
datasource = self.update_datasource_for_field_upstream(
|
||||
datasource=datasource,
|
||||
field_upstream_query=datasource_upstream_fields_graphql_query,
|
||||
page_size_override=self.config.embedded_datasource_field_upstream_page_size,
|
||||
page_size=self.config.effective_embedded_datasource_field_upstream_page_size,
|
||||
)
|
||||
yield from self.emit_datasource(
|
||||
datasource,
|
||||
|
@ -69,8 +69,7 @@ config_source_default = {
|
||||
"projects": ["default", "Project 2", "Samples"],
|
||||
"extract_project_hierarchy": False,
|
||||
"page_size": 1000,
|
||||
"workbook_page_size": 1000,
|
||||
"embedded_datasource_field_upstream_page_size": 1000,
|
||||
"workbook_page_size": None,
|
||||
"ingest_tags": True,
|
||||
"ingest_owner": True,
|
||||
"ingest_tables_external": True,
|
||||
@ -646,8 +645,7 @@ def test_tableau_ingest_with_platform_instance(
|
||||
"platform_instance": "acryl_site1",
|
||||
"projects": ["default", "Project 2"],
|
||||
"page_size": 1000,
|
||||
"workbook_page_size": 1000,
|
||||
"embedded_datasource_field_upstream_page_size": 1000,
|
||||
"workbook_page_size": None,
|
||||
"ingest_tags": True,
|
||||
"ingest_owner": True,
|
||||
"ingest_tables_external": True,
|
||||
|
@ -3,7 +3,11 @@ from typing import Any, Dict, List
|
||||
import pytest
|
||||
|
||||
import datahub.ingestion.source.tableau.tableau_constant as c
|
||||
from datahub.ingestion.source.tableau.tableau import TableauSiteSource
|
||||
from datahub.ingestion.source.tableau.tableau import (
|
||||
DEFAULT_PAGE_SIZE,
|
||||
TableauPageSizeConfig,
|
||||
TableauSiteSource,
|
||||
)
|
||||
from datahub.ingestion.source.tableau.tableau_common import (
|
||||
TableauUpstreamReference,
|
||||
get_filter_pages,
|
||||
@ -276,3 +280,100 @@ def test_tableau_upstream_reference():
|
||||
)
|
||||
except ValueError:
|
||||
assert True
|
||||
|
||||
|
||||
class TestTableauPageSizeConfig:
|
||||
def test_defaults(self):
|
||||
config = TableauPageSizeConfig()
|
||||
assert config.effective_database_server_page_size == DEFAULT_PAGE_SIZE
|
||||
assert config.effective_workbook_page_size == 1
|
||||
assert config.effective_sheet_page_size == DEFAULT_PAGE_SIZE
|
||||
assert config.effective_dashboard_page_size == DEFAULT_PAGE_SIZE
|
||||
assert config.effective_embedded_datasource_page_size == DEFAULT_PAGE_SIZE
|
||||
assert (
|
||||
config.effective_embedded_datasource_field_upstream_page_size
|
||||
== DEFAULT_PAGE_SIZE * 10
|
||||
)
|
||||
assert config.effective_published_datasource_page_size == DEFAULT_PAGE_SIZE
|
||||
assert (
|
||||
config.effective_published_datasource_field_upstream_page_size
|
||||
== DEFAULT_PAGE_SIZE * 10
|
||||
)
|
||||
assert config.effective_custom_sql_table_page_size == DEFAULT_PAGE_SIZE
|
||||
assert config.effective_database_table_page_size == DEFAULT_PAGE_SIZE
|
||||
|
||||
def test_page_size_fallbacks(self):
|
||||
page_size = 33
|
||||
config = TableauPageSizeConfig(page_size=page_size)
|
||||
assert config.effective_database_server_page_size == page_size
|
||||
assert config.effective_workbook_page_size == 1
|
||||
assert config.effective_sheet_page_size == page_size
|
||||
assert config.effective_dashboard_page_size == page_size
|
||||
assert config.effective_embedded_datasource_page_size == page_size
|
||||
assert (
|
||||
config.effective_embedded_datasource_field_upstream_page_size
|
||||
== page_size * 10
|
||||
)
|
||||
assert config.effective_published_datasource_page_size == page_size
|
||||
assert (
|
||||
config.effective_published_datasource_field_upstream_page_size
|
||||
== page_size * 10
|
||||
)
|
||||
assert config.effective_custom_sql_table_page_size == page_size
|
||||
assert config.effective_database_table_page_size == page_size
|
||||
|
||||
def test_fine_grained(self):
|
||||
any_page_size = 55
|
||||
config = TableauPageSizeConfig(database_server_page_size=any_page_size)
|
||||
assert config.page_size == DEFAULT_PAGE_SIZE
|
||||
assert config.effective_database_server_page_size == any_page_size
|
||||
|
||||
config = TableauPageSizeConfig(workbook_page_size=any_page_size)
|
||||
assert config.page_size == DEFAULT_PAGE_SIZE
|
||||
assert config.effective_workbook_page_size == any_page_size
|
||||
|
||||
config = TableauPageSizeConfig(workbook_page_size=None)
|
||||
assert config.page_size == DEFAULT_PAGE_SIZE
|
||||
assert config.effective_workbook_page_size == DEFAULT_PAGE_SIZE
|
||||
|
||||
config = TableauPageSizeConfig(sheet_page_size=any_page_size)
|
||||
assert config.page_size == DEFAULT_PAGE_SIZE
|
||||
assert config.effective_sheet_page_size == any_page_size
|
||||
|
||||
config = TableauPageSizeConfig(dashboard_page_size=any_page_size)
|
||||
assert config.page_size == DEFAULT_PAGE_SIZE
|
||||
assert config.effective_dashboard_page_size == any_page_size
|
||||
|
||||
config = TableauPageSizeConfig(embedded_datasource_page_size=any_page_size)
|
||||
assert config.page_size == DEFAULT_PAGE_SIZE
|
||||
assert config.effective_embedded_datasource_page_size == any_page_size
|
||||
|
||||
config = TableauPageSizeConfig(
|
||||
embedded_datasource_field_upstream_page_size=any_page_size
|
||||
)
|
||||
assert config.page_size == DEFAULT_PAGE_SIZE
|
||||
assert (
|
||||
config.effective_embedded_datasource_field_upstream_page_size
|
||||
== any_page_size
|
||||
)
|
||||
|
||||
config = TableauPageSizeConfig(published_datasource_page_size=any_page_size)
|
||||
assert config.page_size == DEFAULT_PAGE_SIZE
|
||||
assert config.effective_published_datasource_page_size == any_page_size
|
||||
|
||||
config = TableauPageSizeConfig(
|
||||
published_datasource_field_upstream_page_size=any_page_size
|
||||
)
|
||||
assert config.page_size == DEFAULT_PAGE_SIZE
|
||||
assert (
|
||||
config.effective_published_datasource_field_upstream_page_size
|
||||
== any_page_size
|
||||
)
|
||||
|
||||
config = TableauPageSizeConfig(custom_sql_table_page_size=any_page_size)
|
||||
assert config.page_size == DEFAULT_PAGE_SIZE
|
||||
assert config.effective_custom_sql_table_page_size == any_page_size
|
||||
|
||||
config = TableauPageSizeConfig(database_table_page_size=any_page_size)
|
||||
assert config.page_size == DEFAULT_PAGE_SIZE
|
||||
assert config.effective_database_table_page_size == any_page_size
|
||||
|
Loading…
x
Reference in New Issue
Block a user