fix(ingest/dremio): simplify and fix build source map (#12908)

This commit is contained in:
Mayuri Nehate 2025-03-21 11:31:16 +05:30 committed by GitHub
parent 56d92f3e66
commit 8323bc3910
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
9 changed files with 2350 additions and 2506 deletions

View File

@ -66,7 +66,7 @@ class DremioToDataHubSourceTypeMapping:
}
@staticmethod
def get_datahub_source_type(dremio_source_type: str) -> str:
def get_datahub_platform(dremio_source_type: str) -> str:
"""
Return the DataHub source type.
"""

View File

@ -294,7 +294,7 @@ class DremioContainer:
)
class DremioSource(DremioContainer):
class DremioSourceContainer(DremioContainer):
subclass: str = "Dremio Source"
dremio_source_type: str
root_path: Optional[str]
@ -337,7 +337,7 @@ class DremioCatalog:
self.dremio_api = dremio_api
self.edition = dremio_api.edition
self.datasets: Deque[DremioDataset] = deque()
self.sources: Deque[DremioSource] = deque()
self.sources: Deque[DremioSourceContainer] = deque()
self.spaces: Deque[DremioSpace] = deque()
self.folders: Deque[DremioFolder] = deque()
self.glossary_terms: Deque[DremioGlossaryTerm] = deque()
@ -380,12 +380,13 @@ class DremioCatalog:
container_type = container.get("container_type")
if container_type == DremioEntityContainerType.SOURCE:
self.sources.append(
DremioSource(
DremioSourceContainer(
container_name=container.get("name"),
location_id=container.get("id"),
path=[],
api_operations=self.dremio_api,
dremio_source_type=container.get("source_type"),
dremio_source_type=container.get("source_type")
or "unknown",
root_path=container.get("root_path"),
database_name=container.get("database_name"),
)
@ -426,7 +427,7 @@ class DremioCatalog:
self.set_containers()
return deque(itertools.chain(self.sources, self.spaces, self.folders))
def get_sources(self) -> Deque[DremioSource]:
def get_sources(self) -> Deque[DremioSourceContainer]:
self.set_containers()
return self.sources

View File

@ -1,7 +1,6 @@
import logging
import re
from collections import defaultdict
from concurrent.futures import ThreadPoolExecutor, as_completed
from dataclasses import dataclass
from typing import Dict, Iterable, List, Optional
from datahub.emitter.mce_builder import (
@ -28,7 +27,10 @@ from datahub.ingestion.source.dremio.dremio_api import (
DremioEdition,
)
from datahub.ingestion.source.dremio.dremio_aspects import DremioAspects
from datahub.ingestion.source.dremio.dremio_config import DremioSourceConfig
from datahub.ingestion.source.dremio.dremio_config import (
DremioSourceConfig,
DremioSourceMapping,
)
from datahub.ingestion.source.dremio.dremio_datahub_source_mapping import (
DremioToDataHubSourceTypeMapping,
)
@ -39,6 +41,7 @@ from datahub.ingestion.source.dremio.dremio_entities import (
DremioDatasetType,
DremioGlossaryTerm,
DremioQuery,
DremioSourceContainer,
)
from datahub.ingestion.source.dremio.dremio_profiling import DremioProfiler
from datahub.ingestion.source.dremio.dremio_reporting import DremioSourceReport
@ -65,6 +68,17 @@ from datahub.sql_parsing.sql_parsing_aggregator import (
logger = logging.getLogger(__name__)
@dataclass
class DremioSourceMapEntry:
platform: str
source_name: str
dremio_source_category: str
root_path: str = ""
database_name: str = ""
platform_instance: Optional[str] = None
env: Optional[str] = None
@platform_name("Dremio")
@config_class(DremioSourceConfig)
@support_status(SupportStatus.CERTIFIED)
@ -112,7 +126,7 @@ class DremioSource(StatefulIngestionSourceBase):
self.default_db = "dremio"
self.config = config
self.report = DremioSourceReport()
self.source_map: Dict[str, Dict] = defaultdict()
self.source_map: Dict[str, DremioSourceMapEntry] = dict()
# Initialize API operations
dremio_api = DremioAPIOperations(self.config, self.report)
@ -152,111 +166,12 @@ class DremioSource(StatefulIngestionSourceBase):
def get_platform(self) -> str:
return "dremio"
def _build_source_map(self) -> Dict[str, Dict]:
"""
Builds a source mapping dictionary to support external lineage generation across
multiple Dremio sources, based on provided configuration mappings.
This method operates as follows:
1. If a source mapping is present in the config:
- For each source in the Dremio catalog, if the mapping's `source_name` matches
the `dremio_source_type`, `root_path` and `database_name` are added to the mapping
information, along with the platform, platform instance, and environment if they exist.
This allows constructing the full URN for upstream lineage.
2. If a source mapping is absent in the configuration:
- Default mappings are created for each source name, setting `env` and `platform_instance`
to default values and classifying the source type. This ensures all sources have a
mapping, even if specific configuration details are missing.
Returns:
Dict[str, Dict]: A dictionary (`source_map`) where each key is a source name
(lowercased) and each value is another dictionary containing:
- `platform`: The source platform.
- `source_name`: The source name.
- `dremio_source_type`: The type mapped to DataHub,
e.g., "database", "folder".
- Optional `root_path`, `database_name`, `platform_instance`,
and `env` if provided in the configuration.
Example:
This method is used internally within the class to generate mappings before
creating cross-platform lineage.
"""
source_map = {}
def _build_source_map(self) -> Dict[str, DremioSourceMapEntry]:
dremio_sources = self.dremio_catalog.get_sources()
source_mappings_config = self.config.source_mappings or []
for source in dremio_sources:
source_name = source.container_name
if isinstance(source.dremio_source_type, str):
source_type = source.dremio_source_type.lower()
root_path = source.root_path.lower() if source.root_path else ""
database_name = (
source.database_name.lower() if source.database_name else ""
)
source_present = False
source_platform_name = source_name
for mapping in self.config.source_mappings or []:
if re.search(mapping.source_name, source_type, re.IGNORECASE):
source_platform_name = mapping.source_name.lower()
datahub_source_type = (
DremioToDataHubSourceTypeMapping.get_datahub_source_type(
source_type
)
)
if re.search(mapping.platform, datahub_source_type, re.IGNORECASE):
source_platform_name = source_platform_name.lower()
source_map[source_platform_name] = {
"platform": mapping.platform,
"source_name": mapping.source_name,
"dremio_source_type": DremioToDataHubSourceTypeMapping.get_category(
source_type,
),
"root_path": root_path,
"database_name": database_name,
"platform_instance": mapping.platform_instance,
"env": mapping.env,
}
source_present = True
break
if not source_present:
try:
dremio_source_type = (
DremioToDataHubSourceTypeMapping.get_category(source_type)
)
except Exception as exc:
logger.info(
f"Source {source_type} is not a standard Dremio source type. "
f"Adding source_type {source_type} to mapping as database. Error: {exc}"
)
DremioToDataHubSourceTypeMapping.add_mapping(
source_type, source_name
)
dremio_source_type = (
DremioToDataHubSourceTypeMapping.get_category(source_type)
)
source_map[source_platform_name.lower()] = {
"platform": source_type,
"source_name": source_name,
"dremio_source_type": dremio_source_type,
}
else:
logger.error(
f'Source "{source.container_name}" is broken. Containers will not be created for source.'
)
logger.error(
f'No new cross-platform lineage will be emitted for source "{source.container_name}".'
)
logger.error("Fix this source in Dremio to fix this issue.")
source_map = build_dremio_source_map(dremio_sources, source_mappings_config)
logger.info(f"Full source map: {source_map}")
return source_map
@ -431,6 +346,7 @@ class DremioSource(StatefulIngestionSourceBase):
dremio_path=dataset_info.path,
dremio_dataset=dataset_info.resource_name,
)
logger.debug(f"Upstream dataset for {dataset_urn}: {upstream_urn}")
if upstream_urn:
upstream_lineage = UpstreamLineage(
@ -596,25 +512,23 @@ class DremioSource(StatefulIngestionSourceBase):
if not mapping:
return None
platform = mapping.get("platform")
platform = mapping.platform
if not platform:
return None
platform_instance = mapping.get(
"platform_instance", self.config.platform_instance
)
env = mapping.get("env", self.config.env)
platform_instance = mapping.platform_instance
env = mapping.env or self.config.env
root_path = ""
database_name = ""
if mapping.get("dremio_source_type") == "file_object_storage":
if mapping.get("root_path"):
root_path = f"{mapping['root_path'][1:]}/"
if mapping.dremio_source_category == "file_object_storage":
if mapping.root_path:
root_path = f"{mapping.root_path[1:]}/"
dremio_dataset = f"{root_path}{'/'.join(dremio_path[1:])}/{dremio_dataset}"
else:
if mapping.get("database_name"):
database_name = f"{mapping['database_name']}."
if mapping.database_name:
database_name = f"{mapping.database_name}."
dremio_dataset = (
f"{database_name}{'.'.join(dremio_path[1:])}.{dremio_dataset}"
)
@ -639,3 +553,68 @@ class DremioSource(StatefulIngestionSourceBase):
Get the source report.
"""
return self.report
def build_dremio_source_map(
dremio_sources: Iterable[DremioSourceContainer],
source_mappings_config: List[DremioSourceMapping],
) -> Dict[str, DremioSourceMapEntry]:
"""
Builds a source mapping dictionary to support external lineage generation across
multiple Dremio sources, based on provided configuration mappings.
This method operates as follows:
Returns:
Dict[str, Dict]: A dictionary (`source_map`) where each key is a source name
(lowercased) and each value is another entry containing:
- `platform`: The source platform.
- `source_name`: The source name.
- `dremio_source_category`: The type mapped to DataHub,
e.g., "database", "folder".
- Optional `root_path`, `database_name`, `platform_instance`,
and `env` if provided in the configuration.
Example:
This method is used internally within the class to generate mappings before
creating cross-platform lineage.
"""
source_map = {}
for source in dremio_sources:
current_source_name = source.container_name
source_type = source.dremio_source_type.lower()
source_category = DremioToDataHubSourceTypeMapping.get_category(source_type)
datahub_platform = DremioToDataHubSourceTypeMapping.get_datahub_platform(
source_type
)
root_path = source.root_path.lower() if source.root_path else ""
database_name = source.database_name.lower() if source.database_name else ""
source_present = False
for mapping in source_mappings_config:
if mapping.source_name.lower() == current_source_name.lower():
source_map[current_source_name.lower()] = DremioSourceMapEntry(
platform=mapping.platform,
source_name=mapping.source_name,
dremio_source_category=source_category,
root_path=root_path,
database_name=database_name,
platform_instance=mapping.platform_instance,
env=mapping.env,
)
source_present = True
break
if not source_present:
source_map[current_source_name.lower()] = DremioSourceMapEntry(
platform=datahub_platform,
source_name=current_source_name,
dremio_source_category=source_category,
root_path=root_path,
database_name=database_name,
platform_instance=None,
env=None,
)
return source_map

View File

@ -19,6 +19,9 @@ source:
- platform: s3
source_name: samples
platform_instance: s3_test_samples
- platform: s3
source_name: s3
platform_instance: s3_test_samples
sink:
type: file

View File

@ -17,6 +17,9 @@ source:
- platform: s3
source_name: samples
platform_instance: s3_test_samples
- platform: s3
source_name: s3
platform_instance: s3_test_samples
schema_pattern:
allow:

View File

@ -17,6 +17,9 @@ source:
- platform: s3
source_name: samples
platform_instance: s3_test_samples
- platform: s3
source_name: s3
platform_instance: s3_test_samples
schema_pattern:
allow:
@ -26,7 +29,9 @@ source:
enabled: true
include_field_min_value: false
include_field_max_value: false
profile_pattern:
deny:
- ".*samples.*"
sink:
type: file

View File

@ -0,0 +1,140 @@
from typing import List
from unittest.mock import MagicMock
from datahub.ingestion.source.dremio.dremio_config import DremioSourceMapping
from datahub.ingestion.source.dremio.dremio_entities import DremioSourceContainer
from datahub.ingestion.source.dremio.dremio_source import (
DremioSourceMapEntry,
build_dremio_source_map,
)
def test_build_source_map_simple():
# write unit test
config_mapping: List[DremioSourceMapping] = [
DremioSourceMapping(source_name="source1", platform="S3", env="PROD"),
DremioSourceMapping(source_name="source2", platform="redshift", env="DEV"),
]
sources: List[DremioSourceContainer] = [
DremioSourceContainer(
container_name="source1",
location_id="xxx",
path=[],
api_operations=MagicMock(), # type:ignore
dremio_source_type="S3",
root_path="/",
database_name=None,
),
DremioSourceContainer(
container_name="source2",
location_id="yyy",
path=[],
api_operations=MagicMock(), # type:ignore
dremio_source_type="REDSHIFT",
root_path="/",
database_name="redshiftdb",
),
]
source_map = build_dremio_source_map(sources, config_mapping)
assert source_map == {
"source1": DremioSourceMapEntry(
source_name="source1",
platform="S3",
env="PROD",
dremio_source_category="file_object_storage",
root_path="/",
database_name="",
),
"source2": DremioSourceMapEntry(
source_name="source2",
platform="redshift",
env="DEV",
dremio_source_category="database",
root_path="/",
database_name="redshiftdb",
),
}
def test_build_source_map_same_platform_multiple_sources():
# write unit test
config_mapping: List[DremioSourceMapping] = [
DremioSourceMapping(source_name="source1", platform="S3", env="PROD"),
DremioSourceMapping(source_name="source2", platform="redshift", env="DEV"),
DremioSourceMapping(source_name="source2", platform="redshift", env="PROD"),
]
sources: List[DremioSourceContainer] = [
DremioSourceContainer(
container_name="source1",
location_id="xxx",
path=[],
api_operations=MagicMock(), # type:ignore
dremio_source_type="S3",
root_path="/",
database_name=None,
),
DremioSourceContainer(
container_name="source2",
location_id="yyy",
path=[],
api_operations=MagicMock(), # type:ignore
dremio_source_type="REDSHIFT",
root_path="/",
database_name="redshiftdb",
),
DremioSourceContainer(
container_name="source3",
location_id="tt",
path=[],
api_operations=MagicMock(), # type:ignore
dremio_source_type="REDSHIFT",
root_path="/",
database_name="redshiftproddb",
),
DremioSourceContainer(
container_name="Source4",
location_id="zz",
path=[],
api_operations=MagicMock(), # type:ignore
dremio_source_type="NEWSOURCE",
root_path="/",
database_name="somedb",
),
]
source_map = build_dremio_source_map(sources, config_mapping)
assert source_map == {
"source1": DremioSourceMapEntry(
source_name="source1",
platform="S3",
env="PROD",
dremio_source_category="file_object_storage",
root_path="/",
database_name="",
),
"source2": DremioSourceMapEntry(
source_name="source2",
platform="redshift",
env="DEV",
dremio_source_category="database",
root_path="/",
database_name="redshiftdb",
),
"source3": DremioSourceMapEntry(
source_name="source3",
platform="redshift",
env=None,
dremio_source_category="database",
root_path="/",
database_name="redshiftproddb",
),
"source4": DremioSourceMapEntry(
source_name="Source4",
platform="newsource",
env=None,
dremio_source_category="unknown",
root_path="/",
database_name="somedb",
),
}