chore(ingest/tableau): miscellaneous cleanup refractor (#8417)

Co-authored-by: Andrew Sikowitz <andrew.sikowitz@acryl.io>
This commit is contained in:
Mayuri Nehate 2023-07-27 17:02:48 +05:30 committed by GitHub
parent 5a3f91de53
commit b9060db515
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 1071 additions and 37041 deletions

View File

@ -613,7 +613,6 @@ class TableauSource(StatefulIngestionSourceBase):
) )
continue continue
self.datasource_project_map[ds.id] = ds.project_id self.datasource_project_map[ds.id] = ds.project_id
self.report.get_all_datasources_query_failed = False
except Exception as e: except Exception as e:
self.report.get_all_datasources_query_failed = True self.report.get_all_datasources_query_failed = True
logger.info(f"Get all datasources query failed due to error {e}") logger.info(f"Get all datasources query failed due to error {e}")
@ -763,7 +762,7 @@ class TableauSource(StatefulIngestionSourceBase):
offset += count offset += count
for obj in connection_objects.get(tableau_constant.NODES, []): for obj in connection_objects.get(tableau_constant.NODES) or []:
yield obj yield obj
def emit_workbooks(self) -> Iterable[MetadataWorkUnit]: def emit_workbooks(self) -> Iterable[MetadataWorkUnit]:
@ -977,18 +976,19 @@ class TableauSource(StatefulIngestionSourceBase):
) )
continue continue
schema = table.get(tableau_constant.SCHEMA, "") schema = table.get(tableau_constant.SCHEMA) or ""
table_name = table.get(tableau_constant.NAME, "") table_name = table.get(tableau_constant.NAME) or ""
full_name = table.get(tableau_constant.FULL_NAME, "") full_name = table.get(tableau_constant.FULL_NAME) or ""
upstream_db = ( upstream_db = (
table.get(tableau_constant.DATABASE, {}).get(tableau_constant.NAME, "") table[tableau_constant.DATABASE][tableau_constant.NAME]
if table.get(tableau_constant.DATABASE) is not None if table.get(tableau_constant.DATABASE)
and table[tableau_constant.DATABASE].get(tableau_constant.NAME)
else "" else ""
) )
logger.debug( logger.debug(
"Processing Table with Connection Type: {0} and id {1}".format( "Processing Table with Connection Type: {0} and id {1}".format(
table.get(tableau_constant.CONNECTION_TYPE, ""), table.get(tableau_constant.CONNECTION_TYPE) or "",
table.get(tableau_constant.ID, ""), table.get(tableau_constant.ID) or "",
) )
) )
schema = self._get_schema(schema, upstream_db, full_name) schema = self._get_schema(schema, upstream_db, full_name)
@ -1008,7 +1008,7 @@ class TableauSource(StatefulIngestionSourceBase):
table_urn = make_table_urn( table_urn = make_table_urn(
self.config.env, self.config.env,
upstream_db, upstream_db,
table.get(tableau_constant.CONNECTION_TYPE, ""), table.get(tableau_constant.CONNECTION_TYPE) or "",
schema, schema,
table_name, table_name,
self.config.platform_instance_map, self.config.platform_instance_map,
@ -1209,7 +1209,9 @@ class TableauSource(StatefulIngestionSourceBase):
f"Adding datasource {datasource_name}({datasource.get('id')}) to container" f"Adding datasource {datasource_name}({datasource.get('id')}) to container"
) )
yield from add_entity_to_container( yield from add_entity_to_container(
self.gen_workbook_key(datasource[tableau_constant.WORKBOOK]), self.gen_workbook_key(
datasource[tableau_constant.WORKBOOK][tableau_constant.ID]
),
tableau_constant.DATASET, tableau_constant.DATASET,
dataset_snapshot.urn, dataset_snapshot.urn,
) )
@ -1254,10 +1256,11 @@ class TableauSource(StatefulIngestionSourceBase):
dataset_snapshot.aspects.append(dataset_properties) dataset_snapshot.aspects.append(dataset_properties)
if csql.get(tableau_constant.QUERY):
view_properties = ViewPropertiesClass( view_properties = ViewPropertiesClass(
materialized=False, materialized=False,
viewLanguage=tableau_constant.SQL, viewLanguage=tableau_constant.SQL,
viewLogic=clean_query(csql.get(tableau_constant.QUERY) or ""), viewLogic=clean_query(csql[tableau_constant.QUERY]),
) )
dataset_snapshot.aspects.append(view_properties) dataset_snapshot.aspects.append(view_properties)
@ -1292,7 +1295,7 @@ class TableauSource(StatefulIngestionSourceBase):
fieldPath=field[tableau_constant.NAME], fieldPath=field[tableau_constant.NAME],
type=SchemaFieldDataType(type=TypeClass()), type=SchemaFieldDataType(type=TypeClass()),
nativeDataType=nativeDataType, nativeDataType=nativeDataType,
description=field.get(tableau_constant.DESCRIPTION, ""), description=field.get(tableau_constant.DESCRIPTION),
) )
fields.append(schema_field) fields.append(schema_field)
@ -1314,8 +1317,12 @@ class TableauSource(StatefulIngestionSourceBase):
and ds[tableau_constant.LUID] not in self.datasource_project_map.keys() and ds[tableau_constant.LUID] not in self.datasource_project_map.keys()
and self.report.get_all_datasources_query_failed and self.report.get_all_datasources_query_failed
): ):
logger.debug(
f"published datasource {ds.get(tableau_constant.NAME)} project_luid not found."
f" Running get datasource query for {ds[tableau_constant.LUID]}"
)
# Query and update self.datasource_project_map with luid # Query and update self.datasource_project_map with luid
self._query_published_datasource_for_project_luid(ds) self._query_published_datasource_for_project_luid(ds[tableau_constant.LUID])
if ( if (
ds.get(tableau_constant.LUID) ds.get(tableau_constant.LUID)
@ -1331,16 +1338,12 @@ class TableauSource(StatefulIngestionSourceBase):
return None return None
def _query_published_datasource_for_project_luid(self, ds: dict) -> None: def _query_published_datasource_for_project_luid(self, ds_luid: str) -> None:
if self.server is None: if self.server is None:
return return
try: try:
logger.debug( ds_result = self.server.datasources.get_by_id(ds_luid)
f"published datasource {ds.get(tableau_constant.NAME)} project_luid not found."
f" Running get datasource query for {ds[tableau_constant.LUID]}"
)
ds_result = self.server.datasources.get_by_id(ds[tableau_constant.LUID])
if ds_result.project_id not in self.tableau_project_registry: if ds_result.project_id not in self.tableau_project_registry:
logger.debug( logger.debug(
f"project id ({ds_result.project_id}) of datasource {ds_result.name} is not present in project " f"project id ({ds_result.project_id}) of datasource {ds_result.name} is not present in project "
@ -1351,7 +1354,7 @@ class TableauSource(StatefulIngestionSourceBase):
except Exception as e: except Exception as e:
self.report.num_get_datasource_query_failures += 1 self.report.num_get_datasource_query_failures += 1
logger.warning( logger.warning(
f"Failed to get datasource project_luid for {ds[tableau_constant.LUID]} due to error {e}" f"Failed to get datasource project_luid for {ds_luid} due to error {e}"
) )
logger.debug("Error stack trace", exc_info=True) logger.debug("Error stack trace", exc_info=True)
@ -1589,11 +1592,11 @@ class TableauSource(StatefulIngestionSourceBase):
# Ownership # Ownership
owner = ( owner = (
self._get_ownership( self._get_ownership(
datasource_info.get(tableau_constant.OWNER, {}).get( datasource_info[tableau_constant.OWNER][tableau_constant.USERNAME]
tableau_constant.USERNAME, ""
)
) )
if datasource_info if datasource_info
and datasource_info.get(tableau_constant.OWNER)
and datasource_info[tableau_constant.OWNER].get(tableau_constant.USERNAME)
else None else None
) )
if owner is not None: if owner is not None:
@ -1603,24 +1606,15 @@ class TableauSource(StatefulIngestionSourceBase):
dataset_props = DatasetPropertiesClass( dataset_props = DatasetPropertiesClass(
name=datasource.get(tableau_constant.NAME), name=datasource.get(tableau_constant.NAME),
description=datasource.get(tableau_constant.DESCRIPTION), description=datasource.get(tableau_constant.DESCRIPTION),
customProperties={ customProperties=self.get_custom_props_from_dict(
tableau_constant.HAS_EXTRACTS: str( datasource,
datasource.get(tableau_constant.HAS_EXTRACTS, "") [
tableau_constant.HAS_EXTRACTS,
tableau_constant.EXTRACT_LAST_REFRESH_TIME,
tableau_constant.EXTRACT_LAST_INCREMENTAL_UPDATE_TIME,
tableau_constant.EXTRACT_LAST_UPDATE_TIME,
],
), ),
tableau_constant.EXTRACT_LAST_REFRESH_TIME: datasource.get(
tableau_constant.EXTRACT_LAST_REFRESH_TIME, ""
)
or "",
tableau_constant.EXTRACT_LAST_INCREMENTAL_UPDATE_TIME: datasource.get(
tableau_constant.EXTRACT_LAST_INCREMENTAL_UPDATE_TIME, ""
)
or "",
tableau_constant.EXTRACT_LAST_UPDATE_TIME: datasource.get(
tableau_constant.EXTRACT_LAST_UPDATE_TIME, ""
)
or "",
tableau_constant.TYPE: datasource.get(tableau_constant.TYPE_NAME, ""),
},
) )
dataset_snapshot.aspects.append(dataset_props) dataset_snapshot.aspects.append(dataset_props)
@ -1681,11 +1675,14 @@ class TableauSource(StatefulIngestionSourceBase):
dataset_snapshot.urn, dataset_snapshot.urn,
) )
def get_custom_props_from_dict(self, obj: dict, keys: List[str]) -> Optional[dict]:
return {key: str(obj[key]) for key in keys if obj.get(key)} or None
def _get_datasource_container_key(self, datasource, workbook, is_embedded_ds): def _get_datasource_container_key(self, datasource, workbook, is_embedded_ds):
container_key: Optional[ContainerKey] = None container_key: Optional[ContainerKey] = None
if is_embedded_ds: # It is embedded then parent is container is workbook if is_embedded_ds: # It is embedded then parent is container is workbook
if workbook is not None: if workbook is not None:
container_key = self.gen_workbook_key(workbook) container_key = self.gen_workbook_key(workbook[tableau_constant.ID])
else: else:
logger.warning( logger.warning(
f"Parent container not set for embedded datasource {datasource[tableau_constant.ID]}" f"Parent container not set for embedded datasource {datasource[tableau_constant.ID]}"
@ -1784,7 +1781,7 @@ class TableauSource(StatefulIngestionSourceBase):
schema_field = SchemaField( schema_field = SchemaField(
fieldPath=field[tableau_constant.NAME], fieldPath=field[tableau_constant.NAME],
type=SchemaFieldDataType(type=TypeClass()), type=SchemaFieldDataType(type=TypeClass()),
description="", description=field.get(tableau_constant.DESCRIPTION),
nativeDataType=nativeDataType, nativeDataType=nativeDataType,
) )
@ -1804,7 +1801,7 @@ class TableauSource(StatefulIngestionSourceBase):
def get_sheetwise_upstream_datasources(self, sheet: dict) -> set: def get_sheetwise_upstream_datasources(self, sheet: dict) -> set:
sheet_upstream_datasources = set() sheet_upstream_datasources = set()
for field in sheet.get(tableau_constant.DATA_SOURCE_FIELDS, ""): for field in sheet.get(tableau_constant.DATA_SOURCE_FIELDS) or []:
if field and field.get(tableau_constant.DATA_SOURCE): if field and field.get(tableau_constant.DATA_SOURCE):
sheet_upstream_datasources.add( sheet_upstream_datasources.add(
field[tableau_constant.DATA_SOURCE][tableau_constant.ID] field[tableau_constant.DATA_SOURCE][tableau_constant.ID]
@ -1891,12 +1888,15 @@ class TableauSource(StatefulIngestionSourceBase):
sheet.get(tableau_constant.CONTAINED_IN_DASHBOARDS) is not None sheet.get(tableau_constant.CONTAINED_IN_DASHBOARDS) is not None
and len(sheet[tableau_constant.CONTAINED_IN_DASHBOARDS]) > 0 and len(sheet[tableau_constant.CONTAINED_IN_DASHBOARDS]) > 0
and sheet[tableau_constant.CONTAINED_IN_DASHBOARDS][0] is not None and sheet[tableau_constant.CONTAINED_IN_DASHBOARDS][0] is not None
and sheet[tableau_constant.CONTAINED_IN_DASHBOARDS][0].get(
tableau_constant.PATH
)
): ):
# sheet contained in dashboard # sheet contained in dashboard
site_part = f"/t/{self.config.site}" if self.config.site else "" site_part = f"/t/{self.config.site}" if self.config.site else ""
dashboard_path = sheet[tableau_constant.CONTAINED_IN_DASHBOARDS][0].get( dashboard_path = sheet[tableau_constant.CONTAINED_IN_DASHBOARDS][0][
tableau_constant.PATH, "" tableau_constant.PATH
) ]
sheet_external_url = f"{self.config.connect_uri}{site_part}/authoring/{dashboard_path}/{sheet.get(tableau_constant.NAME, '')}" sheet_external_url = f"{self.config.connect_uri}{site_part}/authoring/{dashboard_path}/{sheet.get(tableau_constant.NAME, '')}"
else: else:
# hidden or viz-in-tooltip sheet # hidden or viz-in-tooltip sheet
@ -1920,15 +1920,15 @@ class TableauSource(StatefulIngestionSourceBase):
# Chart Info # Chart Info
chart_info = ChartInfoClass( chart_info = ChartInfoClass(
description="", description="",
title=sheet.get(tableau_constant.NAME, ""), title=sheet.get(tableau_constant.NAME) or "",
lastModified=last_modified, lastModified=last_modified,
externalUrl=sheet_external_url externalUrl=sheet_external_url
if self.config.ingest_external_links_for_charts if self.config.ingest_external_links_for_charts
else None, else None,
inputs=sorted(datasource_urn), inputs=sorted(datasource_urn),
customProperties={ customProperties=self.get_custom_props_from_dict(
tableau_constant.LUID: sheet.get(tableau_constant.LUID) or "" sheet, [tableau_constant.LUID]
}, ),
) )
chart_snapshot.aspects.append(chart_info) chart_snapshot.aspects.append(chart_info)
# chart_snapshot doesn't support the stat aspect as list element and hence need to emit MCP # chart_snapshot doesn't support the stat aspect as list element and hence need to emit MCP
@ -1964,13 +1964,10 @@ class TableauSource(StatefulIngestionSourceBase):
chart_snapshot.aspects.append(owner) chart_snapshot.aspects.append(owner)
# Tags # Tags
tag_list = sheet.get(tableau_constant.TAGS, []) tags = self.get_tags(sheet)
if tag_list and self.config.ingest_tags: if tags:
tag_list_str = [
t.get(tableau_constant.NAME, "") for t in tag_list if t is not None
]
chart_snapshot.aspects.append( chart_snapshot.aspects.append(
builder.make_global_tag_aspect_with_tag_list(tag_list_str) builder.make_global_tag_aspect_with_tag_list(tags)
) )
yield self.get_metadata_change_event(chart_snapshot) yield self.get_metadata_change_event(chart_snapshot)
if sheet_external_url is not None and self.config.ingest_embed_url is True: if sheet_external_url is not None and self.config.ingest_embed_url is True:
@ -1982,7 +1979,7 @@ class TableauSource(StatefulIngestionSourceBase):
) )
if workbook is not None: if workbook is not None:
yield from add_entity_to_container( yield from add_entity_to_container(
self.gen_workbook_key(workbook), self.gen_workbook_key(workbook[tableau_constant.ID]),
tableau_constant.CHART, tableau_constant.CHART,
chart_snapshot.urn, chart_snapshot.urn,
) )
@ -2037,7 +2034,7 @@ class TableauSource(StatefulIngestionSourceBase):
) )
def emit_workbook_as_container(self, workbook: Dict) -> Iterable[MetadataWorkUnit]: def emit_workbook_as_container(self, workbook: Dict) -> Iterable[MetadataWorkUnit]:
workbook_container_key = self.gen_workbook_key(workbook) workbook_container_key = self.gen_workbook_key(workbook[tableau_constant.ID])
creator = workbook.get(tableau_constant.OWNER, {}).get( creator = workbook.get(tableau_constant.OWNER, {}).get(
tableau_constant.USERNAME tableau_constant.USERNAME
) )
@ -2049,11 +2046,9 @@ class TableauSource(StatefulIngestionSourceBase):
) )
site_part = f"/site/{self.config.site}" if self.config.site else "" site_part = f"/site/{self.config.site}" if self.config.site else ""
workbook_uri = workbook.get("uri", "") workbook_uri = workbook.get("uri")
workbook_part = ( workbook_part = (
workbook_uri[workbook_uri.index("/workbooks/") :] workbook_uri[workbook_uri.index("/workbooks/") :] if workbook_uri else None
if workbook.get("uri")
else None
) )
workbook_external_url = ( workbook_external_url = (
f"{self.config.connect_uri}/#{site_part}{workbook_part}" f"{self.config.connect_uri}/#{site_part}{workbook_part}"
@ -2061,12 +2056,8 @@ class TableauSource(StatefulIngestionSourceBase):
else None else None
) )
tag_list = workbook.get(tableau_constant.TAGS, []) tags = self.get_tags(workbook)
tag_list_str = (
[t.get(tableau_constant.NAME, "") for t in tag_list if t is not None]
if (tag_list and self.config.ingest_tags)
else None
)
parent_key = None parent_key = None
project_luid: Optional[str] = self._get_workbook_project_luid(workbook) project_luid: Optional[str] = self._get_workbook_project_luid(workbook)
if project_luid and project_luid in self.tableau_project_registry.keys(): if project_luid and project_luid in self.tableau_project_registry.keys():
@ -2080,20 +2071,20 @@ class TableauSource(StatefulIngestionSourceBase):
yield from gen_containers( yield from gen_containers(
container_key=workbook_container_key, container_key=workbook_container_key,
name=workbook.get(tableau_constant.NAME, ""), name=workbook.get(tableau_constant.NAME) or "",
parent_container_key=parent_key, parent_container_key=parent_key,
description=workbook.get(tableau_constant.DESCRIPTION), description=workbook.get(tableau_constant.DESCRIPTION),
sub_types=[BIContainerSubTypes.TABLEAU_WORKBOOK], sub_types=[BIContainerSubTypes.TABLEAU_WORKBOOK],
owner_urn=owner_urn, owner_urn=owner_urn,
external_url=workbook_external_url, external_url=workbook_external_url,
tags=tag_list_str, tags=tags,
) )
def gen_workbook_key(self, workbook: Dict) -> WorkbookKey: def gen_workbook_key(self, workbook_id: str) -> WorkbookKey:
return WorkbookKey( return WorkbookKey(
platform=self.platform, platform=self.platform,
instance=self.config.platform_instance, instance=self.config.platform_instance,
workbook_id=workbook[tableau_constant.ID], workbook_id=workbook_id,
) )
def gen_project_key(self, project_luid): def gen_project_key(self, project_luid):
@ -2182,6 +2173,18 @@ class TableauSource(StatefulIngestionSourceBase):
dashboard, dashboard.get(tableau_constant.WORKBOOK) dashboard, dashboard.get(tableau_constant.WORKBOOK)
) )
def get_tags(self, obj: dict) -> Optional[List[str]]:
tag_list = obj.get(tableau_constant.TAGS, [])
if tag_list and self.config.ingest_tags:
tag_list_str = [
t[tableau_constant.NAME]
for t in tag_list
if t is not None and t.get(tableau_constant.NAME)
]
return tag_list_str
return None
def emit_dashboard( def emit_dashboard(
self, dashboard: dict, workbook: Optional[Dict] self, dashboard: dict, workbook: Optional[Dict]
) -> Iterable[MetadataWorkUnit]: ) -> Iterable[MetadataWorkUnit]:
@ -2223,19 +2226,16 @@ class TableauSource(StatefulIngestionSourceBase):
dashboardUrl=dashboard_external_url dashboardUrl=dashboard_external_url
if self.config.ingest_external_links_for_dashboards if self.config.ingest_external_links_for_dashboards
else None, else None,
customProperties={ customProperties=self.get_custom_props_from_dict(
tableau_constant.LUID: dashboard.get(tableau_constant.LUID) or "" dashboard, [tableau_constant.LUID]
}, ),
) )
dashboard_snapshot.aspects.append(dashboard_info_class) dashboard_snapshot.aspects.append(dashboard_info_class)
tag_list = dashboard.get(tableau_constant.TAGS, []) tags = self.get_tags(dashboard)
if tag_list and self.config.ingest_tags: if tags:
tag_list_str = [
t.get(tableau_constant.NAME, "") for t in tag_list if t is not None
]
dashboard_snapshot.aspects.append( dashboard_snapshot.aspects.append(
builder.make_global_tag_aspect_with_tag_list(tag_list_str) builder.make_global_tag_aspect_with_tag_list(tags)
) )
if self.config.extract_usage_stats: if self.config.extract_usage_stats:
@ -2293,7 +2293,7 @@ class TableauSource(StatefulIngestionSourceBase):
if workbook is not None: if workbook is not None:
yield from add_entity_to_container( yield from add_entity_to_container(
self.gen_workbook_key(workbook), self.gen_workbook_key(workbook[tableau_constant.ID]),
tableau_constant.DASHBOARD, tableau_constant.DASHBOARD,
dashboard_snapshot.urn, dashboard_snapshot.urn,
) )

View File

@ -63,7 +63,6 @@ HAS_EXTRACTS = "hasExtracts"
EXTRACT_LAST_REFRESH_TIME = "extractLastRefreshTime" EXTRACT_LAST_REFRESH_TIME = "extractLastRefreshTime"
EXTRACT_LAST_INCREMENTAL_UPDATE_TIME = "extractLastIncrementalUpdateTime" EXTRACT_LAST_INCREMENTAL_UPDATE_TIME = "extractLastIncrementalUpdateTime"
EXTRACT_LAST_UPDATE_TIME = "extractLastUpdateTime" EXTRACT_LAST_UPDATE_TIME = "extractLastUpdateTime"
TYPE = "type"
PUBLISHED_DATA_SOURCES_CONNECTION = "publishedDatasourcesConnection" PUBLISHED_DATA_SOURCES_CONNECTION = "publishedDatasourcesConnection"
DATA_SOURCE_FIELDS = "datasourceFields" DATA_SOURCE_FIELDS = "datasourceFields"
SHEETS_CONNECTION = "sheetsConnection" SHEETS_CONNECTION = "sheetsConnection"