diff --git a/ingestion/src/metadata/ingestion/source/dashboard/dashboard_service.py b/ingestion/src/metadata/ingestion/source/dashboard/dashboard_service.py new file mode 100644 index 00000000000..49f98f5d66c --- /dev/null +++ b/ingestion/src/metadata/ingestion/source/dashboard/dashboard_service.py @@ -0,0 +1,254 @@ +# Copyright 2021 Collate +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# http://www.apache.org/licenses/LICENSE-2.0 +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +""" +Base class for ingesting database services +""" +from abc import ABC, abstractmethod +from dataclasses import dataclass +from typing import Any, Iterable, List, Optional + +from metadata.generated.schema.api.data.createChart import CreateChartRequest +from metadata.generated.schema.api.data.createDashboard import CreateDashboardRequest +from metadata.generated.schema.api.lineage.addLineage import AddLineageRequest +from metadata.generated.schema.api.teams.createUser import CreateUserRequest +from metadata.generated.schema.entity.data.chart import Chart +from metadata.generated.schema.entity.data.dashboard import Dashboard +from metadata.generated.schema.entity.services.connections.metadata.openMetadataConnection import ( + OpenMetadataConnection, +) +from metadata.generated.schema.entity.services.dashboardService import ( + DashboardConnection, + DashboardService, +) +from metadata.generated.schema.metadataIngestion.dashboardServiceMetadataPipeline import ( + DashboardServiceMetadataPipeline, +) +from metadata.generated.schema.metadataIngestion.workflow import ( + Source as WorkflowSource, +) +from metadata.ingestion.api.source import Source, SourceStatus +from metadata.ingestion.api.topology_runner import TopologyRunnerMixin +from metadata.ingestion.models.ometa_tag_category import OMetaTagAndCategory +from metadata.ingestion.models.topology import ( + NodeStage, + ServiceTopology, + TopologyNode, + create_source_context, +) +from metadata.ingestion.ometa.ometa_api import OpenMetadata +from metadata.utils.connections import get_connection, test_connection +from metadata.utils.filters import filter_by_dashboard +from metadata.utils.logger import ingestion_logger + +logger = ingestion_logger() + + +class DashboardServiceTopology(ServiceTopology): + """ + Defines the hierarchy in Dashboard Services. + service -> dashboard -> charts. + + We could have a topology validator. We can only consume + data that has been produced by any parent node. + """ + + root = TopologyNode( + producer="get_services", + stages=[ + NodeStage( + type_=DashboardService, + context="dashboard_service", + processor="yield_dashboard_service", + ), + NodeStage( + type_=OMetaTagAndCategory, + context="tags", + processor="yield_tag", + ack_sink=False, + nullable=True, + ), + ], + children=["dashboard"], + ) + dashboard = TopologyNode( + producer="get_dashboard", + stages=[ + NodeStage( + type_=Chart, + context="charts", + processor="yield_dashboard_chart", + consumer=["dashboard_service"], + nullable=True, + cache_all=True, + ), + NodeStage( + type_=CreateUserRequest, + context="owner", + processor="yield_owner", + nullable=True, + ), + NodeStage( + type_=Dashboard, + context="dashboard", + processor="yield_dashboard", + consumer=["dashboard_service"], + ), + NodeStage( + type_=AddLineageRequest, + context="lineage", + processor="yield_dashboard_lineage", + consumer=["dashboard_service"], + ack_sink=False, + nullable=True, + ), + ], + ) + + +@dataclass +class DashboardSourceStatus(SourceStatus): + """ + Reports the source status after ingestion + """ + + def scanned(self, record: str) -> None: + self.success.append(record) + logger.info(f"Scanned: {record}") + + def filter(self, record: str, err: str) -> None: + self.filtered.append(record) + logger.warning(f"Filtered {record}: {err}") + + +class DashboardServiceSource(TopologyRunnerMixin, Source, ABC): + """ + Base class for Database Services. + It implements the topology and context. + """ + + @abstractmethod + def yield_dashboard( + self, dashboard_details: Any + ) -> Iterable[CreateDashboardRequest]: + """ + Method to Get Dashboard Entity + """ + + @abstractmethod + def yield_dashboard_lineage( + self, dashboard_details: Any + ) -> Optional[Iterable[AddLineageRequest]]: + """ + Get lineage between dashboard and data sources + """ + + @abstractmethod + def yield_dashboard_chart( + self, dashboard_details: Any + ) -> Optional[Iterable[CreateChartRequest]]: + """ + Method to fetch charts linked to dashboard + """ + + @abstractmethod + def get_dashboards_list(self) -> Optional[List[Any]]: + """ + Get List of all dashboards + """ + + @abstractmethod + def get_dashboard_name(self, dashboard_details: Any) -> str: + """ + Get Dashboard Name + """ + + @abstractmethod + def get_dashboard_details(self, dashboard: Any) -> Any: + """ + Get Dashboard Details + """ + + def yield_tag(self, *args, **kwargs) -> Optional[Iterable[OMetaTagAndCategory]]: + """ + Method to fetch dashboard tags + """ + return # Dashboard does not supports fetching tags except Tableau + + def yield_owner(self, *args, **kwargs) -> Optional[CreateUserRequest]: + """ + Method to fetch dashboard owner + """ + return # Dashboard does not supports fetching owner details except Tableau + + status: DashboardSourceStatus + source_config: DashboardServiceMetadataPipeline + config: WorkflowSource + metadata: OpenMetadata + # Big union of types we want to fetch dynamically + service_connection: DashboardConnection.__fields__["config"].type_ + + topology = DashboardServiceTopology() + context = create_source_context(topology) + + @abstractmethod + def __init__( + self, + config: WorkflowSource, + metadata_config: OpenMetadataConnection, + ): + super().__init__() + self.config = config + self.metadata_config = metadata_config + self.metadata = OpenMetadata(metadata_config) + self.service_connection = self.config.serviceConnection.__root__.config + self.source_config: DashboardServiceMetadataPipeline = ( + self.config.sourceConfig.config + ) + self.connection = get_connection(self.service_connection) + self.test_connection() + self.status = DashboardSourceStatus() + + self.client = self.connection.client + self.metadata_client = OpenMetadata(self.metadata_config) + + def get_status(self) -> SourceStatus: + return self.status + + def close(self): + pass + + def get_services(self) -> Iterable[WorkflowSource]: + yield self.config + + def yield_dashboard_service(self, config: WorkflowSource): + yield self.metadata.get_create_service_from_source( + entity=DashboardService, config=config + ) + + def get_dashboard(self) -> Any: + for dashboard in self.get_dashboards_list(): + dashboard_details = self.get_dashboard_details(dashboard) + if filter_by_dashboard( + self.source_config.dashboardFilterPattern, + self.get_dashboard_name(dashboard_details), + ): + self.status.filter( + self.get_dashboard_name(dashboard), + "Dashboard Pattern not Allowed", + ) + continue + yield dashboard_details + + def test_connection(self) -> None: + test_connection(self.connection) + + def prepare(self): + pass diff --git a/ingestion/src/metadata/ingestion/source/dashboard/dashboard_source.py b/ingestion/src/metadata/ingestion/source/dashboard/dashboard_source.py deleted file mode 100644 index 1674170d6cf..00000000000 --- a/ingestion/src/metadata/ingestion/source/dashboard/dashboard_source.py +++ /dev/null @@ -1,143 +0,0 @@ -from abc import ABC, abstractmethod -from dataclasses import dataclass -from typing import Any, Iterable, List, Optional, Union - -from metadata.generated.schema.api.data.createChart import CreateChartRequest -from metadata.generated.schema.api.data.createDashboard import CreateDashboardRequest -from metadata.generated.schema.api.lineage.addLineage import AddLineageRequest -from metadata.generated.schema.entity.services.connections.metadata.openMetadataConnection import ( - OpenMetadataConnection, -) -from metadata.generated.schema.entity.services.dashboardService import DashboardService -from metadata.generated.schema.metadataIngestion.dashboardServiceMetadataPipeline import ( - DashboardServiceMetadataPipeline, -) -from metadata.generated.schema.metadataIngestion.workflow import ( - Source as WorkflowSource, -) -from metadata.ingestion.api.common import Entity -from metadata.ingestion.api.source import Source, SourceStatus -from metadata.ingestion.ometa.ometa_api import OpenMetadata -from metadata.utils.connections import get_connection, test_connection -from metadata.utils.filters import filter_by_dashboard -from metadata.utils.logger import ingestion_logger - -logger = ingestion_logger() - - -@dataclass -class DashboardSourceStatus(SourceStatus): - """ - Reports the source status after ingestion - """ - - def scanned(self, record: str) -> None: - self.success.append(record) - logger.info(f"Scanned: {record}") - - def filter(self, record: str, err: str) -> None: - self.filtered.append(record) - logger.warning(f"Filtered {record}: {err}") - - -class DashboardSourceService(Source, ABC): - @abstractmethod - def get_dashboards_list(self) -> Optional[List[Any]]: - """ - Get List of all dashboards - """ - - @abstractmethod - def get_dashboard_name(self, dashboard_details: Any) -> str: - """ - Get Dashboard Name - """ - - @abstractmethod - def get_dashboard_details(self, dashboard: Any) -> Any: - """ - Get Dashboard Details - """ - - @abstractmethod - def get_dashboard_entity(self, dashboard_details: Any) -> CreateDashboardRequest: - """ - Method to Get Dashboard Entity - """ - - @abstractmethod - def get_lineage(self, dashboard_details: Any) -> Optional[AddLineageRequest]: - """ - Get lineage between dashboard and data sources - """ - - @abstractmethod - def fetch_dashboard_charts( - self, dashboard: Any - ) -> Optional[Iterable[CreateChartRequest]]: - """ - Method to fetch charts linked to dashboard - """ - - @abstractmethod - def __init__( - self, - config: WorkflowSource, - metadata_config: OpenMetadataConnection, - ): - super().__init__() - self.config = config - self.metadata_config = metadata_config - self.metadata = OpenMetadata(metadata_config) - self.service_connection = self.config.serviceConnection.__root__.config - self.source_config: DashboardServiceMetadataPipeline = ( - self.config.sourceConfig.config - ) - self.connection = get_connection(self.service_connection) - self.test_connection() - - self.client = self.connection.client - self.service = self.metadata.get_service_or_create( - entity=DashboardService, config=config - ) - self.status = DashboardSourceStatus() - self.metadata_client = OpenMetadata(self.metadata_config) - - def next_record(self) -> Iterable[Entity]: - yield from self.process_dashboards() - - def process_dashboards( - self, - ) -> Iterable[Union[CreateDashboardRequest, CreateChartRequest, AddLineageRequest]]: - """Get dashboard method""" - for dashboard in self.get_dashboards_list(): - try: - dashboard_details = self.get_dashboard_details(dashboard) - if filter_by_dashboard( - self.source_config.dashboardFilterPattern, - self.get_dashboard_name(dashboard_details), - ): - self.status.filter( - self.get_dashboard_name(dashboard), - "Dashboard Pattern not Allowed", - ) - continue - yield from self.fetch_dashboard_charts(dashboard_details) or [] - yield from self.get_dashboard_entity(dashboard_details) - if self.source_config.dbServiceName: - yield from self.get_lineage(dashboard_details) - except Exception as err: - logger.error(repr(err)) - self.status.failure(self.get_dashboard_name(dashboard), repr(err)) - - def get_status(self) -> SourceStatus: - return self.status - - def close(self): - pass - - def test_connection(self) -> None: - test_connection(self.connection) - - def prepare(self): - pass diff --git a/ingestion/src/metadata/ingestion/source/dashboard/metabase.py b/ingestion/src/metadata/ingestion/source/dashboard/metabase.py index dbf5273237c..adee1318b6d 100644 --- a/ingestion/src/metadata/ingestion/source/dashboard/metabase.py +++ b/ingestion/src/metadata/ingestion/source/dashboard/metabase.py @@ -35,7 +35,7 @@ from metadata.generated.schema.type.entityLineage import EntitiesEdge from metadata.generated.schema.type.entityReference import EntityReference from metadata.ingestion.api.source import InvalidSourceException from metadata.ingestion.ometa.ometa_api import OpenMetadata -from metadata.ingestion.source.dashboard.dashboard_source import DashboardSourceService +from metadata.ingestion.source.dashboard.dashboard_service import DashboardServiceSource from metadata.ingestion.source.database.common_db_source import SQLSourceStatus from metadata.utils import fqn from metadata.utils.connections import get_connection @@ -52,7 +52,7 @@ HEADERS = {"Content-Type": "application/json", "Accept": "*/*"} logger = ingestion_logger() -class MetabaseSource(DashboardSourceService): +class MetabaseSource(DashboardServiceSource): """Metabase entity class Args: @@ -78,13 +78,8 @@ class MetabaseSource(DashboardSourceService): metadata_config: OpenMetadataConnection, ): super().__init__(config, metadata_config) - params = dict() - params["username"] = self.service_connection.username - params["password"] = self.service_connection.password.get_secret_value() self.connection = get_connection(self.service_connection) self.metabase_session = self.connection.client["metabase_session"] - self.charts = [] - self.metric_charts = [] @classmethod def create(cls, config_dict, metadata_config: OpenMetadataConnection): @@ -126,7 +121,9 @@ class MetabaseSource(DashboardSourceService): resp_dashboard = self.req_get(f"/api/dashboard/{dashboard['id']}") return resp_dashboard.json() - def get_dashboard_entity(self, dashboard_details: dict) -> CreateDashboardRequest: + def yield_dashboard( + self, dashboard_details: dict + ) -> Iterable[CreateDashboardRequest]: """ Method to Get Dashboard Entity """ @@ -139,17 +136,18 @@ class MetabaseSource(DashboardSourceService): dashboardUrl=dashboard_url, displayName=dashboard_details["name"], description=dashboard_details.get("description", ""), - charts=get_chart_entities_from_id( - chart_ids=self.charts, - metadata=self.metadata, - service_name=self.config.serviceName, + charts=[ + EntityReference(id=chart.id.__root__, type="chart") + for chart in self.context.charts + ], + service=EntityReference( + id=self.context.dashboard_service.id.__root__, type="dashboardService" ), - service=EntityReference(id=self.service.id, type="dashboardService"), ) - def fetch_dashboard_charts( + def yield_dashboard_chart( self, dashboard_details: dict - ) -> Iterable[CreateChartRequest]: + ) -> Optional[Iterable[CreateChartRequest]]: """Get chart method Args: @@ -157,7 +155,6 @@ class MetabaseSource(DashboardSourceService): Returns: Iterable[CreateChartRequest] """ - self.charts = [] charts = dashboard_details["ordered_cards"] for chart in charts: try: @@ -186,22 +183,26 @@ class MetabaseSource(DashboardSourceService): ).value, chartUrl=chart_url, service=EntityReference( - id=self.service.id, type="dashboardService" + id=self.context.dashboard_service.id.__root__, + type="dashboardService", ), ) - self.charts.append(chart_details["name"]) self.status.scanned(chart_details["name"]) except Exception as err: # pylint: disable=broad-except logger.error(repr(err)) logger.debug(traceback.format_exc()) continue - def get_lineage(self, dashboard_details: dict) -> AddLineageRequest: + def yield_dashboard_lineage( + self, dashboard_details: dict + ) -> Optional[Iterable[AddLineageRequest]]: """Get lineage method Args: dashboard_details """ + if not self.source_config.dbServiceName: + return chart_list, dashboard_name = ( dashboard_details["ordered_cards"], dashboard_details["name"], diff --git a/ingestion/src/metadata/ingestion/source/dashboard/powerbi.py b/ingestion/src/metadata/ingestion/source/dashboard/powerbi.py index 332ffb2ef47..5a260840547 100644 --- a/ingestion/src/metadata/ingestion/source/dashboard/powerbi.py +++ b/ingestion/src/metadata/ingestion/source/dashboard/powerbi.py @@ -11,7 +11,7 @@ """PowerBI source module""" import traceback -from typing import Any, Iterable, List, Optional +from typing import Iterable, List, Optional from metadata.generated.schema.api.data.createChart import CreateChartRequest from metadata.generated.schema.api.data.createDashboard import CreateDashboardRequest @@ -31,16 +31,15 @@ from metadata.generated.schema.metadataIngestion.workflow import ( from metadata.generated.schema.type.entityLineage import EntitiesEdge from metadata.generated.schema.type.entityReference import EntityReference from metadata.ingestion.api.source import InvalidSourceException -from metadata.ingestion.source.dashboard.dashboard_source import DashboardSourceService +from metadata.ingestion.source.dashboard.dashboard_service import DashboardServiceSource from metadata.utils import fqn from metadata.utils.filters import filter_by_chart -from metadata.utils.helpers import get_chart_entities_from_id from metadata.utils.logger import ingestion_logger logger = ingestion_logger() -class PowerbiSource(DashboardSourceService): +class PowerbiSource(DashboardServiceSource): """PowerBi entity class Args: config: @@ -57,7 +56,6 @@ class PowerbiSource(DashboardSourceService): metadata_config: OpenMetadataConnection, ): super().__init__(config, metadata_config) - self.charts = [] @classmethod def create(cls, config_dict, metadata_config: OpenMetadataConnection): @@ -95,7 +93,9 @@ class PowerbiSource(DashboardSourceService): """ return dashboard - def get_dashboard_entity(self, dashboard_details: dict) -> CreateDashboardRequest: + def yield_dashboard( + self, dashboard_details: dict + ) -> Iterable[CreateDashboardRequest]: """ Method to Get Dashboard Entity, Dashboard Charts & Lineage """ @@ -105,18 +105,23 @@ class PowerbiSource(DashboardSourceService): dashboardUrl=dashboard_details["webUrl"], displayName=dashboard_details["displayName"], description="", - charts=get_chart_entities_from_id( - chart_ids=self.charts, - metadata=self.metadata, - service_name=self.config.serviceName, + charts=[ + EntityReference(id=chart.id.__root__, type="chart") + for chart in self.context.charts + ], + service=EntityReference( + id=self.context.dashboard_service.id.__root__, type="dashboardService" ), - service=EntityReference(id=self.service.id, type="dashboardService"), ) - def get_lineage(self, dashboard_details: Any) -> Optional[AddLineageRequest]: + def yield_dashboard_lineage( + self, dashboard_details: dict + ) -> Optional[Iterable[AddLineageRequest]]: """ Get lineage between dashboard and data sources """ + if not self.source_config.dbServiceName: + return try: charts = self.client.fetch_charts(dashboard_id=dashboard_details["id"]).get( "value" @@ -166,16 +171,15 @@ class PowerbiSource(DashboardSourceService): logger.debug(traceback.format_exc()) logger.error(err) - def fetch_dashboard_charts( + def yield_dashboard_chart( self, dashboard_details: dict - ) -> Iterable[CreateChartRequest]: + ) -> Optional[Iterable[CreateChartRequest]]: """Get chart method Args: dashboard_details: Returns: Iterable[Chart] """ - self.charts = [] charts = self.client.fetch_charts(dashboard_id=dashboard_details["id"]).get( "value" ) @@ -195,10 +199,10 @@ class PowerbiSource(DashboardSourceService): # PBI has no hostPort property. All URL details are present in the webUrl property. chartUrl=chart["embedUrl"], service=EntityReference( - id=self.service.id, type="dashboardService" + id=self.context.dashboard_service.id.__root__, + type="dashboardService", ), ) - self.charts.append(chart["id"]) self.status.scanned(chart["title"]) except Exception as err: # pylint: disable=broad-except logger.debug(traceback.format_exc()) diff --git a/ingestion/src/metadata/ingestion/source/dashboard/redash.py b/ingestion/src/metadata/ingestion/source/dashboard/redash.py index 96b2886ebec..e2986fe2f54 100644 --- a/ingestion/src/metadata/ingestion/source/dashboard/redash.py +++ b/ingestion/src/metadata/ingestion/source/dashboard/redash.py @@ -8,9 +8,7 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. - -from dataclasses import dataclass, field -from typing import Dict, Iterable, List, Optional +from typing import Iterable, List, Optional from sql_metadata import Parser @@ -31,44 +29,23 @@ from metadata.generated.schema.metadataIngestion.workflow import ( ) from metadata.generated.schema.type.entityLineage import EntitiesEdge from metadata.generated.schema.type.entityReference import EntityReference -from metadata.ingestion.api.common import Entity -from metadata.ingestion.api.source import InvalidSourceException, SourceStatus -from metadata.ingestion.source.dashboard.dashboard_source import DashboardSourceService +from metadata.ingestion.api.source import InvalidSourceException +from metadata.ingestion.source.dashboard.dashboard_service import DashboardServiceSource from metadata.utils import fqn -from metadata.utils.helpers import get_chart_entities_from_id, get_standard_chart_type +from metadata.utils.helpers import get_standard_chart_type from metadata.utils.logger import ingestion_logger from metadata.utils.sql_lineage import search_table_entities logger = ingestion_logger() -@dataclass -class RedashSourceStatus(SourceStatus): - items_scanned: int = 0 - filtered: List[str] = field(default_factory=list) - - def item_scanned_status(self) -> None: - self.items_scanned += 1 - - def item_dropped_status(self, item: str) -> None: - self.filtered.append(item) - - -class RedashSource(DashboardSourceService): - config: WorkflowSource - metadata_config: OpenMetadataConnection - status: RedashSourceStatus - platform = "redash" - dashboards_to_charts: Dict[str, List[str]] - +class RedashSource(DashboardServiceSource): def __init__( self, config: WorkflowSource, metadata_config: OpenMetadataConnection, ): super().__init__(config, metadata_config) - self.status = RedashSourceStatus() - self.dashboards_to_charts = {} @classmethod def create(cls, config_dict: dict, metadata_config: OpenMetadataConnection): @@ -99,28 +76,32 @@ class RedashSource(DashboardSourceService): """ return self.client.get_dashboard(dashboard["slug"]) - def get_dashboard_entity(self, dashboard_details: dict) -> CreateDashboardRequest: + def yield_dashboard( + self, dashboard_details: dict + ) -> Iterable[CreateDashboardRequest]: """ Method to Get Dashboard Entity """ - self.status.item_scanned_status() + dashboard_description = "" for widgets in dashboard_details.get("widgets", []): dashboard_description = widgets.get("text") yield CreateDashboardRequest( name=dashboard_details.get("id"), displayName=dashboard_details["name"], - description=dashboard_description if dashboard_details else "", - charts=get_chart_entities_from_id( - chart_ids=self.dashboards_to_charts[dashboard_details.get("id")], - metadata=self.metadata, - service_name=self.config.serviceName, + description=dashboard_description, + charts=[ + EntityReference(id=chart.id.__root__, type="chart") + for chart in self.context.charts + ], + service=EntityReference( + id=self.context.dashboard_service.id.__root__, type="dashboardService" ), - service=EntityReference(id=self.service.id, type="dashboardService"), dashboardUrl=f"/dashboard/{dashboard_details.get('slug', '')}", ) + self.status.scanned(dashboard_details["name"]) - def get_lineage( + def yield_dashboard_lineage( self, dashboard_details: dict ) -> Optional[Iterable[AddLineageRequest]]: """ @@ -128,6 +109,8 @@ class RedashSource(DashboardSourceService): In redash we do not get table, database_schema or database name but we do get query the lineage is being generated based on the query """ + if not self.source_config.dbServiceName: + return for widgets in dashboard_details.get("widgets", []): visualization = widgets.get("visualization") if not visualization.get("query"): @@ -169,16 +152,14 @@ class RedashSource(DashboardSourceService): ) yield lineage - def fetch_dashboard_charts( + def yield_dashboard_chart( self, dashboard_details: dict ) -> Optional[Iterable[CreateChartRequest]]: """ Metod to fetch charts linked to dashboard """ - self.dashboards_to_charts[dashboard_details.get("id")] = [] for widgets in dashboard_details.get("widgets", []): visualization = widgets.get("visualization") - self.dashboards_to_charts[dashboard_details.get("id")].append(widgets["id"]) yield CreateChartRequest( name=widgets["id"], displayName=visualization["query"]["name"] @@ -187,7 +168,10 @@ class RedashSource(DashboardSourceService): chartType=get_standard_chart_type( visualization["type"] if visualization else "" ), - service=EntityReference(id=self.service.id, type="dashboardService"), + service=EntityReference( + id=self.context.dashboard_service.id.__root__, + type="dashboardService", + ), chartUrl=f"/dashboard/{dashboard_details.get('slug', '')}", description=visualization["description"] if visualization else "", ) diff --git a/ingestion/src/metadata/ingestion/source/dashboard/superset.py b/ingestion/src/metadata/ingestion/source/dashboard/superset.py index a4953cebebc..ad6fd94e71c 100644 --- a/ingestion/src/metadata/ingestion/source/dashboard/superset.py +++ b/ingestion/src/metadata/ingestion/source/dashboard/superset.py @@ -14,7 +14,7 @@ Superset source module import json import traceback -from typing import List, Optional +from typing import Iterable, List, Optional import dateutil.parser as dateparser @@ -40,7 +40,7 @@ from metadata.generated.schema.metadataIngestion.workflow import ( from metadata.generated.schema.type.entityLineage import EntitiesEdge from metadata.generated.schema.type.entityReference import EntityReference from metadata.ingestion.api.source import InvalidSourceException, SourceStatus -from metadata.ingestion.source.dashboard.dashboard_source import DashboardSourceService +from metadata.ingestion.source.dashboard.dashboard_service import DashboardServiceSource from metadata.utils import fqn from metadata.utils.helpers import get_chart_entities_from_id, get_standard_chart_type from metadata.utils.logger import ingestion_logger @@ -86,7 +86,7 @@ def get_filter_name(filter_obj): return f"{clause} {column} {operator} {comparator}" -class SupersetSource(DashboardSourceService): +class SupersetSource(DashboardServiceSource): """ Superset source class @@ -116,7 +116,6 @@ class SupersetSource(DashboardSourceService): metadata_config: OpenMetadataConnection, ): super().__init__(config, metadata_config) - self.charts = [] @classmethod def create(cls, config_dict: dict, metadata_config: OpenMetadataConnection): @@ -169,7 +168,9 @@ class SupersetSource(DashboardSourceService): """ return dashboard - def get_dashboard_entity(self, dashboard_details: dict) -> CreateDashboardRequest: + def yield_dashboard( + self, dashboard_details: dict + ) -> Iterable[CreateDashboardRequest]: """ Method to Get Dashboard Entity """ @@ -178,12 +179,13 @@ class SupersetSource(DashboardSourceService): displayName=dashboard_details["dashboard_title"], description="", dashboardUrl=dashboard_details["url"], - charts=get_chart_entities_from_id( - chart_ids=self.charts, - metadata=self.metadata, - service_name=self.config.serviceName, + charts=[ + EntityReference(id=chart.id.__root__, type="chart") + for chart in self.context.charts + ], + service=EntityReference( + id=self.context.dashboard_service.id.__root__, type="dashboardService" ), - service=EntityReference(id=self.service.id, type="dashboardService"), ) def _get_charts_of_dashboard(self, dashboard_details: dict) -> List[str]: @@ -200,10 +202,14 @@ class SupersetSource(DashboardSourceService): ] return [] - def get_lineage(self, dashboard_details: dict) -> Optional[AddLineageRequest]: + def yield_dashboard_lineage( + self, dashboard_details: dict + ) -> Optional[Iterable[AddLineageRequest]]: """ Get lineage between dashboard and data sources """ + if not self.source_config.dbServiceName: + return for chart_id in self._get_charts_of_dashboard(dashboard_details): chart_json = self.all_charts.get(chart_id) datasource_fqn = ( @@ -245,14 +251,32 @@ class SupersetSource(DashboardSourceService): logger.debug(traceback.format_exc()) logger.error(err) - def fetch_dashboard_charts(self, dashboard_details: dict) -> None: + def yield_dashboard_chart( + self, dashboard_details: dict + ) -> Optional[Iterable[CreateChartRequest]]: """ Metod to fetch charts linked to dashboard """ - self.charts = [] for chart_id in self._get_charts_of_dashboard(dashboard_details): - yield from self._build_chart(self.all_charts.get(chart_id)) - self.charts.append(chart_id) + chart_json = self.all_charts.get(chart_id) + chart_id = chart_json["id"] + params = json.loads(chart_json["params"]) + group_bys = params.get("groupby", []) or [] + if isinstance(group_bys, str): + group_bys = [group_bys] + + chart = CreateChartRequest( + name=chart_id, + displayName=chart_json["slice_name"], + description="", + chartType=get_standard_chart_type(chart_json["viz_type"]), + chartUrl=chart_json["url"], + service=EntityReference( + id=self.context.dashboard_service.id.__root__, + type="dashboardService", + ), + ) + yield chart def _get_datasource_fqn(self, datasource_id: str) -> Optional[str]: if not self.source_config.dbServiceName: @@ -274,21 +298,3 @@ class SupersetSource(DashboardSourceService): except KeyError: logger.warning(f"Failed to fetch Datasource with id: {datasource_id}") return None - - # pylint: disable=too-many-locals - def _build_chart(self, chart_json: dict) -> CreateChartRequest: - chart_id = chart_json["id"] - params = json.loads(chart_json["params"]) - group_bys = params.get("groupby", []) or [] - if isinstance(group_bys, str): - group_bys = [group_bys] - - chart = CreateChartRequest( - name=chart_id, - displayName=chart_json["slice_name"], - description="", - chartType=get_standard_chart_type(chart_json["viz_type"]), - chartUrl=chart_json["url"], - service=EntityReference(id=self.service.id, type="dashboardService"), - ) - yield chart diff --git a/ingestion/src/metadata/ingestion/source/dashboard/tableau.py b/ingestion/src/metadata/ingestion/source/dashboard/tableau.py index 228e754da86..1ea2dda551b 100644 --- a/ingestion/src/metadata/ingestion/source/dashboard/tableau.py +++ b/ingestion/src/metadata/ingestion/source/dashboard/tableau.py @@ -12,14 +12,14 @@ Tableau source module """ import traceback -from typing import Iterable, List, Optional, Union +from typing import Iterable, List, Optional -import dateutil.parser as dateparser from tableau_api_lib.utils.querying import ( get_views_dataframe, get_workbook_connections_dataframe, get_workbooks_dataframe, ) +from tableau_api_lib.utils.querying.users import get_all_user_fields from metadata.generated.schema.api.data.createChart import CreateChartRequest from metadata.generated.schema.api.data.createDashboard import CreateDashboardRequest @@ -49,7 +49,7 @@ from metadata.generated.schema.type.entityReference import EntityReference from metadata.generated.schema.type.tagLabel import TagLabel from metadata.ingestion.api.source import InvalidSourceException, SourceStatus from metadata.ingestion.models.ometa_tag_category import OMetaTagAndCategory -from metadata.ingestion.source.dashboard.dashboard_source import DashboardSourceService +from metadata.ingestion.source.dashboard.dashboard_service import DashboardServiceSource from metadata.utils import fqn from metadata.utils.filters import filter_by_chart from metadata.utils.helpers import get_chart_entities_from_id, get_standard_chart_type @@ -59,7 +59,7 @@ logger = ingestion_logger() TABLEAU_TAG_CATEGORY = "TableauTags" -class TableauSource(DashboardSourceService): +class TableauSource(DashboardServiceSource): """Tableau source entity class Args: @@ -83,9 +83,44 @@ class TableauSource(DashboardSourceService): ): super().__init__(config, metadata_config) - self.charts = [] - self.dashboards = get_workbooks_dataframe(self.client).to_dict() - self.all_dashboard_details = get_views_dataframe(self.client).to_dict() + self.workbooks = {} + self.tags = [] + self.owner = {} + + def prepare(self): + # Restructuring the api response for workbooks + workbook_details = get_workbooks_dataframe(self.client).to_dict() + for i in range(len(workbook_details.get("id"))): + workbook = { + key: workbook_details[key][i] for key in workbook_details.keys() + } + workbook["charts"] = [] + self.workbooks[workbook_details["id"][i]] = workbook + + # Restructuring the api response for views and attaching views to their respective workbooks + all_views_details = get_views_dataframe(self.client).to_dict() + for i in range(len(all_views_details.get("id"))): + chart = { + key: all_views_details[key][i] + for key in all_views_details.keys() + if key != "workbook" + } + self.workbooks[all_views_details["workbook"][i]["id"]]["charts"].append( + chart + ) + + # Collecting all view & workbook tags + for _, tags in workbook_details.get("tags").items(): + self.tags.extend([tag["label"] for tag in tags.get("tag", [])]) + + for _, tags in all_views_details.get("tags").items(): + self.tags.extend([tag["label"] for tag in tags.get("tag", [])]) + + # Fetch User/Owner Details + owner = get_all_user_fields(self.client) + self.owner = {user["id"]: user for user in owner} + + return super().prepare() @classmethod def create(cls, config_dict: dict, metadata_config: OpenMetadataConnection): @@ -101,11 +136,7 @@ class TableauSource(DashboardSourceService): """ Get List of all dashboards """ - dashboards = [{} for _ in range(len(self.dashboards["id"]))] - for key, obj_dicts in self.dashboards.items(): - for index, value in obj_dicts.items(): - dashboards[int(index)][key] = value - return dashboards + return self.workbooks.values() def get_dashboard_name(self, dashboard_details: dict) -> str: """ @@ -119,7 +150,7 @@ class TableauSource(DashboardSourceService): """ return dashboard - def get_dashboard_owner(self, owner: dict) -> Optional[EntityReference]: + def yield_owner(self, dashboard_details: dict) -> Optional[EntityReference]: """Get dashboard owner Args: @@ -127,36 +158,28 @@ class TableauSource(DashboardSourceService): Returns: Optional[EntityReference] """ - try: - user_request = CreateUserRequest( - name=owner["name"], displayName=owner["fullName"], email=owner["email"] - ) - created_user: User = self.metadata.create_or_update(user_request) - return EntityReference( - id=created_user.id.__root__, - type="user", - ) - except Exception as err: - logger.error(err) + owner = self.owner[dashboard_details["owner"]["id"]] + yield CreateUserRequest( + name=owner["name"], displayName=owner["fullName"], email=owner["email"] + ) - def create_tags(self, entity_tags: dict) -> OMetaTagAndCategory: + def yield_tag(self, _) -> OMetaTagAndCategory: """ Fetch Dashboard Tags """ - if entity_tags.get("tag"): - for tag in entity_tags["tag"]: - tag_category = OMetaTagAndCategory( - category_name=CreateTagCategoryRequest( - name=TABLEAU_TAG_CATEGORY, - description="Tags associates with amundsen entities", - categoryType="Descriptive", - ), - category_details=CreateTagRequest( - name=tag["label"], description="Amundsen Table Tag" - ), - ) - yield tag_category - logger.info(f"Tag Category {tag_category}, Primary Tag {tag} Ingested") + for tag in self.tags: + tag_category = OMetaTagAndCategory( + category_name=CreateTagCategoryRequest( + name=TABLEAU_TAG_CATEGORY, + description="Tags associates with tableau entities", + categoryType="Descriptive", + ), + category_details=CreateTagRequest(name=tag, description="Tableau Tag"), + ) + yield tag_category + logger.info( + f"Tag Category {TABLEAU_TAG_CATEGORY}, Primary Tag {tag} Ingested" + ) def get_tag_lables(self, tags: dict) -> Optional[List[TagLabel]]: if tags.get("tag"): @@ -176,33 +199,37 @@ class TableauSource(DashboardSourceService): ] return [] - def get_dashboard_entity( + def yield_dashboard( self, dashboard_details: dict - ) -> Union[CreateDashboardRequest, Optional[OMetaTagAndCategory]]: + ) -> Iterable[CreateDashboardRequest]: """ Method to Get Dashboard Entity """ dashboard_tag = dashboard_details.get("tags") - yield from self.create_tags(dashboard_tag) yield CreateDashboardRequest( - name=dashboard_details.get("name"), + name=dashboard_details.get("id"), displayName=dashboard_details.get("name"), description="", - owner=self.get_dashboard_owner(self.owner), - charts=get_chart_entities_from_id( - chart_ids=self.charts, - metadata=self.metadata, - service_name=self.config.serviceName, - ), + owner=self.context.owner, + charts=[ + EntityReference(id=chart.id.__root__, type="chart") + for chart in self.context.charts + ], tags=self.get_tag_lables(dashboard_tag), dashboardUrl=dashboard_details.get("webpageUrl"), - service=EntityReference(id=self.service.id, type="dashboardService"), + service=EntityReference( + id=self.context.dashboard_service.id.__root__, type="dashboardService" + ), ) - def get_lineage(self, dashboard_details: dict) -> Optional[AddLineageRequest]: + def yield_dashboard_lineage( + self, dashboard_details: dict + ) -> Optional[Iterable[AddLineageRequest]]: """ Get lineage between dashboard and data sources """ + if not self.source_config.dbServiceName: + return datasource_list = ( get_workbook_connections_dataframe(self.client, dashboard_details.get("id")) .get("datasource_name") @@ -254,53 +281,37 @@ class TableauSource(DashboardSourceService): logger.debug(traceback.format_exc()) logger.error(err) - def fetch_dashboard_charts( + def yield_dashboard_chart( self, dashboard_details: dict ) -> Optional[Iterable[CreateChartRequest]]: """ Method to fetch charts linked to dashboard """ - self.charts = [] - self.chart = None - self.owner = None - for index in range(len(self.all_dashboard_details["id"])): + for chart in dashboard_details.get("charts"): try: - self.owner = self.all_dashboard_details["owner"][index] - self.chart = self.all_dashboard_details["workbook"][index] - if self.chart["id"] == dashboard_details.get("id"): - chart_id = self.all_dashboard_details["id"][index] - chart_name = self.all_dashboard_details["name"][index] - if filter_by_chart( - self.source_config.chartFilterPattern, chart_name - ): - self.status.failure(chart_name, "Chart Pattern not allowed") - continue - chart_tags = self.all_dashboard_details["tags"][index] - chart_url = ( - f"{self.service_connection.hostPort}" - f"/#/site/{self.service_connection.siteName}/" - f"views/{self.all_dashboard_details['workbook'][index]['name']}/" - f"{self.all_dashboard_details['viewUrlName'][index]}" - ) - yield from self.create_tags(chart_tags) - yield CreateChartRequest( - name=chart_id, - displayName=chart_name, - description="", - chartType=get_standard_chart_type( - self.all_dashboard_details["sheetType"][index] - ), - chartUrl=chart_url, - owner=self.get_dashboard_owner( - self.all_dashboard_details["owner"][index] - ), - tags=self.get_tag_lables(chart_tags), - service=EntityReference( - id=self.service.id, type="dashboardService" - ), - ) - self.charts.append(chart_id) - self.status.scanned(chart_id) + if filter_by_chart( + self.source_config.chartFilterPattern, chart["name"] + ): + self.status.failure(chart["name"], "Chart Pattern not allowed") + continue + chart_url = ( + f"/#/site/{self.service_connection.siteName}/" + f"views/{dashboard_details['name']}/" + f"{chart['viewUrlName']}" + ) + yield CreateChartRequest( + name=chart["id"], + displayName=chart["name"], + description="", + chartType=get_standard_chart_type(chart["sheetType"]), + chartUrl=chart_url, + tags=self.get_tag_lables(chart["tags"]), + service=EntityReference( + id=self.context.dashboard_service.id.__root__, + type="dashboardService", + ), + ) + self.status.scanned(chart["id"]) except Exception as err: logger.debug(traceback.format_exc()) logger.error(err)