datahub/metadata-ingestion/tests/unit/tableau/test_tableau_source.py

719 lines
25 KiB
Python

import json
import pathlib
from typing import Any, Dict, List, cast
from unittest import mock
import pytest
from freezegun import freeze_time
from tableauserverclient import Server
import datahub.ingestion.source.tableau.tableau_constant as c
from datahub.emitter.mce_builder import DEFAULT_ENV, make_schema_field_urn
from datahub.emitter.mcp import MetadataChangeProposalWrapper
from datahub.ingestion.api.common import PipelineContext
from datahub.ingestion.api.source import TestConnectionReport
from datahub.ingestion.source.tableau.tableau import (
DEFAULT_PAGE_SIZE,
SiteIdContentUrl,
TableauConfig,
TableauPageSizeConfig,
TableauSiteSource,
TableauSource,
TableauSourceReport,
)
from datahub.ingestion.source.tableau.tableau_common import (
TableauLineageOverrides,
TableauUpstreamReference,
get_filter_pages,
make_filter,
optimize_query_filter,
tableau_field_to_schema_field,
)
from datahub.metadata.com.linkedin.pegasus2avro.schema import SchemaField
from datahub.metadata.schema_classes import (
DatasetLineageTypeClass,
FineGrainedLineageClass,
FineGrainedLineageDownstreamTypeClass,
FineGrainedLineageUpstreamTypeClass,
UpstreamClass,
UpstreamLineageClass,
)
from tests.test_helpers import test_connection_helpers
from tests.unit.tableau.test_tableau_config import default_config
FROZEN_TIME = "2021-12-07 07:00:00"
test_resources_dir = pathlib.Path(__file__).parent
def read_response(file_name):
response_json_path = f"{test_resources_dir}/setup/{file_name}"
with open(response_json_path) as file:
data = json.loads(file.read())
return data
@freeze_time(FROZEN_TIME)
def test_tableau_test_connection_success():
with mock.patch("datahub.ingestion.source.tableau.tableau.Server"):
report = test_connection_helpers.run_test_connection(
TableauSource, default_config
)
test_connection_helpers.assert_basic_connectivity_success(report)
@freeze_time(FROZEN_TIME)
def test_tableau_test_connection_failure():
report = test_connection_helpers.run_test_connection(TableauSource, default_config)
test_connection_helpers.assert_basic_connectivity_failure(report, "Unable to login")
def test_connection_report_test(requests_mock):
server_info_response = """
<tsResponse xmlns:t="http://tableau.com/api">
<t:serverInfo>
<t:productVersion build="build-number">foo</t:productVersion>
<t:restApiVersion>2.4</t:restApiVersion>
</t:serverInfo>
</tsResponse>
"""
requests_mock.register_uri(
"GET",
"https://do-not-connect/api/2.4/serverInfo",
text=server_info_response,
status_code=200,
headers={"Content-Type": "application/xml"},
)
signin_response = """
<tsResponse xmlns:t="http://tableau.com/api">
<t:credentials token="fake_token">
<t:site id="fake_site_luid" contentUrl="fake_site_content_url"/>
<t:user id="fake_user_id"/>
</t:credentials>
</tsResponse>
"""
requests_mock.register_uri(
"POST",
"https://do-not-connect/api/2.4/auth/signin",
text=signin_response,
status_code=200,
headers={"Content-Type": "application/xml"},
)
user_by_id_response = """
<tsResponse xmlns:t="http://tableau.com/api">
<t:user id="user-id" name="foo@abc.com" siteRole="SiteAdministratorExplorer" />
</tsResponse>
"""
requests_mock.register_uri(
"GET",
"https://do-not-connect/api/2.4/sites/fake_site_luid/users/fake_user_id",
text=user_by_id_response,
status_code=200,
headers={"Content-Type": "application/xml"},
)
report: TestConnectionReport = TableauSource.test_connection(default_config)
assert report
assert report.capability_report
assert report.capability_report.get(c.SITE_PERMISSION)
assert report.capability_report[c.SITE_PERMISSION].capable
# Role other than SiteAdministratorExplorer
user_by_id_response = """
<tsResponse xmlns:t="http://tableau.com/api">
<t:user id="user-id" name="foo@abc.com" siteRole="Explorer" />
</tsResponse>
"""
requests_mock.register_uri(
"GET",
"https://do-not-connect/api/2.4/sites/fake_site_luid/users/fake_user_id",
text=user_by_id_response,
status_code=200,
headers={"Content-Type": "application/xml"},
)
report = TableauSource.test_connection(default_config)
assert report
assert report.capability_report
assert report.capability_report.get(c.SITE_PERMISSION)
assert report.capability_report[c.SITE_PERMISSION].capable is False
assert (
report.capability_report[c.SITE_PERMISSION].failure_reason
== "The user does not have the `Site Administrator Explorer` role. Their current role is Explorer."
)
def test_tableau_no_verify():
# This test ensures that we can connect to a self-signed certificate
# when ssl_verify is set to False.
source = TableauSource.create(
{
"connect_uri": "https://self-signed.badssl.com/",
"ssl_verify": False,
"site": "bogus",
# Credentials
"username": "bogus",
"password": "bogus",
},
PipelineContext(run_id="0"),
)
list(source.get_workunits())
report = source.get_report().as_string()
assert "SSL" not in report
assert "Unable to login" in report
def test_tableau_unsupported_csql():
context = PipelineContext(run_id="0", pipeline_name="test_tableau")
config_dict = default_config.copy()
del config_dict["stateful_ingestion"]
config = TableauConfig.parse_obj(config_dict)
config.extract_lineage_from_unsupported_custom_sql_queries = True
config.lineage_overrides = TableauLineageOverrides(
database_override_map={"production database": "prod"}
)
def check_lineage_metadata(
lineage, expected_entity_urn, expected_upstream_table, expected_cll
):
mcp = cast(MetadataChangeProposalWrapper, list(lineage)[0].metadata)
expected = UpstreamLineageClass(
upstreams=[
UpstreamClass(
dataset=expected_upstream_table,
type=DatasetLineageTypeClass.TRANSFORMED,
)
],
fineGrainedLineages=[
FineGrainedLineageClass(
upstreamType=FineGrainedLineageUpstreamTypeClass.FIELD_SET,
upstreams=[
make_schema_field_urn(expected_upstream_table, upstream_column)
],
downstreamType=FineGrainedLineageDownstreamTypeClass.FIELD,
downstreams=[
make_schema_field_urn(expected_entity_urn, downstream_column)
],
)
for upstream_column, downstream_column in expected_cll.items()
],
)
assert mcp.entityUrn == expected_entity_urn
actual_aspect = mcp.aspect
assert actual_aspect == expected
csql_urn = "urn:li:dataset:(urn:li:dataPlatform:tableau,09988088-05ad-173c-a2f1-f33ba3a13d1a,PROD)"
expected_upstream_table = "urn:li:dataset:(urn:li:dataPlatform:bigquery,my_bigquery_project.invent_dw.UserDetail,PROD)"
expected_cll = {
"user_id": "user_id",
"source": "source",
"user_source": "user_source",
}
site_source = TableauSiteSource(
config=config,
ctx=context,
platform="tableau",
site=SiteIdContentUrl(site_id="id1", site_content_url="site1"),
report=TableauSourceReport(),
server=Server("https://test-tableau-server.com"),
)
lineage = site_source._create_lineage_from_unsupported_csql(
csql_urn=csql_urn,
csql={
"query": "SELECT user_id, source, user_source FROM (SELECT *, ROW_NUMBER() OVER (partition BY user_id ORDER BY __partition_day DESC) AS rank_ FROM invent_dw.UserDetail ) source_user WHERE rank_ = 1",
"isUnsupportedCustomSql": "true",
"connectionType": "bigquery",
"database": {
"name": "my_bigquery_project",
"connectionType": "bigquery",
},
},
out_columns=[],
)
check_lineage_metadata(
lineage=lineage,
expected_entity_urn=csql_urn,
expected_upstream_table=expected_upstream_table,
expected_cll=expected_cll,
)
# With database as None
lineage = site_source._create_lineage_from_unsupported_csql(
csql_urn=csql_urn,
csql={
"query": "SELECT user_id, source, user_source FROM (SELECT *, ROW_NUMBER() OVER (partition BY user_id ORDER BY __partition_day DESC) AS rank_ FROM my_bigquery_project.invent_dw.UserDetail ) source_user WHERE rank_ = 1",
"isUnsupportedCustomSql": "true",
"connectionType": "bigquery",
"database": None,
},
out_columns=[],
)
check_lineage_metadata(
lineage=lineage,
expected_entity_urn=csql_urn,
expected_upstream_table=expected_upstream_table,
expected_cll=expected_cll,
)
def test_lineage_overrides():
# Simple - specify platform instance to presto table
assert (
TableauUpstreamReference(
"presto_catalog",
"test-database-id",
"test-schema",
"test-table",
"presto",
).make_dataset_urn(
env=DEFAULT_ENV, platform_instance_map={"presto": "my_presto_instance"}
)
== "urn:li:dataset:(urn:li:dataPlatform:presto,my_presto_instance.presto_catalog.test-schema.test-table,PROD)"
)
# Transform presto urn to hive urn
# resulting platform instance for hive = mapped platform instance + presto_catalog
assert (
TableauUpstreamReference(
"presto_catalog",
"test-database-id",
"test-schema",
"test-table",
"presto",
).make_dataset_urn(
env=DEFAULT_ENV,
platform_instance_map={"presto": "my_instance"},
lineage_overrides=TableauLineageOverrides(
platform_override_map={"presto": "hive"},
),
)
== "urn:li:dataset:(urn:li:dataPlatform:hive,my_instance.presto_catalog.test-schema.test-table,PROD)"
)
# transform hive urn to presto urn
assert (
TableauUpstreamReference(
None,
None,
"test-schema",
"test-table",
"hive",
).make_dataset_urn(
env=DEFAULT_ENV,
platform_instance_map={"hive": "my_presto_instance.presto_catalog"},
lineage_overrides=TableauLineageOverrides(
platform_override_map={"hive": "presto"},
),
)
== "urn:li:dataset:(urn:li:dataPlatform:presto,my_presto_instance.presto_catalog.test-schema.test-table,PROD)"
)
def test_database_hostname_to_platform_instance_map():
# Simple - snowflake table
assert (
TableauUpstreamReference(
"test-database-name",
"test-database-id",
"test-schema",
"test-table",
"snowflake",
).make_dataset_urn(env=DEFAULT_ENV, platform_instance_map={})
== "urn:li:dataset:(urn:li:dataPlatform:snowflake,test-database-name.test-schema.test-table,PROD)"
)
# Finding platform instance based off hostname to platform instance mappings
assert (
TableauUpstreamReference(
"test-database-name",
"test-database-id",
"test-schema",
"test-table",
"snowflake",
).make_dataset_urn(
env=DEFAULT_ENV,
platform_instance_map={},
database_hostname_to_platform_instance_map={
"test-hostname": "test-platform-instance"
},
database_server_hostname_map={"test-database-id": "test-hostname"},
)
== "urn:li:dataset:(urn:li:dataPlatform:snowflake,test-platform-instance.test-database-name.test-schema.test-table,PROD)"
)
def test_tableau_source_handles_none_nativedatatype():
field: Dict[str, Any] = {
"__typename": "CalculatedField",
"id": "abcd",
"name": "Test Field",
"description": None,
"isHidden": False,
"folderName": None,
"upstreamFields": [],
"upstreamColumns": [],
"role": None,
"dataType": None,
"defaultFormat": "s",
"aggregation": None,
"formula": "a/b + d",
}
schema_field: SchemaField = tableau_field_to_schema_field(
field=field, ingest_tags=False
)
assert schema_field.nativeDataType == "UNKNOWN"
def test_tableau_source_unescapes_lt():
res = TableauSiteSource._clean_tableau_query_parameters(
"select * from t where c1 << 135"
)
assert res == "select * from t where c1 < 135"
def test_tableau_source_unescapes_gt():
res = TableauSiteSource._clean_tableau_query_parameters(
"select * from t where c1 >> 135"
)
assert res == "select * from t where c1 > 135"
def test_tableau_source_unescapes_gte():
res = TableauSiteSource._clean_tableau_query_parameters(
"select * from t where c1 >>= 135"
)
assert res == "select * from t where c1 >= 135"
def test_tableau_source_unescapeslgte():
res = TableauSiteSource._clean_tableau_query_parameters(
"select * from t where c1 <<= 135"
)
assert res == "select * from t where c1 <= 135"
def test_tableau_source_doesnt_touch_not_escaped():
res = TableauSiteSource._clean_tableau_query_parameters(
"select * from t where c1 < 135 and c2 > 15"
)
assert res == "select * from t where c1 < 135 and c2 > 15"
TABLEAU_PARAMS = [
"<Parameters.MyParam>",
"<Parameters.MyParam_1>",
"<Parameters.My Param _ 1>",
"<Parameters.My Param 1 !@\"',.#$%^:;&*()-_+={}|\\ /<>",
"<[Parameters].MyParam>",
"<[Parameters].MyParam_1>",
"<[Parameters].My Param _ 1>",
"<[Parameters].My Param 1 !@\"',.#$%^:;&*()-_+={}|\\ /<>",
"<Parameters.[MyParam]>",
"<Parameters.[MyParam_1]>",
"<Parameters.[My Param _ 1]>",
"<Parameters.[My Param 1 !@\"',.#$%^:;&*()-_+={}|\\ /<]>",
"<[Parameters].[MyParam]>",
"<[Parameters].[MyParam_1]>",
"<[Parameters].[My Param _ 1]>",
"<[Parameters].[My Param 1 !@\"',.#$%^:;&*()-_+={}|\\ /<]>",
"<Parameters.[My Param 1 !@\"',.#$%^:;&*()-_+={}|\\ /<>]>",
"<[Parameters].[My Param 1 !@\"',.#$%^:;&*()-_+={}|\\ /<>]>",
]
@pytest.mark.parametrize("p", TABLEAU_PARAMS)
def test_tableau_source_cleanups_tableau_parameters_in_equi_predicates(p):
assert (
TableauSiteSource._clean_tableau_query_parameters(
f"select * from t where c1 = {p} and c2 = {p} and c3 = 7"
)
== "select * from t where c1 = 1 and c2 = 1 and c3 = 7"
)
@pytest.mark.parametrize("p", TABLEAU_PARAMS)
def test_tableau_source_cleanups_tableau_parameters_in_lt_gt_predicates(p):
assert (
TableauSiteSource._clean_tableau_query_parameters(
f"select * from t where c1 << {p} and c2<<{p} and c3 >> {p} and c4>>{p} or {p} >> c1 and {p}>>c2 and {p} << c3 and {p}<<c4"
)
== "select * from t where c1 < 1 and c2<1 and c3 > 1 and c4>1 or 1 > c1 and 1>c2 and 1 < c3 and 1<c4"
)
@pytest.mark.parametrize("p", TABLEAU_PARAMS)
def test_tableau_source_cleanups_tableau_parameters_in_lte_gte_predicates(p):
assert (
TableauSiteSource._clean_tableau_query_parameters(
f"select * from t where c1 <<= {p} and c2<<={p} and c3 >>= {p} and c4>>={p} or {p} >>= c1 and {p}>>=c2 and {p} <<= c3 and {p}<<=c4"
)
== "select * from t where c1 <= 1 and c2<=1 and c3 >= 1 and c4>=1 or 1 >= c1 and 1>=c2 and 1 <= c3 and 1<=c4"
)
@pytest.mark.parametrize("p", TABLEAU_PARAMS)
def test_tableau_source_cleanups_tableau_parameters_in_join_predicate(p):
assert (
TableauSiteSource._clean_tableau_query_parameters(
f"select * from t1 inner join t2 on t1.id = t2.id and t2.c21 = {p} and t1.c11 = 123 + {p}"
)
== "select * from t1 inner join t2 on t1.id = t2.id and t2.c21 = 1 and t1.c11 = 123 + 1"
)
@pytest.mark.parametrize("p", TABLEAU_PARAMS)
def test_tableau_source_cleanups_tableau_parameters_in_complex_expressions(p):
assert (
TableauSiteSource._clean_tableau_query_parameters(
f"select myudf1(c1, {p}, c2) / myudf2({p}) > ({p} + 3 * {p} * c5) * {p} - c4"
)
== "select myudf1(c1, 1, c2) / myudf2(1) > (1 + 3 * 1 * c5) * 1 - c4"
)
@pytest.mark.parametrize("p", TABLEAU_PARAMS)
def test_tableau_source_cleanups_tableau_parameters_in_udfs(p):
assert (
TableauSiteSource._clean_tableau_query_parameters(f"select myudf({p}) from t")
== "select myudf(1) from t"
)
def test_make_id_filter():
ids = [i for i in range(1, 6)]
filter_dict = {c.ID_WITH_IN: ids}
assert make_filter(filter_dict) == f"{c.ID_WITH_IN}: [1, 2, 3, 4, 5]"
def test_make_project_filter():
projects = ["x", "y", "z"]
filter_dict = {c.PROJECT_NAME_WITH_IN: projects}
assert make_filter(filter_dict) == f'{c.PROJECT_NAME_WITH_IN}: ["x", "y", "z"]'
def test_make_multiple_filters():
ids = [i for i in range(1, 6)]
projects = ["x", "y", "z"]
filter_dict = {c.ID_WITH_IN: ids, c.PROJECT_NAME_WITH_IN: projects}
assert (
make_filter(filter_dict)
== f'{c.ID_WITH_IN}: [1, 2, 3, 4, 5], {c.PROJECT_NAME_WITH_IN}: ["x", "y", "z"]'
)
def test_get_filter_pages_simple():
ids = [i for i in range(5)]
filter_dict = {c.ID_WITH_IN: ids}
assert get_filter_pages(filter_dict, 10) == [filter_dict]
def test_get_filter_pages_non_id_large_filter():
projects = [f"project{i}" for i in range(10)]
filter_dict = {c.PROJECT_NAME_WITH_IN: projects}
assert get_filter_pages(filter_dict, 10) == [filter_dict]
def test_get_filter_pages_for_single_key():
projects = ["project1"]
filter_dict = {c.PROJECT_NAME_WITH_IN: projects}
assert get_filter_pages(filter_dict, 10) == [filter_dict]
def test_get_filter_pages_id_filter_splits_into_multiple_filters():
page_size = 10
num_ids = 20000
ids = [f"id_{i}" for i in range(num_ids)]
filter_dict = {c.ID_WITH_IN: ids}
assert get_filter_pages(filter_dict, page_size) == [
{c.ID_WITH_IN: filter_dict[c.ID_WITH_IN][i : i + page_size]}
for i in range(0, num_ids, page_size)
]
def test_optimize_query_filter_removes_duplicates():
query_filter = {
c.ID_WITH_IN: ["id1", "id2", "id1"],
c.PROJECT_NAME_WITH_IN: ["project1", "project2", "project1"],
}
result = optimize_query_filter(query_filter)
assert len(result) == 2
assert result[c.ID_WITH_IN] == ["id1", "id2"]
assert result[c.PROJECT_NAME_WITH_IN] == ["project1", "project2"]
def test_optimize_query_filter_handles_empty_lists():
query_filter: Dict[str, List[str]] = {c.ID_WITH_IN: [], c.PROJECT_NAME_WITH_IN: []}
result = optimize_query_filter(query_filter)
assert len(result) == 2
assert result[c.ID_WITH_IN] == []
assert result[c.PROJECT_NAME_WITH_IN] == []
def test_optimize_query_filter_handles_missing_keys():
query_filter: Dict[str, List[str]] = {}
result = optimize_query_filter(query_filter)
assert result == {}
def test_optimize_query_filter_handles_other_keys():
query_filter = {"any_other_key": ["id1", "id2", "id1"]}
result = optimize_query_filter(query_filter)
assert len(result) == 1
assert result["any_other_key"] == ["id1", "id2", "id1"]
def test_optimize_query_filter_handles_no_duplicates():
query_filter = {
c.ID_WITH_IN: ["id1", "id2"],
c.PROJECT_NAME_WITH_IN: ["project1", "project2"],
}
result = optimize_query_filter(query_filter)
assert len(result) == 2
assert result[c.ID_WITH_IN] == ["id1", "id2"]
assert result[c.PROJECT_NAME_WITH_IN] == ["project1", "project2"]
def test_tableau_upstream_reference():
d = {
"id": "7127b695-3df5-4a3a-4837-eb0f4b572337",
"name": "TABLE1",
"database": None,
"schema": "SCHEMA1",
"fullName": "DB1.SCHEMA1.TABLE1",
"connectionType": "snowflake",
"description": "",
"columnsConnection": {"totalCount": 0},
}
ref = TableauUpstreamReference.create(d)
assert ref
assert ref.database == "DB1"
assert ref.schema == "SCHEMA1"
assert ref.table == "TABLE1"
assert ref.connection_type == "snowflake"
try:
ref = TableauUpstreamReference.create(None) # type: ignore[arg-type]
raise AssertionError(
"TableauUpstreamReference.create with None should have raised exception"
)
except ValueError:
assert True
class TestTableauPageSizeConfig:
def test_defaults(self):
config = TableauPageSizeConfig()
assert config.effective_database_server_page_size == DEFAULT_PAGE_SIZE
assert config.effective_workbook_page_size == 1
assert config.effective_sheet_page_size == DEFAULT_PAGE_SIZE
assert config.effective_dashboard_page_size == DEFAULT_PAGE_SIZE
assert config.effective_embedded_datasource_page_size == DEFAULT_PAGE_SIZE
assert (
config.effective_embedded_datasource_field_upstream_page_size
== DEFAULT_PAGE_SIZE * 10
)
assert config.effective_published_datasource_page_size == DEFAULT_PAGE_SIZE
assert (
config.effective_published_datasource_field_upstream_page_size
== DEFAULT_PAGE_SIZE * 10
)
assert config.effective_custom_sql_table_page_size == DEFAULT_PAGE_SIZE
assert config.effective_database_table_page_size == DEFAULT_PAGE_SIZE
def test_page_size_fallbacks(self):
page_size = 33
config = TableauPageSizeConfig(page_size=page_size)
assert config.effective_database_server_page_size == page_size
assert config.effective_workbook_page_size == 1
assert config.effective_sheet_page_size == page_size
assert config.effective_dashboard_page_size == page_size
assert config.effective_embedded_datasource_page_size == page_size
assert (
config.effective_embedded_datasource_field_upstream_page_size
== page_size * 10
)
assert config.effective_published_datasource_page_size == page_size
assert (
config.effective_published_datasource_field_upstream_page_size
== page_size * 10
)
assert config.effective_custom_sql_table_page_size == page_size
assert config.effective_database_table_page_size == page_size
def test_fine_grained(self):
any_page_size = 55
config = TableauPageSizeConfig(database_server_page_size=any_page_size)
assert config.page_size == DEFAULT_PAGE_SIZE
assert config.effective_database_server_page_size == any_page_size
config = TableauPageSizeConfig(workbook_page_size=any_page_size)
assert config.page_size == DEFAULT_PAGE_SIZE
assert config.effective_workbook_page_size == any_page_size
config = TableauPageSizeConfig(workbook_page_size=None)
assert config.page_size == DEFAULT_PAGE_SIZE
assert config.effective_workbook_page_size == DEFAULT_PAGE_SIZE
config = TableauPageSizeConfig(sheet_page_size=any_page_size)
assert config.page_size == DEFAULT_PAGE_SIZE
assert config.effective_sheet_page_size == any_page_size
config = TableauPageSizeConfig(dashboard_page_size=any_page_size)
assert config.page_size == DEFAULT_PAGE_SIZE
assert config.effective_dashboard_page_size == any_page_size
config = TableauPageSizeConfig(embedded_datasource_page_size=any_page_size)
assert config.page_size == DEFAULT_PAGE_SIZE
assert config.effective_embedded_datasource_page_size == any_page_size
config = TableauPageSizeConfig(
embedded_datasource_field_upstream_page_size=any_page_size
)
assert config.page_size == DEFAULT_PAGE_SIZE
assert (
config.effective_embedded_datasource_field_upstream_page_size
== any_page_size
)
config = TableauPageSizeConfig(published_datasource_page_size=any_page_size)
assert config.page_size == DEFAULT_PAGE_SIZE
assert config.effective_published_datasource_page_size == any_page_size
config = TableauPageSizeConfig(
published_datasource_field_upstream_page_size=any_page_size
)
assert config.page_size == DEFAULT_PAGE_SIZE
assert (
config.effective_published_datasource_field_upstream_page_size
== any_page_size
)
config = TableauPageSizeConfig(custom_sql_table_page_size=any_page_size)
assert config.page_size == DEFAULT_PAGE_SIZE
assert config.effective_custom_sql_table_page_size == any_page_size
config = TableauPageSizeConfig(database_table_page_size=any_page_size)
assert config.page_size == DEFAULT_PAGE_SIZE
assert config.effective_database_table_page_size == any_page_size