Refactored Superset (#3979)

This commit is contained in:
Ayush Shah 2022-04-09 05:40:00 -07:00 committed by GitHub
parent 3840eb77c0
commit fbf5861128
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 67 additions and 61 deletions

View File

@ -39,6 +39,11 @@
"type": "string", "type": "string",
"default": "db" "default": "db"
}, },
"dbServiceConnection": {
"description": "Database Service to create lineage",
"type": "string",
"default": null
},
"connectionOptions": { "connectionOptions": {
"description": "Additional connection options that can be sent to service during the connection.", "description": "Additional connection options that can be sent to service during the connection.",
"type": "object" "type": "object"

View File

@ -1,24 +1,30 @@
{ {
"source": { "source": {
"type": "superset", "type": "superset",
"serviceName": "local_superset",
"serviceConnection": {
"config": { "config": {
"url": "http://localhost:8080", "supersetURL": "http://localhost:8080",
"username": "admin", "username": "admin",
"password": "admin", "password": "admin",
"service_name": "local_superset", "type": "Superset"
"db_service_name": "aws_redshift" }
},
"sourceConfig": {
"config": {
"chartFilterPattern": {},
"dashboardFilterPattern": {}
}
} }
}, },
"sink": { "sink": {
"type": "metadata-rest", "type": "metadata-rest",
"config": { "config": {}
}
}, },
"metadata_server": { "workflowConfig": {
"type": "metadata-server", "openMetadataServerConfig": {
"config": { "hostPort": "http://localhost:8585/api",
"api_endpoint": "http://localhost:8585/api", "authProvider": "no-auth"
"auth_provider_type": "no-auth"
} }
} }
} }

View File

@ -13,53 +13,35 @@ REST Auth & Client for Apache Superset
""" """
import json import json
import logging import logging
from typing import Optional
from pydantic import SecretStr from pydantic import SecretStr
from metadata.config.common import ConfigModel from metadata.config.common import ConfigModel
from metadata.generated.schema.entity.services.connections.dashboard.supersetConnection import (
SupersetConnection,
)
from metadata.generated.schema.entity.services.dashboardService import ( from metadata.generated.schema.entity.services.dashboardService import (
DashboardServiceType, DashboardServiceType,
) )
from metadata.generated.schema.metadataIngestion.workflow import (
Source as WorkflowSource,
)
from metadata.ingestion.ometa.auth_provider import AuthenticationProvider from metadata.ingestion.ometa.auth_provider import AuthenticationProvider
from metadata.ingestion.ometa.client import REST, ClientConfig from metadata.ingestion.ometa.client import REST, ClientConfig
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
class SupersetConfig(ConfigModel):
"""
Superset Configuration class
Attributes:
url (str):
username (Optional[str]):
password (Optional[str]):
service_name (str):
service_type (str):
provider (str):
options (dict):
"""
url: str = "localhost:8088"
username: Optional[str] = None
password: Optional[SecretStr] = None
service_name: str
service_type: str = DashboardServiceType.Superset.value
provider: str = "db"
options: dict = {}
db_service_name: Optional[str] = None
class SupersetAuthenticationProvider(AuthenticationProvider): class SupersetAuthenticationProvider(AuthenticationProvider):
""" """
Handle SuperSet Auth Handle SuperSet Auth
""" """
def __init__(self, config: SupersetConfig): def __init__(self, config: WorkflowSource):
self.config = config self.config = config
self.service_connection = config.serviceConnection.__root__.config
client_config = ClientConfig( client_config = ClientConfig(
base_url=config.url, base_url=config.serviceConnection.__root__.config.supersetURL,
api_version="api/v1", api_version="api/v1",
auth_token=lambda: ("no_token", 0), auth_token=lambda: ("no_token", 0),
auth_header="Authorization", auth_header="Authorization",
@ -69,7 +51,7 @@ class SupersetAuthenticationProvider(AuthenticationProvider):
super().__init__() super().__init__()
@classmethod @classmethod
def create(cls, config: SupersetConfig): def create(cls, config: WorkflowSource):
return cls(config) return cls(config)
def auth_token(self) -> str: def auth_token(self) -> str:
@ -80,10 +62,10 @@ class SupersetAuthenticationProvider(AuthenticationProvider):
def _login_request(self) -> str: def _login_request(self) -> str:
auth_request = { auth_request = {
"username": self.config.username, "username": self.service_connection.username,
"password": self.config.password.get_secret_value(), "password": self.service_connection.password.get_secret_value(),
"refresh": True, "refresh": True,
"provider": self.config.provider, "provider": self.service_connection.provider,
} }
return json.dumps(auth_request) return json.dumps(auth_request)
@ -100,11 +82,11 @@ class SupersetAPIClient:
client: REST client: REST
_auth_provider: AuthenticationProvider _auth_provider: AuthenticationProvider
def __init__(self, config: SupersetConfig): def __init__(self, config: WorkflowSource):
self.config = config self.config = config
self._auth_provider = SupersetAuthenticationProvider.create(config) self._auth_provider = SupersetAuthenticationProvider.create(config)
client_config = ClientConfig( client_config = ClientConfig(
base_url=config.url, base_url=config.serviceConnection.__root__.config.supersetURL,
api_version="api/v1", api_version="api/v1",
auth_token=lambda: self._auth_provider.get_access_token(), auth_token=lambda: self._auth_provider.get_access_token(),
auth_header="Authorization", auth_header="Authorization",

View File

@ -25,19 +25,25 @@ from metadata.generated.schema.entity.data.dashboard import (
Dashboard as Lineage_Dashboard, Dashboard as Lineage_Dashboard,
) )
from metadata.generated.schema.entity.data.table import Table from metadata.generated.schema.entity.data.table import Table
from metadata.generated.schema.entity.services.connections.dashboard.supersetConnection import (
SupersetConnection,
)
from metadata.generated.schema.entity.services.dashboardService import ( from metadata.generated.schema.entity.services.dashboardService import (
DashboardServiceType, DashboardServiceType,
) )
from metadata.generated.schema.metadataIngestion.workflow import ( from metadata.generated.schema.metadataIngestion.workflow import (
OpenMetadataServerConfig, OpenMetadataServerConfig,
) )
from metadata.generated.schema.metadataIngestion.workflow import (
Source as WorkflowSource,
)
from metadata.generated.schema.type.entityLineage import EntitiesEdge from metadata.generated.schema.type.entityLineage import EntitiesEdge
from metadata.generated.schema.type.entityReference import EntityReference from metadata.generated.schema.type.entityReference import EntityReference
from metadata.ingestion.api.common import Entity from metadata.ingestion.api.common import Entity
from metadata.ingestion.api.source import Source, SourceStatus from metadata.ingestion.api.source import InvalidSourceException, Source, SourceStatus
from metadata.ingestion.models.table_metadata import Chart, Dashboard, DashboardOwner from metadata.ingestion.models.table_metadata import Chart, Dashboard, DashboardOwner
from metadata.ingestion.ometa.ometa_api import OpenMetadata from metadata.ingestion.ometa.ometa_api import OpenMetadata
from metadata.ingestion.ometa.superset_rest import SupersetAPIClient, SupersetConfig from metadata.ingestion.ometa.superset_rest import SupersetAPIClient
from metadata.utils.helpers import get_dashboard_service_or_create from metadata.utils.helpers import get_dashboard_service_or_create
logger: logging.Logger = logging.getLogger(__name__) logger: logging.Logger = logging.getLogger(__name__)
@ -159,7 +165,7 @@ class SupersetSource(Source[Entity]):
""" """
config: SupersetConfig config: WorkflowSource
metadata_config: OpenMetadataServerConfig metadata_config: OpenMetadataServerConfig
status: SourceStatus status: SourceStatus
platform = "superset" platform = "superset"
@ -167,27 +173,32 @@ class SupersetSource(Source[Entity]):
def __init__( def __init__(
self, self,
config: SupersetConfig, config: WorkflowSource,
metadata_config: OpenMetadataServerConfig, metadata_config: OpenMetadataServerConfig,
): ):
super().__init__() super().__init__()
self.config = config self.config = config
self.service_connection = self.config.serviceConnection.__root__.config
self.source_config = self.config.sourceConfig.config
self.metadata_config = metadata_config self.metadata_config = metadata_config
self.metadata_client = OpenMetadata(self.metadata_config) self.metadata_client = OpenMetadata(self.metadata_config)
self.status = SourceStatus() self.status = SourceStatus()
self.client = SupersetAPIClient(self.config) self.client = SupersetAPIClient(self.config)
self.service = get_dashboard_service_or_create( self.service = get_dashboard_service_or_create(
service_name=config.service_name, service_name=config.serviceName,
dashboard_service_type=DashboardServiceType.Superset.name, dashboard_service_type=DashboardServiceType.Superset.name,
username=config.username, config=self.service_connection.dict(),
password=config.password.get_secret_value(),
dashboard_url=config.url,
metadata_config=metadata_config, metadata_config=metadata_config,
) )
@classmethod @classmethod
def create(cls, config_dict: dict, metadata_config: OpenMetadataServerConfig): def create(cls, config_dict: dict, metadata_config: OpenMetadataServerConfig):
config = SupersetConfig.parse_obj(config_dict) config = WorkflowSource.parse_obj(config_dict)
connection: SupersetConnection = config.serviceConnection.__root__.config
if not isinstance(connection, SupersetConnection):
raise InvalidSourceException(
f"Expected SupersetConnection, but got {connection}"
)
return cls(config, metadata_config) return cls(config, metadata_config)
def prepare(self): def prepare(self):
@ -200,7 +211,9 @@ class SupersetSource(Source[Entity]):
def _build_dashboard(self, dashboard_json) -> Dashboard: def _build_dashboard(self, dashboard_json) -> Dashboard:
dashboard_id = dashboard_json["id"] dashboard_id = dashboard_json["id"]
name = dashboard_json["dashboard_title"] name = dashboard_json["dashboard_title"]
dashboard_url = f"{self.config.url[:-1]}{dashboard_json['url']}" dashboard_url = (
f"{self.service_connection.supersetURL[:-1]}{dashboard_json['url']}"
)
last_modified = ( last_modified = (
dateparser.parse(dashboard_json.get("changed_on_utc", "now")).timestamp() dateparser.parse(dashboard_json.get("changed_on_utc", "now")).timestamp()
* 1000 * 1000
@ -263,18 +276,18 @@ class SupersetSource(Source[Entity]):
return None return None
def _check_lineage(self, chart_id, datasource_text): def _check_lineage(self, chart_id, datasource_text):
if datasource_text and self.config.db_service_name: if datasource_text and hasattr(self.service_connection, "dbServiceName"):
chart_data = self.client.fetch_charts_with_id(chart_id) chart_data = self.client.fetch_charts_with_id(chart_id)
dashboards = chart_data["result"].get("dashboards") dashboards = chart_data["result"].get("dashboards")
for dashboard in dashboards: for dashboard in dashboards:
try: try:
from_entity = self.metadata_client.get_by_name( from_entity = self.metadata_client.get_by_name(
entity=Table, entity=Table,
fqdn=f"{self.config.db_service_name}.{datasource_text}", fqdn=f"{self.service_connection.dbServiceName}.{datasource_text}",
) )
to_entity = self.metadata_client.get_by_name( to_entity = self.metadata_client.get_by_name(
entity=Lineage_Dashboard, entity=Lineage_Dashboard,
fqdn=f"{self.config.service_name}.{dashboard['id']}", fqdn=f"{self.config.serviceName}.{dashboard['id']}",
) )
if from_entity and to_entity: if from_entity and to_entity:
lineage = AddLineageRequest( lineage = AddLineageRequest(
@ -301,7 +314,7 @@ class SupersetSource(Source[Entity]):
dateparser.parse(chart_json.get("changed_on_utc", "now")).timestamp() * 1000 dateparser.parse(chart_json.get("changed_on_utc", "now")).timestamp() * 1000
) )
chart_type = chart_json["viz_type"] chart_type = chart_json["viz_type"]
chart_url = f"{self.config.url}{chart_json['url']}" chart_url = f"{self.service_connection.supersetURL}{chart_json['url']}"
datasource_id = chart_json["datasource_id"] datasource_id = chart_json["datasource_id"]
datasource_fqn = self._get_datasource_from_id(datasource_id) datasource_fqn = self._get_datasource_from_id(datasource_id)
owners = get_owners(chart_json["owners"]) owners = get_owners(chart_json["owners"])