fix(tableau): fixes some aspects being emitted multiple times (#12258)

This commit is contained in:
Sergio Gómez Villamor 2025-01-03 09:15:53 +01:00 committed by GitHub
parent 539f521388
commit 1190dd95b2
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 66 additions and 1 deletions

View File

@ -109,6 +109,7 @@ from datahub.ingestion.source.tableau.tableau_common import (
make_filter,
make_fine_grained_lineage_class,
make_upstream_class,
optimize_query_filter,
published_datasource_graphql_query,
query_metadata_cursor_based_pagination,
sheet_graphql_query,
@ -1363,6 +1364,8 @@ class TableauSiteSource:
query_filter: dict = {},
page_size_override: Optional[int] = None,
) -> Iterable[dict]:
query_filter = optimize_query_filter(query_filter)
# Calls the get_connection_object_page function to get the objects,
# and automatically handles pagination.
page_size = page_size_override or self.config.page_size

View File

@ -1,3 +1,4 @@
import copy
import html
import json
import logging
@ -35,6 +36,7 @@ from datahub.metadata.schema_classes import (
UpstreamClass,
)
from datahub.sql_parsing.sqlglot_lineage import ColumnLineageInfo, SqlParsingResult
from datahub.utilities.ordered_set import OrderedSet
logger = logging.getLogger(__name__)
@ -1000,3 +1002,19 @@ def get_filter_pages(query_filter: dict, page_size: int) -> List[dict]:
]
return filter_pages
def optimize_query_filter(query_filter: dict) -> dict:
"""
Duplicates in the filter cause duplicates in the result,
leading to entities/aspects being emitted multiple times unnecessarily
"""
optimized_query = copy.deepcopy(query_filter)
if query_filter.get(c.ID_WITH_IN):
optimized_query[c.ID_WITH_IN] = list(OrderedSet(query_filter[c.ID_WITH_IN]))
if query_filter.get(c.PROJECT_NAME_WITH_IN):
optimized_query[c.PROJECT_NAME_WITH_IN] = list(
OrderedSet(query_filter[c.PROJECT_NAME_WITH_IN])
)
return optimized_query

View File

@ -1,4 +1,4 @@
from typing import Any, Dict
from typing import Any, Dict, List
import pytest
@ -7,6 +7,7 @@ from datahub.ingestion.source.tableau.tableau import TableauSiteSource
from datahub.ingestion.source.tableau.tableau_common import (
get_filter_pages,
make_filter,
optimize_query_filter,
tableau_field_to_schema_field,
)
from datahub.metadata.com.linkedin.pegasus2avro.schema import SchemaField
@ -203,3 +204,46 @@ def test_get_filter_pages_id_filter_splits_into_multiple_filters():
{c.ID_WITH_IN: filter_dict[c.ID_WITH_IN][i : i + page_size]}
for i in range(0, num_ids, page_size)
]
def test_optimize_query_filter_removes_duplicates():
query_filter = {
c.ID_WITH_IN: ["id1", "id2", "id1"],
c.PROJECT_NAME_WITH_IN: ["project1", "project2", "project1"],
}
result = optimize_query_filter(query_filter)
assert len(result) == 2
assert result[c.ID_WITH_IN] == ["id1", "id2"]
assert result[c.PROJECT_NAME_WITH_IN] == ["project1", "project2"]
def test_optimize_query_filter_handles_empty_lists():
query_filter: Dict[str, List[str]] = {c.ID_WITH_IN: [], c.PROJECT_NAME_WITH_IN: []}
result = optimize_query_filter(query_filter)
assert len(result) == 2
assert result[c.ID_WITH_IN] == []
assert result[c.PROJECT_NAME_WITH_IN] == []
def test_optimize_query_filter_handles_missing_keys():
query_filter: Dict[str, List[str]] = {}
result = optimize_query_filter(query_filter)
assert result == {}
def test_optimize_query_filter_handles_other_keys():
query_filter = {"any_other_key": ["id1", "id2", "id1"]}
result = optimize_query_filter(query_filter)
assert len(result) == 1
assert result["any_other_key"] == ["id1", "id2", "id1"]
def test_optimize_query_filter_handles_no_duplicates():
query_filter = {
c.ID_WITH_IN: ["id1", "id2"],
c.PROJECT_NAME_WITH_IN: ["project1", "project2"],
}
result = optimize_query_filter(query_filter)
assert len(result) == 2
assert result[c.ID_WITH_IN] == ["id1", "id2"]
assert result[c.PROJECT_NAME_WITH_IN] == ["project1", "project2"]