Superset Unit Tests (#10078)

This commit is contained in:
Mayur Singal 2023-02-09 19:45:24 +05:30 committed by GitHub
parent 9bb99407e5
commit 0bea02c202
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 1054 additions and 94 deletions

View File

@ -17,11 +17,7 @@ from typing import Iterable, List, Optional
from metadata.generated.schema.api.data.createChart import CreateChartRequest
from metadata.generated.schema.api.data.createDashboard import CreateDashboardRequest
from metadata.generated.schema.api.lineage.addLineage import AddLineageRequest
from metadata.generated.schema.entity.data.chart import ChartType
from metadata.generated.schema.entity.data.dashboard import (
Dashboard as Lineage_Dashboard,
)
from metadata.generated.schema.entity.data.table import Table
from metadata.generated.schema.type.entityReference import EntityReference
from metadata.ingestion.source.dashboard.superset.mixin import SupersetSourceMixin
@ -86,48 +82,12 @@ class SupersetAPISource(SupersetSourceMixin):
),
)
def yield_dashboard_lineage_details(
self, dashboard_details: dict, db_service_name: str
) -> Optional[Iterable[AddLineageRequest]]:
"""
Get lineage between dashboard and data sources
"""
for chart_id in self._get_charts_of_dashboard(dashboard_details):
chart_json = self.all_charts.get(chart_id)
if chart_json:
datasource_fqn = (
self._get_datasource_fqn(
chart_json.get("datasource_id"), db_service_name
)
if chart_json.get("datasource_id")
else None
)
if not datasource_fqn:
continue
from_entity = self.metadata.get_by_name(
entity=Table,
fqn=datasource_fqn,
)
try:
dashboard_fqn = fqn.build(
self.metadata,
entity_type=Lineage_Dashboard,
service_name=self.config.serviceName,
dashboard_name=str(dashboard_details["id"]),
)
to_entity = self.metadata.get_by_name(
entity=Lineage_Dashboard,
fqn=dashboard_fqn,
)
if from_entity and to_entity:
yield self._get_add_lineage_request(
to_entity=to_entity, from_entity=from_entity
)
except Exception as exc:
logger.debug(traceback.format_exc())
logger.error(
f"Error to yield dashboard lineage details for DB service name [{db_service_name}]: {exc}"
)
def _get_datasource_fqn_for_lineage(self, chart_json, db_service_name):
return (
self._get_datasource_fqn(chart_json.get("datasource_id"), db_service_name)
if chart_json.get("datasource_id")
else None
)
def yield_dashboard_chart(
self, dashboard_details: dict

View File

@ -19,11 +19,7 @@ from sqlalchemy.engine import Engine
from metadata.generated.schema.api.data.createChart import CreateChartRequest
from metadata.generated.schema.api.data.createDashboard import CreateDashboardRequest
from metadata.generated.schema.api.lineage.addLineage import AddLineageRequest
from metadata.generated.schema.entity.data.chart import ChartType
from metadata.generated.schema.entity.data.dashboard import (
Dashboard as Lineage_Dashboard,
)
from metadata.generated.schema.entity.data.table import Table
from metadata.generated.schema.entity.services.connections.metadata.openMetadataConnection import (
OpenMetadataConnection,
@ -61,7 +57,7 @@ class SupersetDBSource(SupersetSourceMixin):
"""
charts = self.engine.execute(FETCH_ALL_CHARTS)
for chart in charts:
self.all_charts[chart.id] = dict(chart)
self.all_charts[chart["id"]] = dict(chart)
def get_dashboards_list(self) -> Optional[List[object]]:
"""
@ -81,7 +77,7 @@ class SupersetDBSource(SupersetSourceMixin):
name=dashboard_details["id"],
displayName=dashboard_details["dashboard_title"],
description="",
dashboardUrl=f"/superset/dashboard/{dashboard_details['id']}",
dashboardUrl=f"/superset/dashboard/{dashboard_details['id']}/",
owner=self.get_owner_details(dashboard_details),
charts=[
EntityReference(id=chart.id.__root__, type="chart")
@ -92,46 +88,12 @@ class SupersetDBSource(SupersetSourceMixin):
),
)
def yield_dashboard_lineage_details(
self, dashboard_details: dict, db_service_name: str
) -> Optional[Iterable[AddLineageRequest]]:
"""
Get lineage between dashboard and data sources
"""
for chart_id in self._get_charts_of_dashboard(dashboard_details):
chart_json = self.all_charts.get(chart_id)
if chart_json:
datasource_fqn = (
self._get_datasource_fqn(chart_json, db_service_name)
if chart_json.get("table_name")
else None
)
if not datasource_fqn:
continue
from_entity = self.metadata.get_by_name(
entity=Table,
fqn=datasource_fqn,
)
try:
dashboard_fqn = fqn.build(
self.metadata,
entity_type=Lineage_Dashboard,
service_name=self.config.serviceName,
dashboard_name=str(dashboard_details["id"]),
)
to_entity = self.metadata.get_by_name(
entity=Lineage_Dashboard,
fqn=dashboard_fqn,
)
if from_entity and to_entity:
yield self._get_add_lineage_request(
to_entity=to_entity, from_entity=from_entity
)
except Exception as exc:
logger.debug(traceback.format_exc())
logger.error(
f"Error to yield dashboard lineage details for DB service name [{db_service_name}]: {exc}"
)
def _get_datasource_fqn_for_lineage(self, chart_json, db_service_name):
return (
self._get_datasource_fqn(chart_json, db_service_name)
if chart_json.get("table_name")
else None
)
def yield_dashboard_chart(
self, dashboard_details: dict
@ -180,7 +142,7 @@ class SupersetDBSource(SupersetSourceMixin):
service_name=db_service_name,
)
return dataset_fqn
except KeyError as err:
except Exception as err:
logger.debug(traceback.format_exc())
logger.warning(
f"Failed to fetch Datasource with id [{chart_json.get('table_name')}]: {err}"

View File

@ -12,8 +12,14 @@
Superset mixin module
"""
import json
from typing import List
import traceback
from typing import Iterable, List, Optional
from metadata.generated.schema.api.lineage.addLineage import AddLineageRequest
from metadata.generated.schema.entity.data.dashboard import (
Dashboard as Lineage_Dashboard,
)
from metadata.generated.schema.entity.data.table import Table
from metadata.generated.schema.entity.services.connections.dashboard.supersetConnection import (
SupersetConnection,
)
@ -29,6 +35,10 @@ from metadata.generated.schema.metadataIngestion.workflow import (
from metadata.generated.schema.type.entityReference import EntityReference
from metadata.ingestion.api.source import InvalidSourceException, SourceStatus
from metadata.ingestion.source.dashboard.dashboard_service import DashboardServiceSource
from metadata.utils import fqn
from metadata.utils.logger import ingestion_logger
logger = ingestion_logger()
class SupersetSourceMixin(DashboardServiceSource):
@ -102,3 +112,42 @@ class SupersetSourceMixin(DashboardServiceSource):
if key.startswith("CHART-") and value.get("meta", {}).get("chartId")
]
return []
def yield_dashboard_lineage_details(
self, dashboard_details: dict, db_service_name: str
) -> Optional[Iterable[AddLineageRequest]]:
"""
Get lineage between dashboard and data sources
"""
for chart_id in self._get_charts_of_dashboard(dashboard_details):
chart_json = self.all_charts.get(chart_id)
if chart_json:
datasource_fqn = self._get_datasource_fqn_for_lineage(
chart_json, db_service_name
)
if not datasource_fqn:
continue
from_entity = self.metadata.get_by_name(
entity=Table,
fqn=datasource_fqn,
)
try:
dashboard_fqn = fqn.build(
self.metadata,
entity_type=Lineage_Dashboard,
service_name=self.config.serviceName,
dashboard_name=str(dashboard_details["id"]),
)
to_entity = self.metadata.get_by_name(
entity=Lineage_Dashboard,
fqn=dashboard_fqn,
)
if from_entity and to_entity:
yield self._get_add_lineage_request(
to_entity=to_entity, from_entity=from_entity
)
except Exception as exc:
logger.debug(traceback.format_exc())
logger.error(
f"Error to yield dashboard lineage details for DB service name [{db_service_name}]: {exc}"
)

File diff suppressed because one or more lines are too long

View File

@ -0,0 +1,352 @@
# Copyright 2021 Collate
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Test superset source
"""
import json
import uuid
from pathlib import Path
from unittest import TestCase
from unittest.mock import patch
from sqlalchemy.engine import Engine
from metadata.generated.schema.api.data.createChart import CreateChartRequest
from metadata.generated.schema.api.data.createDashboard import CreateDashboardRequest
from metadata.generated.schema.entity.data.chart import Chart, ChartType
from metadata.generated.schema.metadataIngestion.workflow import (
OpenMetadataWorkflowConfig,
)
from metadata.generated.schema.type.entityReference import EntityReference
from metadata.ingestion.api.source import InvalidSourceException
from metadata.ingestion.ometa.mixins.server_mixin import OMetaServerMixin
from metadata.ingestion.ometa.ometa_api import OpenMetadata
from metadata.ingestion.source.dashboard.dashboard_service import DashboardServiceSource
from metadata.ingestion.source.dashboard.superset.api_source import SupersetAPISource
from metadata.ingestion.source.dashboard.superset.client import SupersetAPIClient
from metadata.ingestion.source.dashboard.superset.db_source import SupersetDBSource
from metadata.ingestion.source.dashboard.superset.metadata import SupersetSource
mock_file_path = (
Path(__file__).parent.parent.parent / "resources/datasets/superset_dataset.json"
)
with open(mock_file_path, encoding="UTF-8") as file:
mock_data: dict = json.load(file)
MOCK_DASHBOARD_RESP = mock_data["dashboard"]
MOCK_DASHBOARD = MOCK_DASHBOARD_RESP["result"][0]
MOCK_CHART_RESP = mock_data["chart"]
MOCK_CHART = MOCK_CHART_RESP["result"][0]
MOCK_CHART_DB = mock_data["chart-db"][0]
MOCK_DASHBOARD_DB = mock_data["dashboard-db"]
MOCK_SUPERSET_API_CONFIG = {
"source": {
"type": "superset",
"serviceName": "test_supserset",
"serviceConnection": {
"config": {
"hostPort": "https://my-superset.com",
"type": "Superset",
"connection": {
"username": "admin",
"password": "admin",
"provider": "db",
},
}
},
"sourceConfig": {
"config": {
"type": "DashboardMetadata",
}
},
},
"sink": {"type": "metadata-rest", "config": {}},
"workflowConfig": {
"openMetadataServerConfig": {
"hostPort": "http://localhost:8585/api",
"authProvider": "openmetadata",
"securityConfig": {"jwtToken": "token"},
},
},
}
MOCK_SUPERSET_DB_CONFIG = {
"source": {
"type": "superset",
"serviceName": "test_supserset",
"serviceConnection": {
"config": {
"hostPort": "https://my-superset.com",
"type": "Superset",
"connection": {
"type": "Postgres",
"username": "superset",
"password": "superset",
"hostPort": "localhost:5432",
"database": "superset",
},
}
},
"sourceConfig": {
"config": {
"type": "DashboardMetadata",
}
},
},
"sink": {"type": "metadata-rest", "config": {}},
"workflowConfig": {
"openMetadataServerConfig": {
"hostPort": "http://localhost:8585/api",
"authProvider": "openmetadata",
"securityConfig": {"jwtToken": "token"},
},
},
}
EXPECTED_DASH_SERVICE = EntityReference(id=uuid.uuid4(), type="dashboardService")
EXPECTED_USER = EntityReference(id=uuid.uuid4(), type="user")
EXPECTED_CHATRT_ENTITY = [
Chart(
id=uuid.uuid4(),
name=37,
service=EXPECTED_DASH_SERVICE,
)
]
EXPECTED_DASH = CreateDashboardRequest(
name=14,
displayName="My DASH",
description="",
dashboardUrl="/superset/dashboard/14/",
owner=EXPECTED_USER,
charts=[
EntityReference(id=chart.id.__root__, type="chart")
for chart in EXPECTED_CHATRT_ENTITY
],
service=EXPECTED_DASH_SERVICE,
)
EXPECTED_CHART = CreateChartRequest(
name=37,
displayName="% Rural",
description="TEST DESCRIPTION",
chartType=ChartType.Other.value,
chartUrl="/explore/?slice_id=37",
service=EXPECTED_DASH_SERVICE,
)
EXPECTED_ALL_CHARTS = {37: MOCK_CHART}
EXPECTED_ALL_CHARTS_DB = {37: MOCK_CHART_DB}
NOT_FOUND_RESP = {"message": "Not found"}
EXPECTED_DATASET_FQN = "demo.examples.main.wb_health_population"
class SupersetUnitTest(TestCase):
"""
Validate how we work with Superset metadata
"""
def __init__(self, methodName) -> None:
super().__init__(methodName)
self.config = OpenMetadataWorkflowConfig.parse_obj(MOCK_SUPERSET_API_CONFIG)
with patch.object(
DashboardServiceSource, "test_connection", return_value=False
), patch.object(OMetaServerMixin, "validate_versions", return_value=True):
# This already validates that the source can be initialized
self.superset_api: SupersetSource = SupersetSource.create(
MOCK_SUPERSET_API_CONFIG["source"],
self.config.workflowConfig.openMetadataServerConfig,
)
self.assertEqual(type(self.superset_api), SupersetAPISource)
self.superset_api.context.__dict__[
"dashboard_service"
] = EXPECTED_DASH_SERVICE
with patch.object(
SupersetAPIClient, "fetch_total_charts", return_value=1
), patch.object(
SupersetAPIClient, "fetch_charts", return_value=MOCK_CHART_RESP
):
self.superset_api.prepare()
self.assertEqual(EXPECTED_ALL_CHARTS, self.superset_api.all_charts)
with patch.object(
DashboardServiceSource, "test_connection", return_value=False
), patch.object(OMetaServerMixin, "validate_versions", return_value=True):
# This already validates that the source can be initialized
self.superset_db: SupersetSource = SupersetSource.create(
MOCK_SUPERSET_DB_CONFIG["source"],
self.config.workflowConfig.openMetadataServerConfig,
)
self.assertEqual(type(self.superset_db), SupersetDBSource)
self.superset_db.context.__dict__[
"dashboard_service"
] = EXPECTED_DASH_SERVICE
with patch.object(Engine, "execute", return_value=mock_data["chart-db"]):
self.superset_db.prepare()
self.assertEqual(EXPECTED_ALL_CHARTS_DB, self.superset_db.all_charts)
def test_create(self):
"""
An invalid config raises an error
"""
not_superset_source = {
"type": "mysql",
"serviceName": "mysql_local",
"serviceConnection": {
"config": {
"type": "Mysql",
"username": "openmetadata_user",
"password": "openmetadata_password",
"hostPort": "localhost:3306",
"databaseSchema": "openmetadata_db",
}
},
"sourceConfig": {
"config": {
"type": "DatabaseMetadata",
}
},
}
self.assertRaises(
InvalidSourceException,
SupersetSource.create,
not_superset_source,
self.config.workflowConfig.openMetadataServerConfig,
)
def test_api_perpare(self):
pass
def test_api_get_dashboards_list(self):
"""
Mock the client and check that we get a list
"""
with patch.object(
SupersetAPIClient, "fetch_total_dashboards", return_value=1
), patch.object(
SupersetAPIClient, "fetch_dashboards", return_value=MOCK_DASHBOARD_RESP
):
dashboard_list = self.superset_api.get_dashboards_list()
self.assertEqual(list(dashboard_list), [MOCK_DASHBOARD])
def test_charts_of_dashboard(self):
"""
Mock the client and check that we get a list
"""
result = self.superset_api._get_charts_of_dashboard( # pylint: disable=protected-access
MOCK_DASHBOARD
)
self.assertEqual(result, [37])
def test_dashboard_name(self):
dashboard_name = self.superset_api.get_dashboard_name(MOCK_DASHBOARD)
self.assertEqual(dashboard_name, MOCK_DASHBOARD["dashboard_title"])
def test_yield_dashboard(self):
# TEST API SOURCE
with patch.object(
SupersetAPISource, "_get_user_by_email", return_value=EXPECTED_USER
):
self.superset_api.context.__dict__["charts"] = EXPECTED_CHATRT_ENTITY
dashboard = self.superset_api.yield_dashboard(MOCK_DASHBOARD)
self.assertEqual(list(dashboard), [EXPECTED_DASH])
# TEST DB SOURCE
with patch.object(
SupersetDBSource, "_get_user_by_email", return_value=EXPECTED_USER
):
self.superset_db.context.__dict__["charts"] = EXPECTED_CHATRT_ENTITY
dashboard = self.superset_db.yield_dashboard(MOCK_DASHBOARD_DB)
self.assertEqual(list(dashboard), [EXPECTED_DASH])
def test_yield_dashboard_chart(self):
# TEST API SOURCE
dashboard_charts = self.superset_api.yield_dashboard_chart(MOCK_DASHBOARD)
self.assertEqual(list(dashboard_charts), [EXPECTED_CHART])
# TEST DB SOURCE
dashboard_charts = self.superset_db.yield_dashboard_chart(MOCK_DASHBOARD_DB)
self.assertEqual(list(dashboard_charts), [EXPECTED_CHART])
def test_api_get_datasource_fqn(self):
"""
Test generated datasource fqn for api source
"""
with patch.object(
OpenMetadata, "es_search_from_fqn", return_value=None
), patch.object(
SupersetAPIClient,
"fetch_datasource",
return_value=mock_data.get("datasource"),
), patch.object(
SupersetAPIClient, "fetch_database", return_value=mock_data.get("database")
):
fqn = self.superset_api._get_datasource_fqn( # pylint: disable=protected-access
1, "demo"
)
self.assertEqual(fqn, EXPECTED_DATASET_FQN)
with patch.object(
OpenMetadata, "es_search_from_fqn", return_value=None
), patch.object(
SupersetAPIClient,
"fetch_datasource",
return_value=mock_data.get("datasource"),
), patch.object(
SupersetAPIClient, "fetch_database", return_value=NOT_FOUND_RESP
):
fqn = self.superset_api._get_datasource_fqn( # pylint: disable=protected-access
1, "demo"
)
self.assertEqual(fqn, None)
def test_db_get_datasource_fqn_for_lineage(self):
fqn = self.superset_db._get_datasource_fqn_for_lineage( # pylint: disable=protected-access
MOCK_CHART_DB, "demo"
)
self.assertEqual(fqn, EXPECTED_DATASET_FQN)
def test_db_get_database_name(self):
sqa_str1 = "postgres://user:pass@localhost:8888/database"
self.assertEqual(
self.superset_db._get_database_name( # pylint: disable=protected-access
sqa_str1
),
"database",
)
sqa_str2 = "sqlite:////app/superset_home/superset.db"
self.assertEqual(
self.superset_db._get_database_name( # pylint: disable=protected-access
sqa_str2
),
"superset.db",
)