From 18d963c3d88a8a82c902ed30e3b6e523c6c46cae Mon Sep 17 00:00:00 2001 From: Ayush Shah Date: Tue, 8 Feb 2022 05:46:38 +0530 Subject: [PATCH] Issue-969: Superset Lineage Implemented (#2659) --- ingestion/examples/workflows/superset.json | 5 +- .../metadata/ingestion/ometa/superset_rest.py | 7 ++- .../src/metadata/ingestion/source/superset.py | 48 +++++++++++++++++-- 3 files changed, 54 insertions(+), 6 deletions(-) diff --git a/ingestion/examples/workflows/superset.json b/ingestion/examples/workflows/superset.json index 8e7ed34449f..77ec96773d5 100644 --- a/ingestion/examples/workflows/superset.json +++ b/ingestion/examples/workflows/superset.json @@ -2,10 +2,11 @@ "source": { "type": "superset", "config": { - "url": "http://localhost:8088", + "url": "http://localhost:8080", "username": "admin", "password": "admin", - "service_name": "local_superset" + "service_name": "local_superset", + "db_service_name": "aws_redshift" } }, "sink": { diff --git a/ingestion/src/metadata/ingestion/ometa/superset_rest.py b/ingestion/src/metadata/ingestion/ometa/superset_rest.py index c164f0a19a3..d24dee0c3e3 100644 --- a/ingestion/src/metadata/ingestion/ometa/superset_rest.py +++ b/ingestion/src/metadata/ingestion/ometa/superset_rest.py @@ -45,6 +45,7 @@ class SupersetConfig(ConfigModel): service_type: str = "Superset" provider: str = "db" options: dict = {} + db_service_name: Optional[str] = None class SupersetAuthenticationProvider(AuthenticationProvider): @@ -91,7 +92,7 @@ class SupersetAPIClient: client_config = ClientConfig( base_url=config.url, api_version="api/v1", - auth_token=f"Bearer {self._auth_provider.auth_token()}", + auth_token=f"{self._auth_provider.auth_token()}", auth_header="Authorization", allow_redirects=True, ) @@ -149,6 +150,10 @@ class SupersetAPIClient: ) return response + def fetch_charts_with_id(self, chart_id): + response = self.client.get(f"/chart/{chart_id}") + return response + def fetch_datasource(self, datasource_id: str): """ Fetch data source diff --git a/ingestion/src/metadata/ingestion/source/superset.py b/ingestion/src/metadata/ingestion/source/superset.py index ad99248816c..42e1f9425ae 100644 --- a/ingestion/src/metadata/ingestion/source/superset.py +++ b/ingestion/src/metadata/ingestion/source/superset.py @@ -12,23 +12,33 @@ Superset source module """ - import json +import logging +import traceback from typing import Iterable import dateutil.parser as dateparser +from metadata.generated.schema.api.lineage.addLineage import AddLineageRequest +from metadata.generated.schema.entity.data.dashboard import ( + Dashboard as Lineage_Dashboard, +) +from metadata.generated.schema.entity.data.table import Table from metadata.generated.schema.entity.services.dashboardService import ( DashboardServiceType, ) +from metadata.generated.schema.type.entityLineage import EntitiesEdge from metadata.generated.schema.type.entityReference import EntityReference from metadata.ingestion.api.common import Entity, WorkflowContext from metadata.ingestion.api.source import Source, SourceStatus from metadata.ingestion.models.table_metadata import Chart, Dashboard, DashboardOwner +from metadata.ingestion.ometa.ometa_api import OpenMetadata from metadata.ingestion.ometa.openmetadata_rest import MetadataServerConfig from metadata.ingestion.ometa.superset_rest import SupersetAPIClient, SupersetConfig from metadata.utils.helpers import get_dashboard_service_or_create +logger: logging.Logger = logging.getLogger(__name__) + def get_metric_name(metric): """ @@ -162,6 +172,7 @@ class SupersetSource(Source[Entity]): super().__init__(ctx) self.config = config self.metadata_config = metadata_config + self.metadata_client = OpenMetadata(self.metadata_config) self.status = SourceStatus() self.client = SupersetAPIClient(self.config) self.service = get_dashboard_service_or_create( @@ -253,6 +264,36 @@ class SupersetSource(Source[Entity]): return dataset_fqn return None + def _check_lineage(self, chart_id, datasource_text): + if datasource_text and self.config.db_service_name: + chart_data = self.client.fetch_charts_with_id(chart_id) + dashboards = chart_data["result"].get("dashboards") + for dashboard in dashboards: + try: + from_entity = self.metadata_client.get_by_name( + entity=Table, + fqdn=f"{self.config.db_service_name}.{datasource_text}", + ) + to_entity = self.metadata_client.get_by_name( + entity=Lineage_Dashboard, + fqdn=f"{self.config.service_name}.{dashboard['id']}", + ) + lineage = AddLineageRequest( + edge=EntitiesEdge( + fromEntity=EntityReference( + id=from_entity.id.__root__, type="table" + ), + toEntity=EntityReference( + id=to_entity.id.__root__, type="dashboard" + ), + ) + ) + yield lineage + + except Exception as err: + logger.debug(traceback.print_exc()) + logger.error(err) + # pylint: disable=too-many-locals def _build_chart(self, chart_json) -> Chart: chart_id = chart_json["id"] @@ -295,7 +336,8 @@ class SupersetSource(Source[Entity]): service=EntityReference(id=self.service.id, type="dashboardService"), custom_props=custom_properties, ) - return chart + yield from self._check_lineage(chart_id, chart_json.get("datasource_name_text")) + yield chart def _fetch_charts(self): current_page = 0 @@ -305,7 +347,7 @@ class SupersetSource(Source[Entity]): charts = self.client.fetch_charts(current_page, page_size) current_page += 1 for chart_json in charts["result"]: - yield self._build_chart(chart_json) + yield from self._build_chart(chart_json) def get_status(self): return self.status