From d2df40cf2b132b876f9a6302b46852ba33298e3a Mon Sep 17 00:00:00 2001 From: Ayush Shah Date: Sun, 12 Sep 2021 00:16:10 +0530 Subject: [PATCH] Fix #355: Tableau Implemented (#468) * Fix #355: Tableau Implemented * Tableau pipeline location modification --- .../entity/services/dashboardService.json | 14 +- ingestion/examples/workflows/tableau.json | 33 ++++ ingestion/setup.py | 14 +- .../entity/services/dashboardService.py | 1 + .../src/metadata/ingestion/source/looker.py | 44 +++-- .../src/metadata/ingestion/source/superset.py | 76 +++++---- .../src/metadata/ingestion/source/tableau.py | 161 ++++++++++++++++++ 7 files changed, 287 insertions(+), 56 deletions(-) create mode 100644 ingestion/examples/workflows/tableau.json create mode 100644 ingestion/src/metadata/ingestion/source/tableau.py diff --git a/catalog-rest-service/src/main/resources/json/schema/entity/services/dashboardService.json b/catalog-rest-service/src/main/resources/json/schema/entity/services/dashboardService.json index fdfccb5100d..9f45d952fcc 100644 --- a/catalog-rest-service/src/main/resources/json/schema/entity/services/dashboardService.json +++ b/catalog-rest-service/src/main/resources/json/schema/entity/services/dashboardService.json @@ -6,11 +6,13 @@ "type": "object", "definitions": { "dashboardServiceType": { - "description": "Type of Dashboard service - Superset or Lookr", + "description": "Type of Dashboard service - Superset, Looker, Redash or Tableau", "type": "string", "enum": [ "Superset", - "Looker" + "Looker", + "Tableau", + "Redash" ], "javaEnums": [ { @@ -18,6 +20,12 @@ }, { "name": "Looker" + }, + { + "name": "Tableau" + }, + { + "name": "Redash" } ] } @@ -34,7 +42,7 @@ "maxLength": 64 }, "serviceType": { - "description": "Type of dashboard service such as Lookr or Superset...", + "description": "Type of dashboard service such as Looker or Superset...", "$ref": "#/definitions/dashboardServiceType" }, "description": { diff --git a/ingestion/examples/workflows/tableau.json b/ingestion/examples/workflows/tableau.json new file mode 100644 index 00000000000..89ced0654c3 --- /dev/null +++ b/ingestion/examples/workflows/tableau.json @@ -0,0 +1,33 @@ +{ + "source": { + "type": "tableau", + "config": { + "username": "username", + "password": "password", + "service_name": "local_tableau", + "server": "server_address", + "site_name": "site_name", + "site_url": "site_url", + "api_version": "api version", + "env": "env" + } + }, + "sink": { + "type": "metadata-rest", + "config": {} + }, + "metadata_server": { + "type": "metadata-server", + "config": { + "api_endpoint": "http://localhost:8585/api", + "auth_provider_type": "no-auth" + } + }, + "cron": { + "minute": "*/5", + "hour": null, + "day": null, + "month": null, + "day_of_week": null + } +} diff --git a/ingestion/setup.py b/ingestion/setup.py index 05328bbbe5f..ef7e4d5d573 100644 --- a/ingestion/setup.py +++ b/ingestion/setup.py @@ -87,14 +87,15 @@ plugins: Dict[str, Set[str]] = { "oracle": {"cx_Oracle"}, "presto": {"pyhive~=0.6.3"}, "postgres": {"pymysql>=1.0.2", "psycopg2-binary", "GeoAlchemy2"}, - "redshift": {"sqlalchemy-redshift", "GeoAlchemy2", "psycopg2-binary"}, + "redshift": {"sqlalchemy-redshift", "GeoAlchemy2", "psycopg2-binary"}, "redshift-usage": {"sqlalchemy-redshift", "psycopg2-binary", "GeoAlchemy2"}, "scheduler": scheduler_requirements, "snowflake": {"snowflake-sqlalchemy<=1.2.4"}, "snowflake-usage": {"snowflake-sqlalchemy<=1.2.4"}, "sample-data": {"faker~=8.1.1"}, "superset": {}, - "pii-processor": {"pandas~=1.3.1"} + "tableau": {"tableau-api-lib==0.1.22"}, + "pii-processor": {"pandas~=1.3.1"}, } build_options = {"includes": ["_cffi_backend"]} @@ -129,12 +130,13 @@ setup( plugin: list(dependencies) for (plugin, dependencies) in plugins.items() }, - "all": list(base_requirements.union( - *[ + "all": list( + base_requirements.union( + *[ requirements for plugin, requirements in plugins.items() - ] - ) + ] + ) ) } diff --git a/ingestion/src/metadata/generated/schema/entity/services/dashboardService.py b/ingestion/src/metadata/generated/schema/entity/services/dashboardService.py index 55a3f7d3621..bf99845d6ec 100644 --- a/ingestion/src/metadata/generated/schema/entity/services/dashboardService.py +++ b/ingestion/src/metadata/generated/schema/entity/services/dashboardService.py @@ -15,6 +15,7 @@ from ...type import basic, schedule class DashboardServiceType(Enum): Superset = 'Superset' Looker = 'Looker' + Tableau = 'Tableau' class DashboardService(BaseModel): diff --git a/ingestion/src/metadata/ingestion/source/looker.py b/ingestion/src/metadata/ingestion/source/looker.py index b5b5c24c04e..30be2573a11 100644 --- a/ingestion/src/metadata/ingestion/source/looker.py +++ b/ingestion/src/metadata/ingestion/source/looker.py @@ -73,17 +73,22 @@ class LookerSource(Source): metadata_config: MetadataServerConfig status: LookerDashboardSourceStatus - def __init__(self, config: LookerSourceConfig, metadata_config: MetadataServerConfig, ctx: WorkflowContext): + def __init__( + self, config: LookerSourceConfig, metadata_config: MetadataServerConfig, + ctx: WorkflowContext + ): super().__init__(ctx) self.config = config self.metadata_config = metadata_config self.client = self.looker_client() - self.service = get_dashboard_service_or_create(config.service_name, - DashboardServiceType.Looker.name, - config.username, - config.password, - config.url, - metadata_config) + self.service = get_dashboard_service_or_create( + config.service_name, + DashboardServiceType.Looker.name, + config.username, + config.password, + config.url, + metadata_config + ) def looker_client(self): os.environ["LOOKERSDK_CLIENT_ID"] = self.config.username @@ -108,18 +113,20 @@ class LookerSource(Source): def _yield_charts(self, chart: DashboardElement): yield Chart( - id=uuid.uuid4(), - name=chart.id, - displayName=chart.title, - chartType=chart.type, - service=EntityReference(id=self.service.id, type="dashboardService"), - description=chart.subtitle_text, - ) + id=uuid.uuid4(), + name=chart.id, + displayName=chart.title, + chartType=chart.type, + service=EntityReference(id=self.service.id, type="dashboardService"), + description=chart.subtitle_text, + ) def _get_looker_charts(self) -> Optional[Chart]: for child_dashboard in self.client.all_dashboards(fields="id"): fields = ["id", "title", "dashboard_elements", "dashboard_filters", "view_count"] - charts = self.client.dashboard_dashboard_elements(dashboard_id=child_dashboard.id, fields=",".join(fields)) + charts = self.client.dashboard_dashboard_elements( + dashboard_id=child_dashboard.id, fields=",".join(fields) + ) for chart in charts: self._yield_charts(chart) @@ -161,8 +168,13 @@ class LookerSource(Source): dashboard_id=child_dashboard, fields=",".join(fields) ) except SDKError: - self.status.warning(child_dashboard, f"Error occurred while loading dashboard {child_dashboard}.",) + self.status.warning( + child_dashboard, + f"Error occurred while loading dashboard {child_dashboard}.", ) return self.looker_dashboard(dashboard) def get_status(self) -> SourceStatus: return self.status + + def close(self): + pass diff --git a/ingestion/src/metadata/ingestion/source/superset.py b/ingestion/src/metadata/ingestion/source/superset.py index 2943a3355ce..b3f7992eeef 100644 --- a/ingestion/src/metadata/ingestion/source/superset.py +++ b/ingestion/src/metadata/ingestion/source/superset.py @@ -39,9 +39,11 @@ def get_filter_name(filter_obj): def get_owners(owners_obj): owners = [] for owner in owners_obj: - dashboard_owner = DashboardOwner(first_name=owner['first_name'], - last_name=owner['last_name'], - username=owner['username']) + dashboard_owner = DashboardOwner( + first_name=owner['first_name'], + last_name=owner['last_name'], + username=owner['username'] + ) owners.append(dashboard_owner) return owners @@ -84,19 +86,23 @@ class SupersetSource(Source): platform = "superset" service_type = "Superset" - def __init__(self, config: SupersetConfig, metadata_config: MetadataServerConfig, ctx: WorkflowContext): + def __init__( + self, config: SupersetConfig, metadata_config: MetadataServerConfig, + ctx: WorkflowContext + ): super().__init__(ctx) self.config = config self.metadata_config = metadata_config self.status = SourceStatus() self.client = SupersetAPIClient(self.config) - self.service = get_dashboard_service_or_create(config.service_name, - DashboardServiceType.Superset.name, - config.username, - config.password, - config.url, - metadata_config) - + self.service = get_dashboard_service_or_create( + config.service_name, + DashboardServiceType.Superset.name, + config.username, + config.password, + config.url, + metadata_config + ) @classmethod def create(cls, config_dict: dict, metadata_config_dict: dict, ctx: WorkflowContext): @@ -115,7 +121,9 @@ class SupersetSource(Source): dashboard_id = dashboard_json['id'] name = dashboard_json['dashboard_title'] dashboard_url = f"{self.config.url[:-1]}{dashboard_json['url']}" - last_modified = dateparser.parse(dashboard_json.get("changed_on_utc", "now")).timestamp() * 1000 + last_modified = dateparser.parse( + dashboard_json.get("changed_on_utc", "now") + ).timestamp() * 1000 owners = get_owners(dashboard_json['owners']) raw_position_data = dashboard_json.get("position_json", "{}") charts = [] @@ -127,14 +135,16 @@ class SupersetSource(Source): chart_id = value.get('meta', {}).get('chartId', 'unknown') charts.append(chart_id) - return Dashboard(name=dashboard_id, - displayName=name, - description="", - url=dashboard_url, - owners=owners, - charts=charts, - service=EntityReference(id=self.service.id, type="dashboardService"), - lastModified=last_modified) + return Dashboard( + name=dashboard_id, + displayName=name, + description="", + url=dashboard_url, + owners=owners, + charts=charts, + service=EntityReference(id=self.service.id, type="dashboardService"), + lastModified=last_modified + ) def _fetch_dashboards(self) -> Iterable[Record]: current_page = 0 @@ -174,7 +184,9 @@ class SupersetSource(Source): def _build_chart(self, chart_json) -> Chart: chart_id = chart_json['id'] name = chart_json['slice_name'] - last_modified = dateparser.parse(chart_json.get("changed_on_utc", "now")).timestamp() * 1000 + last_modified = dateparser.parse( + chart_json.get("changed_on_utc", "now") + ).timestamp() * 1000 chart_type = chart_json["viz_type"] chart_url = f"{self.config.url}{chart_json['url']}" datasource_id = chart_json["datasource_id"] @@ -198,16 +210,18 @@ class SupersetSource(Source): "Dimensions": ", ".join(group_bys), } - chart = Chart(name=chart_id, - displayName=name, - description="", - chart_type=chart_type, - url=chart_url, - owners=owners, - datasource_fqn=datasource_fqn, - lastModified=last_modified, - service=EntityReference(id=self.service.id, type="dashboardService"), - custom_props=custom_properties) + chart = Chart( + name=chart_id, + displayName=name, + description="", + chart_type=chart_type, + url=chart_url, + owners=owners, + datasource_fqn=datasource_fqn, + lastModified=last_modified, + service=EntityReference(id=self.service.id, type="dashboardService"), + custom_props=custom_properties + ) return chart def _fetch_charts(self): diff --git a/ingestion/src/metadata/ingestion/source/tableau.py b/ingestion/src/metadata/ingestion/source/tableau.py new file mode 100644 index 00000000000..aaef1370559 --- /dev/null +++ b/ingestion/src/metadata/ingestion/source/tableau.py @@ -0,0 +1,161 @@ +import logging +import uuid +from typing import Iterable, Optional +import dateutil.parser as dateparser +from metadata.generated.schema.type.entityReference import EntityReference +from metadata.ingestion.api.common import ConfigModel, Record, WorkflowContext +from metadata.generated.schema.entity.services.dashboardService import DashboardServiceType +from metadata.ingestion.api.common import IncludeFilterPattern +from metadata.ingestion.api.source import Source, SourceStatus +from metadata.ingestion.models.table_metadata import Dashboard, Chart, DashboardOwner +from metadata.ingestion.ometa.openmetadata_rest import MetadataServerConfig +from metadata.utils.helpers import get_dashboard_service_or_create +from tableau_api_lib import TableauServerConnection +from tableau_api_lib.utils.querying import get_workbooks_dataframe, get_views_dataframe + +logger = logging.getLogger(__name__) + + +class TableauSourceConfig(ConfigModel): + username: str + password: str + server: str + api_version: str + env: Optional[str] = 'tableau_prod' + site_name: str + site_url: str + service_name: str + service_type: str = "Tableau" + dashboard_pattern: IncludeFilterPattern = IncludeFilterPattern.allow_all() + chart_pattern: IncludeFilterPattern = IncludeFilterPattern.allow_all() + + +class TableauSource(Source): + config: TableauSourceConfig + metadata_config: MetadataServerConfig + status: SourceStatus + + def __init__( + self, config: TableauSourceConfig, metadata_config: MetadataServerConfig, + ctx: WorkflowContext + ): + super().__init__(ctx) + self.config = config + self.metadata_config = metadata_config + self.client = self.tableau_client() + self.service = get_dashboard_service_or_create( + config.service_name, + DashboardServiceType.Tableau.name, + config.username, + config.password, + config.server, + metadata_config + ) + self.status = SourceStatus() + self.dashboards = get_workbooks_dataframe(self.client).to_dict() + self.all_dashboard_details = get_views_dataframe(self.client).to_dict() + + def tableau_client(self): + tableau_server_config = { + f'{self.config.env}': { + 'server': self.config.server, + 'api_version': self.config.api_version, + 'username': self.config.username, + 'password': self.config.password, + 'site_name': self.config.site_name, + 'site_url': self.config.site_url + } + } + try: + conn = TableauServerConnection(config_json=tableau_server_config, env='tableau_prod') + conn.sign_in().json() + except Exception as err: + print(f"{repr(err)}: {err}") + return conn + + @classmethod + def create(cls, config_dict: dict, metadata_config_dict: dict, ctx: WorkflowContext): + config = TableauSourceConfig.parse_obj(config_dict) + metadata_config = MetadataServerConfig.parse_obj(metadata_config_dict) + return cls(config, metadata_config, ctx) + + def prepare(self): + pass + + def next_record(self) -> Iterable[Record]: + yield from self._get_tableau_charts() + yield from self._get_tableau_dashboard() + + @staticmethod + def get_owner(owner) -> DashboardOwner: + return [DashboardOwner( + first_name=owner['fullName'].split(" ")[0], + last_name=owner['fullName'].split(" ")[1], + username=owner['name'] + )] + + def _get_tableau_dashboard(self) -> Dashboard: + for index in range(len(self.dashboards['id'])): + dashboard_id = self.dashboards['id'][index] + dashboard_name = self.dashboards['name'][index] + dashboard_tag = self.dashboards['tags'][index] + dashboard_url = self.dashboards['webpageUrl'][index] + tag_labels = [] + if hasattr(dashboard_tag, 'tag'): + for tag in dashboard_tag['tag']: + tag_labels.append(tag['label']) + dashboard_chart = [] + for chart_index in self.all_dashboard_details['workbook']: + dashboard_owner = self.all_dashboard_details['owner'][chart_index] + chart = self.all_dashboard_details['workbook'][chart_index] + if chart['id'] == dashboard_id: + dashboard_chart.append(self.all_dashboard_details["name"][chart_index]) + yield Dashboard( + id=uuid.uuid4(), + name=dashboard_id, + displayName=dashboard_name, + description='', + owner=self.get_owner(dashboard_owner), + charts=dashboard_chart, + tags=list(tag_labels), + url=dashboard_url, + service=EntityReference( + id=self.service.id, type="dashboardService" + ), + last_modified=dateparser.parse(chart["updatedAt"]).timestamp() * 1000 + ) + + def _get_tableau_charts(self): + for index in range(len(self.all_dashboard_details['id'])): + chart_name = self.all_dashboard_details["name"][index] + chart_id = self.all_dashboard_details["id"][index] + chart_tags = self.all_dashboard_details["tags"][index] + chart_type = self.all_dashboard_details["sheetType"][index] + chart_url = f"{self.config.server}/#/site/{self.config.site_name}" \ + f"{self.all_dashboard_details['contentUrl'][index]}" + chart_owner = self.all_dashboard_details["owner"][index] + chart_datasource_fqn = chart_url.replace("/", ".") + chart_last_modified = self.all_dashboard_details["updatedAt"][index] + tag_labels = [] + if hasattr(chart_tags, 'tag'): + for tag in chart_tags['tag']: + tag_labels.append(tag['label']) + yield Chart( + name=chart_id, + displayName=chart_name, + description="", + chart_type=chart_type, + url=chart_url, + owners=self.get_owner(chart_owner), + datasource_fqn=chart_datasource_fqn, + last_modified=dateparser.parse( + chart_last_modified + ).timestamp() * 1000, + service=EntityReference(id=self.service.id, type="dashboardService"), + ) + + def get_status(self) -> SourceStatus: + return self.status + + def close(self): + pass