Fixes 14109 and 14325: Optimised Tableau Connector (#14548)

* Optimised tableau conn

* Added comment
This commit is contained in:
Onkar Ravgan 2024-01-08 11:03:05 +05:30 committed by GitHub
parent c3903b3f45
commit ecdb7b9f41
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
14 changed files with 183 additions and 97 deletions

View File

@ -51,6 +51,7 @@ VERSIONS = {
"snowflake": "snowflake-sqlalchemy~=1.4",
"elasticsearch8": "elasticsearch8~=8.9.0",
"giturlparse": "giturlparse",
"validators": "validators~=0.22.0",
}
COMMONS = {
@ -249,7 +250,7 @@ plugins: Dict[str, Set[str]] = {
"sklearn": {VERSIONS["scikit-learn"]},
"snowflake": {VERSIONS["snowflake"]},
"superset": {}, # uses requests
"tableau": {VERSIONS["tableau"]},
"tableau": {VERSIONS["tableau"], VERSIONS["validators"], VERSIONS["packaging"]},
"trino": {VERSIONS["trino"]},
"vertica": {"sqlalchemy-vertica[vertica-python]>=0.0.5"},
"pii-processor": pii_requirements,

View File

@ -105,7 +105,7 @@ class DashboardServiceTopology(ServiceTopology):
),
NodeStage(
type_=OMetaTagAndClassification,
processor="yield_tag",
processor="yield_bulk_tags",
nullable=True,
),
],
@ -133,6 +133,11 @@ class DashboardServiceTopology(ServiceTopology):
dashboard = TopologyNode(
producer="get_dashboard",
stages=[
NodeStage(
type_=OMetaTagAndClassification,
processor="yield_tags",
nullable=True,
),
NodeStage(
type_=Chart,
context="charts",
@ -333,7 +338,16 @@ class DashboardServiceSource(TopologyRunnerMixin, Source, ABC):
dashboard_details, db_service_name
) or []
def yield_tag(self, *args, **kwargs) -> Iterable[Either[OMetaTagAndClassification]]:
def yield_bulk_tags(
self, *args, **kwargs
) -> Iterable[Either[OMetaTagAndClassification]]:
"""
Method to bulk fetch dashboard tags
"""
def yield_tags(
self, dashboard_details
) -> Iterable[Either[OMetaTagAndClassification]]:
"""
Method to fetch dashboard tags
"""
@ -388,7 +402,7 @@ class DashboardServiceSource(TopologyRunnerMixin, Source, ABC):
def process_owner(self, dashboard_details):
"""
Method to process the dashboard onwers
Method to process the dashboard owners
"""
try:
if self.source_config.includeOwners:
@ -429,7 +443,7 @@ class DashboardServiceSource(TopologyRunnerMixin, Source, ABC):
self.dashboard_source_state.add(dashboard_fqn)
def register_record_datamodel(
self, datamodel_requst: CreateDashboardDataModelRequest
self, datamodel_request: CreateDashboardDataModelRequest
) -> None:
"""
Mark the datamodel record as scanned and update the datamodel_source_state
@ -437,8 +451,8 @@ class DashboardServiceSource(TopologyRunnerMixin, Source, ABC):
datamodel_fqn = fqn.build(
self.metadata,
entity_type=DashboardDataModel,
service_name=datamodel_requst.service.__root__,
data_model_name=datamodel_requst.name.__root__,
service_name=datamodel_request.service.__root__,
data_model_name=datamodel_request.name.__root__,
)
self.datamodel_source_state.add(datamodel_fqn)
@ -505,7 +519,7 @@ class DashboardServiceSource(TopologyRunnerMixin, Source, ABC):
self.context.project_name = ( # pylint: disable=assignment-from-none
self.get_project_name(dashboard_details=dashboard_details)
)
if self.context.project_name and filter_by_project(
if filter_by_project(
self.source_config.projectFilterPattern,
self.context.project_name,
):

View File

@ -407,7 +407,7 @@ class LookerSource(DashboardServiceSource):
project=model.project_name,
)
yield Either(right=explore_datamodel)
self.register_record_datamodel(datamodel_requst=explore_datamodel)
self.register_record_datamodel(datamodel_request=explore_datamodel)
# build datamodel by our hand since ack_sink=False
self.context.dataModel = self._build_data_model(datamodel_name)
@ -508,7 +508,7 @@ class LookerSource(DashboardServiceSource):
self._view_data_model = self._build_data_model(
build_datamodel_name(explore.model_name, view.name)
)
self.register_record_datamodel(datamodel_requst=data_model_request)
self.register_record_datamodel(datamodel_request=data_model_request)
yield from self.add_view_lineage(view, explore)
else:
yield Either(

View File

@ -327,7 +327,7 @@ class PowerbiSource(DashboardServiceSource):
project=self._fetch_dataset_workspace(dataset_id=dataset.id),
)
yield Either(right=data_model_request)
self.register_record_datamodel(datamodel_requst=data_model_request)
self.register_record_datamodel(datamodel_request=data_model_request)
except Exception as exc:
yield Either(

View File

@ -224,7 +224,7 @@ class QliksenseSource(DashboardServiceSource):
columns=self.get_column_info(data_model),
)
yield Either(right=data_model_request)
self.register_record_datamodel(datamodel_requst=data_model_request)
self.register_record_datamodel(datamodel_request=data_model_request)
except Exception as exc:
name = (
data_model.tableName if data_model.tableName else data_model.id

View File

@ -87,7 +87,7 @@ class RedashSource(DashboardServiceSource):
for dashboard in self.dashboard_list:
self.tags.extend(dashboard.get("tags") or [])
def yield_tag(self, *_, **__) -> Iterable[Either[OMetaTagAndClassification]]:
def yield_bulk_tags(self, *_, **__) -> Iterable[Either[OMetaTagAndClassification]]:
"""Fetch Dashboard Tags"""
yield from get_ometa_tag_and_classification(
tags=self.tags,

View File

@ -220,7 +220,7 @@ class SupersetAPISource(SupersetSourceMixin):
dataModelType=DataModelType.SupersetDataModel.value,
)
yield Either(right=data_model_request)
self.register_record_datamodel(datamodel_requst=data_model_request)
self.register_record_datamodel(datamodel_request=data_model_request)
except Exception as exc:
yield Either(
left=StackTraceError(

View File

@ -239,7 +239,7 @@ class SupersetDBSource(SupersetSourceMixin):
dataModelType=DataModelType.SupersetDataModel.value,
)
yield Either(right=data_model_request)
self.register_record_datamodel(datamodel_requst=data_model_request)
self.register_record_datamodel(datamodel_request=data_model_request)
except Exception as exc:
yield Either(

View File

@ -15,7 +15,9 @@ import math
import traceback
from typing import Any, Callable, Dict, List, Optional
import validators
from cached_property import cached_property
from packaging import version
from tableau_api_lib import TableauServerConnection
from tableau_api_lib.utils import extract_pages
@ -60,7 +62,8 @@ class TableauClient:
def __init__(
self,
config: Dict[str, Dict[str, Any]],
tableau_server_config: Dict[str, Dict[str, Any]],
config,
env: str,
ssl_verify: bool,
pagination_limit: int,
@ -70,10 +73,11 @@ class TableauClient:
# In requests (https://requests.readthedocs.io/en/latest/user/advanced.html?highlight=ssl#ssl-cert-verification)
# the param can be None, False to ignore HTTPS certs or a string with the path to the cert.
self._client = TableauServerConnection(
config_json=config,
config_json=tableau_server_config,
env=env,
ssl_verify=ssl_verify,
)
self.config = config
self._client.sign_in().json()
self.pagination_limit = pagination_limit
@ -81,6 +85,10 @@ class TableauClient:
def server_info(self) -> Callable:
return self._client.server_info
@property
def server_api_version(self) -> str:
return self.server_info().json()["serverInfo"]["restApiVersion"]
@property
def site_id(self) -> str:
return self._client.site_id
@ -122,11 +130,79 @@ class TableauClient:
)
]
def get_workbook_charts(self, dashboard_id: str) -> Optional[List[TableauChart]]:
"""
Get the charts for a workbook
"""
try:
return [
TableauChart(**chart)
for chart in self._client.query_views_for_workbook(
workbook_id=dashboard_id,
parameter_dict=TABLEAU_GET_VIEWS_PARAM_DICT,
).json()["views"]["view"]
]
except Exception as exc:
logger.debug(traceback.format_exc())
logger.warning(
f"Error processing charts for dashboard [{dashboard_id}]: {exc}"
)
return None
def test_api_version(self):
"""
Method to validate the declared v/s server tableau rest api version
"""
server_api_version = version.parse(self.server_api_version)
declared_api_version = version.parse(self.config.apiVersion)
if declared_api_version > server_api_version:
raise ValueError(
f"""
Your API version of '{declared_api_version}' is too damn high!
The server you are establishing a connection with is using REST API version '{server_api_version}'.
"""
)
if declared_api_version < server_api_version:
raise ValueError(
f"""
The Tableau Server REST API version you specified is lower than the version your server uses.
Your Tableau Server is on REST API version {server_api_version}.
The REST API version you specified is {declared_api_version}.
For optimal results, please change the 'api_version' config variable to {server_api_version}.
"""
)
def test_site_url(self):
"""
Method to test the site url and site name fields
"""
validation = validators.url(self.config.siteUrl)
if validation:
raise ValueError(
f"""
The site url "{self.config.siteUrl}" is in incorrect format.
If "https://xxx.tableau.com/#/site/MarketingTeam/home" represents the homepage url for your tableau site,
the "MarketingTeam" from the url should be entered in the Site Name and Site Url fields.
"""
)
return True
def test_get_datamodels(self):
"""
Method to test the datamodels
"""
data = self._query_datasources(entities_per_page=1, offset=0)
workbooks = self.get_workbooks()
if len(workbooks) == 0:
raise TableauDataModelsException(
"Unable to get any workbooks to fetch tableau data sources"
)
# Take the 1st workbook's id and pass to the graphql query
test_workbook = workbooks[0]
data = self._query_datasources(
dashboard_id=test_workbook.id, entities_per_page=1, offset=0
)
if data:
return data
raise TableauDataModelsException(
@ -138,7 +214,7 @@ class TableauClient:
)
def _query_datasources(
self, entities_per_page: int, offset: int
self, dashboard_id: str, entities_per_page: int, offset: int
) -> Optional[TableauDatasources]:
"""
Method to query the graphql endpoint to get data sources
@ -146,14 +222,14 @@ class TableauClient:
try:
datasources_graphql_result = self._client.metadata_graphql_query(
query=TABLEAU_DATASOURCES_QUERY.format(
first=entities_per_page, offset=offset
workbook_id=dashboard_id, first=entities_per_page, offset=offset
)
)
if datasources_graphql_result:
resp = datasources_graphql_result.json()
if resp and resp.get("data"):
tableau_datasource_connection = TableauDatasourcesConnection(
**resp.get("data")
**resp["data"]["workbooks"][0]
)
return tableau_datasource_connection.embeddedDatasourcesConnection
except Exception:
@ -167,13 +243,15 @@ class TableauClient:
)
return None
def get_datasources(self) -> Optional[List[DataSource]]:
def get_datasources(self, dashboard_id: str) -> Optional[List[DataSource]]:
"""
Paginate and get the list of all data sources
Paginate and get the list of all data sources of the workbook
"""
try:
# Query the graphql endpoint once to get total count of data sources
tableau_datasource = self._query_datasources(entities_per_page=1, offset=1)
tableau_datasource = self._query_datasources(
dashboard_id=dashboard_id, entities_per_page=1, offset=1
)
entities_per_page = min(50, self.pagination_limit)
indexes = math.ceil(tableau_datasource.totalCount / entities_per_page)
@ -182,7 +260,9 @@ class TableauClient:
for index in range(indexes):
offset = index * entities_per_page
tableau_datasource = self._query_datasources(
entities_per_page=entities_per_page, offset=offset
dashboard_id=dashboard_id,
entities_per_page=entities_per_page,
offset=offset,
)
if tableau_datasource:
data_sources.extend(tableau_datasource.nodes)

View File

@ -52,7 +52,8 @@ def get_connection(connection: TableauConnection) -> TableauClient:
get_verify_ssl = get_verify_ssl_fn(connection.verifySSL)
try:
return TableauClient(
config=tableau_server_config,
tableau_server_config=tableau_server_config,
config=connection,
env=connection.env,
ssl_verify=get_verify_ssl(connection.sslConfig),
pagination_limit=connection.paginationLimit,
@ -77,6 +78,14 @@ def test_connection(
test_fn = {
"ServerInfo": client.server_info,
# The Tableau server_info API doesn't provide direct access to the API version.
# This is due to the "api_version" being a mandatory field for the tableau library's connection class.
# Without this information, requests to the Tableau server cannot be made,
# including fetching the server info containing the "api_version".
# Consequently, we'll compare the declared api_version with the server's api_version during the test connection
# once the tableau library's connection class is initialized.
"ValidateApiVersion": client.test_api_version,
"ValidateSiteUrl": client.test_site_url,
"GetWorkbooks": partial(
extract_pages,
query_func=client.query_workbooks_for_site,

View File

@ -14,6 +14,8 @@ Tableau source module
import traceback
from typing import Any, Iterable, List, Optional, Set
from requests.utils import urlparse
from metadata.generated.schema.api.data.createChart import CreateChartRequest
from metadata.generated.schema.api.data.createDashboard import CreateDashboardRequest
from metadata.generated.schema.api.data.createDashboardDataModel import (
@ -85,51 +87,6 @@ class TableauSource(DashboardServiceSource):
metadata_config: OpenMetadataConnection
client: TableauClient
def __init__(
self,
config: WorkflowSource,
metadata: OpenMetadata,
):
super().__init__(config, metadata)
self.workbooks: List[
TableauDashboard
] = [] # We will populate this in `prepare`
self.tags: Set[TableauTag] = set()
def prepare(self):
"""Restructure the API response"""
try:
# get workbooks which are considered Dashboards in OM
self.workbooks = self.client.get_workbooks()
# get views which are considered charts in OM
charts = self.client.get_charts()
# get datasources which are considered as datamodels in OM
data_models = self.client.get_datasources()
# add all the charts (views) and datasources from the API to each workbook
for workbook in self.workbooks:
workbook.charts = [
chart for chart in charts if chart.workbook.id == workbook.id
]
for data_model in data_models or []:
if data_model.workbook and data_model.workbook.luid == workbook.id:
workbook.dataModels.append(data_model)
# collect all the tags from charts and workbooks before yielding final entities
if self.source_config.includeTags:
for container in [self.workbooks, charts]:
for elem in container:
self.tags.update(elem.tags)
except Exception:
logger.debug(traceback.format_exc())
logger.error("Error in fetching the Tableau Workbook metadata")
return super().prepare()
@classmethod
def create(cls, config_dict: dict, metadata: OpenMetadata):
config: WorkflowSource = WorkflowSource.parse_obj(config_dict)
@ -141,16 +98,22 @@ class TableauSource(DashboardServiceSource):
return cls(config, metadata)
def get_dashboards_list(self) -> Optional[List[TableauDashboard]]:
return self.workbooks
return self.client.get_workbooks()
def get_dashboard_name(self, dashboard: TableauDashboard) -> str:
return dashboard.name
def get_dashboard_details(self, dashboard: TableauDashboard) -> TableauDashboard:
"""
Get Dashboard Details. Returning the identity here as we prepare everything
during the `prepare` stage
Get Dashboard Details including the dashboard charts and datamodels
"""
# Get the tableau views/sheets
dashboard.charts = self.client.get_workbook_charts(dashboard_id=dashboard.id)
# Get the tableau data sources
dashboard.dataModels = self.client.get_datasources(dashboard_id=dashboard.id)
return dashboard
def get_owner_details(
@ -161,14 +124,25 @@ class TableauSource(DashboardServiceSource):
return self.metadata.get_reference_by_email(dashboard_details.owner.email)
return None
def yield_tag(self, *_, **__) -> Iterable[Either[OMetaTagAndClassification]]:
yield from get_ometa_tag_and_classification(
tags=[tag.label for tag in self.tags],
classification_name=TABLEAU_TAG_CATEGORY,
tag_description="Tableau Tag",
classification_description="Tags associated with tableau entities",
include_tags=self.source_config.includeTags,
)
def yield_tags(
self, dashboard_details: TableauDashboard
) -> Iterable[Either[OMetaTagAndClassification]]:
"""
Method to yield tags related to specific dashboards
"""
if self.source_config.includeTags:
tags: Set[TableauTag] = set()
for container in [[dashboard_details], dashboard_details.charts]:
for elem in container:
tags.update(elem.tags)
yield from get_ometa_tag_and_classification(
tags=[tag.label for tag in tags],
classification_name=TABLEAU_TAG_CATEGORY,
tag_description="Tableau Tag",
classification_description="Tags associated with tableau entities",
include_tags=self.source_config.includeTags,
)
def yield_datamodel(
self, dashboard_details: TableauDashboard
@ -191,7 +165,7 @@ class TableauSource(DashboardServiceSource):
columns=self.get_column_info(data_model),
)
yield Either(right=data_model_request)
self.register_record_datamodel(datamodel_requst=data_model_request)
self.register_record_datamodel(datamodel_request=data_model_request)
except Exception as exc:
yield Either(
@ -215,6 +189,10 @@ class TableauSource(DashboardServiceSource):
topology. And they are cleared after processing each Dashboard because of the 'clear_cache' option.
"""
try:
dashboard_url = (
f"{clean_uri(str(self.config.serviceConnection.__root__.config.hostPort))}"
f"/#{urlparse(dashboard_details.webpageUrl).fragment}"
)
dashboard_request = CreateDashboardRequest(
name=dashboard_details.id,
displayName=dashboard_details.name,
@ -244,7 +222,7 @@ class TableauSource(DashboardServiceSource):
classification_name=TABLEAU_TAG_CATEGORY,
include_tags=self.source_config.includeTags,
),
sourceUrl=dashboard_details.webpageUrl,
sourceUrl=dashboard_url,
service=self.context.dashboard_service,
)
yield Either(right=dashboard_request)

View File

@ -99,12 +99,6 @@ class DatasourceField(BaseModel):
description: Optional[str]
class Workbook(BaseModel):
id: str
luid: str
name: Optional[str]
class UpstreamTableColumn(BaseModel):
id: str
name: Optional[str]
@ -130,7 +124,6 @@ class DataSource(BaseModel):
id: str
name: Optional[str]
fields: Optional[List[DatasourceField]]
workbook: Optional[Workbook]
upstreamTables: Optional[List[UpstreamTable]]
@ -148,7 +141,6 @@ class TableauChart(TableauBaseModel):
Aux class for Chart object of the tableau_api_lib response
"""
workbook: TableauBaseModel
owner: Optional[TableauOwner]
tags: Optional[List[TableauTag]] = []
_extract_tags = validator("tags", pre=True, allow_reuse=True)(transform_tags)

View File

@ -15,6 +15,10 @@ GraphQL queries used during ingestion
TABLEAU_DATASOURCES_QUERY = """
{{
workbooks(filter:{{luid: "{workbook_id}"}}){{
id
luid
name
embeddedDatasourcesConnection(first: {first}, offset: {offset} ) {{
nodes {{
id
@ -29,11 +33,6 @@ TABLEAU_DATASOURCES_QUERY = """
}}
description
}}
workbook {{
id
luid
name
}}
upstreamTables {{
id
luid
@ -57,5 +56,6 @@ TABLEAU_DATASOURCES_QUERY = """
}}
totalCount
}}
}}
}}
"""

View File

@ -10,6 +10,18 @@
"shortCircuit": true,
"mandatory": true
},
{
"name": "ValidateApiVersion",
"description": "Validate that the entered api version matches with the server api version",
"errorMessage": "Failed to match api versions, please validate the entered version",
"mandatory": false
},
{
"name": "ValidateSiteUrl",
"description": "Validate that the entered site url is in a correct format",
"errorMessage": "Failed to validate site url, please validate the entered value",
"mandatory": false
},
{
"name": "GetWorkbooks",
"description": "List all the workbooks available to the user. We will ingest Workbooks as Dashboards.",