diff --git a/catalog-rest-service/src/main/resources/json/schema/entity/services/connections/dashboard/supersetConnection.json b/catalog-rest-service/src/main/resources/json/schema/entity/services/connections/dashboard/supersetConnection.json index a15ed811c49..dac2b0f8f18 100644 --- a/catalog-rest-service/src/main/resources/json/schema/entity/services/connections/dashboard/supersetConnection.json +++ b/catalog-rest-service/src/main/resources/json/schema/entity/services/connections/dashboard/supersetConnection.json @@ -39,6 +39,11 @@ "type": "string", "default": "db" }, + "dbServiceConnection": { + "description": "Database Service to create lineage", + "type": "string", + "default": null + }, "connectionOptions": { "description": "Additional connection options that can be sent to service during the connection.", "type": "object" diff --git a/ingestion/examples/workflows/superset.json b/ingestion/examples/workflows/superset.json index 77ec96773d5..0170596587d 100644 --- a/ingestion/examples/workflows/superset.json +++ b/ingestion/examples/workflows/superset.json @@ -1,24 +1,30 @@ { "source": { "type": "superset", - "config": { - "url": "http://localhost:8080", - "username": "admin", - "password": "admin", - "service_name": "local_superset", - "db_service_name": "aws_redshift" + "serviceName": "local_superset", + "serviceConnection": { + "config": { + "supersetURL": "http://localhost:8080", + "username": "admin", + "password": "admin", + "type": "Superset" + } + }, + "sourceConfig": { + "config": { + "chartFilterPattern": {}, + "dashboardFilterPattern": {} + } } }, "sink": { "type": "metadata-rest", - "config": { - } + "config": {} }, - "metadata_server": { - "type": "metadata-server", - "config": { - "api_endpoint": "http://localhost:8585/api", - "auth_provider_type": "no-auth" + "workflowConfig": { + "openMetadataServerConfig": { + "hostPort": "http://localhost:8585/api", + "authProvider": "no-auth" } } -} +} \ No newline at end of file diff --git a/ingestion/src/metadata/ingestion/ometa/superset_rest.py b/ingestion/src/metadata/ingestion/ometa/superset_rest.py index 0aaf161fc16..f3e32ca9f72 100644 --- a/ingestion/src/metadata/ingestion/ometa/superset_rest.py +++ b/ingestion/src/metadata/ingestion/ometa/superset_rest.py @@ -13,53 +13,35 @@ REST Auth & Client for Apache Superset """ import json import logging -from typing import Optional from pydantic import SecretStr from metadata.config.common import ConfigModel +from metadata.generated.schema.entity.services.connections.dashboard.supersetConnection import ( + SupersetConnection, +) from metadata.generated.schema.entity.services.dashboardService import ( DashboardServiceType, ) +from metadata.generated.schema.metadataIngestion.workflow import ( + Source as WorkflowSource, +) from metadata.ingestion.ometa.auth_provider import AuthenticationProvider from metadata.ingestion.ometa.client import REST, ClientConfig logger = logging.getLogger(__name__) -class SupersetConfig(ConfigModel): - """ - Superset Configuration class - - Attributes: - url (str): - username (Optional[str]): - password (Optional[str]): - service_name (str): - service_type (str): - provider (str): - options (dict): - """ - - url: str = "localhost:8088" - username: Optional[str] = None - password: Optional[SecretStr] = None - service_name: str - service_type: str = DashboardServiceType.Superset.value - provider: str = "db" - options: dict = {} - db_service_name: Optional[str] = None - - class SupersetAuthenticationProvider(AuthenticationProvider): """ Handle SuperSet Auth """ - def __init__(self, config: SupersetConfig): + def __init__(self, config: WorkflowSource): self.config = config + self.service_connection = config.serviceConnection.__root__.config client_config = ClientConfig( - base_url=config.url, + base_url=config.serviceConnection.__root__.config.supersetURL, api_version="api/v1", auth_token=lambda: ("no_token", 0), auth_header="Authorization", @@ -69,7 +51,7 @@ class SupersetAuthenticationProvider(AuthenticationProvider): super().__init__() @classmethod - def create(cls, config: SupersetConfig): + def create(cls, config: WorkflowSource): return cls(config) def auth_token(self) -> str: @@ -80,10 +62,10 @@ class SupersetAuthenticationProvider(AuthenticationProvider): def _login_request(self) -> str: auth_request = { - "username": self.config.username, - "password": self.config.password.get_secret_value(), + "username": self.service_connection.username, + "password": self.service_connection.password.get_secret_value(), "refresh": True, - "provider": self.config.provider, + "provider": self.service_connection.provider, } return json.dumps(auth_request) @@ -100,11 +82,11 @@ class SupersetAPIClient: client: REST _auth_provider: AuthenticationProvider - def __init__(self, config: SupersetConfig): + def __init__(self, config: WorkflowSource): self.config = config self._auth_provider = SupersetAuthenticationProvider.create(config) client_config = ClientConfig( - base_url=config.url, + base_url=config.serviceConnection.__root__.config.supersetURL, api_version="api/v1", auth_token=lambda: self._auth_provider.get_access_token(), auth_header="Authorization", diff --git a/ingestion/src/metadata/ingestion/source/superset.py b/ingestion/src/metadata/ingestion/source/superset.py index 7c43ef45c5a..a55b8205ec0 100644 --- a/ingestion/src/metadata/ingestion/source/superset.py +++ b/ingestion/src/metadata/ingestion/source/superset.py @@ -25,19 +25,25 @@ from metadata.generated.schema.entity.data.dashboard import ( Dashboard as Lineage_Dashboard, ) from metadata.generated.schema.entity.data.table import Table +from metadata.generated.schema.entity.services.connections.dashboard.supersetConnection import ( + SupersetConnection, +) from metadata.generated.schema.entity.services.dashboardService import ( DashboardServiceType, ) from metadata.generated.schema.metadataIngestion.workflow import ( OpenMetadataServerConfig, ) +from metadata.generated.schema.metadataIngestion.workflow import ( + Source as WorkflowSource, +) from metadata.generated.schema.type.entityLineage import EntitiesEdge from metadata.generated.schema.type.entityReference import EntityReference from metadata.ingestion.api.common import Entity -from metadata.ingestion.api.source import Source, SourceStatus +from metadata.ingestion.api.source import InvalidSourceException, Source, SourceStatus from metadata.ingestion.models.table_metadata import Chart, Dashboard, DashboardOwner from metadata.ingestion.ometa.ometa_api import OpenMetadata -from metadata.ingestion.ometa.superset_rest import SupersetAPIClient, SupersetConfig +from metadata.ingestion.ometa.superset_rest import SupersetAPIClient from metadata.utils.helpers import get_dashboard_service_or_create logger: logging.Logger = logging.getLogger(__name__) @@ -159,7 +165,7 @@ class SupersetSource(Source[Entity]): """ - config: SupersetConfig + config: WorkflowSource metadata_config: OpenMetadataServerConfig status: SourceStatus platform = "superset" @@ -167,27 +173,32 @@ class SupersetSource(Source[Entity]): def __init__( self, - config: SupersetConfig, + config: WorkflowSource, metadata_config: OpenMetadataServerConfig, ): super().__init__() self.config = config + self.service_connection = self.config.serviceConnection.__root__.config + self.source_config = self.config.sourceConfig.config self.metadata_config = metadata_config self.metadata_client = OpenMetadata(self.metadata_config) self.status = SourceStatus() self.client = SupersetAPIClient(self.config) self.service = get_dashboard_service_or_create( - service_name=config.service_name, + service_name=config.serviceName, dashboard_service_type=DashboardServiceType.Superset.name, - username=config.username, - password=config.password.get_secret_value(), - dashboard_url=config.url, + config=self.service_connection.dict(), metadata_config=metadata_config, ) @classmethod def create(cls, config_dict: dict, metadata_config: OpenMetadataServerConfig): - config = SupersetConfig.parse_obj(config_dict) + config = WorkflowSource.parse_obj(config_dict) + connection: SupersetConnection = config.serviceConnection.__root__.config + if not isinstance(connection, SupersetConnection): + raise InvalidSourceException( + f"Expected SupersetConnection, but got {connection}" + ) return cls(config, metadata_config) def prepare(self): @@ -200,7 +211,9 @@ class SupersetSource(Source[Entity]): def _build_dashboard(self, dashboard_json) -> Dashboard: dashboard_id = dashboard_json["id"] name = dashboard_json["dashboard_title"] - dashboard_url = f"{self.config.url[:-1]}{dashboard_json['url']}" + dashboard_url = ( + f"{self.service_connection.supersetURL[:-1]}{dashboard_json['url']}" + ) last_modified = ( dateparser.parse(dashboard_json.get("changed_on_utc", "now")).timestamp() * 1000 @@ -263,18 +276,18 @@ class SupersetSource(Source[Entity]): return None def _check_lineage(self, chart_id, datasource_text): - if datasource_text and self.config.db_service_name: + if datasource_text and hasattr(self.service_connection, "dbServiceName"): chart_data = self.client.fetch_charts_with_id(chart_id) dashboards = chart_data["result"].get("dashboards") for dashboard in dashboards: try: from_entity = self.metadata_client.get_by_name( entity=Table, - fqdn=f"{self.config.db_service_name}.{datasource_text}", + fqdn=f"{self.service_connection.dbServiceName}.{datasource_text}", ) to_entity = self.metadata_client.get_by_name( entity=Lineage_Dashboard, - fqdn=f"{self.config.service_name}.{dashboard['id']}", + fqdn=f"{self.config.serviceName}.{dashboard['id']}", ) if from_entity and to_entity: lineage = AddLineageRequest( @@ -301,7 +314,7 @@ class SupersetSource(Source[Entity]): dateparser.parse(chart_json.get("changed_on_utc", "now")).timestamp() * 1000 ) chart_type = chart_json["viz_type"] - chart_url = f"{self.config.url}{chart_json['url']}" + chart_url = f"{self.service_connection.supersetURL}{chart_json['url']}" datasource_id = chart_json["datasource_id"] datasource_fqn = self._get_datasource_from_id(datasource_id) owners = get_owners(chart_json["owners"])