Fix #5409: Implement Dashboard Topology (#5495)

* Fix #5409: Dashboard Service

* Fix #5409: Implement Topology to Dashboards

* Removed print
This commit is contained in:
Mayur Singal 2022-06-17 15:22:54 +05:30 committed by GitHub
parent b03411470d
commit f53a07815e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 462 additions and 345 deletions

View File

@ -0,0 +1,254 @@
# Copyright 2021 Collate
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Base class for ingesting database services
"""
from abc import ABC, abstractmethod
from dataclasses import dataclass
from typing import Any, Iterable, List, Optional
from metadata.generated.schema.api.data.createChart import CreateChartRequest
from metadata.generated.schema.api.data.createDashboard import CreateDashboardRequest
from metadata.generated.schema.api.lineage.addLineage import AddLineageRequest
from metadata.generated.schema.api.teams.createUser import CreateUserRequest
from metadata.generated.schema.entity.data.chart import Chart
from metadata.generated.schema.entity.data.dashboard import Dashboard
from metadata.generated.schema.entity.services.connections.metadata.openMetadataConnection import (
OpenMetadataConnection,
)
from metadata.generated.schema.entity.services.dashboardService import (
DashboardConnection,
DashboardService,
)
from metadata.generated.schema.metadataIngestion.dashboardServiceMetadataPipeline import (
DashboardServiceMetadataPipeline,
)
from metadata.generated.schema.metadataIngestion.workflow import (
Source as WorkflowSource,
)
from metadata.ingestion.api.source import Source, SourceStatus
from metadata.ingestion.api.topology_runner import TopologyRunnerMixin
from metadata.ingestion.models.ometa_tag_category import OMetaTagAndCategory
from metadata.ingestion.models.topology import (
NodeStage,
ServiceTopology,
TopologyNode,
create_source_context,
)
from metadata.ingestion.ometa.ometa_api import OpenMetadata
from metadata.utils.connections import get_connection, test_connection
from metadata.utils.filters import filter_by_dashboard
from metadata.utils.logger import ingestion_logger
logger = ingestion_logger()
class DashboardServiceTopology(ServiceTopology):
"""
Defines the hierarchy in Dashboard Services.
service -> dashboard -> charts.
We could have a topology validator. We can only consume
data that has been produced by any parent node.
"""
root = TopologyNode(
producer="get_services",
stages=[
NodeStage(
type_=DashboardService,
context="dashboard_service",
processor="yield_dashboard_service",
),
NodeStage(
type_=OMetaTagAndCategory,
context="tags",
processor="yield_tag",
ack_sink=False,
nullable=True,
),
],
children=["dashboard"],
)
dashboard = TopologyNode(
producer="get_dashboard",
stages=[
NodeStage(
type_=Chart,
context="charts",
processor="yield_dashboard_chart",
consumer=["dashboard_service"],
nullable=True,
cache_all=True,
),
NodeStage(
type_=CreateUserRequest,
context="owner",
processor="yield_owner",
nullable=True,
),
NodeStage(
type_=Dashboard,
context="dashboard",
processor="yield_dashboard",
consumer=["dashboard_service"],
),
NodeStage(
type_=AddLineageRequest,
context="lineage",
processor="yield_dashboard_lineage",
consumer=["dashboard_service"],
ack_sink=False,
nullable=True,
),
],
)
@dataclass
class DashboardSourceStatus(SourceStatus):
"""
Reports the source status after ingestion
"""
def scanned(self, record: str) -> None:
self.success.append(record)
logger.info(f"Scanned: {record}")
def filter(self, record: str, err: str) -> None:
self.filtered.append(record)
logger.warning(f"Filtered {record}: {err}")
class DashboardServiceSource(TopologyRunnerMixin, Source, ABC):
"""
Base class for Database Services.
It implements the topology and context.
"""
@abstractmethod
def yield_dashboard(
self, dashboard_details: Any
) -> Iterable[CreateDashboardRequest]:
"""
Method to Get Dashboard Entity
"""
@abstractmethod
def yield_dashboard_lineage(
self, dashboard_details: Any
) -> Optional[Iterable[AddLineageRequest]]:
"""
Get lineage between dashboard and data sources
"""
@abstractmethod
def yield_dashboard_chart(
self, dashboard_details: Any
) -> Optional[Iterable[CreateChartRequest]]:
"""
Method to fetch charts linked to dashboard
"""
@abstractmethod
def get_dashboards_list(self) -> Optional[List[Any]]:
"""
Get List of all dashboards
"""
@abstractmethod
def get_dashboard_name(self, dashboard_details: Any) -> str:
"""
Get Dashboard Name
"""
@abstractmethod
def get_dashboard_details(self, dashboard: Any) -> Any:
"""
Get Dashboard Details
"""
def yield_tag(self, *args, **kwargs) -> Optional[Iterable[OMetaTagAndCategory]]:
"""
Method to fetch dashboard tags
"""
return # Dashboard does not supports fetching tags except Tableau
def yield_owner(self, *args, **kwargs) -> Optional[CreateUserRequest]:
"""
Method to fetch dashboard owner
"""
return # Dashboard does not supports fetching owner details except Tableau
status: DashboardSourceStatus
source_config: DashboardServiceMetadataPipeline
config: WorkflowSource
metadata: OpenMetadata
# Big union of types we want to fetch dynamically
service_connection: DashboardConnection.__fields__["config"].type_
topology = DashboardServiceTopology()
context = create_source_context(topology)
@abstractmethod
def __init__(
self,
config: WorkflowSource,
metadata_config: OpenMetadataConnection,
):
super().__init__()
self.config = config
self.metadata_config = metadata_config
self.metadata = OpenMetadata(metadata_config)
self.service_connection = self.config.serviceConnection.__root__.config
self.source_config: DashboardServiceMetadataPipeline = (
self.config.sourceConfig.config
)
self.connection = get_connection(self.service_connection)
self.test_connection()
self.status = DashboardSourceStatus()
self.client = self.connection.client
self.metadata_client = OpenMetadata(self.metadata_config)
def get_status(self) -> SourceStatus:
return self.status
def close(self):
pass
def get_services(self) -> Iterable[WorkflowSource]:
yield self.config
def yield_dashboard_service(self, config: WorkflowSource):
yield self.metadata.get_create_service_from_source(
entity=DashboardService, config=config
)
def get_dashboard(self) -> Any:
for dashboard in self.get_dashboards_list():
dashboard_details = self.get_dashboard_details(dashboard)
if filter_by_dashboard(
self.source_config.dashboardFilterPattern,
self.get_dashboard_name(dashboard_details),
):
self.status.filter(
self.get_dashboard_name(dashboard),
"Dashboard Pattern not Allowed",
)
continue
yield dashboard_details
def test_connection(self) -> None:
test_connection(self.connection)
def prepare(self):
pass

View File

@ -1,143 +0,0 @@
from abc import ABC, abstractmethod
from dataclasses import dataclass
from typing import Any, Iterable, List, Optional, Union
from metadata.generated.schema.api.data.createChart import CreateChartRequest
from metadata.generated.schema.api.data.createDashboard import CreateDashboardRequest
from metadata.generated.schema.api.lineage.addLineage import AddLineageRequest
from metadata.generated.schema.entity.services.connections.metadata.openMetadataConnection import (
OpenMetadataConnection,
)
from metadata.generated.schema.entity.services.dashboardService import DashboardService
from metadata.generated.schema.metadataIngestion.dashboardServiceMetadataPipeline import (
DashboardServiceMetadataPipeline,
)
from metadata.generated.schema.metadataIngestion.workflow import (
Source as WorkflowSource,
)
from metadata.ingestion.api.common import Entity
from metadata.ingestion.api.source import Source, SourceStatus
from metadata.ingestion.ometa.ometa_api import OpenMetadata
from metadata.utils.connections import get_connection, test_connection
from metadata.utils.filters import filter_by_dashboard
from metadata.utils.logger import ingestion_logger
logger = ingestion_logger()
@dataclass
class DashboardSourceStatus(SourceStatus):
"""
Reports the source status after ingestion
"""
def scanned(self, record: str) -> None:
self.success.append(record)
logger.info(f"Scanned: {record}")
def filter(self, record: str, err: str) -> None:
self.filtered.append(record)
logger.warning(f"Filtered {record}: {err}")
class DashboardSourceService(Source, ABC):
@abstractmethod
def get_dashboards_list(self) -> Optional[List[Any]]:
"""
Get List of all dashboards
"""
@abstractmethod
def get_dashboard_name(self, dashboard_details: Any) -> str:
"""
Get Dashboard Name
"""
@abstractmethod
def get_dashboard_details(self, dashboard: Any) -> Any:
"""
Get Dashboard Details
"""
@abstractmethod
def get_dashboard_entity(self, dashboard_details: Any) -> CreateDashboardRequest:
"""
Method to Get Dashboard Entity
"""
@abstractmethod
def get_lineage(self, dashboard_details: Any) -> Optional[AddLineageRequest]:
"""
Get lineage between dashboard and data sources
"""
@abstractmethod
def fetch_dashboard_charts(
self, dashboard: Any
) -> Optional[Iterable[CreateChartRequest]]:
"""
Method to fetch charts linked to dashboard
"""
@abstractmethod
def __init__(
self,
config: WorkflowSource,
metadata_config: OpenMetadataConnection,
):
super().__init__()
self.config = config
self.metadata_config = metadata_config
self.metadata = OpenMetadata(metadata_config)
self.service_connection = self.config.serviceConnection.__root__.config
self.source_config: DashboardServiceMetadataPipeline = (
self.config.sourceConfig.config
)
self.connection = get_connection(self.service_connection)
self.test_connection()
self.client = self.connection.client
self.service = self.metadata.get_service_or_create(
entity=DashboardService, config=config
)
self.status = DashboardSourceStatus()
self.metadata_client = OpenMetadata(self.metadata_config)
def next_record(self) -> Iterable[Entity]:
yield from self.process_dashboards()
def process_dashboards(
self,
) -> Iterable[Union[CreateDashboardRequest, CreateChartRequest, AddLineageRequest]]:
"""Get dashboard method"""
for dashboard in self.get_dashboards_list():
try:
dashboard_details = self.get_dashboard_details(dashboard)
if filter_by_dashboard(
self.source_config.dashboardFilterPattern,
self.get_dashboard_name(dashboard_details),
):
self.status.filter(
self.get_dashboard_name(dashboard),
"Dashboard Pattern not Allowed",
)
continue
yield from self.fetch_dashboard_charts(dashboard_details) or []
yield from self.get_dashboard_entity(dashboard_details)
if self.source_config.dbServiceName:
yield from self.get_lineage(dashboard_details)
except Exception as err:
logger.error(repr(err))
self.status.failure(self.get_dashboard_name(dashboard), repr(err))
def get_status(self) -> SourceStatus:
return self.status
def close(self):
pass
def test_connection(self) -> None:
test_connection(self.connection)
def prepare(self):
pass

View File

@ -35,7 +35,7 @@ from metadata.generated.schema.type.entityLineage import EntitiesEdge
from metadata.generated.schema.type.entityReference import EntityReference
from metadata.ingestion.api.source import InvalidSourceException
from metadata.ingestion.ometa.ometa_api import OpenMetadata
from metadata.ingestion.source.dashboard.dashboard_source import DashboardSourceService
from metadata.ingestion.source.dashboard.dashboard_service import DashboardServiceSource
from metadata.ingestion.source.database.common_db_source import SQLSourceStatus
from metadata.utils import fqn
from metadata.utils.connections import get_connection
@ -52,7 +52,7 @@ HEADERS = {"Content-Type": "application/json", "Accept": "*/*"}
logger = ingestion_logger()
class MetabaseSource(DashboardSourceService):
class MetabaseSource(DashboardServiceSource):
"""Metabase entity class
Args:
@ -78,13 +78,8 @@ class MetabaseSource(DashboardSourceService):
metadata_config: OpenMetadataConnection,
):
super().__init__(config, metadata_config)
params = dict()
params["username"] = self.service_connection.username
params["password"] = self.service_connection.password.get_secret_value()
self.connection = get_connection(self.service_connection)
self.metabase_session = self.connection.client["metabase_session"]
self.charts = []
self.metric_charts = []
@classmethod
def create(cls, config_dict, metadata_config: OpenMetadataConnection):
@ -126,7 +121,9 @@ class MetabaseSource(DashboardSourceService):
resp_dashboard = self.req_get(f"/api/dashboard/{dashboard['id']}")
return resp_dashboard.json()
def get_dashboard_entity(self, dashboard_details: dict) -> CreateDashboardRequest:
def yield_dashboard(
self, dashboard_details: dict
) -> Iterable[CreateDashboardRequest]:
"""
Method to Get Dashboard Entity
"""
@ -139,17 +136,18 @@ class MetabaseSource(DashboardSourceService):
dashboardUrl=dashboard_url,
displayName=dashboard_details["name"],
description=dashboard_details.get("description", ""),
charts=get_chart_entities_from_id(
chart_ids=self.charts,
metadata=self.metadata,
service_name=self.config.serviceName,
charts=[
EntityReference(id=chart.id.__root__, type="chart")
for chart in self.context.charts
],
service=EntityReference(
id=self.context.dashboard_service.id.__root__, type="dashboardService"
),
service=EntityReference(id=self.service.id, type="dashboardService"),
)
def fetch_dashboard_charts(
def yield_dashboard_chart(
self, dashboard_details: dict
) -> Iterable[CreateChartRequest]:
) -> Optional[Iterable[CreateChartRequest]]:
"""Get chart method
Args:
@ -157,7 +155,6 @@ class MetabaseSource(DashboardSourceService):
Returns:
Iterable[CreateChartRequest]
"""
self.charts = []
charts = dashboard_details["ordered_cards"]
for chart in charts:
try:
@ -186,22 +183,26 @@ class MetabaseSource(DashboardSourceService):
).value,
chartUrl=chart_url,
service=EntityReference(
id=self.service.id, type="dashboardService"
id=self.context.dashboard_service.id.__root__,
type="dashboardService",
),
)
self.charts.append(chart_details["name"])
self.status.scanned(chart_details["name"])
except Exception as err: # pylint: disable=broad-except
logger.error(repr(err))
logger.debug(traceback.format_exc())
continue
def get_lineage(self, dashboard_details: dict) -> AddLineageRequest:
def yield_dashboard_lineage(
self, dashboard_details: dict
) -> Optional[Iterable[AddLineageRequest]]:
"""Get lineage method
Args:
dashboard_details
"""
if not self.source_config.dbServiceName:
return
chart_list, dashboard_name = (
dashboard_details["ordered_cards"],
dashboard_details["name"],

View File

@ -11,7 +11,7 @@
"""PowerBI source module"""
import traceback
from typing import Any, Iterable, List, Optional
from typing import Iterable, List, Optional
from metadata.generated.schema.api.data.createChart import CreateChartRequest
from metadata.generated.schema.api.data.createDashboard import CreateDashboardRequest
@ -31,16 +31,15 @@ from metadata.generated.schema.metadataIngestion.workflow import (
from metadata.generated.schema.type.entityLineage import EntitiesEdge
from metadata.generated.schema.type.entityReference import EntityReference
from metadata.ingestion.api.source import InvalidSourceException
from metadata.ingestion.source.dashboard.dashboard_source import DashboardSourceService
from metadata.ingestion.source.dashboard.dashboard_service import DashboardServiceSource
from metadata.utils import fqn
from metadata.utils.filters import filter_by_chart
from metadata.utils.helpers import get_chart_entities_from_id
from metadata.utils.logger import ingestion_logger
logger = ingestion_logger()
class PowerbiSource(DashboardSourceService):
class PowerbiSource(DashboardServiceSource):
"""PowerBi entity class
Args:
config:
@ -57,7 +56,6 @@ class PowerbiSource(DashboardSourceService):
metadata_config: OpenMetadataConnection,
):
super().__init__(config, metadata_config)
self.charts = []
@classmethod
def create(cls, config_dict, metadata_config: OpenMetadataConnection):
@ -95,7 +93,9 @@ class PowerbiSource(DashboardSourceService):
"""
return dashboard
def get_dashboard_entity(self, dashboard_details: dict) -> CreateDashboardRequest:
def yield_dashboard(
self, dashboard_details: dict
) -> Iterable[CreateDashboardRequest]:
"""
Method to Get Dashboard Entity, Dashboard Charts & Lineage
"""
@ -105,18 +105,23 @@ class PowerbiSource(DashboardSourceService):
dashboardUrl=dashboard_details["webUrl"],
displayName=dashboard_details["displayName"],
description="",
charts=get_chart_entities_from_id(
chart_ids=self.charts,
metadata=self.metadata,
service_name=self.config.serviceName,
charts=[
EntityReference(id=chart.id.__root__, type="chart")
for chart in self.context.charts
],
service=EntityReference(
id=self.context.dashboard_service.id.__root__, type="dashboardService"
),
service=EntityReference(id=self.service.id, type="dashboardService"),
)
def get_lineage(self, dashboard_details: Any) -> Optional[AddLineageRequest]:
def yield_dashboard_lineage(
self, dashboard_details: dict
) -> Optional[Iterable[AddLineageRequest]]:
"""
Get lineage between dashboard and data sources
"""
if not self.source_config.dbServiceName:
return
try:
charts = self.client.fetch_charts(dashboard_id=dashboard_details["id"]).get(
"value"
@ -166,16 +171,15 @@ class PowerbiSource(DashboardSourceService):
logger.debug(traceback.format_exc())
logger.error(err)
def fetch_dashboard_charts(
def yield_dashboard_chart(
self, dashboard_details: dict
) -> Iterable[CreateChartRequest]:
) -> Optional[Iterable[CreateChartRequest]]:
"""Get chart method
Args:
dashboard_details:
Returns:
Iterable[Chart]
"""
self.charts = []
charts = self.client.fetch_charts(dashboard_id=dashboard_details["id"]).get(
"value"
)
@ -195,10 +199,10 @@ class PowerbiSource(DashboardSourceService):
# PBI has no hostPort property. All URL details are present in the webUrl property.
chartUrl=chart["embedUrl"],
service=EntityReference(
id=self.service.id, type="dashboardService"
id=self.context.dashboard_service.id.__root__,
type="dashboardService",
),
)
self.charts.append(chart["id"])
self.status.scanned(chart["title"])
except Exception as err: # pylint: disable=broad-except
logger.debug(traceback.format_exc())

View File

@ -8,9 +8,7 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from dataclasses import dataclass, field
from typing import Dict, Iterable, List, Optional
from typing import Iterable, List, Optional
from sql_metadata import Parser
@ -31,44 +29,23 @@ from metadata.generated.schema.metadataIngestion.workflow import (
)
from metadata.generated.schema.type.entityLineage import EntitiesEdge
from metadata.generated.schema.type.entityReference import EntityReference
from metadata.ingestion.api.common import Entity
from metadata.ingestion.api.source import InvalidSourceException, SourceStatus
from metadata.ingestion.source.dashboard.dashboard_source import DashboardSourceService
from metadata.ingestion.api.source import InvalidSourceException
from metadata.ingestion.source.dashboard.dashboard_service import DashboardServiceSource
from metadata.utils import fqn
from metadata.utils.helpers import get_chart_entities_from_id, get_standard_chart_type
from metadata.utils.helpers import get_standard_chart_type
from metadata.utils.logger import ingestion_logger
from metadata.utils.sql_lineage import search_table_entities
logger = ingestion_logger()
@dataclass
class RedashSourceStatus(SourceStatus):
items_scanned: int = 0
filtered: List[str] = field(default_factory=list)
def item_scanned_status(self) -> None:
self.items_scanned += 1
def item_dropped_status(self, item: str) -> None:
self.filtered.append(item)
class RedashSource(DashboardSourceService):
config: WorkflowSource
metadata_config: OpenMetadataConnection
status: RedashSourceStatus
platform = "redash"
dashboards_to_charts: Dict[str, List[str]]
class RedashSource(DashboardServiceSource):
def __init__(
self,
config: WorkflowSource,
metadata_config: OpenMetadataConnection,
):
super().__init__(config, metadata_config)
self.status = RedashSourceStatus()
self.dashboards_to_charts = {}
@classmethod
def create(cls, config_dict: dict, metadata_config: OpenMetadataConnection):
@ -99,28 +76,32 @@ class RedashSource(DashboardSourceService):
"""
return self.client.get_dashboard(dashboard["slug"])
def get_dashboard_entity(self, dashboard_details: dict) -> CreateDashboardRequest:
def yield_dashboard(
self, dashboard_details: dict
) -> Iterable[CreateDashboardRequest]:
"""
Method to Get Dashboard Entity
"""
self.status.item_scanned_status()
dashboard_description = ""
for widgets in dashboard_details.get("widgets", []):
dashboard_description = widgets.get("text")
yield CreateDashboardRequest(
name=dashboard_details.get("id"),
displayName=dashboard_details["name"],
description=dashboard_description if dashboard_details else "",
charts=get_chart_entities_from_id(
chart_ids=self.dashboards_to_charts[dashboard_details.get("id")],
metadata=self.metadata,
service_name=self.config.serviceName,
description=dashboard_description,
charts=[
EntityReference(id=chart.id.__root__, type="chart")
for chart in self.context.charts
],
service=EntityReference(
id=self.context.dashboard_service.id.__root__, type="dashboardService"
),
service=EntityReference(id=self.service.id, type="dashboardService"),
dashboardUrl=f"/dashboard/{dashboard_details.get('slug', '')}",
)
self.status.scanned(dashboard_details["name"])
def get_lineage(
def yield_dashboard_lineage(
self, dashboard_details: dict
) -> Optional[Iterable[AddLineageRequest]]:
"""
@ -128,6 +109,8 @@ class RedashSource(DashboardSourceService):
In redash we do not get table, database_schema or database name but we do get query
the lineage is being generated based on the query
"""
if not self.source_config.dbServiceName:
return
for widgets in dashboard_details.get("widgets", []):
visualization = widgets.get("visualization")
if not visualization.get("query"):
@ -169,16 +152,14 @@ class RedashSource(DashboardSourceService):
)
yield lineage
def fetch_dashboard_charts(
def yield_dashboard_chart(
self, dashboard_details: dict
) -> Optional[Iterable[CreateChartRequest]]:
"""
Metod to fetch charts linked to dashboard
"""
self.dashboards_to_charts[dashboard_details.get("id")] = []
for widgets in dashboard_details.get("widgets", []):
visualization = widgets.get("visualization")
self.dashboards_to_charts[dashboard_details.get("id")].append(widgets["id"])
yield CreateChartRequest(
name=widgets["id"],
displayName=visualization["query"]["name"]
@ -187,7 +168,10 @@ class RedashSource(DashboardSourceService):
chartType=get_standard_chart_type(
visualization["type"] if visualization else ""
),
service=EntityReference(id=self.service.id, type="dashboardService"),
service=EntityReference(
id=self.context.dashboard_service.id.__root__,
type="dashboardService",
),
chartUrl=f"/dashboard/{dashboard_details.get('slug', '')}",
description=visualization["description"] if visualization else "",
)

View File

@ -14,7 +14,7 @@ Superset source module
import json
import traceback
from typing import List, Optional
from typing import Iterable, List, Optional
import dateutil.parser as dateparser
@ -40,7 +40,7 @@ from metadata.generated.schema.metadataIngestion.workflow import (
from metadata.generated.schema.type.entityLineage import EntitiesEdge
from metadata.generated.schema.type.entityReference import EntityReference
from metadata.ingestion.api.source import InvalidSourceException, SourceStatus
from metadata.ingestion.source.dashboard.dashboard_source import DashboardSourceService
from metadata.ingestion.source.dashboard.dashboard_service import DashboardServiceSource
from metadata.utils import fqn
from metadata.utils.helpers import get_chart_entities_from_id, get_standard_chart_type
from metadata.utils.logger import ingestion_logger
@ -86,7 +86,7 @@ def get_filter_name(filter_obj):
return f"{clause} {column} {operator} {comparator}"
class SupersetSource(DashboardSourceService):
class SupersetSource(DashboardServiceSource):
"""
Superset source class
@ -116,7 +116,6 @@ class SupersetSource(DashboardSourceService):
metadata_config: OpenMetadataConnection,
):
super().__init__(config, metadata_config)
self.charts = []
@classmethod
def create(cls, config_dict: dict, metadata_config: OpenMetadataConnection):
@ -169,7 +168,9 @@ class SupersetSource(DashboardSourceService):
"""
return dashboard
def get_dashboard_entity(self, dashboard_details: dict) -> CreateDashboardRequest:
def yield_dashboard(
self, dashboard_details: dict
) -> Iterable[CreateDashboardRequest]:
"""
Method to Get Dashboard Entity
"""
@ -178,12 +179,13 @@ class SupersetSource(DashboardSourceService):
displayName=dashboard_details["dashboard_title"],
description="",
dashboardUrl=dashboard_details["url"],
charts=get_chart_entities_from_id(
chart_ids=self.charts,
metadata=self.metadata,
service_name=self.config.serviceName,
charts=[
EntityReference(id=chart.id.__root__, type="chart")
for chart in self.context.charts
],
service=EntityReference(
id=self.context.dashboard_service.id.__root__, type="dashboardService"
),
service=EntityReference(id=self.service.id, type="dashboardService"),
)
def _get_charts_of_dashboard(self, dashboard_details: dict) -> List[str]:
@ -200,10 +202,14 @@ class SupersetSource(DashboardSourceService):
]
return []
def get_lineage(self, dashboard_details: dict) -> Optional[AddLineageRequest]:
def yield_dashboard_lineage(
self, dashboard_details: dict
) -> Optional[Iterable[AddLineageRequest]]:
"""
Get lineage between dashboard and data sources
"""
if not self.source_config.dbServiceName:
return
for chart_id in self._get_charts_of_dashboard(dashboard_details):
chart_json = self.all_charts.get(chart_id)
datasource_fqn = (
@ -245,14 +251,32 @@ class SupersetSource(DashboardSourceService):
logger.debug(traceback.format_exc())
logger.error(err)
def fetch_dashboard_charts(self, dashboard_details: dict) -> None:
def yield_dashboard_chart(
self, dashboard_details: dict
) -> Optional[Iterable[CreateChartRequest]]:
"""
Metod to fetch charts linked to dashboard
"""
self.charts = []
for chart_id in self._get_charts_of_dashboard(dashboard_details):
yield from self._build_chart(self.all_charts.get(chart_id))
self.charts.append(chart_id)
chart_json = self.all_charts.get(chart_id)
chart_id = chart_json["id"]
params = json.loads(chart_json["params"])
group_bys = params.get("groupby", []) or []
if isinstance(group_bys, str):
group_bys = [group_bys]
chart = CreateChartRequest(
name=chart_id,
displayName=chart_json["slice_name"],
description="",
chartType=get_standard_chart_type(chart_json["viz_type"]),
chartUrl=chart_json["url"],
service=EntityReference(
id=self.context.dashboard_service.id.__root__,
type="dashboardService",
),
)
yield chart
def _get_datasource_fqn(self, datasource_id: str) -> Optional[str]:
if not self.source_config.dbServiceName:
@ -274,21 +298,3 @@ class SupersetSource(DashboardSourceService):
except KeyError:
logger.warning(f"Failed to fetch Datasource with id: {datasource_id}")
return None
# pylint: disable=too-many-locals
def _build_chart(self, chart_json: dict) -> CreateChartRequest:
chart_id = chart_json["id"]
params = json.loads(chart_json["params"])
group_bys = params.get("groupby", []) or []
if isinstance(group_bys, str):
group_bys = [group_bys]
chart = CreateChartRequest(
name=chart_id,
displayName=chart_json["slice_name"],
description="",
chartType=get_standard_chart_type(chart_json["viz_type"]),
chartUrl=chart_json["url"],
service=EntityReference(id=self.service.id, type="dashboardService"),
)
yield chart

View File

@ -12,14 +12,14 @@
Tableau source module
"""
import traceback
from typing import Iterable, List, Optional, Union
from typing import Iterable, List, Optional
import dateutil.parser as dateparser
from tableau_api_lib.utils.querying import (
get_views_dataframe,
get_workbook_connections_dataframe,
get_workbooks_dataframe,
)
from tableau_api_lib.utils.querying.users import get_all_user_fields
from metadata.generated.schema.api.data.createChart import CreateChartRequest
from metadata.generated.schema.api.data.createDashboard import CreateDashboardRequest
@ -49,7 +49,7 @@ from metadata.generated.schema.type.entityReference import EntityReference
from metadata.generated.schema.type.tagLabel import TagLabel
from metadata.ingestion.api.source import InvalidSourceException, SourceStatus
from metadata.ingestion.models.ometa_tag_category import OMetaTagAndCategory
from metadata.ingestion.source.dashboard.dashboard_source import DashboardSourceService
from metadata.ingestion.source.dashboard.dashboard_service import DashboardServiceSource
from metadata.utils import fqn
from metadata.utils.filters import filter_by_chart
from metadata.utils.helpers import get_chart_entities_from_id, get_standard_chart_type
@ -59,7 +59,7 @@ logger = ingestion_logger()
TABLEAU_TAG_CATEGORY = "TableauTags"
class TableauSource(DashboardSourceService):
class TableauSource(DashboardServiceSource):
"""Tableau source entity class
Args:
@ -83,9 +83,44 @@ class TableauSource(DashboardSourceService):
):
super().__init__(config, metadata_config)
self.charts = []
self.dashboards = get_workbooks_dataframe(self.client).to_dict()
self.all_dashboard_details = get_views_dataframe(self.client).to_dict()
self.workbooks = {}
self.tags = []
self.owner = {}
def prepare(self):
# Restructuring the api response for workbooks
workbook_details = get_workbooks_dataframe(self.client).to_dict()
for i in range(len(workbook_details.get("id"))):
workbook = {
key: workbook_details[key][i] for key in workbook_details.keys()
}
workbook["charts"] = []
self.workbooks[workbook_details["id"][i]] = workbook
# Restructuring the api response for views and attaching views to their respective workbooks
all_views_details = get_views_dataframe(self.client).to_dict()
for i in range(len(all_views_details.get("id"))):
chart = {
key: all_views_details[key][i]
for key in all_views_details.keys()
if key != "workbook"
}
self.workbooks[all_views_details["workbook"][i]["id"]]["charts"].append(
chart
)
# Collecting all view & workbook tags
for _, tags in workbook_details.get("tags").items():
self.tags.extend([tag["label"] for tag in tags.get("tag", [])])
for _, tags in all_views_details.get("tags").items():
self.tags.extend([tag["label"] for tag in tags.get("tag", [])])
# Fetch User/Owner Details
owner = get_all_user_fields(self.client)
self.owner = {user["id"]: user for user in owner}
return super().prepare()
@classmethod
def create(cls, config_dict: dict, metadata_config: OpenMetadataConnection):
@ -101,11 +136,7 @@ class TableauSource(DashboardSourceService):
"""
Get List of all dashboards
"""
dashboards = [{} for _ in range(len(self.dashboards["id"]))]
for key, obj_dicts in self.dashboards.items():
for index, value in obj_dicts.items():
dashboards[int(index)][key] = value
return dashboards
return self.workbooks.values()
def get_dashboard_name(self, dashboard_details: dict) -> str:
"""
@ -119,7 +150,7 @@ class TableauSource(DashboardSourceService):
"""
return dashboard
def get_dashboard_owner(self, owner: dict) -> Optional[EntityReference]:
def yield_owner(self, dashboard_details: dict) -> Optional[EntityReference]:
"""Get dashboard owner
Args:
@ -127,36 +158,28 @@ class TableauSource(DashboardSourceService):
Returns:
Optional[EntityReference]
"""
try:
user_request = CreateUserRequest(
name=owner["name"], displayName=owner["fullName"], email=owner["email"]
)
created_user: User = self.metadata.create_or_update(user_request)
return EntityReference(
id=created_user.id.__root__,
type="user",
)
except Exception as err:
logger.error(err)
owner = self.owner[dashboard_details["owner"]["id"]]
yield CreateUserRequest(
name=owner["name"], displayName=owner["fullName"], email=owner["email"]
)
def create_tags(self, entity_tags: dict) -> OMetaTagAndCategory:
def yield_tag(self, _) -> OMetaTagAndCategory:
"""
Fetch Dashboard Tags
"""
if entity_tags.get("tag"):
for tag in entity_tags["tag"]:
tag_category = OMetaTagAndCategory(
category_name=CreateTagCategoryRequest(
name=TABLEAU_TAG_CATEGORY,
description="Tags associates with amundsen entities",
categoryType="Descriptive",
),
category_details=CreateTagRequest(
name=tag["label"], description="Amundsen Table Tag"
),
)
yield tag_category
logger.info(f"Tag Category {tag_category}, Primary Tag {tag} Ingested")
for tag in self.tags:
tag_category = OMetaTagAndCategory(
category_name=CreateTagCategoryRequest(
name=TABLEAU_TAG_CATEGORY,
description="Tags associates with tableau entities",
categoryType="Descriptive",
),
category_details=CreateTagRequest(name=tag, description="Tableau Tag"),
)
yield tag_category
logger.info(
f"Tag Category {TABLEAU_TAG_CATEGORY}, Primary Tag {tag} Ingested"
)
def get_tag_lables(self, tags: dict) -> Optional[List[TagLabel]]:
if tags.get("tag"):
@ -176,33 +199,37 @@ class TableauSource(DashboardSourceService):
]
return []
def get_dashboard_entity(
def yield_dashboard(
self, dashboard_details: dict
) -> Union[CreateDashboardRequest, Optional[OMetaTagAndCategory]]:
) -> Iterable[CreateDashboardRequest]:
"""
Method to Get Dashboard Entity
"""
dashboard_tag = dashboard_details.get("tags")
yield from self.create_tags(dashboard_tag)
yield CreateDashboardRequest(
name=dashboard_details.get("name"),
name=dashboard_details.get("id"),
displayName=dashboard_details.get("name"),
description="",
owner=self.get_dashboard_owner(self.owner),
charts=get_chart_entities_from_id(
chart_ids=self.charts,
metadata=self.metadata,
service_name=self.config.serviceName,
),
owner=self.context.owner,
charts=[
EntityReference(id=chart.id.__root__, type="chart")
for chart in self.context.charts
],
tags=self.get_tag_lables(dashboard_tag),
dashboardUrl=dashboard_details.get("webpageUrl"),
service=EntityReference(id=self.service.id, type="dashboardService"),
service=EntityReference(
id=self.context.dashboard_service.id.__root__, type="dashboardService"
),
)
def get_lineage(self, dashboard_details: dict) -> Optional[AddLineageRequest]:
def yield_dashboard_lineage(
self, dashboard_details: dict
) -> Optional[Iterable[AddLineageRequest]]:
"""
Get lineage between dashboard and data sources
"""
if not self.source_config.dbServiceName:
return
datasource_list = (
get_workbook_connections_dataframe(self.client, dashboard_details.get("id"))
.get("datasource_name")
@ -254,53 +281,37 @@ class TableauSource(DashboardSourceService):
logger.debug(traceback.format_exc())
logger.error(err)
def fetch_dashboard_charts(
def yield_dashboard_chart(
self, dashboard_details: dict
) -> Optional[Iterable[CreateChartRequest]]:
"""
Method to fetch charts linked to dashboard
"""
self.charts = []
self.chart = None
self.owner = None
for index in range(len(self.all_dashboard_details["id"])):
for chart in dashboard_details.get("charts"):
try:
self.owner = self.all_dashboard_details["owner"][index]
self.chart = self.all_dashboard_details["workbook"][index]
if self.chart["id"] == dashboard_details.get("id"):
chart_id = self.all_dashboard_details["id"][index]
chart_name = self.all_dashboard_details["name"][index]
if filter_by_chart(
self.source_config.chartFilterPattern, chart_name
):
self.status.failure(chart_name, "Chart Pattern not allowed")
continue
chart_tags = self.all_dashboard_details["tags"][index]
chart_url = (
f"{self.service_connection.hostPort}"
f"/#/site/{self.service_connection.siteName}/"
f"views/{self.all_dashboard_details['workbook'][index]['name']}/"
f"{self.all_dashboard_details['viewUrlName'][index]}"
)
yield from self.create_tags(chart_tags)
yield CreateChartRequest(
name=chart_id,
displayName=chart_name,
description="",
chartType=get_standard_chart_type(
self.all_dashboard_details["sheetType"][index]
),
chartUrl=chart_url,
owner=self.get_dashboard_owner(
self.all_dashboard_details["owner"][index]
),
tags=self.get_tag_lables(chart_tags),
service=EntityReference(
id=self.service.id, type="dashboardService"
),
)
self.charts.append(chart_id)
self.status.scanned(chart_id)
if filter_by_chart(
self.source_config.chartFilterPattern, chart["name"]
):
self.status.failure(chart["name"], "Chart Pattern not allowed")
continue
chart_url = (
f"/#/site/{self.service_connection.siteName}/"
f"views/{dashboard_details['name']}/"
f"{chart['viewUrlName']}"
)
yield CreateChartRequest(
name=chart["id"],
displayName=chart["name"],
description="",
chartType=get_standard_chart_type(chart["sheetType"]),
chartUrl=chart_url,
tags=self.get_tag_lables(chart["tags"]),
service=EntityReference(
id=self.context.dashboard_service.id.__root__,
type="dashboardService",
),
)
self.status.scanned(chart["id"])
except Exception as err:
logger.debug(traceback.format_exc())
logger.error(err)