Issue-969: Superset Lineage Implemented (#2659)

This commit is contained in:
Ayush Shah 2022-02-08 05:46:38 +05:30 committed by GitHub
parent 66c3f43bbb
commit 18d963c3d8
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 54 additions and 6 deletions

View File

@ -2,10 +2,11 @@
"source": { "source": {
"type": "superset", "type": "superset",
"config": { "config": {
"url": "http://localhost:8088", "url": "http://localhost:8080",
"username": "admin", "username": "admin",
"password": "admin", "password": "admin",
"service_name": "local_superset" "service_name": "local_superset",
"db_service_name": "aws_redshift"
} }
}, },
"sink": { "sink": {

View File

@ -45,6 +45,7 @@ class SupersetConfig(ConfigModel):
service_type: str = "Superset" service_type: str = "Superset"
provider: str = "db" provider: str = "db"
options: dict = {} options: dict = {}
db_service_name: Optional[str] = None
class SupersetAuthenticationProvider(AuthenticationProvider): class SupersetAuthenticationProvider(AuthenticationProvider):
@ -91,7 +92,7 @@ class SupersetAPIClient:
client_config = ClientConfig( client_config = ClientConfig(
base_url=config.url, base_url=config.url,
api_version="api/v1", api_version="api/v1",
auth_token=f"Bearer {self._auth_provider.auth_token()}", auth_token=f"{self._auth_provider.auth_token()}",
auth_header="Authorization", auth_header="Authorization",
allow_redirects=True, allow_redirects=True,
) )
@ -149,6 +150,10 @@ class SupersetAPIClient:
) )
return response return response
def fetch_charts_with_id(self, chart_id):
response = self.client.get(f"/chart/{chart_id}")
return response
def fetch_datasource(self, datasource_id: str): def fetch_datasource(self, datasource_id: str):
""" """
Fetch data source Fetch data source

View File

@ -12,23 +12,33 @@
Superset source module Superset source module
""" """
import json import json
import logging
import traceback
from typing import Iterable from typing import Iterable
import dateutil.parser as dateparser import dateutil.parser as dateparser
from metadata.generated.schema.api.lineage.addLineage import AddLineageRequest
from metadata.generated.schema.entity.data.dashboard import (
Dashboard as Lineage_Dashboard,
)
from metadata.generated.schema.entity.data.table import Table
from metadata.generated.schema.entity.services.dashboardService import ( from metadata.generated.schema.entity.services.dashboardService import (
DashboardServiceType, DashboardServiceType,
) )
from metadata.generated.schema.type.entityLineage import EntitiesEdge
from metadata.generated.schema.type.entityReference import EntityReference from metadata.generated.schema.type.entityReference import EntityReference
from metadata.ingestion.api.common import Entity, WorkflowContext from metadata.ingestion.api.common import Entity, WorkflowContext
from metadata.ingestion.api.source import Source, SourceStatus from metadata.ingestion.api.source import Source, SourceStatus
from metadata.ingestion.models.table_metadata import Chart, Dashboard, DashboardOwner from metadata.ingestion.models.table_metadata import Chart, Dashboard, DashboardOwner
from metadata.ingestion.ometa.ometa_api import OpenMetadata
from metadata.ingestion.ometa.openmetadata_rest import MetadataServerConfig from metadata.ingestion.ometa.openmetadata_rest import MetadataServerConfig
from metadata.ingestion.ometa.superset_rest import SupersetAPIClient, SupersetConfig from metadata.ingestion.ometa.superset_rest import SupersetAPIClient, SupersetConfig
from metadata.utils.helpers import get_dashboard_service_or_create from metadata.utils.helpers import get_dashboard_service_or_create
logger: logging.Logger = logging.getLogger(__name__)
def get_metric_name(metric): def get_metric_name(metric):
""" """
@ -162,6 +172,7 @@ class SupersetSource(Source[Entity]):
super().__init__(ctx) super().__init__(ctx)
self.config = config self.config = config
self.metadata_config = metadata_config self.metadata_config = metadata_config
self.metadata_client = OpenMetadata(self.metadata_config)
self.status = SourceStatus() self.status = SourceStatus()
self.client = SupersetAPIClient(self.config) self.client = SupersetAPIClient(self.config)
self.service = get_dashboard_service_or_create( self.service = get_dashboard_service_or_create(
@ -253,6 +264,36 @@ class SupersetSource(Source[Entity]):
return dataset_fqn return dataset_fqn
return None return None
def _check_lineage(self, chart_id, datasource_text):
if datasource_text and self.config.db_service_name:
chart_data = self.client.fetch_charts_with_id(chart_id)
dashboards = chart_data["result"].get("dashboards")
for dashboard in dashboards:
try:
from_entity = self.metadata_client.get_by_name(
entity=Table,
fqdn=f"{self.config.db_service_name}.{datasource_text}",
)
to_entity = self.metadata_client.get_by_name(
entity=Lineage_Dashboard,
fqdn=f"{self.config.service_name}.{dashboard['id']}",
)
lineage = AddLineageRequest(
edge=EntitiesEdge(
fromEntity=EntityReference(
id=from_entity.id.__root__, type="table"
),
toEntity=EntityReference(
id=to_entity.id.__root__, type="dashboard"
),
)
)
yield lineage
except Exception as err:
logger.debug(traceback.print_exc())
logger.error(err)
# pylint: disable=too-many-locals # pylint: disable=too-many-locals
def _build_chart(self, chart_json) -> Chart: def _build_chart(self, chart_json) -> Chart:
chart_id = chart_json["id"] chart_id = chart_json["id"]
@ -295,7 +336,8 @@ class SupersetSource(Source[Entity]):
service=EntityReference(id=self.service.id, type="dashboardService"), service=EntityReference(id=self.service.id, type="dashboardService"),
custom_props=custom_properties, custom_props=custom_properties,
) )
return chart yield from self._check_lineage(chart_id, chart_json.get("datasource_name_text"))
yield chart
def _fetch_charts(self): def _fetch_charts(self):
current_page = 0 current_page = 0
@ -305,7 +347,7 @@ class SupersetSource(Source[Entity]):
charts = self.client.fetch_charts(current_page, page_size) charts = self.client.fetch_charts(current_page, page_size)
current_page += 1 current_page += 1
for chart_json in charts["result"]: for chart_json in charts["result"]:
yield self._build_chart(chart_json) yield from self._build_chart(chart_json)
def get_status(self): def get_status(self):
return self.status return self.status