mirror of
https://github.com/open-metadata/OpenMetadata.git
synced 2025-12-05 20:15:15 +00:00
* Fix #355: Tableau Implemented * Tableau pipeline location modification
This commit is contained in:
parent
ed698dc40f
commit
d2df40cf2b
@ -6,11 +6,13 @@
|
||||
"type": "object",
|
||||
"definitions": {
|
||||
"dashboardServiceType": {
|
||||
"description": "Type of Dashboard service - Superset or Lookr",
|
||||
"description": "Type of Dashboard service - Superset, Looker, Redash or Tableau",
|
||||
"type": "string",
|
||||
"enum": [
|
||||
"Superset",
|
||||
"Looker"
|
||||
"Looker",
|
||||
"Tableau",
|
||||
"Redash"
|
||||
],
|
||||
"javaEnums": [
|
||||
{
|
||||
@ -18,6 +20,12 @@
|
||||
},
|
||||
{
|
||||
"name": "Looker"
|
||||
},
|
||||
{
|
||||
"name": "Tableau"
|
||||
},
|
||||
{
|
||||
"name": "Redash"
|
||||
}
|
||||
]
|
||||
}
|
||||
@ -34,7 +42,7 @@
|
||||
"maxLength": 64
|
||||
},
|
||||
"serviceType": {
|
||||
"description": "Type of dashboard service such as Lookr or Superset...",
|
||||
"description": "Type of dashboard service such as Looker or Superset...",
|
||||
"$ref": "#/definitions/dashboardServiceType"
|
||||
},
|
||||
"description": {
|
||||
|
||||
33
ingestion/examples/workflows/tableau.json
Normal file
33
ingestion/examples/workflows/tableau.json
Normal file
@ -0,0 +1,33 @@
|
||||
{
|
||||
"source": {
|
||||
"type": "tableau",
|
||||
"config": {
|
||||
"username": "username",
|
||||
"password": "password",
|
||||
"service_name": "local_tableau",
|
||||
"server": "server_address",
|
||||
"site_name": "site_name",
|
||||
"site_url": "site_url",
|
||||
"api_version": "api version",
|
||||
"env": "env"
|
||||
}
|
||||
},
|
||||
"sink": {
|
||||
"type": "metadata-rest",
|
||||
"config": {}
|
||||
},
|
||||
"metadata_server": {
|
||||
"type": "metadata-server",
|
||||
"config": {
|
||||
"api_endpoint": "http://localhost:8585/api",
|
||||
"auth_provider_type": "no-auth"
|
||||
}
|
||||
},
|
||||
"cron": {
|
||||
"minute": "*/5",
|
||||
"hour": null,
|
||||
"day": null,
|
||||
"month": null,
|
||||
"day_of_week": null
|
||||
}
|
||||
}
|
||||
@ -87,14 +87,15 @@ plugins: Dict[str, Set[str]] = {
|
||||
"oracle": {"cx_Oracle"},
|
||||
"presto": {"pyhive~=0.6.3"},
|
||||
"postgres": {"pymysql>=1.0.2", "psycopg2-binary", "GeoAlchemy2"},
|
||||
"redshift": {"sqlalchemy-redshift", "GeoAlchemy2", "psycopg2-binary"},
|
||||
"redshift": {"sqlalchemy-redshift", "GeoAlchemy2", "psycopg2-binary"},
|
||||
"redshift-usage": {"sqlalchemy-redshift", "psycopg2-binary", "GeoAlchemy2"},
|
||||
"scheduler": scheduler_requirements,
|
||||
"snowflake": {"snowflake-sqlalchemy<=1.2.4"},
|
||||
"snowflake-usage": {"snowflake-sqlalchemy<=1.2.4"},
|
||||
"sample-data": {"faker~=8.1.1"},
|
||||
"superset": {},
|
||||
"pii-processor": {"pandas~=1.3.1"}
|
||||
"tableau": {"tableau-api-lib==0.1.22"},
|
||||
"pii-processor": {"pandas~=1.3.1"},
|
||||
}
|
||||
|
||||
build_options = {"includes": ["_cffi_backend"]}
|
||||
@ -129,12 +130,13 @@ setup(
|
||||
plugin: list(dependencies)
|
||||
for (plugin, dependencies) in plugins.items()
|
||||
},
|
||||
"all": list(base_requirements.union(
|
||||
*[
|
||||
"all": list(
|
||||
base_requirements.union(
|
||||
*[
|
||||
requirements
|
||||
for plugin, requirements in plugins.items()
|
||||
]
|
||||
)
|
||||
]
|
||||
)
|
||||
)
|
||||
}
|
||||
|
||||
|
||||
@ -15,6 +15,7 @@ from ...type import basic, schedule
|
||||
class DashboardServiceType(Enum):
|
||||
Superset = 'Superset'
|
||||
Looker = 'Looker'
|
||||
Tableau = 'Tableau'
|
||||
|
||||
|
||||
class DashboardService(BaseModel):
|
||||
|
||||
@ -73,17 +73,22 @@ class LookerSource(Source):
|
||||
metadata_config: MetadataServerConfig
|
||||
status: LookerDashboardSourceStatus
|
||||
|
||||
def __init__(self, config: LookerSourceConfig, metadata_config: MetadataServerConfig, ctx: WorkflowContext):
|
||||
def __init__(
|
||||
self, config: LookerSourceConfig, metadata_config: MetadataServerConfig,
|
||||
ctx: WorkflowContext
|
||||
):
|
||||
super().__init__(ctx)
|
||||
self.config = config
|
||||
self.metadata_config = metadata_config
|
||||
self.client = self.looker_client()
|
||||
self.service = get_dashboard_service_or_create(config.service_name,
|
||||
DashboardServiceType.Looker.name,
|
||||
config.username,
|
||||
config.password,
|
||||
config.url,
|
||||
metadata_config)
|
||||
self.service = get_dashboard_service_or_create(
|
||||
config.service_name,
|
||||
DashboardServiceType.Looker.name,
|
||||
config.username,
|
||||
config.password,
|
||||
config.url,
|
||||
metadata_config
|
||||
)
|
||||
|
||||
def looker_client(self):
|
||||
os.environ["LOOKERSDK_CLIENT_ID"] = self.config.username
|
||||
@ -108,18 +113,20 @@ class LookerSource(Source):
|
||||
|
||||
def _yield_charts(self, chart: DashboardElement):
|
||||
yield Chart(
|
||||
id=uuid.uuid4(),
|
||||
name=chart.id,
|
||||
displayName=chart.title,
|
||||
chartType=chart.type,
|
||||
service=EntityReference(id=self.service.id, type="dashboardService"),
|
||||
description=chart.subtitle_text,
|
||||
)
|
||||
id=uuid.uuid4(),
|
||||
name=chart.id,
|
||||
displayName=chart.title,
|
||||
chartType=chart.type,
|
||||
service=EntityReference(id=self.service.id, type="dashboardService"),
|
||||
description=chart.subtitle_text,
|
||||
)
|
||||
|
||||
def _get_looker_charts(self) -> Optional[Chart]:
|
||||
for child_dashboard in self.client.all_dashboards(fields="id"):
|
||||
fields = ["id", "title", "dashboard_elements", "dashboard_filters", "view_count"]
|
||||
charts = self.client.dashboard_dashboard_elements(dashboard_id=child_dashboard.id, fields=",".join(fields))
|
||||
charts = self.client.dashboard_dashboard_elements(
|
||||
dashboard_id=child_dashboard.id, fields=",".join(fields)
|
||||
)
|
||||
for chart in charts:
|
||||
self._yield_charts(chart)
|
||||
|
||||
@ -161,8 +168,13 @@ class LookerSource(Source):
|
||||
dashboard_id=child_dashboard, fields=",".join(fields)
|
||||
)
|
||||
except SDKError:
|
||||
self.status.warning(child_dashboard, f"Error occurred while loading dashboard {child_dashboard}.",)
|
||||
self.status.warning(
|
||||
child_dashboard,
|
||||
f"Error occurred while loading dashboard {child_dashboard}.", )
|
||||
return self.looker_dashboard(dashboard)
|
||||
|
||||
def get_status(self) -> SourceStatus:
|
||||
return self.status
|
||||
|
||||
def close(self):
|
||||
pass
|
||||
|
||||
@ -39,9 +39,11 @@ def get_filter_name(filter_obj):
|
||||
def get_owners(owners_obj):
|
||||
owners = []
|
||||
for owner in owners_obj:
|
||||
dashboard_owner = DashboardOwner(first_name=owner['first_name'],
|
||||
last_name=owner['last_name'],
|
||||
username=owner['username'])
|
||||
dashboard_owner = DashboardOwner(
|
||||
first_name=owner['first_name'],
|
||||
last_name=owner['last_name'],
|
||||
username=owner['username']
|
||||
)
|
||||
owners.append(dashboard_owner)
|
||||
return owners
|
||||
|
||||
@ -84,19 +86,23 @@ class SupersetSource(Source):
|
||||
platform = "superset"
|
||||
service_type = "Superset"
|
||||
|
||||
def __init__(self, config: SupersetConfig, metadata_config: MetadataServerConfig, ctx: WorkflowContext):
|
||||
def __init__(
|
||||
self, config: SupersetConfig, metadata_config: MetadataServerConfig,
|
||||
ctx: WorkflowContext
|
||||
):
|
||||
super().__init__(ctx)
|
||||
self.config = config
|
||||
self.metadata_config = metadata_config
|
||||
self.status = SourceStatus()
|
||||
self.client = SupersetAPIClient(self.config)
|
||||
self.service = get_dashboard_service_or_create(config.service_name,
|
||||
DashboardServiceType.Superset.name,
|
||||
config.username,
|
||||
config.password,
|
||||
config.url,
|
||||
metadata_config)
|
||||
|
||||
self.service = get_dashboard_service_or_create(
|
||||
config.service_name,
|
||||
DashboardServiceType.Superset.name,
|
||||
config.username,
|
||||
config.password,
|
||||
config.url,
|
||||
metadata_config
|
||||
)
|
||||
|
||||
@classmethod
|
||||
def create(cls, config_dict: dict, metadata_config_dict: dict, ctx: WorkflowContext):
|
||||
@ -115,7 +121,9 @@ class SupersetSource(Source):
|
||||
dashboard_id = dashboard_json['id']
|
||||
name = dashboard_json['dashboard_title']
|
||||
dashboard_url = f"{self.config.url[:-1]}{dashboard_json['url']}"
|
||||
last_modified = dateparser.parse(dashboard_json.get("changed_on_utc", "now")).timestamp() * 1000
|
||||
last_modified = dateparser.parse(
|
||||
dashboard_json.get("changed_on_utc", "now")
|
||||
).timestamp() * 1000
|
||||
owners = get_owners(dashboard_json['owners'])
|
||||
raw_position_data = dashboard_json.get("position_json", "{}")
|
||||
charts = []
|
||||
@ -127,14 +135,16 @@ class SupersetSource(Source):
|
||||
chart_id = value.get('meta', {}).get('chartId', 'unknown')
|
||||
charts.append(chart_id)
|
||||
|
||||
return Dashboard(name=dashboard_id,
|
||||
displayName=name,
|
||||
description="",
|
||||
url=dashboard_url,
|
||||
owners=owners,
|
||||
charts=charts,
|
||||
service=EntityReference(id=self.service.id, type="dashboardService"),
|
||||
lastModified=last_modified)
|
||||
return Dashboard(
|
||||
name=dashboard_id,
|
||||
displayName=name,
|
||||
description="",
|
||||
url=dashboard_url,
|
||||
owners=owners,
|
||||
charts=charts,
|
||||
service=EntityReference(id=self.service.id, type="dashboardService"),
|
||||
lastModified=last_modified
|
||||
)
|
||||
|
||||
def _fetch_dashboards(self) -> Iterable[Record]:
|
||||
current_page = 0
|
||||
@ -174,7 +184,9 @@ class SupersetSource(Source):
|
||||
def _build_chart(self, chart_json) -> Chart:
|
||||
chart_id = chart_json['id']
|
||||
name = chart_json['slice_name']
|
||||
last_modified = dateparser.parse(chart_json.get("changed_on_utc", "now")).timestamp() * 1000
|
||||
last_modified = dateparser.parse(
|
||||
chart_json.get("changed_on_utc", "now")
|
||||
).timestamp() * 1000
|
||||
chart_type = chart_json["viz_type"]
|
||||
chart_url = f"{self.config.url}{chart_json['url']}"
|
||||
datasource_id = chart_json["datasource_id"]
|
||||
@ -198,16 +210,18 @@ class SupersetSource(Source):
|
||||
"Dimensions": ", ".join(group_bys),
|
||||
}
|
||||
|
||||
chart = Chart(name=chart_id,
|
||||
displayName=name,
|
||||
description="",
|
||||
chart_type=chart_type,
|
||||
url=chart_url,
|
||||
owners=owners,
|
||||
datasource_fqn=datasource_fqn,
|
||||
lastModified=last_modified,
|
||||
service=EntityReference(id=self.service.id, type="dashboardService"),
|
||||
custom_props=custom_properties)
|
||||
chart = Chart(
|
||||
name=chart_id,
|
||||
displayName=name,
|
||||
description="",
|
||||
chart_type=chart_type,
|
||||
url=chart_url,
|
||||
owners=owners,
|
||||
datasource_fqn=datasource_fqn,
|
||||
lastModified=last_modified,
|
||||
service=EntityReference(id=self.service.id, type="dashboardService"),
|
||||
custom_props=custom_properties
|
||||
)
|
||||
return chart
|
||||
|
||||
def _fetch_charts(self):
|
||||
|
||||
161
ingestion/src/metadata/ingestion/source/tableau.py
Normal file
161
ingestion/src/metadata/ingestion/source/tableau.py
Normal file
@ -0,0 +1,161 @@
|
||||
import logging
|
||||
import uuid
|
||||
from typing import Iterable, Optional
|
||||
import dateutil.parser as dateparser
|
||||
from metadata.generated.schema.type.entityReference import EntityReference
|
||||
from metadata.ingestion.api.common import ConfigModel, Record, WorkflowContext
|
||||
from metadata.generated.schema.entity.services.dashboardService import DashboardServiceType
|
||||
from metadata.ingestion.api.common import IncludeFilterPattern
|
||||
from metadata.ingestion.api.source import Source, SourceStatus
|
||||
from metadata.ingestion.models.table_metadata import Dashboard, Chart, DashboardOwner
|
||||
from metadata.ingestion.ometa.openmetadata_rest import MetadataServerConfig
|
||||
from metadata.utils.helpers import get_dashboard_service_or_create
|
||||
from tableau_api_lib import TableauServerConnection
|
||||
from tableau_api_lib.utils.querying import get_workbooks_dataframe, get_views_dataframe
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class TableauSourceConfig(ConfigModel):
|
||||
username: str
|
||||
password: str
|
||||
server: str
|
||||
api_version: str
|
||||
env: Optional[str] = 'tableau_prod'
|
||||
site_name: str
|
||||
site_url: str
|
||||
service_name: str
|
||||
service_type: str = "Tableau"
|
||||
dashboard_pattern: IncludeFilterPattern = IncludeFilterPattern.allow_all()
|
||||
chart_pattern: IncludeFilterPattern = IncludeFilterPattern.allow_all()
|
||||
|
||||
|
||||
class TableauSource(Source):
|
||||
config: TableauSourceConfig
|
||||
metadata_config: MetadataServerConfig
|
||||
status: SourceStatus
|
||||
|
||||
def __init__(
|
||||
self, config: TableauSourceConfig, metadata_config: MetadataServerConfig,
|
||||
ctx: WorkflowContext
|
||||
):
|
||||
super().__init__(ctx)
|
||||
self.config = config
|
||||
self.metadata_config = metadata_config
|
||||
self.client = self.tableau_client()
|
||||
self.service = get_dashboard_service_or_create(
|
||||
config.service_name,
|
||||
DashboardServiceType.Tableau.name,
|
||||
config.username,
|
||||
config.password,
|
||||
config.server,
|
||||
metadata_config
|
||||
)
|
||||
self.status = SourceStatus()
|
||||
self.dashboards = get_workbooks_dataframe(self.client).to_dict()
|
||||
self.all_dashboard_details = get_views_dataframe(self.client).to_dict()
|
||||
|
||||
def tableau_client(self):
|
||||
tableau_server_config = {
|
||||
f'{self.config.env}': {
|
||||
'server': self.config.server,
|
||||
'api_version': self.config.api_version,
|
||||
'username': self.config.username,
|
||||
'password': self.config.password,
|
||||
'site_name': self.config.site_name,
|
||||
'site_url': self.config.site_url
|
||||
}
|
||||
}
|
||||
try:
|
||||
conn = TableauServerConnection(config_json=tableau_server_config, env='tableau_prod')
|
||||
conn.sign_in().json()
|
||||
except Exception as err:
|
||||
print(f"{repr(err)}: {err}")
|
||||
return conn
|
||||
|
||||
@classmethod
|
||||
def create(cls, config_dict: dict, metadata_config_dict: dict, ctx: WorkflowContext):
|
||||
config = TableauSourceConfig.parse_obj(config_dict)
|
||||
metadata_config = MetadataServerConfig.parse_obj(metadata_config_dict)
|
||||
return cls(config, metadata_config, ctx)
|
||||
|
||||
def prepare(self):
|
||||
pass
|
||||
|
||||
def next_record(self) -> Iterable[Record]:
|
||||
yield from self._get_tableau_charts()
|
||||
yield from self._get_tableau_dashboard()
|
||||
|
||||
@staticmethod
|
||||
def get_owner(owner) -> DashboardOwner:
|
||||
return [DashboardOwner(
|
||||
first_name=owner['fullName'].split(" ")[0],
|
||||
last_name=owner['fullName'].split(" ")[1],
|
||||
username=owner['name']
|
||||
)]
|
||||
|
||||
def _get_tableau_dashboard(self) -> Dashboard:
|
||||
for index in range(len(self.dashboards['id'])):
|
||||
dashboard_id = self.dashboards['id'][index]
|
||||
dashboard_name = self.dashboards['name'][index]
|
||||
dashboard_tag = self.dashboards['tags'][index]
|
||||
dashboard_url = self.dashboards['webpageUrl'][index]
|
||||
tag_labels = []
|
||||
if hasattr(dashboard_tag, 'tag'):
|
||||
for tag in dashboard_tag['tag']:
|
||||
tag_labels.append(tag['label'])
|
||||
dashboard_chart = []
|
||||
for chart_index in self.all_dashboard_details['workbook']:
|
||||
dashboard_owner = self.all_dashboard_details['owner'][chart_index]
|
||||
chart = self.all_dashboard_details['workbook'][chart_index]
|
||||
if chart['id'] == dashboard_id:
|
||||
dashboard_chart.append(self.all_dashboard_details["name"][chart_index])
|
||||
yield Dashboard(
|
||||
id=uuid.uuid4(),
|
||||
name=dashboard_id,
|
||||
displayName=dashboard_name,
|
||||
description='',
|
||||
owner=self.get_owner(dashboard_owner),
|
||||
charts=dashboard_chart,
|
||||
tags=list(tag_labels),
|
||||
url=dashboard_url,
|
||||
service=EntityReference(
|
||||
id=self.service.id, type="dashboardService"
|
||||
),
|
||||
last_modified=dateparser.parse(chart["updatedAt"]).timestamp() * 1000
|
||||
)
|
||||
|
||||
def _get_tableau_charts(self):
|
||||
for index in range(len(self.all_dashboard_details['id'])):
|
||||
chart_name = self.all_dashboard_details["name"][index]
|
||||
chart_id = self.all_dashboard_details["id"][index]
|
||||
chart_tags = self.all_dashboard_details["tags"][index]
|
||||
chart_type = self.all_dashboard_details["sheetType"][index]
|
||||
chart_url = f"{self.config.server}/#/site/{self.config.site_name}" \
|
||||
f"{self.all_dashboard_details['contentUrl'][index]}"
|
||||
chart_owner = self.all_dashboard_details["owner"][index]
|
||||
chart_datasource_fqn = chart_url.replace("/", ".")
|
||||
chart_last_modified = self.all_dashboard_details["updatedAt"][index]
|
||||
tag_labels = []
|
||||
if hasattr(chart_tags, 'tag'):
|
||||
for tag in chart_tags['tag']:
|
||||
tag_labels.append(tag['label'])
|
||||
yield Chart(
|
||||
name=chart_id,
|
||||
displayName=chart_name,
|
||||
description="",
|
||||
chart_type=chart_type,
|
||||
url=chart_url,
|
||||
owners=self.get_owner(chart_owner),
|
||||
datasource_fqn=chart_datasource_fqn,
|
||||
last_modified=dateparser.parse(
|
||||
chart_last_modified
|
||||
).timestamp() * 1000,
|
||||
service=EntityReference(id=self.service.id, type="dashboardService"),
|
||||
)
|
||||
|
||||
def get_status(self) -> SourceStatus:
|
||||
return self.status
|
||||
|
||||
def close(self):
|
||||
pass
|
||||
Loading…
x
Reference in New Issue
Block a user