mirror of
https://github.com/datahub-project/datahub.git
synced 2025-11-08 15:30:55 +00:00
feat(ingest): Add metabase database id to platform instance mapping (#8359)
This commit is contained in:
parent
e67f811034
commit
eec89a884a
@ -9,6 +9,14 @@ the underlying datasets in the `glue` platform, the following snippet can be use
|
|||||||
DataHub will try to determine database name from Metabase [api/database](https://www.metabase.com/docs/latest/api-documentation.html#database)
|
DataHub will try to determine database name from Metabase [api/database](https://www.metabase.com/docs/latest/api-documentation.html#database)
|
||||||
payload. However, the name can be overridden from `database_alias_map` for a given database connected to Metabase.
|
payload. However, the name can be overridden from `database_alias_map` for a given database connected to Metabase.
|
||||||
|
|
||||||
|
If several platform instances with the same platform (e.g. from several distinct clickhouse clusters) are present in DataHub,
|
||||||
|
the mapping between database id in Metabase and platform instance in DataHub may be configured with the following map:
|
||||||
|
```yml
|
||||||
|
database_id_to_instance_map:
|
||||||
|
"42": platform_instance_in_datahub
|
||||||
|
```
|
||||||
|
The key in this map must be string, not integer although Metabase API provides `id` as number.
|
||||||
|
If `database_id_to_instance_map` is not specified, `platform_instance_map` is used for platform instance mapping. If none of the above are specified, platform instance is not used when constructing `urn` when searching for dataset relations.
|
||||||
## Compatibility
|
## Compatibility
|
||||||
|
|
||||||
Metabase version [v0.41.2](https://www.metabase.com/start/oss/)
|
Metabase version [v0.41.2](https://www.metabase.com/start/oss/)
|
||||||
|
|||||||
@ -15,6 +15,8 @@ source:
|
|||||||
# Optional mapping of platform types to instance ids
|
# Optional mapping of platform types to instance ids
|
||||||
platform_instance_map: # optional
|
platform_instance_map: # optional
|
||||||
postgres: test_postgres # optional
|
postgres: test_postgres # optional
|
||||||
|
database_id_to_instance_map: # optional
|
||||||
|
"42": platform_instance_in_datahub # optional
|
||||||
|
|
||||||
sink:
|
sink:
|
||||||
# sink configs
|
# sink configs
|
||||||
@ -1,6 +1,6 @@
|
|||||||
from datetime import datetime, timezone
|
from datetime import datetime, timezone
|
||||||
from functools import lru_cache
|
from functools import lru_cache
|
||||||
from typing import Dict, Iterable, List, Optional
|
from typing import Dict, Iterable, List, Optional, Union
|
||||||
|
|
||||||
import dateutil.parser as dp
|
import dateutil.parser as dp
|
||||||
import pydantic
|
import pydantic
|
||||||
@ -60,6 +60,10 @@ class MetabaseConfig(DatasetLineageProviderConfigBase):
|
|||||||
default=None,
|
default=None,
|
||||||
description="Custom mappings between metabase database engines and DataHub platforms",
|
description="Custom mappings between metabase database engines and DataHub platforms",
|
||||||
)
|
)
|
||||||
|
database_id_to_instance_map: Optional[Dict[str, str]] = Field(
|
||||||
|
default=None,
|
||||||
|
description="Custom mappings between metabase database id and DataHub platform instance",
|
||||||
|
)
|
||||||
default_schema: str = Field(
|
default_schema: str = Field(
|
||||||
default="public",
|
default="public",
|
||||||
description="Default schema name to use when schema is not provided in an SQL query",
|
description="Default schema name to use when schema is not provided in an SQL query",
|
||||||
@ -122,7 +126,9 @@ class MetabaseSource(Source):
|
|||||||
super().__init__(ctx)
|
super().__init__(ctx)
|
||||||
self.config = config
|
self.config = config
|
||||||
self.report = SourceReport()
|
self.report = SourceReport()
|
||||||
|
self.setup_session()
|
||||||
|
|
||||||
|
def setup_session(self) -> None:
|
||||||
login_response = requests.post(
|
login_response = requests.post(
|
||||||
f"{self.config.connect_uri}/api/session",
|
f"{self.config.connect_uri}/api/session",
|
||||||
None,
|
None,
|
||||||
@ -272,6 +278,16 @@ class MetabaseSource(Source):
|
|||||||
user_info_response.raise_for_status()
|
user_info_response.raise_for_status()
|
||||||
user_details = user_info_response.json()
|
user_details = user_info_response.json()
|
||||||
except HTTPError as http_error:
|
except HTTPError as http_error:
|
||||||
|
if (
|
||||||
|
http_error.response is not None
|
||||||
|
and http_error.response.status_code == 404
|
||||||
|
):
|
||||||
|
self.report.report_warning(
|
||||||
|
key=f"metabase-user-{creator_id}",
|
||||||
|
reason=f"User {creator_id} is blocked in Metabase or missing",
|
||||||
|
)
|
||||||
|
return None
|
||||||
|
# For cases when the error is not 404 but something else
|
||||||
self.report.report_failure(
|
self.report.report_failure(
|
||||||
key=f"metabase-user-{creator_id}",
|
key=f"metabase-user-{creator_id}",
|
||||||
reason=f"Unable to retrieve User info. " f"Reason: {str(http_error)}",
|
reason=f"Unable to retrieve User info. " f"Reason: {str(http_error)}",
|
||||||
@ -524,6 +540,36 @@ class MetabaseSource(Source):
|
|||||||
|
|
||||||
return None, None
|
return None, None
|
||||||
|
|
||||||
|
@lru_cache(maxsize=None)
|
||||||
|
def get_platform_instance(
|
||||||
|
self, platform: Union[str, None] = None, datasource_id: Union[int, None] = None
|
||||||
|
) -> Union[str, None]:
|
||||||
|
"""
|
||||||
|
Method will attempt to detect `platform_instance` by checking
|
||||||
|
`database_id_to_instance_map` and `platform_instance_map` mappings.
|
||||||
|
If `database_id_to_instance_map` is defined it is first checked for
|
||||||
|
`datasource_id` extracted from Metabase. If this mapping is not defined
|
||||||
|
or corresponding key is not found, `platform_instance_map` mapping
|
||||||
|
is checked for datasource platform. If no mapping found `None`
|
||||||
|
is returned.
|
||||||
|
:param str platform: DataHub platform name (e.g. `postgres` or `clickhouse`)
|
||||||
|
:param int datasource_id: Numeric datasource ID received from Metabase API
|
||||||
|
:return: platform instance name or None
|
||||||
|
"""
|
||||||
|
platform_instance = None
|
||||||
|
# For cases when metabase has several platform instances (e.g. several individual ClickHouse clusters)
|
||||||
|
if datasource_id is not None and self.config.database_id_to_instance_map:
|
||||||
|
platform_instance = self.config.database_id_to_instance_map.get(
|
||||||
|
str(datasource_id)
|
||||||
|
)
|
||||||
|
|
||||||
|
# If Metabase datasource ID is not mapped to platform instace, fall back to platform mapping
|
||||||
|
# Set platform_instance if configuration provides a mapping from platform name to instance
|
||||||
|
if platform and self.config.platform_instance_map and platform_instance is None:
|
||||||
|
platform_instance = self.config.platform_instance_map.get(platform)
|
||||||
|
|
||||||
|
return platform_instance
|
||||||
|
|
||||||
@lru_cache(maxsize=None)
|
@lru_cache(maxsize=None)
|
||||||
def get_datasource_from_id(self, datasource_id):
|
def get_datasource_from_id(self, datasource_id):
|
||||||
try:
|
try:
|
||||||
@ -564,11 +610,8 @@ class MetabaseSource(Source):
|
|||||||
reason=f"Platform was not found in DataHub. Using {platform} name as is",
|
reason=f"Platform was not found in DataHub. Using {platform} name as is",
|
||||||
)
|
)
|
||||||
|
|
||||||
# Set platform_instance if configuration provides a mapping from platform name to instance
|
platform_instance = self.get_platform_instance(
|
||||||
platform_instance = (
|
platform, dataset_json.get("id", None)
|
||||||
self.config.platform_instance_map.get(platform)
|
|
||||||
if self.config.platform_instance_map
|
|
||||||
else None
|
|
||||||
)
|
)
|
||||||
|
|
||||||
field_for_dbname_mapping = {
|
field_for_dbname_mapping = {
|
||||||
|
|||||||
42
metadata-ingestion/tests/unit/test_metabase_source.py
Normal file
42
metadata-ingestion/tests/unit/test_metabase_source.py
Normal file
@ -0,0 +1,42 @@
|
|||||||
|
from datahub.ingestion.api.common import PipelineContext
|
||||||
|
from datahub.ingestion.api.source import SourceReport
|
||||||
|
from datahub.ingestion.source.metabase import MetabaseConfig, MetabaseSource
|
||||||
|
|
||||||
|
|
||||||
|
class TestMetabaseSource(MetabaseSource):
|
||||||
|
def __init__(self, ctx: PipelineContext, config: MetabaseConfig):
|
||||||
|
self.config = config
|
||||||
|
self.report = SourceReport()
|
||||||
|
|
||||||
|
|
||||||
|
def test_get_platform_instance():
|
||||||
|
ctx = PipelineContext(run_id="test-metabase")
|
||||||
|
config = MetabaseConfig()
|
||||||
|
config.connect_uri = "http://localhost:3000"
|
||||||
|
# config.database_id_to_instance_map = {"42": "my_main_clickhouse"}
|
||||||
|
# config.platform_instance_map = {"clickhouse": "my_only_clickhouse"}
|
||||||
|
metabase = TestMetabaseSource(ctx, config)
|
||||||
|
|
||||||
|
# no mappings defined
|
||||||
|
assert metabase.get_platform_instance("clickhouse", 42) is None
|
||||||
|
|
||||||
|
# database_id_to_instance_map is defined, key is present
|
||||||
|
metabase.config.database_id_to_instance_map = {"42": "my_main_clickhouse"}
|
||||||
|
assert metabase.get_platform_instance(None, 42) == "my_main_clickhouse"
|
||||||
|
|
||||||
|
# database_id_to_instance_map is defined, key is missing
|
||||||
|
assert metabase.get_platform_instance(None, 999) is None
|
||||||
|
|
||||||
|
# database_id_to_instance_map is defined, key is missing, platform_instance_map is defined and key present
|
||||||
|
metabase.config.platform_instance_map = {"clickhouse": "my_only_clickhouse"}
|
||||||
|
assert metabase.get_platform_instance("clickhouse", 999) == "my_only_clickhouse"
|
||||||
|
|
||||||
|
# database_id_to_instance_map is defined, key is missing, platform_instance_map is defined and key missing
|
||||||
|
assert metabase.get_platform_instance("missing-platform", 999) is None
|
||||||
|
|
||||||
|
# database_id_to_instance_map is missing, platform_instance_map is defined and key present
|
||||||
|
metabase.config.database_id_to_instance_map = None
|
||||||
|
assert metabase.get_platform_instance("clickhouse", 999) == "my_only_clickhouse"
|
||||||
|
|
||||||
|
# database_id_to_instance_map is missing, platform_instance_map is defined and key missing
|
||||||
|
assert metabase.get_platform_instance("missing-platform", 999) is None
|
||||||
Loading…
x
Reference in New Issue
Block a user