datahub/metadata-ingestion/tests/unit/test_metabase_source.py

43 lines
2.0 KiB
Python
Raw Normal View History

from datahub.ingestion.api.common import PipelineContext
from datahub.ingestion.api.source import SourceReport
from datahub.ingestion.source.metabase import MetabaseConfig, MetabaseSource
class TestMetabaseSource(MetabaseSource):
def __init__(self, ctx: PipelineContext, config: MetabaseConfig):
self.config = config
self.report = SourceReport()
def test_get_platform_instance():
ctx = PipelineContext(run_id="test-metabase")
config = MetabaseConfig()
config.connect_uri = "http://localhost:3000"
# config.database_id_to_instance_map = {"42": "my_main_clickhouse"}
# config.platform_instance_map = {"clickhouse": "my_only_clickhouse"}
metabase = TestMetabaseSource(ctx, config)
# no mappings defined
assert metabase.get_platform_instance("clickhouse", 42) is None
# database_id_to_instance_map is defined, key is present
metabase.config.database_id_to_instance_map = {"42": "my_main_clickhouse"}
assert metabase.get_platform_instance(None, 42) == "my_main_clickhouse"
# database_id_to_instance_map is defined, key is missing
assert metabase.get_platform_instance(None, 999) is None
# database_id_to_instance_map is defined, key is missing, platform_instance_map is defined and key present
metabase.config.platform_instance_map = {"clickhouse": "my_only_clickhouse"}
assert metabase.get_platform_instance("clickhouse", 999) == "my_only_clickhouse"
# database_id_to_instance_map is defined, key is missing, platform_instance_map is defined and key missing
assert metabase.get_platform_instance("missing-platform", 999) is None
# database_id_to_instance_map is missing, platform_instance_map is defined and key present
metabase.config.database_id_to_instance_map = None
assert metabase.get_platform_instance("clickhouse", 999) == "my_only_clickhouse"
# database_id_to_instance_map is missing, platform_instance_map is defined and key missing
assert metabase.get_platform_instance("missing-platform", 999) is None