Pere Menal-Ferrer 6ea630d7ef
DevEx: Ingestion development improvement (focus on unit testing) (#21362)
* Fix test_amundsen: missing None

* Fix custom_basemodel_validation to check model_fields on type(values) to prevent noisy warnings

* Refactor referencedByQueries validation to use field_validator as per deprecation warning

* Update ColumnJson to use model_rebuild rather as replacement for forward reference updates as per deprecation warning

* Move superset test to integration test as they are using testcontainers

* Add install_dev_env target to Makefile for development dependencies

* Add test-unit as extra in setup.py

* Skip failing IT test. Requires further investigation.
2025-05-26 10:38:17 +02:00

782 lines
30 KiB
Python

# Copyright 2025 Collate
# Licensed under the Collate Community License, Version 1.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# https://github.com/open-metadata/OpenMetadata/blob/main/ingestion/LICENSE
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Test superset source
"""
import json
import uuid
from pathlib import Path
from unittest import TestCase
from unittest.mock import patch
import sqlalchemy
from collate_sqllineage.core.models import Column, Schema, SubQuery, Table
from testcontainers.core.generic import DockerContainer
from testcontainers.postgres import PostgresContainer
from _openmetadata_testutils.postgres.conftest import postgres_container
from metadata.generated.schema.api.data.createChart import CreateChartRequest
from metadata.generated.schema.api.data.createDashboard import CreateDashboardRequest
from metadata.generated.schema.entity.data.chart import Chart, ChartType
from metadata.generated.schema.entity.data.dashboard import DashboardType
from metadata.generated.schema.entity.services.connections.database.common.basicAuth import (
BasicAuth,
)
from metadata.generated.schema.entity.services.connections.database.mysqlConnection import (
MysqlConnection,
)
from metadata.generated.schema.entity.services.connections.database.postgresConnection import (
PostgresConnection,
)
from metadata.generated.schema.entity.services.dashboardService import (
DashboardConnection,
DashboardService,
DashboardServiceType,
)
from metadata.generated.schema.entity.services.databaseService import (
DatabaseConnection,
DatabaseService,
DatabaseServiceType,
)
from metadata.generated.schema.metadataIngestion.workflow import (
OpenMetadataWorkflowConfig,
)
from metadata.generated.schema.type.basic import (
EntityName,
FullyQualifiedEntityName,
SourceUrl,
)
from metadata.generated.schema.type.entityReference import EntityReference
from metadata.generated.schema.type.entityReferenceList import EntityReferenceList
from metadata.ingestion.api.steps import InvalidSourceException
from metadata.ingestion.lineage.parser import LineageParser
from metadata.ingestion.ometa.ometa_api import OpenMetadata
from metadata.ingestion.source.dashboard.superset.api_source import SupersetAPISource
from metadata.ingestion.source.dashboard.superset.db_source import SupersetDBSource
from metadata.ingestion.source.dashboard.superset.metadata import SupersetSource
from metadata.ingestion.source.dashboard.superset.models import (
FetchChart,
FetchColumn,
FetchDashboard,
SupersetChart,
SupersetDashboardCount,
)
mock_file_path = Path(__file__).parent / "resources/superset_dataset.json"
with open(mock_file_path, encoding="UTF-8") as file:
mock_data: dict = json.load(file)
MOCK_DASHBOARD_RESP = SupersetDashboardCount(**mock_data["dashboard"])
MOCK_DASHBOARD = MOCK_DASHBOARD_RESP.result[0]
PUBLISHED_DASHBOARD_COUNT = 9
PUBLISHED_DASHBOARD_NAME = "Unicode Test"
MOCK_CHART_RESP = SupersetChart(**mock_data["chart"])
MOCK_CHART = MOCK_CHART_RESP.result[0]
MOCK_CHART_DB = FetchChart(**mock_data["chart-db"][0])
MOCK_CHART_DB_2 = FetchChart(**mock_data["chart-db"][1])
MOCK_DASHBOARD_DB = FetchDashboard(**mock_data["dashboard-db"])
EXPECTED_DASH_SERVICE = DashboardService(
id="c3eb265f-5445-4ad3-ba5e-797d3a3071bb",
fullyQualifiedName=FullyQualifiedEntityName("test_supserset"),
name="test_supserset",
connection=DashboardConnection(),
serviceType=DashboardServiceType.Superset,
)
EXPECTED_USER = EntityReferenceList(
root=[EntityReference(id="81af89aa-1bab-41aa-a567-5e68f78acdc0", type="user")]
)
MOCK_DB_MYSQL_SERVICE_1 = DatabaseService(
id="c3eb265f-5445-4ad3-ba5e-797d3a307122",
fullyQualifiedName=FullyQualifiedEntityName("test_mysql"),
name="test_mysql",
connection=DatabaseConnection(
config=MysqlConnection(
username="user",
authType=BasicAuth(password="pass"),
hostPort="localhost:3306",
)
),
serviceType=DatabaseServiceType.Mysql,
)
MOCK_DB_MYSQL_SERVICE_2 = DatabaseService(
id="c3eb265f-5445-4ad3-ba5e-797d3a307122",
fullyQualifiedName=FullyQualifiedEntityName("test_mysql"),
name="test_mysql",
connection=DatabaseConnection(
config=MysqlConnection(
username="user",
authType=BasicAuth(password="pass"),
hostPort="localhost:3306",
databaseName="DUMMY_DB",
)
),
serviceType=DatabaseServiceType.Mysql,
)
MOCK_DASHBOARD_INPUT = {
"certification_details": "sample certificate details",
"certified_by": "certified by unknown",
"css": "css",
"dashboard_title": "Top trades",
"external_url": "external url",
"slug": "top-trades",
"published": True,
"position_json": '{"CHART-dwSXo_0t5X":{"children":[],"id":"CHART-dwSXo_0t5X","meta":{"chartId":37,"height":50,"sliceName":"% Rural","uuid":"8f663401-854a-4da7-8e50-4b8e4ebb4f22","width":4},"parents":["ROOT_ID","GRID_ID","ROW-z_7odBWenK"],"type":"CHART"},"DASHBOARD_VERSION_KEY":"v2","GRID_ID":{"children":["ROW-z_7odBWenK"],"id":"GRID_ID","parents":["ROOT_ID"],"type":"GRID"},"HEADER_ID":{"id":"HEADER_ID","meta":{"text":"My DASH"},"type":"HEADER"},"ROOT_ID":{"children":["GRID_ID"],"id":"ROOT_ID","type":"ROOT"},"ROW-z_7odBWenK":{"children":["CHART-dwSXo_0t5X"],"id":"ROW-z_7odBWenK","meta":{"background":"BACKGROUND_TRANSPARENT"},"parents":["ROOT_ID","GRID_ID"],"type":"ROW"}}',
}
MOCK_DB_POSTGRES_SERVICE = DatabaseService(
id="c3eb265f-5445-4ad3-ba5e-797d3a307122",
fullyQualifiedName=FullyQualifiedEntityName("test_postgres"),
name="test_postgres",
connection=DatabaseConnection(
config=PostgresConnection(
username="user",
authType=BasicAuth(password="pass"),
hostPort="localhost:5432",
database="postgres",
)
),
serviceType=DatabaseServiceType.Postgres,
)
EXPECTED_CHART_ENTITY = [
Chart(
id=uuid.uuid4(),
name="37",
fullyQualifiedName=FullyQualifiedEntityName("test_supserset.37"),
service=EntityReference(
id="c3eb265f-5445-4ad3-ba5e-797d3a3071bb", type="dashboardService"
),
)
]
EXPECTED_DASH = CreateDashboardRequest(
name="14",
displayName="My DASH",
sourceUrl="https://my-superset.com/superset/dashboard/14/",
charts=[chart.fullyQualifiedName for chart in EXPECTED_CHART_ENTITY],
service=EXPECTED_DASH_SERVICE.fullyQualifiedName,
owners=EXPECTED_USER,
)
EXPECTED_API_DASHBOARD = CreateDashboardRequest(
name=EntityName("10"),
displayName="Unicode Test",
description=None,
dashboardType=DashboardType.Dashboard.value,
sourceUrl=SourceUrl("http://localhost:54510/superset/dashboard/unicode-test/"),
project=None,
charts=[],
dataModels=None,
tags=None,
owners=None,
service=FullyQualifiedEntityName("test_supserset"),
extension=None,
domain=None,
dataProducts=None,
lifeCycle=None,
sourceHash=None,
)
EXPECTED_CHART = CreateChartRequest(
name="1",
displayName="Rural",
description="desc",
chartType=ChartType.Other.value,
sourceUrl="https://my-superset.com/explore/?slice_id=1",
service=EXPECTED_DASH_SERVICE.fullyQualifiedName,
)
EXPECTED_CHART_2 = CreateChartRequest(
name=EntityName("69"),
displayName="Unicode Cloud",
description=None,
chartType=ChartType.Other.value,
sourceUrl=SourceUrl("http://localhost:54510/explore/?slice_id=69"),
tags=None,
owners=None,
service=FullyQualifiedEntityName("test_supserset"),
domain=None,
dataProducts=None,
lifeCycle=None,
sourceHash=None,
)
MOCK_DATASOURCE = [
FetchColumn(
id=11, type="INT()", column_name="Population", table_name="sample_table"
)
]
# EXPECTED_ALL_CHARTS = {37: MOCK_CHART}
# EXPECTED_ALL_CHARTS_DB = {37: MOCK_CHART_DB}
EXPECTED_ALL_CHARTS_DB = {1: MOCK_CHART_DB_2}
NOT_FOUND_RESP = {"message": "Not found"}
EXPECTED_API_DATASET_FQN = "test_postgres.*.main.wb_health_population"
EXPECTED_DATASET_FQN = "test_postgres.examples.main.wb_health_population"
def setup_sample_data(postgres_container):
engine = sqlalchemy.create_engine(postgres_container.get_connection_url())
with engine.begin() as connection:
CREATE_TABLE_AB_USER = """
CREATE TABLE ab_user (
id INT PRIMARY KEY,
username VARCHAR(50));
"""
CREATE_TABLE_DASHBOARDS = """
CREATE TABLE dashboards (
id INT PRIMARY KEY,
created_by_fk INT,
FOREIGN KEY (created_by_fk) REFERENCES ab_user(id));
"""
INSERT_AB_USER_DATA = """
INSERT INTO ab_user (id, username)
VALUES (1, 'test_user');
"""
INSERT_DASHBOARDS_DATA = """
INSERT INTO dashboards (id, created_by_fk)
VALUES (1, 1);
"""
CREATE_SLICES_TABLE = """
CREATE TABLE slices (
id INTEGER PRIMARY KEY,
slice_name VARCHAR(255),
description TEXT,
datasource_id INTEGER,
viz_type VARCHAR(255),
datasource_type VARCHAR(255)
)
"""
INSERT_SLICES_DATA = """
INSERT INTO slices(id, slice_name, description, datasource_id, viz_type, datasource_type)
VALUES (1, 'Rural', 'desc', 99, 'bar_chart', 'table');
"""
CREATE_DBS_TABLE = """
CREATE TABLE dbs (
id INTEGER PRIMARY KEY,
database_name VARCHAR(255),
sqlalchemy_uri TEXT
)
"""
INSERT_DBS_DATA = """
INSERT INTO dbs(id, database_name, sqlalchemy_uri)
VALUES (5, 'test_db', 'postgres://user:pass@localhost:5432/examples');
"""
CREATE_TABLES_TABLE = """
CREATE TABLE tables (
id INTEGER PRIMARY KEY,
table_name VARCHAR(255),
schema VARCHAR(255),
database_id INTEGER,
sql VARCHAR(4000)
);
"""
INSERT_TABLES_DATA = """
INSERT INTO tables(id, table_name, schema, database_id)
VALUES (99, 'sample_table', 'main', 5);
"""
CREATE_TABLE_COLUMNS_TABLE = """
CREATE TABLE table_columns (
id INTEGER PRIMARY KEY,
table_name VARCHAR(255),
table_id INTEGER,
column_name VARCHAR(255),
type VARCHAR(255),
description VARCHAR(255)
);
"""
CREATE_TABLE_COLUMNS_DATA = """
INSERT INTO
table_columns(id, table_name, table_id, column_name, type, description)
VALUES
(1099, 'sample_table', 99, 'id', 'VARCHAR', 'dummy description'),
(1199, 'sample_table', 99, 'timestamp', 'VARCHAR', 'dummy description'),
(1299, 'sample_table', 99, 'price', 'VARCHAR', 'dummy description');
"""
connection.execute(sqlalchemy.text(CREATE_TABLE_AB_USER))
connection.execute(sqlalchemy.text(INSERT_AB_USER_DATA))
connection.execute(sqlalchemy.text(CREATE_TABLE_DASHBOARDS))
connection.execute(sqlalchemy.text(INSERT_DASHBOARDS_DATA))
connection.execute(sqlalchemy.text(CREATE_SLICES_TABLE))
connection.execute(sqlalchemy.text(INSERT_SLICES_DATA))
connection.execute(sqlalchemy.text(CREATE_DBS_TABLE))
connection.execute(sqlalchemy.text(INSERT_DBS_DATA))
connection.execute(sqlalchemy.text(CREATE_TABLES_TABLE))
connection.execute(sqlalchemy.text(INSERT_TABLES_DATA))
connection.execute(sqlalchemy.text(CREATE_TABLE_COLUMNS_TABLE))
connection.execute(sqlalchemy.text(CREATE_TABLE_COLUMNS_DATA))
INITIAL_SETUP = True
superset_container = postgres_container = None
def set_testcontainers():
global INITIAL_SETUP, superset_container, postgres_container
if INITIAL_SETUP:
# postgres test container
postgres_container = PostgresContainer("postgres:16-alpine")
postgres_container.start()
setup_sample_data(postgres_container)
# superset testcontainer
superset_container = DockerContainer(image="apache/superset:3.1.2")
superset_container.with_env("SUPERSET_SECRET_KEY", "&3brfbcf192T!)$sabqbie")
superset_container.with_env("WTF_CSRF_ENABLED", False)
superset_container.with_exposed_ports(8088)
superset_container.start()
superset_container.exec(
"superset fab create-admin --username admin --firstname Superset --lastname Admin --email admin@superset.com --password admin"
)
superset_container.exec("superset db upgrade")
superset_container.exec("superset init")
superset_container.exec("superset load-examples")
INITIAL_SETUP = False
return superset_container, postgres_container
class SupersetUnitTest(TestCase):
"""
Validate how we work with Superset metadata
"""
@classmethod
def teardown_class(cls):
"""Teardown class"""
# stop containers
superset_container.stop()
postgres_container.stop()
def __init__(self, methodName) -> None:
super().__init__(methodName)
superset_container, postgres_container = set_testcontainers()
MOCK_SUPERSET_API_CONFIG = {
"source": {
"type": "superset",
"serviceName": "test_supserset",
"serviceConnection": {
"config": {
"hostPort": f"http://{superset_container.get_container_host_ip()}:{superset_container.get_exposed_port(8088)}",
"type": "Superset",
"connection": {
"username": "admin",
"password": "admin",
"provider": "db",
},
}
},
"sourceConfig": {
"config": {
"type": "DashboardMetadata",
"includeDraftDashboard": False,
}
},
},
"sink": {"type": "metadata-rest", "config": {}},
"workflowConfig": {
"openMetadataServerConfig": {
"hostPort": "http://localhost:8585/api",
"authProvider": "openmetadata",
"securityConfig": {
"jwtToken": "eyJraWQiOiJHYjM4OWEtOWY3Ni1nZGpzLWE5MmotMDI0MmJrOTQzNTYiLCJ0eXAiOiJKV1QiLCJhbGciOiJSUzI1NiJ9.eyJzdWIiOiJhZG1pbiIsImlzQm90IjpmYWxzZSwiaXNzIjoib3Blbi1tZXRhZGF0YS5vcmciLCJpYXQiOjE2NjM5Mzg0NjIsImVtYWlsIjoiYWRtaW5Ab3Blbm1ldGFkYXRhLm9yZyJ9.tS8um_5DKu7HgzGBzS1VTA5uUjKWOCU0B_j08WXBiEC0mr0zNREkqVfwFDD-d24HlNEbrqioLsBuFRiwIWKc1m_ZlVQbG7P36RUxhuv2vbSp80FKyNM-Tj93FDzq91jsyNmsQhyNv_fNr3TXfzzSPjHt8Go0FMMP66weoKMgW2PbXlhVKwEuXUHyakLLzewm9UMeQaEiRzhiTMU3UkLXcKbYEJJvfNFcLwSl9W8JCO_l0Yj3ud-qt_nQYEZwqW6u5nfdQllN133iikV4fM5QZsMCnm8Rq1mvLR0y9bmJiD7fwM1tmJ791TUWqmKaTnP49U493VanKpUAfzIiOiIbhg"
},
},
},
}
MOCK_SUPERSET_DB_CONFIG = {
"source": {
"type": "superset",
"serviceName": "test_supserset",
"serviceConnection": {
"config": {
"hostPort": f"http://{superset_container.get_container_host_ip()}:{superset_container.get_exposed_port(8088)}",
"type": "Superset",
"connection": {
"type": "Postgres",
"hostPort": f"{postgres_container.get_container_host_ip()}:{postgres_container.get_exposed_port(5432)}",
"username": postgres_container.env.get("POSTGRES_USER"),
"authType": {
"password": postgres_container.env.get(
"POSTGRES_PASSWORD"
)
},
"database": postgres_container.env.get("POSTGRES_DB"),
},
}
},
"sourceConfig": {
"config": {
"type": "DashboardMetadata",
}
},
},
"sink": {"type": "metadata-rest", "config": {}},
"workflowConfig": {
"openMetadataServerConfig": {
"hostPort": "http://localhost:8585/api",
"authProvider": "openmetadata",
"securityConfig": {"jwtToken": "token"},
},
},
}
self.config = OpenMetadataWorkflowConfig.model_validate(
MOCK_SUPERSET_API_CONFIG
)
self.superset_api: SupersetSource = SupersetSource.create(
MOCK_SUPERSET_API_CONFIG["source"],
OpenMetadata(self.config.workflowConfig.openMetadataServerConfig),
)
self.assertEqual(type(self.superset_api), SupersetAPISource)
self.superset_api.context.get().__dict__[
"dashboard_service"
] = EXPECTED_DASH_SERVICE.fullyQualifiedName.root
self.superset_db: SupersetSource = SupersetSource.create(
MOCK_SUPERSET_DB_CONFIG["source"],
OpenMetadata(self.config.workflowConfig.openMetadataServerConfig),
)
self.assertEqual(type(self.superset_db), SupersetDBSource)
self.superset_db.context.get().__dict__[
"dashboard_service"
] = EXPECTED_DASH_SERVICE.fullyQualifiedName.root
def test_create(self):
"""
An invalid config raises an error
"""
not_superset_source = {
"type": "mysql",
"serviceName": "mysql_local",
"serviceConnection": {
"config": {
"type": "Mysql",
"username": "openmetadata_user",
"authType": {
"password": "openmetadata_password",
},
"hostPort": "localhost:3306",
"databaseSchema": "openmetadata_db",
}
},
"sourceConfig": {
"config": {
"type": "DatabaseMetadata",
}
},
}
self.assertRaises(
InvalidSourceException,
SupersetSource.create,
not_superset_source,
self.config.workflowConfig.openMetadataServerConfig,
)
# disabled due to container being flaky
def x_test_api_get_dashboards_list(self):
"""
Mock the client and check that we get a list
"""
dashboard_list = list(self.superset_api.get_dashboards_list())
self.assertEqual(len(dashboard_list), PUBLISHED_DASHBOARD_COUNT)
def test_charts_of_dashboard(self):
"""
Mock the client and check that we get a list
"""
result = self.superset_api._get_charts_of_dashboard( # pylint: disable=protected-access
MOCK_DASHBOARD
)
self.assertEqual(result, [69])
# disabled due to container being flaky
def x_test_datamodels_of_dashboard(self):
"""
Mock the client and check that we get a list
"""
self.superset_api.prepare()
result = self.superset_api.yield_datamodel(MOCK_DASHBOARD)
self.assertEqual(len(list(result)), 1)
def test_datamodels_of_db_dashboard(self):
"""
Mock the db client and check that we get a list
"""
self.superset_db.prepare()
result = self.superset_db.yield_datamodel(MOCK_DASHBOARD_DB)
self.assertEqual(len(list(result)), 1)
def test_fetch_chart_db(self):
"""
test fetch chart method of db source
"""
self.superset_db.prepare()
self.assertEqual(EXPECTED_ALL_CHARTS_DB, self.superset_db.all_charts)
def test_dashboard_name(self):
dashboard_name = self.superset_api.get_dashboard_name(MOCK_DASHBOARD)
self.assertEqual(dashboard_name, MOCK_DASHBOARD.dashboard_title)
def test_yield_dashboard(self):
# TEST API SOURCE
dashboard = next(self.superset_api.yield_dashboard(MOCK_DASHBOARD)).right
EXPECTED_API_DASHBOARD.sourceUrl = SourceUrl(
f"http://{superset_container.get_container_host_ip()}:{superset_container.get_exposed_port(8088)}{MOCK_DASHBOARD.url}"
)
self.assertEqual(dashboard, EXPECTED_API_DASHBOARD)
# TEST DB SOURCE
self.superset_db.context.get().__dict__["charts"] = [
chart.name.root for chart in EXPECTED_CHART_ENTITY
]
dashboard = next(self.superset_db.yield_dashboard(MOCK_DASHBOARD_DB)).right
EXPECTED_DASH.sourceUrl = SourceUrl(
f"http://{superset_container.get_container_host_ip()}:{superset_container.get_exposed_port(8088)}/superset/dashboard/14/"
)
EXPECTED_DASH.owners = dashboard.owners
self.assertEqual(dashboard, EXPECTED_DASH)
# disabled due to container being flaky
def x_test_yield_dashboard_chart(self):
# TEST API SOURCE
self.superset_api.prepare()
dashboard_chart = next(
self.superset_api.yield_dashboard_chart(MOCK_DASHBOARD)
).right
EXPECTED_CHART_2.sourceUrl = SourceUrl(
f"http://{superset_container.get_container_host_ip()}:{superset_container.get_exposed_port(8088)}/explore/?slice_id={dashboard_chart.name.root}"
)
EXPECTED_CHART_2.displayName = dashboard_chart.displayName
EXPECTED_CHART_2.chartType = dashboard_chart.chartType
EXPECTED_CHART_2.name = dashboard_chart.name
self.assertEqual(dashboard_chart, EXPECTED_CHART_2)
# TEST DB SOURCE
self.superset_db.prepare()
dashboard_charts = next(
self.superset_db.yield_dashboard_chart(MOCK_DASHBOARD_DB)
).right
EXPECTED_CHART.sourceUrl = SourceUrl(
f"http://{superset_container.get_container_host_ip()}:{superset_container.get_exposed_port(8088)}/explore/?slice_id=1"
)
self.assertEqual(dashboard_charts, EXPECTED_CHART)
def test_api_get_datasource_fqn(self):
with patch.object(
OpenMetadata, "get_by_name", return_value=MOCK_DB_POSTGRES_SERVICE
):
"""
Test generated datasource fqn for api source
"""
fqn = self.superset_api._get_datasource_fqn( # pylint: disable=protected-access
1, MOCK_DB_POSTGRES_SERVICE.name.root
)
self.assertEqual(fqn, EXPECTED_API_DATASET_FQN)
def test_db_get_datasource_fqn_for_lineage(self):
with patch.object(
OpenMetadata, "get_by_name", return_value=MOCK_DB_POSTGRES_SERVICE
):
fqn = self.superset_db._get_datasource_fqn_for_lineage( # pylint: disable=protected-access
MOCK_CHART_DB, MOCK_DB_POSTGRES_SERVICE.name.root
)
self.assertEqual(fqn, EXPECTED_DATASET_FQN)
def test_db_get_database_name(self):
sqa_str1 = "postgres://user:pass@localhost:8888/database"
self.assertEqual(
self.superset_db._get_database_name( # pylint: disable=protected-access
sqa_str1, MOCK_DB_POSTGRES_SERVICE
),
"database",
)
sqa_str2 = "postgres://user:pass@localhost:8888/database?ssl=required"
self.assertEqual(
self.superset_db._get_database_name( # pylint: disable=protected-access
sqa_str2, MOCK_DB_POSTGRES_SERVICE
),
"database",
)
sqa_str3 = "postgres://user:pass@localhost:8888/openmetadata_db"
self.assertEqual(
self.superset_db._get_database_name( # pylint: disable=protected-access
sqa_str3, MOCK_DB_MYSQL_SERVICE_1
),
"default",
)
sqa_str4 = "postgres://user:pass@localhost:8888/openmetadata_db"
self.assertEqual(
self.superset_db._get_database_name( # pylint: disable=protected-access
sqa_str4, MOCK_DB_MYSQL_SERVICE_2
),
"DUMMY_DB",
)
sqa_str2 = "sqlite:////app/superset_home/superset.db"
self.assertEqual(
self.superset_db._get_database_name( # pylint: disable=protected-access
sqa_str2, MOCK_DB_POSTGRES_SERVICE
),
"/app/superset_home/superset.db",
)
def test_broken_column_type_in_datamodel(self):
"""
Test column parsing with column containing () in datatype
"""
self.superset_db.prepare()
parsed_datasource = self.superset_db.get_column_info(MOCK_DATASOURCE)
assert parsed_datasource[0].dataType.value == "INT"
def test_is_table_to_table_lineage(self):
table = Table(name="table_name", schema=Schema(name="schema_name"))
for test_case in [
(
(
Column(name="col_name"),
Table(name="table_name", schema=Schema(name="schema_name")),
Column(name="col_name"),
Table(name="dataset_name", schema=Schema(name="schema_name")),
),
True,
),
(
(
Column(name="col_name"),
Table(name="table_name", schema=Schema(name=Schema.unknown)),
Column(name="col_name"),
Table(name="dataset_name", schema=Schema(name="schema_name")),
),
False,
),
(
(
Column(name="col_name"),
Table(name="other_table_name", schema=Schema(name="schema_name")),
Column(name="col_name"),
Table(name="dataset_name", schema=Schema(name="schema_name")),
),
False,
),
(
(
Column(name="col_name"),
Table(name="table_name", schema=Schema(name="schema_name")),
Column(name="col_name"),
SubQuery(
subquery="select * from 1",
subquery_raw="select * from 1",
alias="dummy_subquery",
),
),
False,
),
]:
_columns, expected = test_case
column_from, column_from_parent, column_to, column_to_parent = _columns
column_from._parent.add(column_from_parent)
column_to._parent.add(column_to_parent)
columns = (column_from, column_to)
self.assertEqual(
self.superset_db._is_table_to_table_lineage(columns, table), expected
)
def test_append_value_to_dict_list(self):
init_dict = {1: [2]}
self.superset_db._append_value_to_dict_list(init_dict, 1, 3)
self.assertListEqual(init_dict[1], [2, 3])
self.superset_db._append_value_to_dict_list(init_dict, 2, 1)
self.assertListEqual(init_dict[2], [1])
def test_get_table_schema(self):
for test_case in [
(
Table(name="test_table", schema=Schema(name=Schema.unknown)),
FetchChart(schema="chart_table_schema"),
"chart_table_schema",
),
(
Table(name="test_table", schema=Schema(name="test_schema")),
FetchChart(schema="chart_table_schema"),
"test_schema",
),
]:
table, chart, expected = test_case
self.assertEqual(self.superset_db._get_table_schema(table, chart), expected)
def test_create_column_lineage_mapping_no_wildcard(self):
sql = """
INSERT INTO dummy_table SELECT id, timestamp FROM input_table;
"""
parser = LineageParser(sql)
table = Table(name="input_table", schema=Schema(name=Schema.unknown))
chart = FetchChart(table_name="sample_table", table_schema="main", table_id=99)
expected = {"id": ["id"], "timestamp": ["timestamp"]}
self.assertDictEqual(
self.superset_db._create_column_lineage_mapping(parser, table, chart),
expected,
)
def test_create_column_lineage_mapping_with_wildcard(self):
sql = """
INSERT INTO dummy_table SELECT * FROM input_table;
"""
parser = LineageParser(sql)
table = Table(name="input_table", schema=Schema(name=Schema.unknown))
chart = FetchChart(table_name="sample_table", table_schema="main", table_id=99)
expected = {"id": ["id"], "timestamp": ["timestamp"], "price": ["price"]}
self.assertDictEqual(
self.superset_db._create_column_lineage_mapping(parser, table, chart),
expected,
)
def test_get_input_tables_from_dataset_sql(self):
sql = """SELECT id, timestamp FROM sample_table"""
chart = FetchChart(
sql=sql, table_name="sample_table", table_schema="main", table_id=99
)
result = self.superset_db._get_input_tables(chart)[0]
self.assertSetEqual({"id", "timestamp"}, set(result[1]))
def test_get_input_tables_when_table_has_no_sql(self):
chart = FetchChart(table_name="sample_table", table_schema="main", table_id=99)
result = self.superset_db._get_input_tables(chart)[0]
self.assertSetEqual({"id", "timestamp", "price"}, set(result[1]))