166 lines
6.0 KiB
Python

from datetime import timedelta
from pathlib import Path
from typing import Any, Callable, Dict, Iterable, List, Optional, Tuple, Type, Union
from datahub._codegen.aspect import _Aspect
from datahub.emitter.mce_builder import Aspect
from datahub.emitter.mcp import MetadataChangeProposalWrapper
from datahub.emitter.mcp_builder import mcps_from_mce
from datahub.emitter.rest_emitter import EmitMode
from datahub.ingestion.api.common import PipelineContext
from datahub.ingestion.api.workunit import MetadataWorkUnit
from datahub.ingestion.graph.client import DataHubGraph
from datahub.ingestion.sink.file import write_metadata_file
from datahub.ingestion.source.file import FileSourceConfig, GenericFileSource
from datahub.metadata.com.linkedin.pegasus2avro.mxe import (
MetadataChangeEvent,
MetadataChangeProposal,
)
from datahub.metadata.schema_classes import (
ASPECT_NAME_MAP,
DomainPropertiesClass,
SystemMetadataClass,
UsageAggregationClass,
)
class MockDataHubGraph(DataHubGraph):
def __init__(
self, entity_graph: Optional[Dict[str, Dict[str, Any]]] = None
) -> None:
self.emitted: List[
Union[
MetadataChangeEvent,
MetadataChangeProposal,
MetadataChangeProposalWrapper,
]
] = []
self.entity_graph = entity_graph or {}
def import_file(self, file: Path) -> None:
"""Imports metadata from any MCE/MCP file. Does not clear prior loaded data.
This function can be called repeatedly on the same
Mock instance to load up metadata from multiple files."""
file_source: GenericFileSource = GenericFileSource(
ctx=PipelineContext(run_id="test"), config=FileSourceConfig(path=str(file))
)
for wu in file_source.get_workunits():
if isinstance(wu, MetadataWorkUnit):
metadata = wu.metadata
mcps: Iterable[
Union[
MetadataChangeProposal,
MetadataChangeProposalWrapper,
]
]
if isinstance(metadata, MetadataChangeEvent):
mcps = mcps_from_mce(metadata)
elif isinstance(
metadata, (MetadataChangeProposal, MetadataChangeProposalWrapper)
):
mcps = [metadata]
else:
raise Exception(
f"Unexpected metadata type {type(metadata)}. Was expecting MCE, MCP or MCPW"
)
for mcp in mcps:
assert mcp.entityUrn
assert mcp.aspectName
assert mcp.aspect
if mcp.entityUrn not in self.entity_graph:
self.entity_graph[mcp.entityUrn] = {}
self.entity_graph[mcp.entityUrn][mcp.aspectName] = mcp.aspect
def get_aspect(
self, entity_urn: str, aspect_type: Type[Aspect], version: int = 0
) -> Optional[Aspect]:
aspect_name = [v for v in ASPECT_NAME_MAP if ASPECT_NAME_MAP[v] == aspect_type][
0
]
result = self.entity_graph.get(entity_urn, {}).get(aspect_name, None)
if result is not None and isinstance(result, dict):
return aspect_type.from_obj(result)
else:
return result
def get_domain_urn_by_name(self, domain_name: str) -> Optional[str]:
domain_metadata = {
urn: metadata
for urn, metadata in self.entity_graph.items()
if urn.startswith("urn:li:domain:")
}
domain_properties_metadata = {
urn: metadata["domainProperties"].name
for urn, metadata in domain_metadata.items()
if "domainProperties" in metadata
and isinstance(metadata["domainProperties"], DomainPropertiesClass)
}
urn_match = [
urn
for urn, name in domain_properties_metadata.items()
if name == domain_name
]
if urn_match:
return urn_match[0]
else:
return None
def emit(
self,
item: Union[
MetadataChangeEvent,
MetadataChangeProposal,
MetadataChangeProposalWrapper,
UsageAggregationClass,
],
callback: Union[Callable[[Exception, str], None], None] = None,
emit_mode: EmitMode = EmitMode.ASYNC,
) -> None:
self.emitted.append(item) # type: ignore
def emit_mce(self, mce: MetadataChangeEvent) -> None:
self.emitted.append(mce)
def emit_mcp(
self,
mcp: Union[MetadataChangeProposal, MetadataChangeProposalWrapper],
async_flag: Optional[bool] = None,
emit_mode: EmitMode = EmitMode.ASYNC,
wait_timeout: Optional[timedelta] = timedelta(seconds=3600),
) -> None:
self.emitted.append(mcp)
def get_entities(
self,
entity_name: str,
urns: List[str],
aspects: Optional[List[str]] = None,
with_system_metadata: bool = False,
) -> Dict[str, Dict[str, Tuple[_Aspect, Optional[SystemMetadataClass]]]]:
result: Dict[str, Dict[str, Tuple[_Aspect, Optional[SystemMetadataClass]]]] = {}
for urn, entity in self.entity_graph.items():
if urn not in urns:
continue
if urn not in result:
result[urn] = {}
for aspect_name, aspect in entity.items():
if aspects and aspect_name not in aspects:
continue
# Mock implementation always returns None for system metadata
system_metadata = None
result[urn][aspect_name] = (aspect, system_metadata)
return result
def get_emitted(
self,
) -> List[
Union[
MetadataChangeEvent, MetadataChangeProposal, MetadataChangeProposalWrapper
]
]:
return self.emitted
def sink_to_file(self, file: Path) -> None:
write_metadata_file(file, self.emitted)