Fix #8973: Superset fetch metadata from DB (#9645)

* Docs #8973: Superset with SSO docs

* Superset support for mysql & postgres db

* remove unwanted field from yaml

* Fix pylint

* Fix superset tests

* Fix sample data connection

* ci fix & review comments

* cypress & defualt provider fix
This commit is contained in:
Mayur Singal 2023-01-19 21:09:43 +05:30 committed by GitHub
parent ddff6e2875
commit 939adf887b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
22 changed files with 710 additions and 332 deletions

View File

@ -50,3 +50,16 @@ SET json = JSON_INSERT(
REPLACE(JSON_UNQUOTE(JSON_EXTRACT(json, '$.fullyQualifiedName')),':','')
)
WHERE JSON_EXTRACT(json, '$.serviceType') = 'Dagster';
UPDATE dashboard_service_entity
SET json = JSON_INSERT(
JSON_REMOVE(json,'$.connection.config.username','$.connection.config.password','$.connection.config.provider'),
'$.connection.config.connection',
JSON_OBJECT(
'username',JSON_EXTRACT(json,'$.connection.config.username'),
'password',JSON_EXTRACT(json,'$.connection.config.password'),
'provider',JSON_EXTRACT(json,'$.connection.config.provider')
)
)
WHERE serviceType = 'Superset';

View File

@ -47,4 +47,15 @@ SET json = jsonb_set(
'{fullyQualifiedName}',
to_jsonb(replace(json ->> 'fullyQualifiedName',':',''))
)
WHERE json ->> 'serviceType' = 'Dagster';
WHERE json ->> 'serviceType' = 'Dagster';
UPDATE dashboard_service_entity
SET json = JSONB_SET(json::jsonb,
'{connection,config}',json::jsonb #>'{connection,config}' #- '{password}' #- '{username}' #- '{provider}'||
jsonb_build_object('connection',jsonb_build_object(
'username',json #>'{connection,config,username}',
'password',json #>'{connection,config,password}',
'provider',json #>'{connection,config,provider}'
)), true)
where servicetype = 'Superset';

View File

@ -5,8 +5,11 @@
"config": {
"type": "Superset",
"hostPort": "http://localhost:8088",
"username": "admin",
"password": "admin"
"connection":{
"username": "admin",
"password": "admin",
"provider": "db"
}
}
},
"sourceConfig": {

View File

@ -1,16 +1,24 @@
source:
type: superset
serviceName: local_superset
serviceName: local_superset_12
serviceConnection:
config:
hostPort: http://localhost:8080
username: admin
password: admin
hostPort: http://localhost:8088
connection:
type: Postgres
username: superset
password: superset
hostPort: localhost:5432
database: superset
# username: admin
# password: admin
# provider: db
type: Superset
sourceConfig:
config:
chartFilterPattern: {}
dashboardFilterPattern: {}
type: DashboardMetadata
sink:
type: metadata-rest
config: {}

View File

@ -0,0 +1,181 @@
# Copyright 2021 Collate
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Superset source module
"""
import traceback
from typing import Iterable, List, Optional
from metadata.generated.schema.api.data.createChart import CreateChartRequest
from metadata.generated.schema.api.data.createDashboard import CreateDashboardRequest
from metadata.generated.schema.api.lineage.addLineage import AddLineageRequest
from metadata.generated.schema.entity.data.chart import ChartType
from metadata.generated.schema.entity.data.dashboard import (
Dashboard as Lineage_Dashboard,
)
from metadata.generated.schema.entity.data.table import Table
from metadata.generated.schema.type.entityReference import EntityReference
from metadata.ingestion.source.dashboard.superset.mixin import SupersetSourceMixin
from metadata.utils import fqn
from metadata.utils.helpers import get_standard_chart_type
from metadata.utils.logger import ingestion_logger
logger = ingestion_logger()
class SupersetAPISource(SupersetSourceMixin):
"""
Superset API Source Class
"""
def prepare(self):
"""
Fetching all charts available in superset
this step is done because fetch_total_charts api fetches all
the required information which is not available in fetch_charts_with_id api
"""
current_page = 0
page_size = 25
total_charts = self.client.fetch_total_charts()
while current_page * page_size <= total_charts:
charts = self.client.fetch_charts(current_page, page_size)
current_page += 1
for index in range(len(charts["result"])):
self.all_charts[charts["ids"][index]] = charts["result"][index]
def get_dashboards_list(self) -> Optional[List[object]]:
"""
Get List of all dashboards
"""
current_page = 0
page_size = 25
total_dashboards = self.client.fetch_total_dashboards()
while current_page * page_size <= total_dashboards:
dashboards = self.client.fetch_dashboards(current_page, page_size)
current_page += 1
for dashboard in dashboards["result"]:
yield dashboard
def yield_dashboard(
self, dashboard_details: dict
) -> Iterable[CreateDashboardRequest]:
"""
Method to Get Dashboard Entity
"""
yield CreateDashboardRequest(
name=dashboard_details["id"],
displayName=dashboard_details["dashboard_title"],
description="",
dashboardUrl=dashboard_details["url"],
owner=self.get_owner_details(dashboard_details),
charts=[
EntityReference(id=chart.id.__root__, type="chart")
for chart in self.context.charts
],
service=EntityReference(
id=self.context.dashboard_service.id.__root__, type="dashboardService"
),
)
def yield_dashboard_lineage_details(
self, dashboard_details: dict, db_service_name: str
) -> Optional[Iterable[AddLineageRequest]]:
"""
Get lineage between dashboard and data sources
"""
for chart_id in self._get_charts_of_dashboard(dashboard_details):
chart_json = self.all_charts.get(chart_id)
if chart_json:
datasource_fqn = (
self._get_datasource_fqn(
chart_json.get("datasource_id"), db_service_name
)
if chart_json.get("datasource_id")
else None
)
if not datasource_fqn:
continue
from_entity = self.metadata.get_by_name(
entity=Table,
fqn=datasource_fqn,
)
try:
dashboard_fqn = fqn.build(
self.metadata,
entity_type=Lineage_Dashboard,
service_name=self.config.serviceName,
dashboard_name=str(dashboard_details["id"]),
)
to_entity = self.metadata.get_by_name(
entity=Lineage_Dashboard,
fqn=dashboard_fqn,
)
if from_entity and to_entity:
yield self._get_add_lineage_request(
to_entity=to_entity, from_entity=from_entity
)
except Exception as exc:
logger.debug(traceback.format_exc())
logger.error(
f"Error to yield dashboard lineage details for DB service name [{db_service_name}]: {exc}"
)
def yield_dashboard_chart(
self, dashboard_details: dict
) -> Optional[Iterable[CreateChartRequest]]:
"""
Metod to fetch charts linked to dashboard
"""
for chart_id in self._get_charts_of_dashboard(dashboard_details):
chart_json = self.all_charts.get(chart_id)
if not chart_json:
logger.warning(f"chart details for id: {chart_id} not found, skipped")
continue
chart = CreateChartRequest(
name=chart_json["id"],
displayName=chart_json.get("slice_name"),
description="",
chartType=get_standard_chart_type(
chart_json.get("viz_type", ChartType.Other.value)
),
chartUrl=chart_json.get("url"),
service=EntityReference(
id=self.context.dashboard_service.id.__root__,
type="dashboardService",
),
)
yield chart
def _get_datasource_fqn(
self, datasource_id: str, db_service_name: str
) -> Optional[str]:
if db_service_name:
try:
datasource_json = self.client.fetch_datasource(datasource_id)
database_json = self.client.fetch_database(
datasource_json["result"]["database"]["id"]
)
dataset_fqn = fqn.build(
self.metadata,
entity_type=Table,
table_name=datasource_json["result"]["table_name"],
schema_name=datasource_json["result"]["schema"],
database_name=database_json["result"]["parameters"]["database"],
service_name=db_service_name,
)
return dataset_fqn
except KeyError as err:
logger.debug(traceback.format_exc())
logger.warning(
f"Failed to fetch Datasource with id [{datasource_id}]: {err}"
)
return None

View File

@ -55,10 +55,10 @@ class SupersetAuthenticationProvider(AuthenticationProvider):
def _login_request(self) -> str:
auth_request = {
"username": self.service_connection.username,
"password": self.service_connection.password.get_secret_value(),
"username": self.service_connection.connection.username,
"password": self.service_connection.connection.password.get_secret_value(),
"refresh": True,
"provider": self.service_connection.provider,
"provider": self.service_connection.connection.provider.value,
}
return json.dumps(auth_request)

View File

@ -12,26 +12,57 @@
"""
Source connection handler
"""
from typing import Union
from sqlalchemy.engine import Engine
from metadata.generated.schema.entity.services.connections.dashboard.supersetConnection import (
SupersetConnection,
)
from metadata.ingestion.connections.test_connections import SourceConnectionException
from metadata.generated.schema.entity.services.connections.database.mysqlConnection import (
MysqlConnection,
)
from metadata.generated.schema.entity.services.connections.database.postgresConnection import (
PostgresConnection,
)
from metadata.generated.schema.entity.utils.supersetApiConnection import (
SupersetAPIConnection,
)
from metadata.ingestion.connections.test_connections import (
SourceConnectionException,
test_connection_db_common,
)
from metadata.ingestion.source.dashboard.superset.client import SupersetAPIClient
from metadata.ingestion.source.database.mysql.connection import (
get_connection as mysql_get_connection,
)
from metadata.ingestion.source.database.postgres.connection import (
get_connection as pg_get_connection,
)
def get_connection(connection: SupersetConnection) -> SupersetAPIClient:
"""
Create connection
"""
return SupersetAPIClient(connection)
if isinstance(connection.connection, SupersetAPIConnection):
return SupersetAPIClient(connection)
if isinstance(connection.connection, PostgresConnection):
return pg_get_connection(connection=connection.connection)
if isinstance(connection.connection, MysqlConnection):
return mysql_get_connection(connection=connection.connection)
return None
def test_connection(client: SupersetAPIClient) -> None:
def test_connection(client: Union[SupersetAPIClient, Engine]) -> None:
"""
Test connection
"""
try:
client.fetch_menu()
if isinstance(client, SupersetAPIClient):
client.fetch_menu()
else:
test_connection_db_common(client)
except Exception as exc:
msg = f"Unknown error connecting with {client}: {exc}."
raise SourceConnectionException(msg)

View File

@ -0,0 +1,188 @@
# Copyright 2021 Collate
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Superset source module
"""
import traceback
from typing import Iterable, List, Optional
from sqlalchemy.engine import Engine
from metadata.generated.schema.api.data.createChart import CreateChartRequest
from metadata.generated.schema.api.data.createDashboard import CreateDashboardRequest
from metadata.generated.schema.api.lineage.addLineage import AddLineageRequest
from metadata.generated.schema.entity.data.chart import ChartType
from metadata.generated.schema.entity.data.dashboard import (
Dashboard as Lineage_Dashboard,
)
from metadata.generated.schema.entity.data.table import Table
from metadata.generated.schema.entity.services.connections.metadata.openMetadataConnection import (
OpenMetadataConnection,
)
from metadata.generated.schema.metadataIngestion.workflow import (
Source as WorkflowSource,
)
from metadata.generated.schema.type.entityReference import EntityReference
from metadata.ingestion.source.dashboard.superset.mixin import SupersetSourceMixin
from metadata.ingestion.source.dashboard.superset.queries import (
FETCH_ALL_CHARTS,
FETCH_DASHBOARDS,
)
from metadata.utils import fqn
from metadata.utils.helpers import get_standard_chart_type
from metadata.utils.logger import ingestion_logger
logger = ingestion_logger()
class SupersetDBSource(SupersetSourceMixin):
"""
Superset DB Source Class
"""
def __init__(self, config: WorkflowSource, metadata_config: OpenMetadataConnection):
super().__init__(config, metadata_config)
self.engine: Engine = self.client
def prepare(self):
"""
Fetching all charts available in superset
this step is done because fetch_total_charts api fetches all
the required information which is not available in fetch_charts_with_id api
"""
charts = self.engine.execute(FETCH_ALL_CHARTS)
for chart in charts:
self.all_charts[chart.id] = dict(chart)
def get_dashboards_list(self) -> Optional[List[object]]:
"""
Get List of all dashboards
"""
dashboards = self.engine.execute(FETCH_DASHBOARDS)
for dashboard in dashboards:
yield dict(dashboard)
def yield_dashboard(
self, dashboard_details: dict
) -> Iterable[CreateDashboardRequest]:
"""
Method to Get Dashboard Entity
"""
yield CreateDashboardRequest(
name=dashboard_details["id"],
displayName=dashboard_details["dashboard_title"],
description="",
dashboardUrl=f"/superset/dashboard/{dashboard_details['id']}",
owner=self.get_owner_details(dashboard_details),
charts=[
EntityReference(id=chart.id.__root__, type="chart")
for chart in self.context.charts
],
service=EntityReference(
id=self.context.dashboard_service.id.__root__, type="dashboardService"
),
)
def yield_dashboard_lineage_details(
self, dashboard_details: dict, db_service_name: str
) -> Optional[Iterable[AddLineageRequest]]:
"""
Get lineage between dashboard and data sources
"""
for chart_id in self._get_charts_of_dashboard(dashboard_details):
chart_json = self.all_charts.get(chart_id)
if chart_json:
datasource_fqn = (
self._get_datasource_fqn(chart_json, db_service_name)
if chart_json.get("table_name")
else None
)
if not datasource_fqn:
continue
from_entity = self.metadata.get_by_name(
entity=Table,
fqn=datasource_fqn,
)
try:
dashboard_fqn = fqn.build(
self.metadata,
entity_type=Lineage_Dashboard,
service_name=self.config.serviceName,
dashboard_name=str(dashboard_details["id"]),
)
to_entity = self.metadata.get_by_name(
entity=Lineage_Dashboard,
fqn=dashboard_fqn,
)
if from_entity and to_entity:
yield self._get_add_lineage_request(
to_entity=to_entity, from_entity=from_entity
)
except Exception as exc:
logger.debug(traceback.format_exc())
logger.error(
f"Error to yield dashboard lineage details for DB service name [{db_service_name}]: {exc}"
)
def yield_dashboard_chart(
self, dashboard_details: dict
) -> Optional[Iterable[CreateChartRequest]]:
"""
Metod to fetch charts linked to dashboard
"""
for chart_id in self._get_charts_of_dashboard(dashboard_details):
chart_json = self.all_charts.get(chart_id)
if not chart_json:
logger.warning(f"chart details for id: {chart_id} not found, skipped")
continue
chart = CreateChartRequest(
name=chart_json["id"],
displayName=chart_json.get("slice_name"),
description="",
chartType=get_standard_chart_type(
chart_json.get("viz_type", ChartType.Other.value)
),
chartUrl=f"/explore/?slice_id={chart_json['id']}",
service=EntityReference(
id=self.context.dashboard_service.id.__root__,
type="dashboardService",
),
)
yield chart
def _get_database_name(self, sqa_str: str) -> str:
if sqa_str:
return sqa_str.split("/")[-1]
return None
def _get_datasource_fqn(
self, chart_json: dict, db_service_name: str
) -> Optional[str]:
if db_service_name:
try:
dataset_fqn = fqn.build(
self.metadata,
entity_type=Table,
table_name=chart_json.get("table_name"),
database_name=self._get_database_name(
chart_json.get("sqlalchemy_uri")
),
schema_name=chart_json.get("schema"),
service_name=db_service_name,
)
return dataset_fqn
except KeyError as err:
logger.debug(traceback.format_exc())
logger.warning(
f"Failed to fetch Datasource with id [{chart_json.get('table_name')}]: {err}"
)
return None

View File

@ -11,94 +11,28 @@
"""
Superset source module
"""
import json
import traceback
from typing import Iterable, List, Optional
from metadata.generated.schema.api.data.createChart import CreateChartRequest
from metadata.generated.schema.api.data.createDashboard import CreateDashboardRequest
from metadata.generated.schema.api.lineage.addLineage import AddLineageRequest
from metadata.generated.schema.entity.data.chart import ChartType
from metadata.generated.schema.entity.data.dashboard import (
Dashboard as Lineage_Dashboard,
)
from metadata.generated.schema.entity.data.table import Table
from metadata.generated.schema.entity.services.connections.dashboard.supersetConnection import (
SupersetConnection,
)
from metadata.generated.schema.entity.services.connections.metadata.openMetadataConnection import (
OpenMetadataConnection,
)
from metadata.generated.schema.entity.services.dashboardService import (
DashboardServiceType,
from metadata.generated.schema.entity.utils.supersetApiConnection import (
SupersetAPIConnection,
)
from metadata.generated.schema.metadataIngestion.workflow import (
Source as WorkflowSource,
)
from metadata.generated.schema.type.entityReference import EntityReference
from metadata.ingestion.api.source import InvalidSourceException, SourceStatus
from metadata.ingestion.source.dashboard.dashboard_service import DashboardServiceSource
from metadata.utils import fqn
from metadata.utils.helpers import get_standard_chart_type
from metadata.utils.logger import ingestion_logger
logger = ingestion_logger()
from metadata.ingestion.api.source import InvalidSourceException
from metadata.ingestion.source.dashboard.superset.api_source import SupersetAPISource
from metadata.ingestion.source.dashboard.superset.db_source import SupersetDBSource
def get_metric_name(metric):
"""
Get metric name
Args:
metric:
Returns:
"""
if not metric:
return ""
if isinstance(metric, str):
return metric
label = metric.get("label")
return label or None
def get_filter_name(filter_obj):
"""
Get filter name
Args:
filter_obj:
Returns:
str
"""
sql_expression = filter_obj.get("sqlExpression")
if sql_expression:
return sql_expression
clause = filter_obj.get("clause")
column = filter_obj.get("subject")
operator = filter_obj.get("operator")
comparator = filter_obj.get("comparator")
return f"{clause} {column} {operator} {comparator}"
class SupersetSource(DashboardServiceSource):
class SupersetSource:
"""
Superset Source Class
"""
config: WorkflowSource
metadata_config: OpenMetadataConnection
status: SourceStatus
platform = "superset"
service_type = DashboardServiceType.Superset.value
def __init__(self, config: WorkflowSource, metadata_config: OpenMetadataConnection):
self.all_charts = {}
super().__init__(config, metadata_config)
@classmethod
def create(cls, config_dict: dict, metadata_config: OpenMetadataConnection):
config = WorkflowSource.parse_obj(config_dict)
@ -107,181 +41,6 @@ class SupersetSource(DashboardServiceSource):
raise InvalidSourceException(
f"Expected SupersetConnection, but got {connection}"
)
return cls(config, metadata_config)
def prepare(self):
"""
Fetching all charts available in superset
this step is done because fetch_total_charts api fetches all
the required information which is not available in fetch_charts_with_id api
"""
current_page = 0
page_size = 25
total_charts = self.client.fetch_total_charts()
while current_page * page_size <= total_charts:
charts = self.client.fetch_charts(current_page, page_size)
current_page += 1
for index in range(len(charts["result"])):
self.all_charts[charts["ids"][index]] = charts["result"][index]
def get_dashboards_list(self) -> Optional[List[object]]:
"""
Get List of all dashboards
"""
current_page = 0
page_size = 25
total_dashboards = self.client.fetch_total_dashboards()
while current_page * page_size <= total_dashboards:
dashboards = self.client.fetch_dashboards(current_page, page_size)
current_page += 1
for dashboard in dashboards["result"]:
yield dashboard
def get_dashboard_name(self, dashboard: dict) -> str:
"""
Get Dashboard Name
"""
return dashboard["dashboard_title"]
def get_dashboard_details(self, dashboard: dict) -> dict:
"""
Get Dashboard Details
"""
return dashboard
def get_owner_details(self, dashboard_details: dict) -> EntityReference:
if dashboard_details.get("owners"):
owner = dashboard_details["owners"][0]
user = self.metadata.get_user_by_email(owner.get("email"))
if user:
return EntityReference(id=user.id.__root__, type="user")
return None
def yield_dashboard(
self, dashboard_details: dict
) -> Iterable[CreateDashboardRequest]:
"""
Method to Get Dashboard Entity
"""
yield CreateDashboardRequest(
name=dashboard_details["id"],
displayName=dashboard_details["dashboard_title"],
description="",
dashboardUrl=dashboard_details["url"],
owner=self.get_owner_details(dashboard_details),
charts=[
EntityReference(id=chart.id.__root__, type="chart")
for chart in self.context.charts
],
service=EntityReference(
id=self.context.dashboard_service.id.__root__, type="dashboardService"
),
)
def _get_charts_of_dashboard(self, dashboard_details: dict) -> List[str]:
"""
Method to fetch chart ids linked to dashboard
"""
raw_position_data = dashboard_details.get("position_json", {})
if raw_position_data:
position_data = json.loads(raw_position_data)
return [
value.get("meta", {}).get("chartId")
for key, value in position_data.items()
if key.startswith("CHART-") and value.get("meta", {}).get("chartId")
]
return []
def yield_dashboard_lineage_details(
self, dashboard_details: dict, db_service_name: str
) -> Optional[Iterable[AddLineageRequest]]:
"""
Get lineage between dashboard and data sources
"""
for chart_id in self._get_charts_of_dashboard(dashboard_details):
chart_json = self.all_charts.get(chart_id)
if chart_json:
datasource_fqn = (
self._get_datasource_fqn(
chart_json.get("datasource_id"), db_service_name
)
if chart_json.get("datasource_id")
else None
)
if not datasource_fqn:
continue
from_entity = self.metadata.get_by_name(
entity=Table,
fqn=datasource_fqn,
)
try:
dashboard_fqn = fqn.build(
self.metadata,
entity_type=Lineage_Dashboard,
service_name=self.config.serviceName,
dashboard_name=str(dashboard_details["id"]),
)
to_entity = self.metadata.get_by_name(
entity=Lineage_Dashboard,
fqn=dashboard_fqn,
)
if from_entity and to_entity:
yield self._get_add_lineage_request(
to_entity=to_entity, from_entity=from_entity
)
except Exception as exc:
logger.debug(traceback.format_exc())
logger.error(
f"Error to yield dashboard lineage details for DB service name [{db_service_name}]: {exc}"
)
def yield_dashboard_chart(
self, dashboard_details: dict
) -> Optional[Iterable[CreateChartRequest]]:
"""
Metod to fetch charts linked to dashboard
"""
for chart_id in self._get_charts_of_dashboard(dashboard_details):
chart_json = self.all_charts.get(chart_id)
if not chart_json:
logger.warning(f"chart details for id: {chart_id} not found, skipped")
continue
chart = CreateChartRequest(
name=chart_json["id"],
displayName=chart_json.get("slice_name"),
description="",
chartType=get_standard_chart_type(
chart_json.get("viz_type", ChartType.Other.value)
),
chartUrl=chart_json.get("url"),
service=EntityReference(
id=self.context.dashboard_service.id.__root__,
type="dashboardService",
),
)
yield chart
def _get_datasource_fqn(
self, datasource_id: str, db_service_name: str
) -> Optional[str]:
if db_service_name:
try:
datasource_json = self.client.fetch_datasource(datasource_id)
database_json = self.client.fetch_database(
datasource_json["result"]["database"]["id"]
)
dataset_fqn = fqn.build(
self.metadata,
entity_type=Table,
table_name=datasource_json["result"]["table_name"],
schema_name=datasource_json["result"]["schema"],
database_name=database_json["result"]["parameters"]["database"],
service_name=db_service_name,
)
return dataset_fqn
except KeyError as err:
logger.debug(traceback.format_exc())
logger.warning(
f"Failed to fetch Datasource with id [{datasource_id}]: {err}"
)
return None
if isinstance(connection.connection, SupersetAPIConnection):
return SupersetAPISource(config, metadata_config)
return SupersetDBSource(config, metadata_config)

View File

@ -0,0 +1,91 @@
# Copyright 2021 Collate
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Superset mixin module
"""
import json
from typing import List
from metadata.generated.schema.entity.services.connections.dashboard.supersetConnection import (
SupersetConnection,
)
from metadata.generated.schema.entity.services.connections.metadata.openMetadataConnection import (
OpenMetadataConnection,
)
from metadata.generated.schema.entity.services.dashboardService import (
DashboardServiceType,
)
from metadata.generated.schema.metadataIngestion.workflow import (
Source as WorkflowSource,
)
from metadata.generated.schema.type.entityReference import EntityReference
from metadata.ingestion.api.source import InvalidSourceException, SourceStatus
from metadata.ingestion.source.dashboard.dashboard_service import DashboardServiceSource
class SupersetSourceMixin(DashboardServiceSource):
"""
Superset DB Source Class
"""
config: WorkflowSource
metadata_config: OpenMetadataConnection
status: SourceStatus
platform = "superset"
service_type = DashboardServiceType.Superset.value
service_connection: SupersetConnection
def __init__(self, config: WorkflowSource, metadata_config: OpenMetadataConnection):
super().__init__(config, metadata_config)
self.all_charts = {}
@classmethod
def create(cls, config_dict: dict, metadata_config: OpenMetadataConnection):
config = WorkflowSource.parse_obj(config_dict)
connection: SupersetConnection = config.serviceConnection.__root__.config
if not isinstance(connection, SupersetConnection):
raise InvalidSourceException(
f"Expected SupersetConnection, but got {connection}"
)
return cls(config, metadata_config)
def get_dashboard_name(self, dashboard: dict) -> str:
"""
Get Dashboard Name
"""
return dashboard["dashboard_title"]
def get_dashboard_details(self, dashboard: dict) -> dict:
"""
Get Dashboard Details
"""
return dashboard
def get_owner_details(self, dashboard_details: dict) -> EntityReference:
if dashboard_details.get("email"):
user = self.metadata.get_user_by_email(dashboard_details["email"])
if user:
return EntityReference(id=user.id.__root__, type="user")
return None
def _get_charts_of_dashboard(self, dashboard_details: dict) -> List[str]:
"""
Method to fetch chart ids linked to dashboard
"""
raw_position_data = dashboard_details.get("position_json", {})
if raw_position_data:
position_data = json.loads(raw_position_data)
return [
value.get("meta", {}).get("chartId")
for key, value in position_data.items()
if key.startswith("CHART-") and value.get("meta", {}).get("chartId")
]
return []

View File

@ -0,0 +1,43 @@
# Copyright 2021 Collate
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Queries to fetch data from superset
"""
FETCH_ALL_CHARTS = """
select
s.id,
s.slice_name,
t.table_name,
t.schema,
db.database_name,
db.sqlalchemy_uri
from
slices s left join "tables" t
on s.datasource_id = t.id and s.datasource_type = 'table'
left join "dbs" db
on db.id = t.database_id
"""
FETCH_DASHBOARDS = """
select
d.id,
d.dashboard_title,
d.position_json,
au.email
from
dashboards d
LEFT JOIN
ab_user au
ON
d.created_by_fk = au.id
"""

View File

@ -0,0 +1,14 @@
---
title: Superset
slug: /connectors/dashboard/superset/sso
---
# Superset with SSO
OpenMetadata utilizes [Superset REST APIs](https://superset.apache.org/docs/api/) to retrieve metadata from Superset. These APIs support two modes of authentication: `db` and `ldap`. At this time, `OAuth` authentication is not supported by these APIs.
Although the Superset REST APIs do not support OAuth authentication, there are still two ways for a user to authenticate through the API:
- **Using admin user credentials**: When a Superset instance is initialized, a default admin user is created with the username and password both set as "admin". This admin user can be used to authenticate to the Superset APIs via the "db" authentication mode.
- **Using database credentials**: You can fetch metadata from superset instance by providing the `mysql` or `postgres` database connection details.

View File

@ -378,6 +378,8 @@ site_menu:
url: /connectors/dashboard/superset/airflow
- category: Connectors / Dashboard / Superset / CLI
url: /connectors/dashboard/superset/cli
- category: Connectors / Dashboard / Superset / SSO
url: /connectors/dashboard/superset/sso
- category: Connectors / Dashboard / Metabase
url: /connectors/dashboard/metabase
- category: Connectors / Dashboard / Metabase / Airflow

View File

@ -259,7 +259,7 @@ public abstract class EntityResourceTest<T extends EntityInterface, K extends Cr
public static EntityReference GLOSSARY2_TERM1_REF;
public static TagLabel GLOSSARY2_TERM1_LABEL;
public static EntityReference SUPERSET_REFERENCE;
public static EntityReference METABASE_REFERENCE;
public static EntityReference LOOKER_REFERENCE;
public static List<EntityReference> CHART_REFERENCES;

View File

@ -55,7 +55,7 @@ public class ChartResourceTest extends EntityResourceTest<Chart, CreateChart> {
@Test
void post_chartWithDifferentService_200_ok(TestInfo test) throws IOException {
EntityReference[] differentServices = {SUPERSET_REFERENCE, LOOKER_REFERENCE};
EntityReference[] differentServices = {METABASE_REFERENCE, LOOKER_REFERENCE};
// Create chart for each service and test APIs
for (EntityReference service : differentServices) {
@ -99,7 +99,7 @@ public class ChartResourceTest extends EntityResourceTest<Chart, CreateChart> {
@Override
public EntityReference getContainer() {
return SUPERSET_REFERENCE;
return METABASE_REFERENCE;
}
@Override

View File

@ -68,7 +68,7 @@ public class DashboardResourceTest extends EntityResourceTest<Dashboard, CreateD
SUPERSET_INVALID_SERVICE_REFERENCE =
new EntityReference()
.withName("invalid_superset_service")
.withId(SUPERSET_REFERENCE.getId())
.withId(METABASE_REFERENCE.getId())
.withType("DashboardService1");
}
@ -94,7 +94,7 @@ public class DashboardResourceTest extends EntityResourceTest<Dashboard, CreateD
@Test
void post_DashboardWithDifferentService_200_ok(TestInfo test) throws IOException {
EntityReference[] differentServices = {SUPERSET_REFERENCE, LOOKER_REFERENCE};
EntityReference[] differentServices = {METABASE_REFERENCE, LOOKER_REFERENCE};
// Create Dashboard for each service and test APIs
for (EntityReference service : differentServices) {
@ -203,7 +203,7 @@ public class DashboardResourceTest extends EntityResourceTest<Dashboard, CreateD
@Override
public EntityReference getContainer() {
return SUPERSET_REFERENCE;
return METABASE_REFERENCE;
}
@Override

View File

@ -42,7 +42,7 @@ import org.openmetadata.schema.api.services.CreateDashboardService.DashboardServ
import org.openmetadata.schema.entity.data.Chart;
import org.openmetadata.schema.entity.services.DashboardService;
import org.openmetadata.schema.services.connections.dashboard.LookerConnection;
import org.openmetadata.schema.services.connections.dashboard.SupersetConnection;
import org.openmetadata.schema.services.connections.dashboard.MetabaseConnection;
import org.openmetadata.schema.type.ChangeDescription;
import org.openmetadata.schema.type.DashboardConnection;
import org.openmetadata.service.Entity;
@ -85,15 +85,15 @@ public class DashboardServiceResourceTest extends EntityResourceTest<DashboardSe
void post_validService_as_admin_200_ok(TestInfo test) throws IOException, URISyntaxException {
// Create dashboard service with different optional fields
Map<String, String> authHeaders = ADMIN_AUTH_HEADERS;
SupersetConnection supersetConnection =
new SupersetConnection()
MetabaseConnection metabaseConnection =
new MetabaseConnection()
.withHostPort(new URI("http://localhost:8080"))
.withUsername("user")
.withPassword("password");
createAndCheckEntity(createRequest(test, 1).withDescription(null), authHeaders);
createAndCheckEntity(createRequest(test, 2).withDescription("description"), authHeaders);
createAndCheckEntity(
createRequest(test, 3).withConnection(new DashboardConnection().withConfig(supersetConnection)), authHeaders);
createRequest(test, 3).withConnection(new DashboardConnection().withConfig(metabaseConnection)), authHeaders);
}
@Test
@ -102,7 +102,7 @@ public class DashboardServiceResourceTest extends EntityResourceTest<DashboardSe
DashboardConnection dashboardConnection =
new DashboardConnection()
.withConfig(
new SupersetConnection()
new MetabaseConnection()
.withHostPort(new URI("http://localhost:8080"))
.withUsername("user")
.withPassword(secretPassword));
@ -114,7 +114,7 @@ public class DashboardServiceResourceTest extends EntityResourceTest<DashboardSe
DashboardConnection dashboardConnection1 =
new DashboardConnection()
.withConfig(
new SupersetConnection()
new MetabaseConnection()
.withHostPort(new URI("http://localhost:9000"))
.withUsername("user1")
.withPassword(secretPassword));
@ -133,17 +133,17 @@ public class DashboardServiceResourceTest extends EntityResourceTest<DashboardSe
updatedService = getEntity(service.getId(), TEST_AUTH_HEADERS);
assertNotNull(updatedService.getConnection());
assertNotNull(
JsonUtils.readValue(JsonUtils.pojoToJson(updatedService.getConnection().getConfig()), SupersetConnection.class)
JsonUtils.readValue(JsonUtils.pojoToJson(updatedService.getConnection().getConfig()), MetabaseConnection.class)
.getHostPort());
assertNull(
JsonUtils.readValue(JsonUtils.pojoToJson(updatedService.getConnection().getConfig()), SupersetConnection.class)
JsonUtils.readValue(JsonUtils.pojoToJson(updatedService.getConnection().getConfig()), MetabaseConnection.class)
.getUsername());
SupersetConnection supersetConnection =
new SupersetConnection()
MetabaseConnection metabaseConnection =
new MetabaseConnection()
.withHostPort(new URI("http://localhost:8080"))
.withUsername("user")
.withPassword(secretPassword);
DashboardConnection dashboardConnection2 = new DashboardConnection().withConfig(supersetConnection);
DashboardConnection dashboardConnection2 = new DashboardConnection().withConfig(metabaseConnection);
update = createPutRequest(test).withDescription("description1").withConnection(dashboardConnection2);
fieldUpdated(change, "connection", dashboardConnection1, dashboardConnection2);
@ -158,11 +158,11 @@ public class DashboardServiceResourceTest extends EntityResourceTest<DashboardSe
try {
return new CreateDashboardService()
.withName(name)
.withServiceType(CreateDashboardService.DashboardServiceType.Superset)
.withServiceType(CreateDashboardService.DashboardServiceType.Metabase)
.withConnection(
new DashboardConnection()
.withConfig(
new SupersetConnection()
new MetabaseConnection()
.withHostPort(new URI("http://localhost:8080"))
.withUsername("admin")
.withPassword("admin")));
@ -178,11 +178,11 @@ public class DashboardServiceResourceTest extends EntityResourceTest<DashboardSe
try {
return new CreateDashboardService()
.withName(name)
.withServiceType(CreateDashboardService.DashboardServiceType.Superset)
.withServiceType(CreateDashboardService.DashboardServiceType.Metabase)
.withConnection(
new DashboardConnection()
.withConfig(
new SupersetConnection()
new MetabaseConnection()
.withHostPort(new URI("http://localhost:8080"))
.withUsername("admin")
.withPassword(Fernet.getInstance().encrypt(secretPassword.toLowerCase(Locale.ROOT)))));
@ -240,24 +240,23 @@ public class DashboardServiceResourceTest extends EntityResourceTest<DashboardSe
DashboardServiceType dashboardServiceType,
Map<String, String> authHeaders) {
if (expectedDashboardConnection != null && actualDashboardConnection != null) {
if (dashboardServiceType == CreateDashboardService.DashboardServiceType.Superset) {
SupersetConnection expectedSupersetConnection = (SupersetConnection) expectedDashboardConnection.getConfig();
SupersetConnection actualSupersetConnection;
if (actualDashboardConnection.getConfig() instanceof SupersetConnection) {
actualSupersetConnection = (SupersetConnection) actualDashboardConnection.getConfig();
if (dashboardServiceType == CreateDashboardService.DashboardServiceType.Metabase) {
MetabaseConnection expectedmetabaseConnection = (MetabaseConnection) expectedDashboardConnection.getConfig();
MetabaseConnection actualMetabaseConnection;
if (actualDashboardConnection.getConfig() instanceof MetabaseConnection) {
actualMetabaseConnection = (MetabaseConnection) actualDashboardConnection.getConfig();
} else {
actualSupersetConnection =
JsonUtils.convertValue(actualDashboardConnection.getConfig(), SupersetConnection.class);
actualMetabaseConnection =
JsonUtils.convertValue(actualDashboardConnection.getConfig(), MetabaseConnection.class);
}
assertEquals(expectedSupersetConnection.getHostPort(), actualSupersetConnection.getHostPort());
assertEquals(expectedSupersetConnection.getProvider(), actualSupersetConnection.getProvider());
assertEquals(expectedmetabaseConnection.getHostPort(), actualMetabaseConnection.getHostPort());
if (ADMIN_AUTH_HEADERS.equals(authHeaders) || INGESTION_BOT_AUTH_HEADERS.equals(authHeaders)) {
assertEquals(expectedSupersetConnection.getUsername(), actualSupersetConnection.getUsername());
assertTrue(actualSupersetConnection.getPassword().startsWith("secret:/openmetadata/dashboard/"));
assertTrue(actualSupersetConnection.getPassword().endsWith("/password"));
assertEquals(expectedmetabaseConnection.getUsername(), actualMetabaseConnection.getUsername());
assertTrue(actualMetabaseConnection.getPassword().startsWith("secret:/openmetadata/dashboard/"));
assertTrue(actualMetabaseConnection.getPassword().endsWith("/password"));
} else {
assertNull(actualSupersetConnection.getUsername());
assertNull(actualSupersetConnection.getPassword());
assertNull(actualMetabaseConnection.getUsername());
assertNull(actualMetabaseConnection.getPassword());
}
}
}
@ -266,18 +265,18 @@ public class DashboardServiceResourceTest extends EntityResourceTest<DashboardSe
public void setupDashboardServices(TestInfo test) throws HttpResponseException, URISyntaxException {
DashboardServiceResourceTest dashboardResourceTest = new DashboardServiceResourceTest();
CreateDashboardService createDashboardService =
dashboardResourceTest.createRequest("superset", "", "", null).withServiceType(DashboardServiceType.Superset);
dashboardResourceTest.createRequest("superset", "", "", null).withServiceType(DashboardServiceType.Metabase);
DashboardConnection dashboardConnection =
new DashboardConnection()
.withConfig(
new SupersetConnection()
new MetabaseConnection()
.withHostPort(new URI("http://localhost:8080"))
.withPassword("test")
.withUsername("admin"));
createDashboardService.withConnection(dashboardConnection);
DashboardService dashboardService =
new DashboardServiceResourceTest().createEntity(createDashboardService, ADMIN_AUTH_HEADERS);
SUPERSET_REFERENCE = dashboardService.getEntityReference();
METABASE_REFERENCE = dashboardService.getEntityReference();
CreateDashboardService lookerDashboardService =
dashboardResourceTest.createRequest("looker", "", "", null).withServiceType(DashboardServiceType.Looker);
@ -294,7 +293,7 @@ public class DashboardServiceResourceTest extends EntityResourceTest<DashboardSe
CHART_REFERENCES = new ArrayList<>();
ChartResourceTest chartResourceTest = new ChartResourceTest();
for (int i = 0; i < 3; i++) {
CreateChart createChart = chartResourceTest.createRequest(test, i).withService(SUPERSET_REFERENCE);
CreateChart createChart = chartResourceTest.createRequest(test, i).withService(METABASE_REFERENCE);
Chart chart = chartResourceTest.createEntity(createChart, ADMIN_AUTH_HEADERS);
CHART_REFERENCES.add(chart.getEntityReference());
}

View File

@ -334,7 +334,7 @@ public class IngestionPipelineResourceTest extends EntityResourceTest<IngestionP
void put_IngestionPipelineForDashboardSourceUpdate_200(TestInfo test) throws IOException {
CreateIngestionPipeline request =
createRequest(test)
.withService(reduceEntityReference(SUPERSET_REFERENCE))
.withService(reduceEntityReference(METABASE_REFERENCE))
.withDescription("description")
.withSourceConfig(DASHBOARD_METADATA_CONFIG)
.withAirflowConfig(new AirflowConfig().withScheduleInterval("5 * * * *").withStartDate(START_DATE));
@ -353,7 +353,7 @@ public class IngestionPipelineResourceTest extends EntityResourceTest<IngestionP
.withScheduleInterval(expectedScheduleInterval)
.withStartDate(startDate)),
ADMIN_AUTH_HEADERS);
String expectedFQN = FullyQualifiedName.build(SUPERSET_REFERENCE.getName(), ingestion.getName());
String expectedFQN = FullyQualifiedName.build(METABASE_REFERENCE.getName(), ingestion.getName());
assertEquals(startDate, ingestion.getAirflowConfig().getStartDate());
assertEquals(pipelineConcurrency, ingestion.getAirflowConfig().getConcurrency());
assertEquals(expectedFQN, ingestion.getFullyQualifiedName());

View File

@ -56,7 +56,7 @@ import org.openmetadata.schema.entity.services.MetadataConnection;
import org.openmetadata.schema.entity.teams.User;
import org.openmetadata.schema.entity.type.CustomProperty;
import org.openmetadata.schema.security.credentials.AWSCredentials;
import org.openmetadata.schema.services.connections.dashboard.SupersetConnection;
import org.openmetadata.schema.services.connections.dashboard.MetabaseConnection;
import org.openmetadata.schema.services.connections.database.BigQueryConnection;
import org.openmetadata.schema.services.connections.database.MysqlConnection;
import org.openmetadata.schema.services.connections.database.RedshiftConnection;
@ -100,7 +100,7 @@ public final class TestUtils {
public static PipelineConnection GLUE_CONNECTION;
public static MessagingConnection KAFKA_CONNECTION;
public static DashboardConnection SUPERSET_CONNECTION;
public static DashboardConnection METABASE_CONNECTION;
public static final MlModelConnection MLFLOW_CONNECTION;
public static MetadataConnection AMUNDSEN_CONNECTION;
@ -163,15 +163,15 @@ public final class TestUtils {
static {
try {
SUPERSET_CONNECTION =
METABASE_CONNECTION =
new DashboardConnection()
.withConfig(
new SupersetConnection()
new MetabaseConnection()
.withHostPort(new URI("http://localhost:8080"))
.withUsername("admin")
.withPassword("admin"));
} catch (URISyntaxException e) {
SUPERSET_CONNECTION = null;
METABASE_CONNECTION = null;
e.printStackTrace();
}
}

View File

@ -28,22 +28,20 @@
"format": "uri",
"default": "http://localhost:8088"
},
"username": {
"title": "Username",
"description": "Username for Superset.",
"type": "string"
},
"password": {
"title": "Password",
"description": "Password for Superset.",
"type": "string",
"format": "password"
},
"provider": {
"title": "Provider",
"description": "Authentication provider for the Superset service. For basic user/password authentication, the default value `db` can be used. This parameter is used internally to connect to Superset's REST API.",
"type": "string",
"default": "db"
"connection":{
"title": "Superset Connection",
"description": "Choose between API or database connection fetch metadata from superset.",
"oneOf": [
{
"$ref": "../../../utils/supersetApiConnection.json"
},
{
"$ref": "../database/postgresConnection.json"
},
{
"$ref": "../database/mysqlConnection.json"
}
]
},
"connectionOptions": {
"title": "Connection Options",
@ -56,5 +54,5 @@
}
},
"additionalProperties": false,
"required": ["hostPort", "username"]
"required": ["hostPort","connection"]
}

View File

@ -0,0 +1,37 @@
{
"$id": "https://open-metadata.org/schema/entity/services/connections/dashboard/supersetConnection.json",
"$schema": "http://json-schema.org/draft-07/schema#",
"title": "SupersetAPIConnection",
"description": "Superset API Connection Config",
"type": "object",
"definitions": {
"apiProvider": {
"title": "Provider",
"description": "Authentication provider for the Superset service. For basic user/password authentication, the default value `db` can be used. This parameter is used internally to connect to Superset's REST API.",
"type": "string",
"enum": ["db","ldap"],
"default": "db"
}
},
"properties": {
"provider": {
"title": "Provider",
"description": "Authentication provider for the Superset service. For basic user/password authentication, the default value `db` can be used. This parameter is used internally to connect to Superset's REST API.",
"$ref": "#/definitions/apiProvider",
"default": "db"
},
"username": {
"title": "Username",
"description": "Username for Superset.",
"type": "string"
},
"password": {
"title": "Password",
"description": "Password for Superset.",
"type": "string",
"format": "password"
}
},
"additionalProperties": false,
"required": ["provider","password","username"]
}

View File

@ -96,7 +96,7 @@ import {
} from '../constants/Services.constant';
import { PROMISE_STATE } from '../enums/common.enum';
import { ServiceCategory } from '../enums/service.enum';
import { ConnectionType } from '../generated/api/services/ingestionPipelines/testServiceConnection';
import { ConnectionTypeEnum } from '../generated/api/services/ingestionPipelines/testServiceConnection';
import { Database } from '../generated/entity/data/database';
import { MlModelServiceType } from '../generated/entity/data/mlmodel';
import {
@ -602,14 +602,14 @@ export const shouldTestConnection = (serviceType: string) => {
export const getTestConnectionType = (serviceCat: ServiceCategory) => {
switch (serviceCat) {
case ServiceCategory.MESSAGING_SERVICES:
return ConnectionType.Messaging;
return ConnectionTypeEnum.Messaging;
case ServiceCategory.DASHBOARD_SERVICES:
return ConnectionType.Dashboard;
return ConnectionTypeEnum.Dashboard;
case ServiceCategory.PIPELINE_SERVICES:
return ConnectionType.Pipeline;
return ConnectionTypeEnum.Pipeline;
case ServiceCategory.DATABASE_SERVICES:
default:
return ConnectionType.Database;
return ConnectionTypeEnum.Database;
}
};