mirror of
https://github.com/datahub-project/datahub.git
synced 2026-01-03 05:16:20 +00:00
fix(ingest/tableau): graceful handling of get all datasources failure (#8406)
This commit is contained in:
parent
6ce157d0ff
commit
60b9623675
@ -411,6 +411,15 @@ class DatabaseTable:
|
||||
self.num_cols = num_tbl_cols
|
||||
|
||||
|
||||
class TableauSourceReport(StaleEntityRemovalSourceReport):
|
||||
get_all_datasources_query_failed: bool = False
|
||||
num_get_datasource_query_failures: int = 0
|
||||
num_datasource_field_skipped_no_name: int = 0
|
||||
num_csql_field_skipped_no_name: int = 0
|
||||
num_table_field_skipped_no_name: int = 0
|
||||
num_upstream_table_skipped_no_name: int = 0
|
||||
|
||||
|
||||
@platform_name("Tableau")
|
||||
@config_class(TableauConfig)
|
||||
@support_status(SupportStatus.CERTIFIED)
|
||||
@ -442,7 +451,7 @@ class TableauSource(StatefulIngestionSourceBase):
|
||||
super().__init__(config, ctx)
|
||||
|
||||
self.config: TableauConfig = config
|
||||
self.report: StaleEntityRemovalSourceReport = StaleEntityRemovalSourceReport()
|
||||
self.report: TableauSourceReport = TableauSourceReport()
|
||||
self.server: Optional[Server] = None
|
||||
self.database_tables: Dict[str, DatabaseTable] = {}
|
||||
self.tableau_stat_registry: Dict[str, UsageStat] = {}
|
||||
@ -595,14 +604,20 @@ class TableauSource(StatefulIngestionSourceBase):
|
||||
if self.server is None:
|
||||
return
|
||||
|
||||
for ds in TSC.Pager(self.server.datasources):
|
||||
if ds.project_id not in self.tableau_project_registry:
|
||||
logger.debug(
|
||||
f"project id ({ds.project_id}) of datasource {ds.name} is not present in project "
|
||||
f"registry"
|
||||
)
|
||||
continue
|
||||
self.datasource_project_map[ds.id] = ds.project_id
|
||||
try:
|
||||
for ds in TSC.Pager(self.server.datasources):
|
||||
if ds.project_id not in self.tableau_project_registry:
|
||||
logger.debug(
|
||||
f"project id ({ds.project_id}) of datasource {ds.name} is not present in project "
|
||||
f"registry"
|
||||
)
|
||||
continue
|
||||
self.datasource_project_map[ds.id] = ds.project_id
|
||||
self.report.get_all_datasources_query_failed = False
|
||||
except Exception as e:
|
||||
self.report.get_all_datasources_query_failed = True
|
||||
logger.info(f"Get all datasources query failed due to error {e}")
|
||||
logger.debug("Error stack trace", exc_info=True)
|
||||
|
||||
def _init_workbook_registry(self) -> None:
|
||||
if self.server is None:
|
||||
@ -956,6 +971,7 @@ class TableauSource(StatefulIngestionSourceBase):
|
||||
)
|
||||
continue
|
||||
elif table[tableau_constant.NAME] is None:
|
||||
self.report.num_upstream_table_skipped_no_name += 1
|
||||
logger.warning(
|
||||
f"Skipping upstream table {table[tableau_constant.ID]} from lineage since its name is none: {table}"
|
||||
)
|
||||
@ -1263,6 +1279,7 @@ class TableauSource(StatefulIngestionSourceBase):
|
||||
# Datasource fields
|
||||
|
||||
if field.get(tableau_constant.NAME) is None:
|
||||
self.report.num_csql_field_skipped_no_name += 1
|
||||
logger.warning(
|
||||
f"Skipping field {field[tableau_constant.ID]} from schema since its name is none"
|
||||
)
|
||||
@ -1290,6 +1307,16 @@ class TableauSource(StatefulIngestionSourceBase):
|
||||
return schema_metadata
|
||||
|
||||
def _get_published_datasource_project_luid(self, ds):
|
||||
# This is fallback in case "get all datasources" query fails for some reason.
|
||||
# It is possible due to https://github.com/tableau/server-client-python/issues/1210
|
||||
if (
|
||||
ds.get(tableau_constant.LUID)
|
||||
and ds[tableau_constant.LUID] not in self.datasource_project_map.keys()
|
||||
and self.report.get_all_datasources_query_failed
|
||||
):
|
||||
# Query and update self.datasource_project_map with luid
|
||||
self._query_published_datasource_for_project_luid(ds)
|
||||
|
||||
if (
|
||||
ds.get(tableau_constant.LUID)
|
||||
and ds[tableau_constant.LUID] in self.datasource_project_map.keys()
|
||||
@ -1304,6 +1331,30 @@ class TableauSource(StatefulIngestionSourceBase):
|
||||
|
||||
return None
|
||||
|
||||
def _query_published_datasource_for_project_luid(self, ds: dict) -> None:
|
||||
if self.server is None:
|
||||
return
|
||||
|
||||
try:
|
||||
logger.debug(
|
||||
f"published datasource {ds.get(tableau_constant.NAME)} project_luid not found."
|
||||
f" Running get datasource query for {ds[tableau_constant.LUID]}"
|
||||
)
|
||||
ds_result = self.server.datasources.get_by_id(ds[tableau_constant.LUID])
|
||||
if ds_result.project_id not in self.tableau_project_registry:
|
||||
logger.debug(
|
||||
f"project id ({ds_result.project_id}) of datasource {ds_result.name} is not present in project "
|
||||
f"registry"
|
||||
)
|
||||
else:
|
||||
self.datasource_project_map[ds_result.id] = ds_result.project_id
|
||||
except Exception as e:
|
||||
self.report.num_get_datasource_query_failures += 1
|
||||
logger.warning(
|
||||
f"Failed to get datasource project_luid for {ds[tableau_constant.LUID]} due to error {e}"
|
||||
)
|
||||
logger.debug("Error stack trace", exc_info=True)
|
||||
|
||||
def _get_workbook_project_luid(self, wb):
|
||||
if wb.get(tableau_constant.LUID) and self.workbook_project_map.get(
|
||||
wb[tableau_constant.LUID]
|
||||
@ -1451,6 +1502,7 @@ class TableauSource(StatefulIngestionSourceBase):
|
||||
# check datasource - custom sql relations from a field being referenced
|
||||
self._track_custom_sql_ids(field)
|
||||
if field.get(tableau_constant.NAME) is None:
|
||||
self.report.num_upstream_table_skipped_no_name += 1
|
||||
logger.warning(
|
||||
f"Skipping field {field[tableau_constant.ID]} from schema since its name is none"
|
||||
)
|
||||
@ -1619,29 +1671,38 @@ class TableauSource(StatefulIngestionSourceBase):
|
||||
),
|
||||
)
|
||||
|
||||
if (
|
||||
is_embedded_ds and workbook is not None
|
||||
): # It is embedded then parent is container is workbook
|
||||
container_key = self._get_datasource_container_key(
|
||||
datasource, workbook, is_embedded_ds
|
||||
)
|
||||
if container_key is not None:
|
||||
yield from add_entity_to_container(
|
||||
self.gen_workbook_key(workbook),
|
||||
tableau_constant.DATASET,
|
||||
dataset_snapshot.urn,
|
||||
)
|
||||
elif (
|
||||
datasource.get(tableau_constant.LUID)
|
||||
and datasource[tableau_constant.LUID] in self.datasource_project_map.keys()
|
||||
): # It is published datasource and hence parent container is project
|
||||
yield from add_entity_to_container(
|
||||
self.gen_project_key(
|
||||
self.datasource_project_map[datasource[tableau_constant.LUID]]
|
||||
),
|
||||
container_key,
|
||||
tableau_constant.DATASET,
|
||||
dataset_snapshot.urn,
|
||||
)
|
||||
|
||||
def _get_datasource_container_key(self, datasource, workbook, is_embedded_ds):
|
||||
container_key: Optional[PlatformKey] = None
|
||||
if is_embedded_ds: # It is embedded then parent is container is workbook
|
||||
if workbook is not None:
|
||||
container_key = self.gen_workbook_key(workbook)
|
||||
else:
|
||||
logger.warning(
|
||||
f"Parent container not set for embedded datasource {datasource[tableau_constant.ID]}"
|
||||
)
|
||||
else:
|
||||
logger.warning(
|
||||
f"Parent container not set for datasource {datasource[tableau_constant.ID]}"
|
||||
parent_project_luid = self._get_published_datasource_project_luid(
|
||||
datasource
|
||||
)
|
||||
# It is published datasource and hence parent container is project
|
||||
if parent_project_luid is not None:
|
||||
container_key = self.gen_project_key(parent_project_luid)
|
||||
else:
|
||||
logger.warning(
|
||||
f"Parent container not set for published datasource {datasource[tableau_constant.ID]}"
|
||||
)
|
||||
|
||||
return container_key
|
||||
|
||||
def emit_published_datasources(self) -> Iterable[MetadataWorkUnit]:
|
||||
datasource_filter = f"{tableau_constant.ID_WITH_IN}: {json.dumps(self.datasource_ids_being_used)}"
|
||||
@ -1695,11 +1756,22 @@ class TableauSource(StatefulIngestionSourceBase):
|
||||
dataset_snapshot.aspects.append(browse_paths)
|
||||
else:
|
||||
logger.debug(f"Browse path not set for table {database_table.urn}")
|
||||
schema_metadata = None
|
||||
|
||||
schema_metadata = self.get_schema_metadata_for_table(columns or [])
|
||||
if schema_metadata is not None:
|
||||
dataset_snapshot.aspects.append(schema_metadata)
|
||||
|
||||
yield self.get_metadata_change_event(dataset_snapshot)
|
||||
|
||||
def get_schema_metadata_for_table(
|
||||
self, columns: List[dict]
|
||||
) -> Optional[SchemaMetadata]:
|
||||
schema_metadata: Optional[SchemaMetadata] = None
|
||||
if columns:
|
||||
fields = []
|
||||
for field in columns:
|
||||
if field.get(tableau_constant.NAME) is None:
|
||||
self.report.num_table_field_skipped_no_name += 1
|
||||
logger.warning(
|
||||
f"Skipping field {field[tableau_constant.ID]} from schema since its name is none"
|
||||
)
|
||||
@ -1726,10 +1798,8 @@ class TableauSource(StatefulIngestionSourceBase):
|
||||
hash="",
|
||||
platformSchema=OtherSchema(rawSchema=""),
|
||||
)
|
||||
if schema_metadata is not None:
|
||||
dataset_snapshot.aspects.append(schema_metadata)
|
||||
|
||||
yield self.get_metadata_change_event(dataset_snapshot)
|
||||
return schema_metadata
|
||||
|
||||
def get_sheetwise_upstream_datasources(self, sheet: dict) -> set:
|
||||
sheet_upstream_datasources = set()
|
||||
@ -2368,5 +2438,5 @@ class TableauSource(StatefulIngestionSourceBase):
|
||||
reason=f"Unable to retrieve metadata from tableau. Information: {str(md_exception)}",
|
||||
)
|
||||
|
||||
def get_report(self) -> StaleEntityRemovalSourceReport:
|
||||
def get_report(self) -> TableauSourceReport:
|
||||
return self.report
|
||||
|
||||
@ -188,6 +188,13 @@ def side_effect_workbook_data(*arg, **kwargs):
|
||||
], mock_pagination
|
||||
|
||||
|
||||
def side_effect_datasource_get_by_id(id, *arg, **kwargs):
|
||||
datasources, _ = side_effect_datasource_data()
|
||||
for ds in datasources:
|
||||
if ds._id == id:
|
||||
return ds
|
||||
|
||||
|
||||
def tableau_ingest_common(
|
||||
pytestconfig,
|
||||
tmp_path,
|
||||
@ -198,6 +205,7 @@ def tableau_ingest_common(
|
||||
pipeline_config=config_source_default,
|
||||
sign_out_side_effect=lambda: None,
|
||||
pipeline_name="tableau-test-pipeline",
|
||||
datasources_side_effect=side_effect_datasource_data,
|
||||
):
|
||||
with mock.patch(
|
||||
"datahub.ingestion.source.state_provider.datahub_ingestion_checkpointing_provider.DataHubGraph",
|
||||
@ -215,7 +223,10 @@ def tableau_ingest_common(
|
||||
mock_client.projects = mock.Mock()
|
||||
mock_client.projects.get.side_effect = side_effect_project_data
|
||||
mock_client.datasources = mock.Mock()
|
||||
mock_client.datasources.get.side_effect = side_effect_datasource_data
|
||||
mock_client.datasources.get.side_effect = datasources_side_effect
|
||||
mock_client.datasources.get_by_id.side_effect = (
|
||||
side_effect_datasource_get_by_id
|
||||
)
|
||||
mock_client.workbooks = mock.Mock()
|
||||
mock_client.workbooks.get.side_effect = side_effect_workbook_data
|
||||
mock_client.views.get.side_effect = side_effect_usage_stat
|
||||
@ -243,12 +254,13 @@ def tableau_ingest_common(
|
||||
pipeline.run()
|
||||
pipeline.raise_from_status()
|
||||
|
||||
mce_helpers.check_golden_file(
|
||||
pytestconfig,
|
||||
output_path=f"{tmp_path}/{output_file_name}",
|
||||
golden_path=test_resources_dir / golden_file_name,
|
||||
ignore_paths=mce_helpers.IGNORE_PATH_TIMESTAMPS,
|
||||
)
|
||||
if golden_file_name:
|
||||
mce_helpers.check_golden_file(
|
||||
pytestconfig,
|
||||
output_path=f"{tmp_path}/{output_file_name}",
|
||||
golden_path=test_resources_dir / golden_file_name,
|
||||
ignore_paths=mce_helpers.IGNORE_PATH_TIMESTAMPS,
|
||||
)
|
||||
return pipeline
|
||||
|
||||
|
||||
@ -767,3 +779,28 @@ def test_tableau_unsupported_csql(mock_datahub_graph):
|
||||
mcp.entityUrn
|
||||
== "urn:li:dataset:(urn:li:dataPlatform:tableau,09988088-05ad-173c-a2f1-f33ba3a13d1a,PROD)"
|
||||
)
|
||||
|
||||
|
||||
@freeze_time(FROZEN_TIME)
|
||||
@pytest.mark.integration
|
||||
def test_get_all_datasources_failure(pytestconfig, tmp_path, mock_datahub_graph):
|
||||
output_file_name: str = "tableau_mces.json"
|
||||
golden_file_name: str = "tableau_mces_golden.json"
|
||||
tableau_ingest_common(
|
||||
pytestconfig,
|
||||
tmp_path,
|
||||
[
|
||||
read_response(pytestconfig, "workbooksConnection_all.json"),
|
||||
read_response(pytestconfig, "sheetsConnection_all.json"),
|
||||
read_response(pytestconfig, "dashboardsConnection_all.json"),
|
||||
read_response(pytestconfig, "embeddedDatasourcesConnection_all.json"),
|
||||
read_response(pytestconfig, "publishedDatasourcesConnection_all.json"),
|
||||
read_response(pytestconfig, "customSQLTablesConnection_all.json"),
|
||||
read_response(pytestconfig, "databaseTablesConnection_all.json"),
|
||||
],
|
||||
golden_file_name,
|
||||
output_file_name,
|
||||
mock_datahub_graph,
|
||||
pipeline_name="test_tableau_ingest",
|
||||
datasources_side_effect=ValueError("project_id must be defined."),
|
||||
)
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user