diff --git a/ingestion/setup.py b/ingestion/setup.py index b2a80fff4ee..9f44a9e8012 100644 --- a/ingestion/setup.py +++ b/ingestion/setup.py @@ -51,6 +51,7 @@ VERSIONS = { "snowflake": "snowflake-sqlalchemy~=1.4", "elasticsearch8": "elasticsearch8~=8.9.0", "giturlparse": "giturlparse", + "validators": "validators~=0.22.0", } COMMONS = { @@ -249,7 +250,7 @@ plugins: Dict[str, Set[str]] = { "sklearn": {VERSIONS["scikit-learn"]}, "snowflake": {VERSIONS["snowflake"]}, "superset": {}, # uses requests - "tableau": {VERSIONS["tableau"]}, + "tableau": {VERSIONS["tableau"], VERSIONS["validators"], VERSIONS["packaging"]}, "trino": {VERSIONS["trino"]}, "vertica": {"sqlalchemy-vertica[vertica-python]>=0.0.5"}, "pii-processor": pii_requirements, diff --git a/ingestion/src/metadata/ingestion/source/dashboard/dashboard_service.py b/ingestion/src/metadata/ingestion/source/dashboard/dashboard_service.py index 7046f4efd23..bbda61c836e 100644 --- a/ingestion/src/metadata/ingestion/source/dashboard/dashboard_service.py +++ b/ingestion/src/metadata/ingestion/source/dashboard/dashboard_service.py @@ -105,7 +105,7 @@ class DashboardServiceTopology(ServiceTopology): ), NodeStage( type_=OMetaTagAndClassification, - processor="yield_tag", + processor="yield_bulk_tags", nullable=True, ), ], @@ -133,6 +133,11 @@ class DashboardServiceTopology(ServiceTopology): dashboard = TopologyNode( producer="get_dashboard", stages=[ + NodeStage( + type_=OMetaTagAndClassification, + processor="yield_tags", + nullable=True, + ), NodeStage( type_=Chart, context="charts", @@ -333,7 +338,16 @@ class DashboardServiceSource(TopologyRunnerMixin, Source, ABC): dashboard_details, db_service_name ) or [] - def yield_tag(self, *args, **kwargs) -> Iterable[Either[OMetaTagAndClassification]]: + def yield_bulk_tags( + self, *args, **kwargs + ) -> Iterable[Either[OMetaTagAndClassification]]: + """ + Method to bulk fetch dashboard tags + """ + + def yield_tags( + self, dashboard_details + ) -> Iterable[Either[OMetaTagAndClassification]]: """ Method to fetch dashboard tags """ @@ -388,7 +402,7 @@ class DashboardServiceSource(TopologyRunnerMixin, Source, ABC): def process_owner(self, dashboard_details): """ - Method to process the dashboard onwers + Method to process the dashboard owners """ try: if self.source_config.includeOwners: @@ -429,7 +443,7 @@ class DashboardServiceSource(TopologyRunnerMixin, Source, ABC): self.dashboard_source_state.add(dashboard_fqn) def register_record_datamodel( - self, datamodel_requst: CreateDashboardDataModelRequest + self, datamodel_request: CreateDashboardDataModelRequest ) -> None: """ Mark the datamodel record as scanned and update the datamodel_source_state @@ -437,8 +451,8 @@ class DashboardServiceSource(TopologyRunnerMixin, Source, ABC): datamodel_fqn = fqn.build( self.metadata, entity_type=DashboardDataModel, - service_name=datamodel_requst.service.__root__, - data_model_name=datamodel_requst.name.__root__, + service_name=datamodel_request.service.__root__, + data_model_name=datamodel_request.name.__root__, ) self.datamodel_source_state.add(datamodel_fqn) @@ -505,7 +519,7 @@ class DashboardServiceSource(TopologyRunnerMixin, Source, ABC): self.context.project_name = ( # pylint: disable=assignment-from-none self.get_project_name(dashboard_details=dashboard_details) ) - if self.context.project_name and filter_by_project( + if filter_by_project( self.source_config.projectFilterPattern, self.context.project_name, ): diff --git a/ingestion/src/metadata/ingestion/source/dashboard/looker/metadata.py b/ingestion/src/metadata/ingestion/source/dashboard/looker/metadata.py index be5fc575f04..1aa867fa290 100644 --- a/ingestion/src/metadata/ingestion/source/dashboard/looker/metadata.py +++ b/ingestion/src/metadata/ingestion/source/dashboard/looker/metadata.py @@ -407,7 +407,7 @@ class LookerSource(DashboardServiceSource): project=model.project_name, ) yield Either(right=explore_datamodel) - self.register_record_datamodel(datamodel_requst=explore_datamodel) + self.register_record_datamodel(datamodel_request=explore_datamodel) # build datamodel by our hand since ack_sink=False self.context.dataModel = self._build_data_model(datamodel_name) @@ -508,7 +508,7 @@ class LookerSource(DashboardServiceSource): self._view_data_model = self._build_data_model( build_datamodel_name(explore.model_name, view.name) ) - self.register_record_datamodel(datamodel_requst=data_model_request) + self.register_record_datamodel(datamodel_request=data_model_request) yield from self.add_view_lineage(view, explore) else: yield Either( diff --git a/ingestion/src/metadata/ingestion/source/dashboard/powerbi/metadata.py b/ingestion/src/metadata/ingestion/source/dashboard/powerbi/metadata.py index 2f22493adeb..3d050657e35 100644 --- a/ingestion/src/metadata/ingestion/source/dashboard/powerbi/metadata.py +++ b/ingestion/src/metadata/ingestion/source/dashboard/powerbi/metadata.py @@ -327,7 +327,7 @@ class PowerbiSource(DashboardServiceSource): project=self._fetch_dataset_workspace(dataset_id=dataset.id), ) yield Either(right=data_model_request) - self.register_record_datamodel(datamodel_requst=data_model_request) + self.register_record_datamodel(datamodel_request=data_model_request) except Exception as exc: yield Either( diff --git a/ingestion/src/metadata/ingestion/source/dashboard/qliksense/metadata.py b/ingestion/src/metadata/ingestion/source/dashboard/qliksense/metadata.py index b2d92158ea9..cc7c9e4af32 100644 --- a/ingestion/src/metadata/ingestion/source/dashboard/qliksense/metadata.py +++ b/ingestion/src/metadata/ingestion/source/dashboard/qliksense/metadata.py @@ -224,7 +224,7 @@ class QliksenseSource(DashboardServiceSource): columns=self.get_column_info(data_model), ) yield Either(right=data_model_request) - self.register_record_datamodel(datamodel_requst=data_model_request) + self.register_record_datamodel(datamodel_request=data_model_request) except Exception as exc: name = ( data_model.tableName if data_model.tableName else data_model.id diff --git a/ingestion/src/metadata/ingestion/source/dashboard/redash/metadata.py b/ingestion/src/metadata/ingestion/source/dashboard/redash/metadata.py index 45c4f07e6c6..6c9a6b2804b 100644 --- a/ingestion/src/metadata/ingestion/source/dashboard/redash/metadata.py +++ b/ingestion/src/metadata/ingestion/source/dashboard/redash/metadata.py @@ -87,7 +87,7 @@ class RedashSource(DashboardServiceSource): for dashboard in self.dashboard_list: self.tags.extend(dashboard.get("tags") or []) - def yield_tag(self, *_, **__) -> Iterable[Either[OMetaTagAndClassification]]: + def yield_bulk_tags(self, *_, **__) -> Iterable[Either[OMetaTagAndClassification]]: """Fetch Dashboard Tags""" yield from get_ometa_tag_and_classification( tags=self.tags, diff --git a/ingestion/src/metadata/ingestion/source/dashboard/superset/api_source.py b/ingestion/src/metadata/ingestion/source/dashboard/superset/api_source.py index 88e9c02a176..b1ce2d9906e 100644 --- a/ingestion/src/metadata/ingestion/source/dashboard/superset/api_source.py +++ b/ingestion/src/metadata/ingestion/source/dashboard/superset/api_source.py @@ -220,7 +220,7 @@ class SupersetAPISource(SupersetSourceMixin): dataModelType=DataModelType.SupersetDataModel.value, ) yield Either(right=data_model_request) - self.register_record_datamodel(datamodel_requst=data_model_request) + self.register_record_datamodel(datamodel_request=data_model_request) except Exception as exc: yield Either( left=StackTraceError( diff --git a/ingestion/src/metadata/ingestion/source/dashboard/superset/db_source.py b/ingestion/src/metadata/ingestion/source/dashboard/superset/db_source.py index 098ff1f347f..1b03c5744ba 100644 --- a/ingestion/src/metadata/ingestion/source/dashboard/superset/db_source.py +++ b/ingestion/src/metadata/ingestion/source/dashboard/superset/db_source.py @@ -239,7 +239,7 @@ class SupersetDBSource(SupersetSourceMixin): dataModelType=DataModelType.SupersetDataModel.value, ) yield Either(right=data_model_request) - self.register_record_datamodel(datamodel_requst=data_model_request) + self.register_record_datamodel(datamodel_request=data_model_request) except Exception as exc: yield Either( diff --git a/ingestion/src/metadata/ingestion/source/dashboard/tableau/client.py b/ingestion/src/metadata/ingestion/source/dashboard/tableau/client.py index ab29472bb23..c0cdc242823 100644 --- a/ingestion/src/metadata/ingestion/source/dashboard/tableau/client.py +++ b/ingestion/src/metadata/ingestion/source/dashboard/tableau/client.py @@ -15,7 +15,9 @@ import math import traceback from typing import Any, Callable, Dict, List, Optional +import validators from cached_property import cached_property +from packaging import version from tableau_api_lib import TableauServerConnection from tableau_api_lib.utils import extract_pages @@ -60,7 +62,8 @@ class TableauClient: def __init__( self, - config: Dict[str, Dict[str, Any]], + tableau_server_config: Dict[str, Dict[str, Any]], + config, env: str, ssl_verify: bool, pagination_limit: int, @@ -70,10 +73,11 @@ class TableauClient: # In requests (https://requests.readthedocs.io/en/latest/user/advanced.html?highlight=ssl#ssl-cert-verification) # the param can be None, False to ignore HTTPS certs or a string with the path to the cert. self._client = TableauServerConnection( - config_json=config, + config_json=tableau_server_config, env=env, ssl_verify=ssl_verify, ) + self.config = config self._client.sign_in().json() self.pagination_limit = pagination_limit @@ -81,6 +85,10 @@ class TableauClient: def server_info(self) -> Callable: return self._client.server_info + @property + def server_api_version(self) -> str: + return self.server_info().json()["serverInfo"]["restApiVersion"] + @property def site_id(self) -> str: return self._client.site_id @@ -122,11 +130,79 @@ class TableauClient: ) ] + def get_workbook_charts(self, dashboard_id: str) -> Optional[List[TableauChart]]: + """ + Get the charts for a workbook + """ + try: + return [ + TableauChart(**chart) + for chart in self._client.query_views_for_workbook( + workbook_id=dashboard_id, + parameter_dict=TABLEAU_GET_VIEWS_PARAM_DICT, + ).json()["views"]["view"] + ] + except Exception as exc: + logger.debug(traceback.format_exc()) + logger.warning( + f"Error processing charts for dashboard [{dashboard_id}]: {exc}" + ) + return None + + def test_api_version(self): + """ + Method to validate the declared v/s server tableau rest api version + """ + server_api_version = version.parse(self.server_api_version) + declared_api_version = version.parse(self.config.apiVersion) + if declared_api_version > server_api_version: + raise ValueError( + f""" + Your API version of '{declared_api_version}' is too damn high! + The server you are establishing a connection with is using REST API version '{server_api_version}'. + """ + ) + if declared_api_version < server_api_version: + raise ValueError( + f""" + The Tableau Server REST API version you specified is lower than the version your server uses. + Your Tableau Server is on REST API version {server_api_version}. + The REST API version you specified is {declared_api_version}. + For optimal results, please change the 'api_version' config variable to {server_api_version}. + """ + ) + + def test_site_url(self): + """ + Method to test the site url and site name fields + """ + validation = validators.url(self.config.siteUrl) + if validation: + raise ValueError( + f""" + The site url "{self.config.siteUrl}" is in incorrect format. + If "https://xxx.tableau.com/#/site/MarketingTeam/home" represents the homepage url for your tableau site, + the "MarketingTeam" from the url should be entered in the Site Name and Site Url fields. + """ + ) + return True + def test_get_datamodels(self): """ Method to test the datamodels """ - data = self._query_datasources(entities_per_page=1, offset=0) + workbooks = self.get_workbooks() + + if len(workbooks) == 0: + raise TableauDataModelsException( + "Unable to get any workbooks to fetch tableau data sources" + ) + + # Take the 1st workbook's id and pass to the graphql query + test_workbook = workbooks[0] + data = self._query_datasources( + dashboard_id=test_workbook.id, entities_per_page=1, offset=0 + ) if data: return data raise TableauDataModelsException( @@ -138,7 +214,7 @@ class TableauClient: ) def _query_datasources( - self, entities_per_page: int, offset: int + self, dashboard_id: str, entities_per_page: int, offset: int ) -> Optional[TableauDatasources]: """ Method to query the graphql endpoint to get data sources @@ -146,14 +222,14 @@ class TableauClient: try: datasources_graphql_result = self._client.metadata_graphql_query( query=TABLEAU_DATASOURCES_QUERY.format( - first=entities_per_page, offset=offset + workbook_id=dashboard_id, first=entities_per_page, offset=offset ) ) if datasources_graphql_result: resp = datasources_graphql_result.json() if resp and resp.get("data"): tableau_datasource_connection = TableauDatasourcesConnection( - **resp.get("data") + **resp["data"]["workbooks"][0] ) return tableau_datasource_connection.embeddedDatasourcesConnection except Exception: @@ -167,13 +243,15 @@ class TableauClient: ) return None - def get_datasources(self) -> Optional[List[DataSource]]: + def get_datasources(self, dashboard_id: str) -> Optional[List[DataSource]]: """ - Paginate and get the list of all data sources + Paginate and get the list of all data sources of the workbook """ try: # Query the graphql endpoint once to get total count of data sources - tableau_datasource = self._query_datasources(entities_per_page=1, offset=1) + tableau_datasource = self._query_datasources( + dashboard_id=dashboard_id, entities_per_page=1, offset=1 + ) entities_per_page = min(50, self.pagination_limit) indexes = math.ceil(tableau_datasource.totalCount / entities_per_page) @@ -182,7 +260,9 @@ class TableauClient: for index in range(indexes): offset = index * entities_per_page tableau_datasource = self._query_datasources( - entities_per_page=entities_per_page, offset=offset + dashboard_id=dashboard_id, + entities_per_page=entities_per_page, + offset=offset, ) if tableau_datasource: data_sources.extend(tableau_datasource.nodes) diff --git a/ingestion/src/metadata/ingestion/source/dashboard/tableau/connection.py b/ingestion/src/metadata/ingestion/source/dashboard/tableau/connection.py index 8f52f6121a8..7b0670bc6e5 100644 --- a/ingestion/src/metadata/ingestion/source/dashboard/tableau/connection.py +++ b/ingestion/src/metadata/ingestion/source/dashboard/tableau/connection.py @@ -52,7 +52,8 @@ def get_connection(connection: TableauConnection) -> TableauClient: get_verify_ssl = get_verify_ssl_fn(connection.verifySSL) try: return TableauClient( - config=tableau_server_config, + tableau_server_config=tableau_server_config, + config=connection, env=connection.env, ssl_verify=get_verify_ssl(connection.sslConfig), pagination_limit=connection.paginationLimit, @@ -77,6 +78,14 @@ def test_connection( test_fn = { "ServerInfo": client.server_info, + # The Tableau server_info API doesn't provide direct access to the API version. + # This is due to the "api_version" being a mandatory field for the tableau library's connection class. + # Without this information, requests to the Tableau server cannot be made, + # including fetching the server info containing the "api_version". + # Consequently, we'll compare the declared api_version with the server's api_version during the test connection + # once the tableau library's connection class is initialized. + "ValidateApiVersion": client.test_api_version, + "ValidateSiteUrl": client.test_site_url, "GetWorkbooks": partial( extract_pages, query_func=client.query_workbooks_for_site, diff --git a/ingestion/src/metadata/ingestion/source/dashboard/tableau/metadata.py b/ingestion/src/metadata/ingestion/source/dashboard/tableau/metadata.py index ff247e7861a..c4fb770e576 100644 --- a/ingestion/src/metadata/ingestion/source/dashboard/tableau/metadata.py +++ b/ingestion/src/metadata/ingestion/source/dashboard/tableau/metadata.py @@ -14,6 +14,8 @@ Tableau source module import traceback from typing import Any, Iterable, List, Optional, Set +from requests.utils import urlparse + from metadata.generated.schema.api.data.createChart import CreateChartRequest from metadata.generated.schema.api.data.createDashboard import CreateDashboardRequest from metadata.generated.schema.api.data.createDashboardDataModel import ( @@ -85,51 +87,6 @@ class TableauSource(DashboardServiceSource): metadata_config: OpenMetadataConnection client: TableauClient - def __init__( - self, - config: WorkflowSource, - metadata: OpenMetadata, - ): - super().__init__(config, metadata) - self.workbooks: List[ - TableauDashboard - ] = [] # We will populate this in `prepare` - self.tags: Set[TableauTag] = set() - - def prepare(self): - """Restructure the API response""" - try: - # get workbooks which are considered Dashboards in OM - self.workbooks = self.client.get_workbooks() - - # get views which are considered charts in OM - charts = self.client.get_charts() - - # get datasources which are considered as datamodels in OM - data_models = self.client.get_datasources() - - # add all the charts (views) and datasources from the API to each workbook - for workbook in self.workbooks: - workbook.charts = [ - chart for chart in charts if chart.workbook.id == workbook.id - ] - - for data_model in data_models or []: - if data_model.workbook and data_model.workbook.luid == workbook.id: - workbook.dataModels.append(data_model) - - # collect all the tags from charts and workbooks before yielding final entities - if self.source_config.includeTags: - for container in [self.workbooks, charts]: - for elem in container: - self.tags.update(elem.tags) - - except Exception: - logger.debug(traceback.format_exc()) - logger.error("Error in fetching the Tableau Workbook metadata") - - return super().prepare() - @classmethod def create(cls, config_dict: dict, metadata: OpenMetadata): config: WorkflowSource = WorkflowSource.parse_obj(config_dict) @@ -141,16 +98,22 @@ class TableauSource(DashboardServiceSource): return cls(config, metadata) def get_dashboards_list(self) -> Optional[List[TableauDashboard]]: - return self.workbooks + return self.client.get_workbooks() def get_dashboard_name(self, dashboard: TableauDashboard) -> str: return dashboard.name def get_dashboard_details(self, dashboard: TableauDashboard) -> TableauDashboard: """ - Get Dashboard Details. Returning the identity here as we prepare everything - during the `prepare` stage + Get Dashboard Details including the dashboard charts and datamodels """ + + # Get the tableau views/sheets + dashboard.charts = self.client.get_workbook_charts(dashboard_id=dashboard.id) + + # Get the tableau data sources + dashboard.dataModels = self.client.get_datasources(dashboard_id=dashboard.id) + return dashboard def get_owner_details( @@ -161,14 +124,25 @@ class TableauSource(DashboardServiceSource): return self.metadata.get_reference_by_email(dashboard_details.owner.email) return None - def yield_tag(self, *_, **__) -> Iterable[Either[OMetaTagAndClassification]]: - yield from get_ometa_tag_and_classification( - tags=[tag.label for tag in self.tags], - classification_name=TABLEAU_TAG_CATEGORY, - tag_description="Tableau Tag", - classification_description="Tags associated with tableau entities", - include_tags=self.source_config.includeTags, - ) + def yield_tags( + self, dashboard_details: TableauDashboard + ) -> Iterable[Either[OMetaTagAndClassification]]: + """ + Method to yield tags related to specific dashboards + """ + if self.source_config.includeTags: + tags: Set[TableauTag] = set() + for container in [[dashboard_details], dashboard_details.charts]: + for elem in container: + tags.update(elem.tags) + + yield from get_ometa_tag_and_classification( + tags=[tag.label for tag in tags], + classification_name=TABLEAU_TAG_CATEGORY, + tag_description="Tableau Tag", + classification_description="Tags associated with tableau entities", + include_tags=self.source_config.includeTags, + ) def yield_datamodel( self, dashboard_details: TableauDashboard @@ -191,7 +165,7 @@ class TableauSource(DashboardServiceSource): columns=self.get_column_info(data_model), ) yield Either(right=data_model_request) - self.register_record_datamodel(datamodel_requst=data_model_request) + self.register_record_datamodel(datamodel_request=data_model_request) except Exception as exc: yield Either( @@ -215,6 +189,10 @@ class TableauSource(DashboardServiceSource): topology. And they are cleared after processing each Dashboard because of the 'clear_cache' option. """ try: + dashboard_url = ( + f"{clean_uri(str(self.config.serviceConnection.__root__.config.hostPort))}" + f"/#{urlparse(dashboard_details.webpageUrl).fragment}" + ) dashboard_request = CreateDashboardRequest( name=dashboard_details.id, displayName=dashboard_details.name, @@ -244,7 +222,7 @@ class TableauSource(DashboardServiceSource): classification_name=TABLEAU_TAG_CATEGORY, include_tags=self.source_config.includeTags, ), - sourceUrl=dashboard_details.webpageUrl, + sourceUrl=dashboard_url, service=self.context.dashboard_service, ) yield Either(right=dashboard_request) diff --git a/ingestion/src/metadata/ingestion/source/dashboard/tableau/models.py b/ingestion/src/metadata/ingestion/source/dashboard/tableau/models.py index caf0a74d3ab..ded6e5194dc 100644 --- a/ingestion/src/metadata/ingestion/source/dashboard/tableau/models.py +++ b/ingestion/src/metadata/ingestion/source/dashboard/tableau/models.py @@ -99,12 +99,6 @@ class DatasourceField(BaseModel): description: Optional[str] -class Workbook(BaseModel): - id: str - luid: str - name: Optional[str] - - class UpstreamTableColumn(BaseModel): id: str name: Optional[str] @@ -130,7 +124,6 @@ class DataSource(BaseModel): id: str name: Optional[str] fields: Optional[List[DatasourceField]] - workbook: Optional[Workbook] upstreamTables: Optional[List[UpstreamTable]] @@ -148,7 +141,6 @@ class TableauChart(TableauBaseModel): Aux class for Chart object of the tableau_api_lib response """ - workbook: TableauBaseModel owner: Optional[TableauOwner] tags: Optional[List[TableauTag]] = [] _extract_tags = validator("tags", pre=True, allow_reuse=True)(transform_tags) diff --git a/ingestion/src/metadata/ingestion/source/dashboard/tableau/queries.py b/ingestion/src/metadata/ingestion/source/dashboard/tableau/queries.py index 89dac28cd27..0f18924b20b 100644 --- a/ingestion/src/metadata/ingestion/source/dashboard/tableau/queries.py +++ b/ingestion/src/metadata/ingestion/source/dashboard/tableau/queries.py @@ -15,6 +15,10 @@ GraphQL queries used during ingestion TABLEAU_DATASOURCES_QUERY = """ {{ +workbooks(filter:{{luid: "{workbook_id}"}}){{ + id + luid + name embeddedDatasourcesConnection(first: {first}, offset: {offset} ) {{ nodes {{ id @@ -29,11 +33,6 @@ TABLEAU_DATASOURCES_QUERY = """ }} description }} - workbook {{ - id - luid - name - }} upstreamTables {{ id luid @@ -57,5 +56,6 @@ TABLEAU_DATASOURCES_QUERY = """ }} totalCount }} + }} }} """ diff --git a/openmetadata-service/src/main/resources/json/data/testConnections/dashboard/tableau.json b/openmetadata-service/src/main/resources/json/data/testConnections/dashboard/tableau.json index 5168e891e49..1c2d5f3e4b5 100644 --- a/openmetadata-service/src/main/resources/json/data/testConnections/dashboard/tableau.json +++ b/openmetadata-service/src/main/resources/json/data/testConnections/dashboard/tableau.json @@ -10,6 +10,18 @@ "shortCircuit": true, "mandatory": true }, + { + "name": "ValidateApiVersion", + "description": "Validate that the entered api version matches with the server api version", + "errorMessage": "Failed to match api versions, please validate the entered version", + "mandatory": false + }, + { + "name": "ValidateSiteUrl", + "description": "Validate that the entered site url is in a correct format", + "errorMessage": "Failed to validate site url, please validate the entered value", + "mandatory": false + }, { "name": "GetWorkbooks", "description": "List all the workbooks available to the user. We will ingest Workbooks as Dashboards.",