diff --git a/metadata-ingestion/src/datahub/ingestion/source/tableau/tableau.py b/metadata-ingestion/src/datahub/ingestion/source/tableau/tableau.py index ea3fb6c979..0424a0e692 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/tableau/tableau.py +++ b/metadata-ingestion/src/datahub/ingestion/source/tableau/tableau.py @@ -419,6 +419,17 @@ class TableauConfig( description="[advanced] Number of workbooks to query at a time using the Tableau API.", ) + # Since the field upstream query was separated from the embedded datasource queries into an independent query, + # the number of queries increased significantly and so the execution time. + # To increase the batching and so reduce the number of queries, we can increase the page size for that + # particular case. + # + # `num_(filter_|paginated_)?queries_by_connection_type` metrics in the report will help to understand the impact of this change. + embedded_datasource_field_upstream_page_size: int = Field( + default=100, + description="[advanced] Number of upstream fields to query at a time for embedded datasources using the Tableau API.", + ) + env: str = Field( default=builder.DEFAULT_ENV, description="Environment to use in namespace when constructing URNs.", @@ -700,6 +711,23 @@ class TableauSourceReport( default_factory=(lambda: defaultdict(int)) ) + # Counters for tracking the number of queries made to get_connection_objects method + # by connection type (static and short set of keys): + # - num_queries_by_connection_type: total number of queries + # - num_filter_queries_by_connection_type: number of paginated queries due to splitting query filters + # - num_paginated_queries_by_connection_type: total number of queries due to Tableau pagination + # These counters are useful to understand the impact of changing the page size. + + num_queries_by_connection_type: Dict[str, int] = dataclass_field( + default_factory=(lambda: defaultdict(int)) + ) + num_filter_queries_by_connection_type: Dict[str, int] = dataclass_field( + default_factory=(lambda: defaultdict(int)) + ) + num_paginated_queries_by_connection_type: Dict[str, int] = dataclass_field( + default_factory=(lambda: defaultdict(int)) + ) + def report_user_role(report: TableauSourceReport, server: Server) -> None: title: str = "Insufficient Permissions" @@ -1430,12 +1458,21 @@ class TableauSiteSource: page_size = page_size_override or self.config.page_size filter_pages = get_filter_pages(query_filter, page_size) + self.report.num_queries_by_connection_type[connection_type] += 1 + self.report.num_filter_queries_by_connection_type[connection_type] += len( + filter_pages + ) + for filter_page in filter_pages: has_next_page = 1 current_cursor: Optional[str] = None while has_next_page: filter_: str = make_filter(filter_page) + self.report.num_paginated_queries_by_connection_type[ + connection_type + ] += 1 + self.report.num_expected_tableau_metadata_queries += 1 ( connection_objects, @@ -2632,6 +2669,7 @@ class TableauSiteSource: self, datasource: dict, field_upstream_query: str, + page_size_override: Optional[int] = None, ) -> dict: # Collect field ids to fetch field upstreams field_ids: List[str] = [] @@ -2642,9 +2680,10 @@ class TableauSiteSource: # Fetch field upstreams and arrange them in map field_vs_upstream: Dict[str, dict] = {} for field_upstream in self.get_connection_objects( - field_upstream_query, - c.FIELDS_CONNECTION, - {c.ID_WITH_IN: field_ids}, + query=field_upstream_query, + connection_type=c.FIELDS_CONNECTION, + query_filter={c.ID_WITH_IN: field_ids}, + page_size_override=page_size_override, ): if field_upstream.get(c.ID): field_id = field_upstream[c.ID] @@ -3356,6 +3395,7 @@ class TableauSiteSource: datasource = self.update_datasource_for_field_upstream( datasource=datasource, field_upstream_query=datasource_upstream_fields_graphql_query, + page_size_override=self.config.embedded_datasource_field_upstream_page_size, ) yield from self.emit_datasource( datasource, diff --git a/metadata-ingestion/tests/integration/tableau/test_tableau_ingest.py b/metadata-ingestion/tests/integration/tableau/test_tableau_ingest.py index d7868038a4..417496c559 100644 --- a/metadata-ingestion/tests/integration/tableau/test_tableau_ingest.py +++ b/metadata-ingestion/tests/integration/tableau/test_tableau_ingest.py @@ -70,6 +70,7 @@ config_source_default = { "extract_project_hierarchy": False, "page_size": 1000, "workbook_page_size": 1000, + "embedded_datasource_field_upstream_page_size": 1000, "ingest_tags": True, "ingest_owner": True, "ingest_tables_external": True, @@ -646,6 +647,7 @@ def test_tableau_ingest_with_platform_instance( "projects": ["default", "Project 2"], "page_size": 1000, "workbook_page_size": 1000, + "embedded_datasource_field_upstream_page_size": 1000, "ingest_tags": True, "ingest_owner": True, "ingest_tables_external": True,