mirror of
https://github.com/datahub-project/datahub.git
synced 2025-07-04 15:50:14 +00:00
165 lines
5.9 KiB
Python
165 lines
5.9 KiB
Python
from datetime import timedelta
|
|
from pathlib import Path
|
|
from typing import Any, Callable, Dict, Iterable, List, Optional, Tuple, Type, Union
|
|
|
|
from datahub._codegen.aspect import _Aspect
|
|
from datahub.emitter.mce_builder import Aspect
|
|
from datahub.emitter.mcp import MetadataChangeProposalWrapper
|
|
from datahub.emitter.mcp_builder import mcps_from_mce
|
|
from datahub.ingestion.api.common import PipelineContext
|
|
from datahub.ingestion.api.workunit import MetadataWorkUnit
|
|
from datahub.ingestion.graph.client import DataHubGraph
|
|
from datahub.ingestion.sink.file import write_metadata_file
|
|
from datahub.ingestion.source.file import FileSourceConfig, GenericFileSource
|
|
from datahub.metadata.com.linkedin.pegasus2avro.mxe import (
|
|
MetadataChangeEvent,
|
|
MetadataChangeProposal,
|
|
)
|
|
from datahub.metadata.schema_classes import (
|
|
ASPECT_NAME_MAP,
|
|
DomainPropertiesClass,
|
|
SystemMetadataClass,
|
|
UsageAggregationClass,
|
|
)
|
|
|
|
|
|
class MockDataHubGraph(DataHubGraph):
|
|
def __init__(
|
|
self, entity_graph: Optional[Dict[str, Dict[str, Any]]] = None
|
|
) -> None:
|
|
self.emitted: List[
|
|
Union[
|
|
MetadataChangeEvent,
|
|
MetadataChangeProposal,
|
|
MetadataChangeProposalWrapper,
|
|
]
|
|
] = []
|
|
self.entity_graph = entity_graph or {}
|
|
|
|
def import_file(self, file: Path) -> None:
|
|
"""Imports metadata from any MCE/MCP file. Does not clear prior loaded data.
|
|
This function can be called repeatedly on the same
|
|
Mock instance to load up metadata from multiple files."""
|
|
file_source: GenericFileSource = GenericFileSource(
|
|
ctx=PipelineContext(run_id="test"), config=FileSourceConfig(path=str(file))
|
|
)
|
|
for wu in file_source.get_workunits():
|
|
if isinstance(wu, MetadataWorkUnit):
|
|
metadata = wu.metadata
|
|
mcps: Iterable[
|
|
Union[
|
|
MetadataChangeProposal,
|
|
MetadataChangeProposalWrapper,
|
|
]
|
|
]
|
|
if isinstance(metadata, MetadataChangeEvent):
|
|
mcps = mcps_from_mce(metadata)
|
|
elif isinstance(
|
|
metadata, (MetadataChangeProposal, MetadataChangeProposalWrapper)
|
|
):
|
|
mcps = [metadata]
|
|
else:
|
|
raise Exception(
|
|
f"Unexpected metadata type {type(metadata)}. Was expecting MCE, MCP or MCPW"
|
|
)
|
|
|
|
for mcp in mcps:
|
|
assert mcp.entityUrn
|
|
assert mcp.aspectName
|
|
assert mcp.aspect
|
|
if mcp.entityUrn not in self.entity_graph:
|
|
self.entity_graph[mcp.entityUrn] = {}
|
|
self.entity_graph[mcp.entityUrn][mcp.aspectName] = mcp.aspect
|
|
|
|
def get_aspect(
|
|
self, entity_urn: str, aspect_type: Type[Aspect], version: int = 0
|
|
) -> Optional[Aspect]:
|
|
aspect_name = [v for v in ASPECT_NAME_MAP if ASPECT_NAME_MAP[v] == aspect_type][
|
|
0
|
|
]
|
|
result = self.entity_graph.get(entity_urn, {}).get(aspect_name, None)
|
|
if result is not None and isinstance(result, dict):
|
|
return aspect_type.from_obj(result)
|
|
else:
|
|
return result
|
|
|
|
def get_domain_urn_by_name(self, domain_name: str) -> Optional[str]:
|
|
domain_metadata = {
|
|
urn: metadata
|
|
for urn, metadata in self.entity_graph.items()
|
|
if urn.startswith("urn:li:domain:")
|
|
}
|
|
domain_properties_metadata = {
|
|
urn: metadata["domainProperties"].name
|
|
for urn, metadata in domain_metadata.items()
|
|
if "domainProperties" in metadata
|
|
and isinstance(metadata["domainProperties"], DomainPropertiesClass)
|
|
}
|
|
urn_match = [
|
|
urn
|
|
for urn, name in domain_properties_metadata.items()
|
|
if name == domain_name
|
|
]
|
|
if urn_match:
|
|
return urn_match[0]
|
|
else:
|
|
return None
|
|
|
|
def emit(
|
|
self,
|
|
item: Union[
|
|
MetadataChangeEvent,
|
|
MetadataChangeProposal,
|
|
MetadataChangeProposalWrapper,
|
|
UsageAggregationClass,
|
|
],
|
|
callback: Union[Callable[[Exception, str], None], None] = None,
|
|
async_flag: Optional[bool] = None,
|
|
) -> None:
|
|
self.emitted.append(item) # type: ignore
|
|
|
|
def emit_mce(self, mce: MetadataChangeEvent) -> None:
|
|
self.emitted.append(mce)
|
|
|
|
def emit_mcp(
|
|
self,
|
|
mcp: Union[MetadataChangeProposal, MetadataChangeProposalWrapper],
|
|
async_flag: Optional[bool] = None,
|
|
trace_flag: Optional[bool] = None,
|
|
trace_timeout: Optional[timedelta] = timedelta(seconds=3600),
|
|
) -> None:
|
|
self.emitted.append(mcp)
|
|
|
|
def get_entities(
|
|
self,
|
|
entity_name: str,
|
|
urns: List[str],
|
|
aspects: Optional[List[str]] = None,
|
|
with_system_metadata: bool = False,
|
|
) -> Dict[str, Dict[str, Tuple[_Aspect, Optional[SystemMetadataClass]]]]:
|
|
result: Dict[str, Dict[str, Tuple[_Aspect, Optional[SystemMetadataClass]]]] = {}
|
|
for urn, entity in self.entity_graph.items():
|
|
if urn not in urns:
|
|
continue
|
|
if urn not in result:
|
|
result[urn] = {}
|
|
for aspect_name, aspect in entity.items():
|
|
if aspects and aspect_name not in aspects:
|
|
continue
|
|
# Mock implementation always returns None for system metadata
|
|
system_metadata = None
|
|
result[urn][aspect_name] = (aspect, system_metadata)
|
|
return result
|
|
|
|
def get_emitted(
|
|
self,
|
|
) -> List[
|
|
Union[
|
|
MetadataChangeEvent, MetadataChangeProposal, MetadataChangeProposalWrapper
|
|
]
|
|
]:
|
|
return self.emitted
|
|
|
|
def sink_to_file(self, file: Path) -> None:
|
|
write_metadata_file(file, self.emitted)
|