From f73ecfdcbbc35437fcb80c9e27e78908dae23ea7 Mon Sep 17 00:00:00 2001 From: Andrew Sikowitz Date: Wed, 8 Nov 2023 18:17:49 -0500 Subject: [PATCH] style(ingest/tableau): Rename tableau_constant to c (#9207) --- .../src/datahub/ingestion/source/tableau.py | 597 ++++++++---------- .../ingestion/source/tableau_common.py | 14 +- 2 files changed, 272 insertions(+), 339 deletions(-) diff --git a/metadata-ingestion/src/datahub/ingestion/source/tableau.py b/metadata-ingestion/src/datahub/ingestion/source/tableau.py index 4bc40b0aac..08df759951 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/tableau.py +++ b/metadata-ingestion/src/datahub/ingestion/source/tableau.py @@ -59,7 +59,7 @@ from datahub.ingestion.api.decorators import ( ) from datahub.ingestion.api.source import MetadataWorkUnitProcessor, Source from datahub.ingestion.api.workunit import MetadataWorkUnit -from datahub.ingestion.source import tableau_constant +from datahub.ingestion.source import tableau_constant as c from datahub.ingestion.source.common.subtypes import ( BIContainerSubTypes, DatasetSubTypes, @@ -720,16 +720,12 @@ class TableauSource(StatefulIngestionSourceBase): query, connection_type, query_filter, count, offset, False ) - if tableau_constant.ERRORS in query_data: - errors = query_data[tableau_constant.ERRORS] + if c.ERRORS in query_data: + errors = query_data[c.ERRORS] if all( # The format of the error messages is highly unpredictable, so we have to # be extra defensive with our parsing. - error - and (error.get(tableau_constant.EXTENSIONS) or {}).get( - tableau_constant.SEVERITY - ) - == tableau_constant.WARNING + error and (error.get(c.EXTENSIONS) or {}).get(c.SEVERITY) == c.WARNING for error in errors ): self.report.report_warning(key=connection_type, reason=f"{errors}") @@ -737,14 +733,14 @@ class TableauSource(StatefulIngestionSourceBase): raise RuntimeError(f"Query {connection_type} error: {errors}") connection_object = ( - query_data.get(tableau_constant.DATA).get(connection_type, {}) - if query_data.get(tableau_constant.DATA) + query_data.get(c.DATA).get(connection_type, {}) + if query_data.get(c.DATA) else {} ) - total_count = connection_object.get(tableau_constant.TOTAL_COUNT, 0) - has_next_page = connection_object.get(tableau_constant.PAGE_INFO, {}).get( - tableau_constant.HAS_NEXT_PAGE, False + total_count = connection_object.get(c.TOTAL_COUNT, 0) + has_next_page = connection_object.get(c.PAGE_INFO, {}).get( + c.HAS_NEXT_PAGE, False ) return connection_object, total_count, has_next_page @@ -781,7 +777,7 @@ class TableauSource(StatefulIngestionSourceBase): offset += count - for obj in connection_objects.get(tableau_constant.NODES) or []: + for obj in connection_objects.get(c.NODES) or []: yield obj def emit_workbooks(self) -> Iterable[MetadataWorkUnit]: @@ -790,11 +786,11 @@ class TableauSource(StatefulIngestionSourceBase): project.name for project in self.tableau_project_registry.values() ] project_names_str: str = json.dumps(project_names) - projects = f"{tableau_constant.PROJECT_NAME_WITH_IN}: {project_names_str}" + projects = f"{c.PROJECT_NAME_WITH_IN}: {project_names_str}" for workbook in self.get_connection_objects( workbook_graphql_query, - tableau_constant.WORKBOOKS_CONNECTION, + c.WORKBOOKS_CONNECTION, projects, page_size_override=self.config.workbook_page_size, ): @@ -804,11 +800,9 @@ class TableauSource(StatefulIngestionSourceBase): # however Tableau supports projectLuidWithin in Tableau Cloud June 2022 / Server 2022.3 and later. project_luid: Optional[str] = self._get_workbook_project_luid(workbook) if project_luid not in self.tableau_project_registry.keys(): - wrk_name: Optional[str] = workbook.get(tableau_constant.NAME) - wrk_id: Optional[str] = workbook.get(tableau_constant.ID) - prj_name: Optional[str] = workbook.get( - tableau_constant.PROJECT_NAME - ) + wrk_name: Optional[str] = workbook.get(c.NAME) + wrk_id: Optional[str] = workbook.get(c.ID) + prj_name: Optional[str] = workbook.get(c.PROJECT_NAME) logger.debug( f"Skipping workbook {wrk_name}({wrk_id}) as it is project {prj_name}({project_luid}) not " @@ -818,25 +812,22 @@ class TableauSource(StatefulIngestionSourceBase): yield from self.emit_workbook_as_container(workbook) - for sheet in workbook.get(tableau_constant.SHEETS, []): - self.sheet_ids.append(sheet[tableau_constant.ID]) + for sheet in workbook.get(c.SHEETS, []): + self.sheet_ids.append(sheet[c.ID]) - for dashboard in workbook.get(tableau_constant.DASHBOARDS, []): - self.dashboard_ids.append(dashboard[tableau_constant.ID]) + for dashboard in workbook.get(c.DASHBOARDS, []): + self.dashboard_ids.append(dashboard[c.ID]) - for ds in workbook.get(tableau_constant.EMBEDDED_DATA_SOURCES, []): - self.embedded_datasource_ids_being_used.append( - ds[tableau_constant.ID] - ) + for ds in workbook.get(c.EMBEDDED_DATA_SOURCES, []): + self.embedded_datasource_ids_being_used.append(ds[c.ID]) def _track_custom_sql_ids(self, field: dict) -> None: # Tableau shows custom sql datasource as a table in ColumnField's upstreamColumns. - for column in field.get(tableau_constant.UPSTREAM_COLUMNS, []): + for column in field.get(c.UPSTREAM_COLUMNS, []): table_id = ( - column.get(tableau_constant.TABLE, {}).get(tableau_constant.ID) - if column.get(tableau_constant.TABLE) - and column[tableau_constant.TABLE][tableau_constant.TYPE_NAME] - == tableau_constant.CUSTOM_SQL_TABLE + column.get(c.TABLE, {}).get(c.ID) + if column.get(c.TABLE) + and column[c.TABLE][c.TYPE_NAME] == c.CUSTOM_SQL_TABLE else None ) @@ -861,15 +852,15 @@ class TableauSource(StatefulIngestionSourceBase): # and published datasource have same upstreamTables in this case. if upstream_tables and is_embedded_ds: logger.debug( - f"Embedded datasource {datasource.get(tableau_constant.ID)} has upstreamDatasources.\ + f"Embedded datasource {datasource.get(c.ID)} has upstreamDatasources.\ Setting only upstreamDatasources lineage. The upstreamTables lineage \ will be set via upstream published datasource." ) else: # This adds an edge to upstream DatabaseTables using `upstreamTables` upstreams, id_to_urn = self.get_upstream_tables( - datasource.get(tableau_constant.UPSTREAM_TABLES, []), - datasource.get(tableau_constant.NAME), + datasource.get(c.UPSTREAM_TABLES, []), + datasource.get(c.NAME), browse_path, is_custom_sql=False, ) @@ -878,23 +869,23 @@ class TableauSource(StatefulIngestionSourceBase): # This adds an edge to upstream CustomSQLTables using `fields`.`upstreamColumns`.`table` csql_upstreams, csql_id_to_urn = self.get_upstream_csql_tables( - datasource.get(tableau_constant.FIELDS) or [], + datasource.get(c.FIELDS) or [], ) upstream_tables.extend(csql_upstreams) table_id_to_urn.update(csql_id_to_urn) logger.debug( - f"A total of {len(upstream_tables)} upstream table edges found for datasource {datasource[tableau_constant.ID]}" + f"A total of {len(upstream_tables)} upstream table edges found for datasource {datasource[c.ID]}" ) datasource_urn = builder.make_dataset_urn_with_platform_instance( platform=self.platform, - name=datasource[tableau_constant.ID], + name=datasource[c.ID], platform_instance=self.config.platform_instance, env=self.config.env, ) - if datasource.get(tableau_constant.FIELDS): + if datasource.get(c.FIELDS): if self.config.extract_column_level_lineage: # Find fine grained lineage for datasource column to datasource column edge, # upstream columns may be from same datasource @@ -912,20 +903,20 @@ class TableauSource(StatefulIngestionSourceBase): fine_grained_lineages.extend(upstream_columns) logger.debug( - f"A total of {len(fine_grained_lineages)} upstream column edges found for datasource {datasource[tableau_constant.ID]}" + f"A total of {len(fine_grained_lineages)} upstream column edges found for datasource {datasource[c.ID]}" ) return upstream_tables, fine_grained_lineages def get_upstream_datasources(self, datasource: dict) -> List[Upstream]: upstream_tables = [] - for ds in datasource.get(tableau_constant.UPSTREAM_DATA_SOURCES, []): - if ds[tableau_constant.ID] not in self.datasource_ids_being_used: - self.datasource_ids_being_used.append(ds[tableau_constant.ID]) + for ds in datasource.get(c.UPSTREAM_DATA_SOURCES, []): + if ds[c.ID] not in self.datasource_ids_being_used: + self.datasource_ids_being_used.append(ds[c.ID]) upstream_ds_urn = builder.make_dataset_urn_with_platform_instance( platform=self.platform, - name=ds[tableau_constant.ID], + name=ds[c.ID], platform_instance=self.config.platform_instance, env=self.config.env, ) @@ -943,20 +934,15 @@ class TableauSource(StatefulIngestionSourceBase): csql_id_to_urn = {} for field in fields: - if not field.get(tableau_constant.UPSTREAM_COLUMNS): + if not field.get(c.UPSTREAM_COLUMNS): continue - for upstream_col in field[tableau_constant.UPSTREAM_COLUMNS]: + for upstream_col in field[c.UPSTREAM_COLUMNS]: if ( upstream_col - and upstream_col.get(tableau_constant.TABLE) - and upstream_col.get(tableau_constant.TABLE)[ - tableau_constant.TYPE_NAME - ] - == tableau_constant.CUSTOM_SQL_TABLE + and upstream_col.get(c.TABLE) + and upstream_col.get(c.TABLE)[c.TYPE_NAME] == c.CUSTOM_SQL_TABLE ): - upstream_table_id = upstream_col.get(tableau_constant.TABLE)[ - tableau_constant.ID - ] + upstream_table_id = upstream_col.get(c.TABLE)[c.ID] csql_urn = builder.make_dataset_urn_with_platform_instance( platform=self.platform, @@ -986,18 +972,18 @@ class TableauSource(StatefulIngestionSourceBase): for table in tables: # skip upstream tables when there is no column info when retrieving datasource # Lineage and Schema details for these will be taken care in self.emit_custom_sql_datasources() - num_tbl_cols: Optional[int] = table.get( - tableau_constant.COLUMNS_CONNECTION - ) and table[tableau_constant.COLUMNS_CONNECTION].get("totalCount") + num_tbl_cols: Optional[int] = table.get(c.COLUMNS_CONNECTION) and table[ + c.COLUMNS_CONNECTION + ].get("totalCount") if not is_custom_sql and not num_tbl_cols: logger.debug( - f"Skipping upstream table with id {table[tableau_constant.ID]}, no columns: {table}" + f"Skipping upstream table with id {table[c.ID]}, no columns: {table}" ) continue - elif table[tableau_constant.NAME] is None: + elif table[c.NAME] is None: self.report.num_upstream_table_skipped_no_name += 1 logger.warning( - f"Skipping upstream table {table[tableau_constant.ID]} from lineage since its name is none: {table}" + f"Skipping upstream table {table[c.ID]} from lineage since its name is none: {table}" ) continue @@ -1014,7 +1000,7 @@ class TableauSource(StatefulIngestionSourceBase): self.config.platform_instance_map, self.config.lineage_overrides, ) - table_id_to_urn[table[tableau_constant.ID]] = table_urn + table_id_to_urn[table[c.ID]] = table_urn upstream_table = Upstream( dataset=table_urn, @@ -1029,13 +1015,13 @@ class TableauSource(StatefulIngestionSourceBase): if table_urn not in self.database_tables: self.database_tables[table_urn] = DatabaseTable( urn=table_urn, - id=table[tableau_constant.ID], + id=table[c.ID], num_cols=num_tbl_cols, paths={table_path} if table_path else set(), ) else: self.database_tables[table_urn].update_table( - table[tableau_constant.ID], num_tbl_cols, table_path + table[c.ID], num_tbl_cols, table_path ) return upstream_tables, table_id_to_urn @@ -1047,24 +1033,24 @@ class TableauSource(StatefulIngestionSourceBase): table_id_to_urn: Dict[str, str], ) -> List[FineGrainedLineage]: fine_grained_lineages = [] - for field in datasource.get(tableau_constant.FIELDS) or []: - field_name = field.get(tableau_constant.NAME) + for field in datasource.get(c.FIELDS) or []: + field_name = field.get(c.NAME) # upstreamColumns lineage will be set via upstreamFields. # such as for CalculatedField if ( not field_name - or not field.get(tableau_constant.UPSTREAM_COLUMNS) - or field.get(tableau_constant.UPSTREAM_FIELDS) + or not field.get(c.UPSTREAM_COLUMNS) + or field.get(c.UPSTREAM_FIELDS) ): continue input_columns = [] - for upstream_col in field.get(tableau_constant.UPSTREAM_COLUMNS): + for upstream_col in field.get(c.UPSTREAM_COLUMNS): if not upstream_col: continue - name = upstream_col.get(tableau_constant.NAME) + name = upstream_col.get(c.NAME) upstream_table_id = ( - upstream_col.get(tableau_constant.TABLE)[tableau_constant.ID] - if upstream_col.get(tableau_constant.TABLE) + upstream_col.get(c.TABLE)[c.ID] + if upstream_col.get(c.TABLE) else None ) if ( @@ -1110,23 +1096,21 @@ class TableauSource(StatefulIngestionSourceBase): self, datasource: dict, datasource_urn: str ) -> List[FineGrainedLineage]: fine_grained_lineages = [] - for field in datasource.get(tableau_constant.FIELDS) or []: - field_name = field.get(tableau_constant.NAME) + for field in datasource.get(c.FIELDS) or []: + field_name = field.get(c.NAME) # It is observed that upstreamFields gives one-hop field # lineage, and not multi-hop field lineage # This behavior is as desired in our case. - if not field_name or not field.get(tableau_constant.UPSTREAM_FIELDS): + if not field_name or not field.get(c.UPSTREAM_FIELDS): continue input_fields = [] - for upstream_field in field.get(tableau_constant.UPSTREAM_FIELDS): + for upstream_field in field.get(c.UPSTREAM_FIELDS): if not upstream_field: continue - name = upstream_field.get(tableau_constant.NAME) + name = upstream_field.get(c.NAME) upstream_ds_id = ( - upstream_field.get(tableau_constant.DATA_SOURCE)[ - tableau_constant.ID - ] - if upstream_field.get(tableau_constant.DATA_SOURCE) + upstream_field.get(c.DATA_SOURCE)[c.ID] + if upstream_field.get(c.DATA_SOURCE) else None ) if name and upstream_ds_id: @@ -1212,35 +1196,37 @@ class TableauSource(StatefulIngestionSourceBase): return fine_grained_lineages def get_transform_operation(self, field: dict) -> str: - field_type = field[tableau_constant.TYPE_NAME] + field_type = field[c.TYPE_NAME] if field_type in ( - tableau_constant.DATA_SOURCE_FIELD, - tableau_constant.COLUMN_FIELD, + c.DATA_SOURCE_FIELD, + c.COLUMN_FIELD, ): - op = tableau_constant.IDENTITY # How to specify exact same - elif field_type == tableau_constant.CALCULATED_FIELD: + op = c.IDENTITY # How to specify exact same + elif field_type == c.CALCULATED_FIELD: op = field_type - if field.get(tableau_constant.FORMULA): - op += f"formula: {field.get(tableau_constant.FORMULA)}" + if field.get(c.FORMULA): + op += f"formula: {field.get(c.FORMULA)}" else: op = field_type # BinField, CombinedField, etc return op def emit_custom_sql_datasources(self) -> Iterable[MetadataWorkUnit]: - custom_sql_filter = f"{tableau_constant.ID_WITH_IN}: {json.dumps(self.custom_sql_ids_being_used)}" + custom_sql_filter = ( + f"{c.ID_WITH_IN}: {json.dumps(self.custom_sql_ids_being_used)}" + ) custom_sql_connection = list( self.get_connection_objects( custom_sql_graphql_query, - tableau_constant.CUSTOM_SQL_TABLE_CONNECTION, + c.CUSTOM_SQL_TABLE_CONNECTION, custom_sql_filter, ) ) unique_custom_sql = get_unique_custom_sql(custom_sql_connection) for csql in unique_custom_sql: - csql_id: str = csql[tableau_constant.ID] + csql_id: str = csql[c.ID] csql_urn = builder.make_dataset_urn_with_platform_instance( platform=self.platform, name=csql_id, @@ -1256,40 +1242,33 @@ class TableauSource(StatefulIngestionSourceBase): datasource_name = None project = None - if len(csql[tableau_constant.DATA_SOURCES]) > 0: + if len(csql[c.DATA_SOURCES]) > 0: # CustomSQLTable id owned by exactly one tableau data source logger.debug( - f"Number of datasources referencing CustomSQLTable: {len(csql[tableau_constant.DATA_SOURCES])}" + f"Number of datasources referencing CustomSQLTable: {len(csql[c.DATA_SOURCES])}" ) - datasource = csql[tableau_constant.DATA_SOURCES][0] - datasource_name = datasource.get(tableau_constant.NAME) + datasource = csql[c.DATA_SOURCES][0] + datasource_name = datasource.get(c.NAME) if datasource.get( - tableau_constant.TYPE_NAME - ) == tableau_constant.EMBEDDED_DATA_SOURCE and datasource.get( - tableau_constant.WORKBOOK - ): + c.TYPE_NAME + ) == c.EMBEDDED_DATA_SOURCE and datasource.get(c.WORKBOOK): datasource_name = ( - f"{datasource.get(tableau_constant.WORKBOOK).get(tableau_constant.NAME)}/{datasource_name}" - if datasource_name - and datasource.get(tableau_constant.WORKBOOK).get( - tableau_constant.NAME - ) + f"{datasource.get(c.WORKBOOK).get(c.NAME)}/{datasource_name}" + if datasource_name and datasource.get(c.WORKBOOK).get(c.NAME) else None ) logger.debug( f"Adding datasource {datasource_name}({datasource.get('id')}) to container" ) yield from add_entity_to_container( - self.gen_workbook_key( - datasource[tableau_constant.WORKBOOK][tableau_constant.ID] - ), - tableau_constant.DATASET, + self.gen_workbook_key(datasource[c.WORKBOOK][c.ID]), + c.DATASET, dataset_snapshot.urn, ) project = self._get_project_browse_path_name(datasource) - tables = csql.get(tableau_constant.TABLES, []) + tables = csql.get(c.TABLES, []) if tables: # lineage from custom sql -> datasets/tables # @@ -1306,9 +1285,8 @@ class TableauSource(StatefulIngestionSourceBase): # Schema Metadata # if condition is needed as graphQL return "cloumns": None columns: List[Dict[Any, Any]] = ( - cast(List[Dict[Any, Any]], csql.get(tableau_constant.COLUMNS)) - if tableau_constant.COLUMNS in csql - and csql.get(tableau_constant.COLUMNS) is not None + cast(List[Dict[Any, Any]], csql.get(c.COLUMNS)) + if c.COLUMNS in csql and csql.get(c.COLUMNS) is not None else [] ) schema_metadata = self.get_schema_metadata_for_custom_sql(columns) @@ -1320,7 +1298,7 @@ class TableauSource(StatefulIngestionSourceBase): if project and datasource_name: browse_paths = BrowsePathsClass( paths=[ - f"/{self.config.env.lower()}/{self.platform}/{project}/{datasource[tableau_constant.NAME]}" + f"/{self.config.env.lower()}/{self.platform}/{project}/{datasource[c.NAME]}" ] ) dataset_snapshot.aspects.append(browse_paths) @@ -1328,27 +1306,25 @@ class TableauSource(StatefulIngestionSourceBase): logger.debug(f"Browse path not set for Custom SQL table {csql_id}") dataset_properties = DatasetPropertiesClass( - name=csql.get(tableau_constant.NAME), - description=csql.get(tableau_constant.DESCRIPTION), + name=csql.get(c.NAME), + description=csql.get(c.DESCRIPTION), ) dataset_snapshot.aspects.append(dataset_properties) - if csql.get(tableau_constant.QUERY): + if csql.get(c.QUERY): view_properties = ViewPropertiesClass( materialized=False, - viewLanguage=tableau_constant.SQL, - viewLogic=clean_query(csql[tableau_constant.QUERY]), + viewLanguage=c.SQL, + viewLogic=clean_query(csql[c.QUERY]), ) dataset_snapshot.aspects.append(view_properties) yield self.get_metadata_change_event(dataset_snapshot) yield self.get_metadata_change_proposal( dataset_snapshot.urn, - aspect_name=tableau_constant.SUB_TYPES, - aspect=SubTypesClass( - typeNames=[DatasetSubTypes.VIEW, tableau_constant.CUSTOM_SQL] - ), + aspect_name=c.SUB_TYPES, + aspect=SubTypesClass(typeNames=[DatasetSubTypes.VIEW, c.CUSTOM_SQL]), ) def get_schema_metadata_for_custom_sql( @@ -1359,21 +1335,19 @@ class TableauSource(StatefulIngestionSourceBase): for field in columns: # Datasource fields - if field.get(tableau_constant.NAME) is None: + if field.get(c.NAME) is None: self.report.num_csql_field_skipped_no_name += 1 logger.warning( - f"Skipping field {field[tableau_constant.ID]} from schema since its name is none" + f"Skipping field {field[c.ID]} from schema since its name is none" ) continue - nativeDataType = field.get( - tableau_constant.REMOTE_TYPE, tableau_constant.UNKNOWN - ) + nativeDataType = field.get(c.REMOTE_TYPE, c.UNKNOWN) TypeClass = FIELD_TYPE_MAPPING.get(nativeDataType, NullTypeClass) schema_field = SchemaField( - fieldPath=field[tableau_constant.NAME], + fieldPath=field[c.NAME], type=SchemaFieldDataType(type=TypeClass()), nativeDataType=nativeDataType, - description=field.get(tableau_constant.DESCRIPTION), + description=field.get(c.DESCRIPTION), ) fields.append(schema_field) @@ -1391,28 +1365,25 @@ class TableauSource(StatefulIngestionSourceBase): # This is fallback in case "get all datasources" query fails for some reason. # It is possible due to https://github.com/tableau/server-client-python/issues/1210 if ( - ds.get(tableau_constant.LUID) - and ds[tableau_constant.LUID] not in self.datasource_project_map.keys() + ds.get(c.LUID) + and ds[c.LUID] not in self.datasource_project_map.keys() and self.report.get_all_datasources_query_failed ): logger.debug( - f"published datasource {ds.get(tableau_constant.NAME)} project_luid not found." - f" Running get datasource query for {ds[tableau_constant.LUID]}" + f"published datasource {ds.get(c.NAME)} project_luid not found." + f" Running get datasource query for {ds[c.LUID]}" ) # Query and update self.datasource_project_map with luid - self._query_published_datasource_for_project_luid(ds[tableau_constant.LUID]) + self._query_published_datasource_for_project_luid(ds[c.LUID]) if ( - ds.get(tableau_constant.LUID) - and ds[tableau_constant.LUID] in self.datasource_project_map.keys() - and self.datasource_project_map[ds[tableau_constant.LUID]] - in self.tableau_project_registry + ds.get(c.LUID) + and ds[c.LUID] in self.datasource_project_map.keys() + and self.datasource_project_map[ds[c.LUID]] in self.tableau_project_registry ): - return self.datasource_project_map[ds[tableau_constant.LUID]] + return self.datasource_project_map[ds[c.LUID]] - logger.debug( - f"published datasource {ds.get(tableau_constant.NAME)} project_luid not found" - ) + logger.debug(f"published datasource {ds.get(c.NAME)} project_luid not found") return None @@ -1437,60 +1408,52 @@ class TableauSource(StatefulIngestionSourceBase): logger.debug("Error stack trace", exc_info=True) def _get_workbook_project_luid(self, wb: dict) -> Optional[str]: - if wb.get(tableau_constant.LUID) and self.workbook_project_map.get( - wb[tableau_constant.LUID] - ): - return self.workbook_project_map[wb[tableau_constant.LUID]] + if wb.get(c.LUID) and self.workbook_project_map.get(wb[c.LUID]): + return self.workbook_project_map[wb[c.LUID]] - logger.debug(f"workbook {wb.get(tableau_constant.NAME)} project_luid not found") + logger.debug(f"workbook {wb.get(c.NAME)} project_luid not found") return None def _get_embedded_datasource_project_luid(self, ds: dict) -> Optional[str]: - if ds.get(tableau_constant.WORKBOOK): + if ds.get(c.WORKBOOK): project_luid: Optional[str] = self._get_workbook_project_luid( - ds[tableau_constant.WORKBOOK] + ds[c.WORKBOOK] ) if project_luid and project_luid in self.tableau_project_registry: return project_luid - logger.debug( - f"embedded datasource {ds.get(tableau_constant.NAME)} project_luid not found" - ) + logger.debug(f"embedded datasource {ds.get(c.NAME)} project_luid not found") return None def _get_datasource_project_luid(self, ds: dict) -> Optional[str]: # Only published and embedded data-sources are supported - ds_type: Optional[str] = ds.get(tableau_constant.TYPE_NAME) + ds_type: Optional[str] = ds.get(c.TYPE_NAME) if ds_type not in ( - tableau_constant.PUBLISHED_DATA_SOURCE, - tableau_constant.EMBEDDED_DATA_SOURCE, + c.PUBLISHED_DATA_SOURCE, + c.EMBEDDED_DATA_SOURCE, ): logger.debug( - f"datasource {ds.get(tableau_constant.NAME)} type {ds.get(tableau_constant.TYPE_NAME)} is " + f"datasource {ds.get(c.NAME)} type {ds.get(c.TYPE_NAME)} is " f"unsupported" ) return None func_selector: Any = { - tableau_constant.PUBLISHED_DATA_SOURCE: self._get_published_datasource_project_luid, - tableau_constant.EMBEDDED_DATA_SOURCE: self._get_embedded_datasource_project_luid, + c.PUBLISHED_DATA_SOURCE: self._get_published_datasource_project_luid, + c.EMBEDDED_DATA_SOURCE: self._get_embedded_datasource_project_luid, } return func_selector[ds_type](ds) @staticmethod def _get_datasource_project_name(ds: dict) -> Optional[str]: - if ds.get( - tableau_constant.TYPE_NAME - ) == tableau_constant.EMBEDDED_DATA_SOURCE and ds.get( - tableau_constant.WORKBOOK - ): - return ds[tableau_constant.WORKBOOK].get(tableau_constant.PROJECT_NAME) - if ds.get(tableau_constant.TYPE_NAME) == tableau_constant.PUBLISHED_DATA_SOURCE: - return ds.get(tableau_constant.PROJECT_NAME) + if ds.get(c.TYPE_NAME) == c.EMBEDDED_DATA_SOURCE and ds.get(c.WORKBOOK): + return ds[c.WORKBOOK].get(c.PROJECT_NAME) + if ds.get(c.TYPE_NAME) == c.PUBLISHED_DATA_SOURCE: + return ds.get(c.PROJECT_NAME) return None def _get_project_browse_path_name(self, ds: dict) -> Optional[str]: @@ -1502,7 +1465,7 @@ class TableauSource(StatefulIngestionSourceBase): project_luid = self._get_datasource_project_luid(ds) if project_luid is None: logger.warning( - f"Could not load project hierarchy for datasource {ds.get(tableau_constant.NAME)}. Please check permissions." + f"Could not load project hierarchy for datasource {ds.get(c.NAME)}. Please check permissions." ) logger.debug(f"datasource = {ds}") return None @@ -1515,7 +1478,7 @@ class TableauSource(StatefulIngestionSourceBase): # This adds an edge to upstream DatabaseTables using `upstreamTables` upstream_tables, _ = self.get_upstream_tables( tables, - datasource.get(tableau_constant.NAME) or "", + datasource.get(c.NAME) or "", self._get_project_browse_path_name(datasource), is_custom_sql=True, ) @@ -1524,7 +1487,7 @@ class TableauSource(StatefulIngestionSourceBase): upstream_lineage = UpstreamLineage(upstreams=upstream_tables) yield self.get_metadata_change_proposal( csql_urn, - aspect_name=tableau_constant.UPSTREAM_LINEAGE, + aspect_name=c.UPSTREAM_LINEAGE, aspect=upstream_lineage, ) @@ -1547,22 +1510,19 @@ class TableauSource(StatefulIngestionSourceBase): ] ], ) -> Optional["SqlParsingResult"]: - database_info = datasource.get(tableau_constant.DATABASE) or {} + database_info = datasource.get(c.DATABASE) or {} - if datasource.get(tableau_constant.IS_UNSUPPORTED_CUSTOM_SQL) in (None, False): + if datasource.get(c.IS_UNSUPPORTED_CUSTOM_SQL) in (None, False): logger.debug(f"datasource {datasource_urn} is not created from custom sql") return None - if ( - tableau_constant.NAME not in database_info - or tableau_constant.CONNECTION_TYPE not in database_info - ): + if c.NAME not in database_info or c.CONNECTION_TYPE not in database_info: logger.debug( f"database information is missing from datasource {datasource_urn}" ) return None - query = datasource.get(tableau_constant.QUERY) + query = datasource.get(c.QUERY) if query is None: logger.debug( f"raw sql query is not available for datasource {datasource_urn}" @@ -1571,13 +1531,13 @@ class TableauSource(StatefulIngestionSourceBase): logger.debug(f"Parsing sql={query}") - upstream_db = database_info.get(tableau_constant.NAME) + upstream_db = database_info.get(c.NAME) if func_overridden_info is not None: # Override the information as per configuration upstream_db, platform_instance, platform, _ = func_overridden_info( - database_info[tableau_constant.CONNECTION_TYPE], - database_info.get(tableau_constant.NAME), + database_info[c.CONNECTION_TYPE], + database_info.get(c.NAME), self.config.platform_instance_map, self.config.lineage_overrides, ) @@ -1631,7 +1591,7 @@ class TableauSource(StatefulIngestionSourceBase): yield self.get_metadata_change_proposal( csql_urn, - aspect_name=tableau_constant.UPSTREAM_LINEAGE, + aspect_name=c.UPSTREAM_LINEAGE, aspect=upstream_lineage, ) @@ -1642,10 +1602,10 @@ class TableauSource(StatefulIngestionSourceBase): for field in datasource_fields: # check datasource - custom sql relations from a field being referenced self._track_custom_sql_ids(field) - if field.get(tableau_constant.NAME) is None: + if field.get(c.NAME) is None: self.report.num_upstream_table_skipped_no_name += 1 logger.warning( - f"Skipping field {field[tableau_constant.ID]} from schema since its name is none" + f"Skipping field {field[c.ID]} from schema since its name is none" ) continue @@ -1678,7 +1638,7 @@ class TableauSource(StatefulIngestionSourceBase): aspect: Union["UpstreamLineage", "SubTypesClass"], ) -> MetadataWorkUnit: return MetadataChangeProposalWrapper( - entityType=tableau_constant.DATASET, + entityType=c.DATASET, changeType=ChangeTypeClass.UPSERT, entityUrn=urn, aspectName=aspect_name, @@ -1696,10 +1656,8 @@ class TableauSource(StatefulIngestionSourceBase): datasource_info = datasource browse_path = self._get_project_browse_path_name(datasource) - logger.debug( - f"datasource {datasource.get(tableau_constant.NAME)} browse-path {browse_path}" - ) - datasource_id = datasource[tableau_constant.ID] + logger.debug(f"datasource {datasource.get(c.NAME)} browse-path {browse_path}") + datasource_id = datasource[c.ID] datasource_urn = builder.make_dataset_urn_with_platform_instance( self.platform, datasource_id, self.config.platform_instance, self.config.env ) @@ -1713,13 +1671,10 @@ class TableauSource(StatefulIngestionSourceBase): # Browse path - if ( - browse_path - and is_embedded_ds - and workbook - and workbook.get(tableau_constant.NAME) - ): - browse_path = f"{browse_path}/{workbook[tableau_constant.NAME].replace('/', REPLACE_SLASH_CHAR)}" + if browse_path and is_embedded_ds and workbook and workbook.get(c.NAME): + browse_path = ( + f"{browse_path}/{workbook[c.NAME].replace('/', REPLACE_SLASH_CHAR)}" + ) if browse_path: browse_paths = BrowsePathsClass( @@ -1729,12 +1684,10 @@ class TableauSource(StatefulIngestionSourceBase): # Ownership owner = ( - self._get_ownership( - datasource_info[tableau_constant.OWNER][tableau_constant.USERNAME] - ) + self._get_ownership(datasource_info[c.OWNER][c.USERNAME]) if datasource_info - and datasource_info.get(tableau_constant.OWNER) - and datasource_info[tableau_constant.OWNER].get(tableau_constant.USERNAME) + and datasource_info.get(c.OWNER) + and datasource_info[c.OWNER].get(c.USERNAME) else None ) if owner is not None: @@ -1742,24 +1695,22 @@ class TableauSource(StatefulIngestionSourceBase): # Dataset properties dataset_props = DatasetPropertiesClass( - name=datasource.get(tableau_constant.NAME), - description=datasource.get(tableau_constant.DESCRIPTION), + name=datasource.get(c.NAME), + description=datasource.get(c.DESCRIPTION), customProperties=self.get_custom_props_from_dict( datasource, [ - tableau_constant.HAS_EXTRACTS, - tableau_constant.EXTRACT_LAST_REFRESH_TIME, - tableau_constant.EXTRACT_LAST_INCREMENTAL_UPDATE_TIME, - tableau_constant.EXTRACT_LAST_UPDATE_TIME, + c.HAS_EXTRACTS, + c.EXTRACT_LAST_REFRESH_TIME, + c.EXTRACT_LAST_INCREMENTAL_UPDATE_TIME, + c.EXTRACT_LAST_UPDATE_TIME, ], ), ) dataset_snapshot.aspects.append(dataset_props) # Upstream Tables - if datasource.get(tableau_constant.UPSTREAM_TABLES) or datasource.get( - tableau_constant.UPSTREAM_DATA_SOURCES - ): + if datasource.get(c.UPSTREAM_TABLES) or datasource.get(c.UPSTREAM_DATA_SOURCES): # datasource -> db table relations ( upstream_tables, @@ -1779,13 +1730,13 @@ class TableauSource(StatefulIngestionSourceBase): ) yield self.get_metadata_change_proposal( datasource_urn, - aspect_name=tableau_constant.UPSTREAM_LINEAGE, + aspect_name=c.UPSTREAM_LINEAGE, aspect=upstream_lineage, ) # Datasource Fields schema_metadata = self._get_schema_metadata_for_datasource( - datasource.get(tableau_constant.FIELDS, []) + datasource.get(c.FIELDS, []) ) if schema_metadata is not None: dataset_snapshot.aspects.append(schema_metadata) @@ -1793,7 +1744,7 @@ class TableauSource(StatefulIngestionSourceBase): yield self.get_metadata_change_event(dataset_snapshot) yield self.get_metadata_change_proposal( dataset_snapshot.urn, - aspect_name=tableau_constant.SUB_TYPES, + aspect_name=c.SUB_TYPES, aspect=SubTypesClass( typeNames=( ["Embedded Data Source"] @@ -1809,7 +1760,7 @@ class TableauSource(StatefulIngestionSourceBase): if container_key is not None: yield from add_entity_to_container( container_key, - tableau_constant.DATASET, + c.DATASET, dataset_snapshot.urn, ) @@ -1822,10 +1773,10 @@ class TableauSource(StatefulIngestionSourceBase): container_key: Optional[ContainerKey] = None if is_embedded_ds: # It is embedded then parent is container is workbook if workbook is not None: - container_key = self.gen_workbook_key(workbook[tableau_constant.ID]) + container_key = self.gen_workbook_key(workbook[c.ID]) else: logger.warning( - f"Parent container not set for embedded datasource {datasource[tableau_constant.ID]}" + f"Parent container not set for embedded datasource {datasource[c.ID]}" ) else: parent_project_luid = self._get_published_datasource_project_luid( @@ -1836,17 +1787,19 @@ class TableauSource(StatefulIngestionSourceBase): container_key = self.gen_project_key(parent_project_luid) else: logger.warning( - f"Parent container not set for published datasource {datasource[tableau_constant.ID]}" + f"Parent container not set for published datasource {datasource[c.ID]}" ) return container_key def emit_published_datasources(self) -> Iterable[MetadataWorkUnit]: - datasource_filter = f"{tableau_constant.ID_WITH_IN}: {json.dumps(self.datasource_ids_being_used)}" + datasource_filter = ( + f"{c.ID_WITH_IN}: {json.dumps(self.datasource_ids_being_used)}" + ) for datasource in self.get_connection_objects( published_datasource_graphql_query, - tableau_constant.PUBLISHED_DATA_SOURCES_CONNECTION, + c.PUBLISHED_DATA_SOURCES_CONNECTION, datasource_filter, ): yield from self.emit_datasource(datasource) @@ -1855,11 +1808,13 @@ class TableauSource(StatefulIngestionSourceBase): database_table_id_to_urn_map: Dict[str, str] = dict() for urn, tbl in self.database_tables.items(): database_table_id_to_urn_map[tbl.id] = urn - tables_filter = f"{tableau_constant.ID_WITH_IN}: {json.dumps(list(database_table_id_to_urn_map.keys()))}" + tables_filter = ( + f"{c.ID_WITH_IN}: {json.dumps(list(database_table_id_to_urn_map.keys()))}" + ) for table in self.get_connection_objects( database_tables_graphql_query, - tableau_constant.DATABASE_TABLES_CONNECTION, + c.DATABASE_TABLES_CONNECTION, tables_filter, ): yield from self.emit_table(table, database_table_id_to_urn_map) @@ -1867,11 +1822,9 @@ class TableauSource(StatefulIngestionSourceBase): def emit_table( self, table: dict, database_table_id_to_urn_map: Dict[str, str] ) -> Iterable[MetadataWorkUnit]: - database_table = self.database_tables[ - database_table_id_to_urn_map[table[tableau_constant.ID]] - ] - columns = table.get(tableau_constant.COLUMNS, []) - is_embedded = table.get(tableau_constant.IS_EMBEDDED) or False + database_table = self.database_tables[database_table_id_to_urn_map[table[c.ID]]] + columns = table.get(c.COLUMNS, []) + is_embedded = table.get(c.IS_EMBEDDED) or False if not is_embedded and not self.config.ingest_tables_external: logger.debug( f"Skipping external table {database_table.urn} as ingest_tables_external is set to False" @@ -1907,21 +1860,19 @@ class TableauSource(StatefulIngestionSourceBase): if columns: fields = [] for field in columns: - if field.get(tableau_constant.NAME) is None: + if field.get(c.NAME) is None: self.report.num_table_field_skipped_no_name += 1 logger.warning( - f"Skipping field {field[tableau_constant.ID]} from schema since its name is none" + f"Skipping field {field[c.ID]} from schema since its name is none" ) continue - nativeDataType = field.get( - tableau_constant.REMOTE_TYPE, tableau_constant.UNKNOWN - ) + nativeDataType = field.get(c.REMOTE_TYPE, c.UNKNOWN) TypeClass = FIELD_TYPE_MAPPING.get(nativeDataType, NullTypeClass) schema_field = SchemaField( - fieldPath=field[tableau_constant.NAME], + fieldPath=field[c.NAME], type=SchemaFieldDataType(type=TypeClass()), - description=field.get(tableau_constant.DESCRIPTION), + description=field.get(c.DESCRIPTION), nativeDataType=nativeDataType, ) @@ -1941,11 +1892,9 @@ class TableauSource(StatefulIngestionSourceBase): def get_sheetwise_upstream_datasources(self, sheet: dict) -> set: sheet_upstream_datasources = set() - for field in sheet.get(tableau_constant.DATA_SOURCE_FIELDS) or []: - if field and field.get(tableau_constant.DATA_SOURCE): - sheet_upstream_datasources.add( - field[tableau_constant.DATA_SOURCE][tableau_constant.ID] - ) + for field in sheet.get(c.DATA_SOURCE_FIELDS) or []: + if field and field.get(c.DATA_SOURCE): + sheet_upstream_datasources.add(field[c.DATA_SOURCE][c.ID]) return sheet_upstream_datasources @@ -1961,20 +1910,20 @@ class TableauSource(StatefulIngestionSourceBase): def _get_chart_stat_wu( self, sheet: dict, sheet_urn: str ) -> Optional[MetadataWorkUnit]: - luid: Optional[str] = sheet.get(tableau_constant.LUID) + luid: Optional[str] = sheet.get(c.LUID) if luid is None: logger.debug( "stat:luid is none for sheet %s(id:%s)", - sheet.get(tableau_constant.NAME), - sheet.get(tableau_constant.ID), + sheet.get(c.NAME), + sheet.get(c.ID), ) return None usage_stat: Optional[UsageStat] = self.tableau_stat_registry.get(luid) if usage_stat is None: logger.debug( "stat:UsageStat is not available in tableau_stat_registry for sheet %s(id:%s)", - sheet.get(tableau_constant.NAME), - sheet.get(tableau_constant.ID), + sheet.get(c.NAME), + sheet.get(c.ID), ) return None @@ -1983,8 +1932,8 @@ class TableauSource(StatefulIngestionSourceBase): ) logger.debug( "stat: Chart usage stat work unit is created for %s(id:%s)", - sheet.get(tableau_constant.NAME), - sheet.get(tableau_constant.ID), + sheet.get(c.NAME), + sheet.get(c.ID), ) return MetadataChangeProposalWrapper( aspect=aspect, @@ -1992,22 +1941,20 @@ class TableauSource(StatefulIngestionSourceBase): ).as_workunit() def emit_sheets(self) -> Iterable[MetadataWorkUnit]: - sheets_filter = f"{tableau_constant.ID_WITH_IN}: {json.dumps(self.sheet_ids)}" + sheets_filter = f"{c.ID_WITH_IN}: {json.dumps(self.sheet_ids)}" for sheet in self.get_connection_objects( sheet_graphql_query, - tableau_constant.SHEETS_CONNECTION, + c.SHEETS_CONNECTION, sheets_filter, ): - yield from self.emit_sheets_as_charts( - sheet, sheet.get(tableau_constant.WORKBOOK) - ) + yield from self.emit_sheets_as_charts(sheet, sheet.get(c.WORKBOOK)) def emit_sheets_as_charts( self, sheet: dict, workbook: Optional[Dict] ) -> Iterable[MetadataWorkUnit]: sheet_urn: str = builder.make_chart_urn( - self.platform, sheet[tableau_constant.ID], self.config.platform_instance + self.platform, sheet[c.ID], self.config.platform_instance ) chart_snapshot = ChartSnapshot( urn=sheet_urn, @@ -2015,34 +1962,32 @@ class TableauSource(StatefulIngestionSourceBase): ) creator: Optional[str] = None - if workbook is not None and workbook.get(tableau_constant.OWNER) is not None: - creator = workbook[tableau_constant.OWNER].get(tableau_constant.USERNAME) - created_at = sheet.get(tableau_constant.CREATED_AT, datetime.now()) - updated_at = sheet.get(tableau_constant.UPDATED_AT, datetime.now()) + if workbook is not None and workbook.get(c.OWNER) is not None: + creator = workbook[c.OWNER].get(c.USERNAME) + created_at = sheet.get(c.CREATED_AT, datetime.now()) + updated_at = sheet.get(c.UPDATED_AT, datetime.now()) last_modified = self.get_last_modified(creator, created_at, updated_at) - if sheet.get(tableau_constant.PATH): + if sheet.get(c.PATH): site_part = f"/site/{self.config.site}" if self.config.site else "" - sheet_external_url = f"{self.config.connect_uri}/#{site_part}/views/{sheet.get(tableau_constant.PATH)}" - elif ( - sheet.get(tableau_constant.CONTAINED_IN_DASHBOARDS) is not None - and len(sheet[tableau_constant.CONTAINED_IN_DASHBOARDS]) > 0 - and sheet[tableau_constant.CONTAINED_IN_DASHBOARDS][0] is not None - and sheet[tableau_constant.CONTAINED_IN_DASHBOARDS][0].get( - tableau_constant.PATH + sheet_external_url = ( + f"{self.config.connect_uri}/#{site_part}/views/{sheet.get(c.PATH)}" ) + elif ( + sheet.get(c.CONTAINED_IN_DASHBOARDS) is not None + and len(sheet[c.CONTAINED_IN_DASHBOARDS]) > 0 + and sheet[c.CONTAINED_IN_DASHBOARDS][0] is not None + and sheet[c.CONTAINED_IN_DASHBOARDS][0].get(c.PATH) ): # sheet contained in dashboard site_part = f"/t/{self.config.site}" if self.config.site else "" - dashboard_path = sheet[tableau_constant.CONTAINED_IN_DASHBOARDS][0][ - tableau_constant.PATH - ] - sheet_external_url = f"{self.config.connect_uri}{site_part}/authoring/{dashboard_path}/{sheet.get(tableau_constant.NAME, '')}" + dashboard_path = sheet[c.CONTAINED_IN_DASHBOARDS][0][c.PATH] + sheet_external_url = f"{self.config.connect_uri}{site_part}/authoring/{dashboard_path}/{sheet.get(c.NAME, '')}" else: # hidden or viz-in-tooltip sheet sheet_external_url = None input_fields: List[InputField] = [] - if sheet.get(tableau_constant.DATA_SOURCE_FIELDS): + if sheet.get(c.DATA_SOURCE_FIELDS): self.populate_sheet_upstream_fields(sheet, input_fields) # datasource urn @@ -2060,15 +2005,13 @@ class TableauSource(StatefulIngestionSourceBase): # Chart Info chart_info = ChartInfoClass( description="", - title=sheet.get(tableau_constant.NAME) or "", + title=sheet.get(c.NAME) or "", lastModified=last_modified, externalUrl=sheet_external_url if self.config.ingest_external_links_for_charts else None, inputs=sorted(datasource_urn), - customProperties=self.get_custom_props_from_dict( - sheet, [tableau_constant.LUID] - ), + customProperties=self.get_custom_props_from_dict(sheet, [c.LUID]), ) chart_snapshot.aspects.append(chart_info) # chart_snapshot doesn't support the stat aspect as list element and hence need to emit MCP @@ -2083,7 +2026,7 @@ class TableauSource(StatefulIngestionSourceBase): chart_snapshot.aspects.append(browse_paths) else: logger.warning( - f"Could not set browse path for workbook {sheet[tableau_constant.ID]}. Please check permissions." + f"Could not set browse path for workbook {sheet[c.ID]}. Please check permissions." ) # Ownership @@ -2107,9 +2050,7 @@ class TableauSource(StatefulIngestionSourceBase): ) if workbook is not None: yield from add_entity_to_container( - self.gen_workbook_key(workbook[tableau_constant.ID]), - tableau_constant.CHART, - chart_snapshot.urn, + self.gen_workbook_key(workbook[c.ID]), c.CHART, chart_snapshot.urn ) if input_fields: @@ -2134,14 +2075,12 @@ class TableauSource(StatefulIngestionSourceBase): def populate_sheet_upstream_fields( self, sheet: dict, input_fields: List[InputField] ) -> None: - for field in sheet.get(tableau_constant.DATA_SOURCE_FIELDS): # type: ignore + for field in sheet.get(c.DATA_SOURCE_FIELDS): # type: ignore if not field: continue - name = field.get(tableau_constant.NAME) + name = field.get(c.NAME) upstream_ds_id = ( - field.get(tableau_constant.DATA_SOURCE)[tableau_constant.ID] - if field.get(tableau_constant.DATA_SOURCE) - else None + field.get(c.DATA_SOURCE)[c.ID] if field.get(c.DATA_SOURCE) else None ) if name and upstream_ds_id: input_fields.append( @@ -2162,10 +2101,8 @@ class TableauSource(StatefulIngestionSourceBase): ) def emit_workbook_as_container(self, workbook: Dict) -> Iterable[MetadataWorkUnit]: - workbook_container_key = self.gen_workbook_key(workbook[tableau_constant.ID]) - creator = workbook.get(tableau_constant.OWNER, {}).get( - tableau_constant.USERNAME - ) + workbook_container_key = self.gen_workbook_key(workbook[c.ID]) + creator = workbook.get(c.OWNER, {}).get(c.USERNAME) owner_urn = ( builder.make_user_urn(creator) @@ -2191,17 +2128,17 @@ class TableauSource(StatefulIngestionSourceBase): if project_luid and project_luid in self.tableau_project_registry.keys(): parent_key = self.gen_project_key(project_luid) else: - workbook_id: Optional[str] = workbook.get(tableau_constant.ID) - workbook_name: Optional[str] = workbook.get(tableau_constant.NAME) + workbook_id: Optional[str] = workbook.get(c.ID) + workbook_name: Optional[str] = workbook.get(c.NAME) logger.warning( f"Could not load project hierarchy for workbook {workbook_name}({workbook_id}). Please check permissions." ) yield from gen_containers( container_key=workbook_container_key, - name=workbook.get(tableau_constant.NAME) or "", + name=workbook.get(c.NAME) or "", parent_container_key=parent_key, - description=workbook.get(tableau_constant.DESCRIPTION), + description=workbook.get(c.DESCRIPTION), sub_types=[BIContainerSubTypes.TABLEAU_WORKBOOK], owner_urn=owner_urn, external_url=workbook_external_url, @@ -2237,20 +2174,20 @@ class TableauSource(StatefulIngestionSourceBase): def _get_dashboard_stat_wu( self, dashboard: dict, dashboard_urn: str ) -> Optional[MetadataWorkUnit]: - luid: Optional[str] = dashboard.get(tableau_constant.LUID) + luid: Optional[str] = dashboard.get(c.LUID) if luid is None: logger.debug( "stat:luid is none for dashboard %s(id:%s)", - dashboard.get(tableau_constant.NAME), - dashboard.get(tableau_constant.ID), + dashboard.get(c.NAME), + dashboard.get(c.ID), ) return None usage_stat: Optional[UsageStat] = self.tableau_stat_registry.get(luid) if usage_stat is None: logger.debug( "stat:UsageStat is not available in tableau_stat_registry for dashboard %s(id:%s)", - dashboard.get(tableau_constant.NAME), - dashboard.get(tableau_constant.ID), + dashboard.get(c.NAME), + dashboard.get(c.ID), ) return None @@ -2259,8 +2196,8 @@ class TableauSource(StatefulIngestionSourceBase): ) logger.debug( "stat: Dashboard usage stat is created for %s(id:%s)", - dashboard.get(tableau_constant.NAME), - dashboard.get(tableau_constant.ID), + dashboard.get(c.NAME), + dashboard.get(c.ID), ) return MetadataChangeProposalWrapper( @@ -2288,26 +2225,20 @@ class TableauSource(StatefulIngestionSourceBase): ) def emit_dashboards(self) -> Iterable[MetadataWorkUnit]: - dashboards_filter = ( - f"{tableau_constant.ID_WITH_IN}: {json.dumps(self.dashboard_ids)}" - ) + dashboards_filter = f"{c.ID_WITH_IN}: {json.dumps(self.dashboard_ids)}" for dashboard in self.get_connection_objects( dashboard_graphql_query, - tableau_constant.DASHBOARDS_CONNECTION, + c.DASHBOARDS_CONNECTION, dashboards_filter, ): - yield from self.emit_dashboard( - dashboard, dashboard.get(tableau_constant.WORKBOOK) - ) + yield from self.emit_dashboard(dashboard, dashboard.get(c.WORKBOOK)) def get_tags(self, obj: dict) -> Optional[List[str]]: - tag_list = obj.get(tableau_constant.TAGS, []) + tag_list = obj.get(c.TAGS, []) if tag_list and self.config.ingest_tags: tag_list_str = [ - t[tableau_constant.NAME] - for t in tag_list - if t is not None and t.get(tableau_constant.NAME) + t[c.NAME] for t in tag_list if t is not None and t.get(c.NAME) ] return tag_list_str @@ -2317,7 +2248,7 @@ class TableauSource(StatefulIngestionSourceBase): self, dashboard: dict, workbook: Optional[Dict] ) -> Iterable[MetadataWorkUnit]: dashboard_urn: str = builder.make_dashboard_urn( - self.platform, dashboard[tableau_constant.ID], self.config.platform_instance + self.platform, dashboard[c.ID], self.config.platform_instance ) dashboard_snapshot = DashboardSnapshot( urn=dashboard_urn, @@ -2325,26 +2256,28 @@ class TableauSource(StatefulIngestionSourceBase): ) creator: Optional[str] = None - if workbook is not None and workbook.get(tableau_constant.OWNER) is not None: - creator = workbook[tableau_constant.OWNER].get(tableau_constant.USERNAME) - created_at = dashboard.get(tableau_constant.CREATED_AT, datetime.now()) - updated_at = dashboard.get(tableau_constant.UPDATED_AT, datetime.now()) + if workbook is not None and workbook.get(c.OWNER) is not None: + creator = workbook[c.OWNER].get(c.USERNAME) + created_at = dashboard.get(c.CREATED_AT, datetime.now()) + updated_at = dashboard.get(c.UPDATED_AT, datetime.now()) last_modified = self.get_last_modified(creator, created_at, updated_at) site_part = f"/site/{self.config.site}" if self.config.site else "" - dashboard_external_url = f"{self.config.connect_uri}/#{site_part}/views/{dashboard.get(tableau_constant.PATH, '')}" + dashboard_external_url = ( + f"{self.config.connect_uri}/#{site_part}/views/{dashboard.get(c.PATH, '')}" + ) title = ( - dashboard[tableau_constant.NAME].replace("/", REPLACE_SLASH_CHAR) - if dashboard.get(tableau_constant.NAME) + dashboard[c.NAME].replace("/", REPLACE_SLASH_CHAR) + if dashboard.get(c.NAME) else "" ) chart_urns = [ builder.make_chart_urn( self.platform, - sheet.get(tableau_constant.ID), + sheet.get(c.ID), self.config.platform_instance, ) - for sheet in dashboard.get(tableau_constant.SHEETS, []) + for sheet in dashboard.get(c.SHEETS, []) ] dashboard_info_class = DashboardInfoClass( description="", @@ -2354,9 +2287,7 @@ class TableauSource(StatefulIngestionSourceBase): dashboardUrl=dashboard_external_url if self.config.ingest_external_links_for_dashboards else None, - customProperties=self.get_custom_props_from_dict( - dashboard, [tableau_constant.LUID] - ), + customProperties=self.get_custom_props_from_dict(dashboard, [c.LUID]), ) dashboard_snapshot.aspects.append(dashboard_info_class) @@ -2377,7 +2308,7 @@ class TableauSource(StatefulIngestionSourceBase): dashboard_snapshot.aspects.append(browse_paths) else: logger.warning( - f"Could not set browse path for dashboard {dashboard[tableau_constant.ID]}. Please check permissions." + f"Could not set browse path for dashboard {dashboard[c.ID]}. Please check permissions." ) # Ownership @@ -2397,8 +2328,8 @@ class TableauSource(StatefulIngestionSourceBase): if workbook is not None: yield from add_entity_to_container( - self.gen_workbook_key(workbook[tableau_constant.ID]), - tableau_constant.DASHBOARD, + self.gen_workbook_key(workbook[c.ID]), + c.DASHBOARD, dashboard_snapshot.urn, ) @@ -2406,38 +2337,40 @@ class TableauSource(StatefulIngestionSourceBase): self, workbook: Optional[Dict] ) -> Optional[BrowsePathsClass]: browse_paths: Optional[BrowsePathsClass] = None - if workbook and workbook.get(tableau_constant.NAME): + if workbook and workbook.get(c.NAME): project_luid: Optional[str] = self._get_workbook_project_luid(workbook) if project_luid in self.tableau_project_registry: browse_paths = BrowsePathsClass( paths=[ f"/{self.platform}/{self._project_luid_to_browse_path_name(project_luid)}" - f"/{workbook[tableau_constant.NAME].replace('/', REPLACE_SLASH_CHAR)}" + f"/{workbook[c.NAME].replace('/', REPLACE_SLASH_CHAR)}" ] ) - elif workbook.get(tableau_constant.PROJECT_NAME): + elif workbook.get(c.PROJECT_NAME): # browse path browse_paths = BrowsePathsClass( paths=[ - f"/{self.platform}/{workbook[tableau_constant.PROJECT_NAME].replace('/', REPLACE_SLASH_CHAR)}" - f"/{workbook[tableau_constant.NAME].replace('/', REPLACE_SLASH_CHAR)}" + f"/{self.platform}/{workbook[c.PROJECT_NAME].replace('/', REPLACE_SLASH_CHAR)}" + f"/{workbook[c.NAME].replace('/', REPLACE_SLASH_CHAR)}" ] ) return browse_paths def emit_embedded_datasources(self) -> Iterable[MetadataWorkUnit]: - datasource_filter = f"{tableau_constant.ID_WITH_IN}: {json.dumps(self.embedded_datasource_ids_being_used)}" + datasource_filter = ( + f"{c.ID_WITH_IN}: {json.dumps(self.embedded_datasource_ids_being_used)}" + ) for datasource in self.get_connection_objects( embedded_datasource_graphql_query, - tableau_constant.EMBEDDED_DATA_SOURCES_CONNECTION, + c.EMBEDDED_DATA_SOURCES_CONNECTION, datasource_filter, ): yield from self.emit_datasource( datasource, - datasource.get(tableau_constant.WORKBOOK), + datasource.get(c.WORKBOOK), is_embedded_ds=True, ) @@ -2483,7 +2416,7 @@ class TableauSource(StatefulIngestionSourceBase): container_key=self.gen_project_key(_id), name=project.name, description=project.description, - sub_types=[tableau_constant.PROJECT], + sub_types=[c.PROJECT], parent_container_key=self.gen_project_key(project.parent_id) if project.parent_id else None, @@ -2498,7 +2431,7 @@ class TableauSource(StatefulIngestionSourceBase): yield from gen_containers( container_key=self.gen_project_key(project.parent_id), name=cast(str, project.parent_name), - sub_types=[tableau_constant.PROJECT], + sub_types=[c.PROJECT], ) def get_workunit_processors(self) -> List[Optional[MetadataWorkUnitProcessor]]: diff --git a/metadata-ingestion/src/datahub/ingestion/source/tableau_common.py b/metadata-ingestion/src/datahub/ingestion/source/tableau_common.py index 7c4852042c..65d779b7f4 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/tableau_common.py +++ b/metadata-ingestion/src/datahub/ingestion/source/tableau_common.py @@ -8,7 +8,7 @@ from pydantic.fields import Field import datahub.emitter.mce_builder as builder from datahub.configuration.common import ConfigModel -from datahub.ingestion.source import tableau_constant as tc +from datahub.ingestion.source import tableau_constant as c from datahub.metadata.com.linkedin.pegasus2avro.dataset import ( DatasetLineageType, FineGrainedLineage, @@ -591,12 +591,12 @@ class TableauUpstreamReference: cls, d: dict, default_schema_map: Optional[Dict[str, str]] = None ) -> "TableauUpstreamReference": # Values directly from `table` object from Tableau - database = t_database = d.get(tc.DATABASE, {}).get(tc.NAME) - schema = t_schema = d.get(tc.SCHEMA) - table = t_table = d.get(tc.NAME) or "" - t_full_name = d.get(tc.FULL_NAME) - t_connection_type = d[tc.CONNECTION_TYPE] # required to generate urn - t_id = d[tc.ID] + database = t_database = d.get(c.DATABASE, {}).get(c.NAME) + schema = t_schema = d.get(c.SCHEMA) + table = t_table = d.get(c.NAME) or "" + t_full_name = d.get(c.FULL_NAME) + t_connection_type = d[c.CONNECTION_TYPE] # required to generate urn + t_id = d[c.ID] parsed_full_name = cls.parse_full_name(t_full_name) if parsed_full_name and len(parsed_full_name) == 3: