166 lines
6.0 KiB
Python
Raw Permalink Normal View History

from datetime import timedelta
from pathlib import Path
from typing import Any, Callable, Dict, Iterable, List, Optional, Tuple, Type, Union
from datahub._codegen.aspect import _Aspect
from datahub.emitter.mce_builder import Aspect
from datahub.emitter.mcp import MetadataChangeProposalWrapper
from datahub.emitter.mcp_builder import mcps_from_mce
from datahub.emitter.rest_emitter import EmitMode
from datahub.ingestion.api.common import PipelineContext
from datahub.ingestion.api.workunit import MetadataWorkUnit
from datahub.ingestion.graph.client import DataHubGraph
from datahub.ingestion.sink.file import write_metadata_file
from datahub.ingestion.source.file import FileSourceConfig, GenericFileSource
from datahub.metadata.com.linkedin.pegasus2avro.mxe import (
MetadataChangeEvent,
MetadataChangeProposal,
)
2023-05-17 08:17:52 -07:00
from datahub.metadata.schema_classes import (
ASPECT_NAME_MAP,
DomainPropertiesClass,
SystemMetadataClass,
2023-05-17 08:17:52 -07:00
UsageAggregationClass,
)
class MockDataHubGraph(DataHubGraph):
def __init__(
self, entity_graph: Optional[Dict[str, Dict[str, Any]]] = None
) -> None:
self.emitted: List[
Union[
MetadataChangeEvent,
MetadataChangeProposal,
MetadataChangeProposalWrapper,
]
] = []
self.entity_graph = entity_graph or {}
def import_file(self, file: Path) -> None:
"""Imports metadata from any MCE/MCP file. Does not clear prior loaded data.
This function can be called repeatedly on the same
Mock instance to load up metadata from multiple files."""
file_source: GenericFileSource = GenericFileSource(
ctx=PipelineContext(run_id="test"), config=FileSourceConfig(path=str(file))
)
for wu in file_source.get_workunits():
if isinstance(wu, MetadataWorkUnit):
metadata = wu.metadata
mcps: Iterable[
Union[
MetadataChangeProposal,
MetadataChangeProposalWrapper,
]
]
if isinstance(metadata, MetadataChangeEvent):
mcps = mcps_from_mce(metadata)
elif isinstance(
metadata, (MetadataChangeProposal, MetadataChangeProposalWrapper)
):
mcps = [metadata]
else:
raise Exception(
f"Unexpected metadata type {type(metadata)}. Was expecting MCE, MCP or MCPW"
)
for mcp in mcps:
assert mcp.entityUrn
assert mcp.aspectName
assert mcp.aspect
if mcp.entityUrn not in self.entity_graph:
self.entity_graph[mcp.entityUrn] = {}
self.entity_graph[mcp.entityUrn][mcp.aspectName] = mcp.aspect
def get_aspect(
self, entity_urn: str, aspect_type: Type[Aspect], version: int = 0
) -> Optional[Aspect]:
aspect_name = [v for v in ASPECT_NAME_MAP if ASPECT_NAME_MAP[v] == aspect_type][
0
]
result = self.entity_graph.get(entity_urn, {}).get(aspect_name, None)
if result is not None and isinstance(result, dict):
return aspect_type.from_obj(result)
else:
return result
def get_domain_urn_by_name(self, domain_name: str) -> Optional[str]:
domain_metadata = {
urn: metadata
for urn, metadata in self.entity_graph.items()
if urn.startswith("urn:li:domain:")
}
domain_properties_metadata = {
urn: metadata["domainProperties"].name
for urn, metadata in domain_metadata.items()
if "domainProperties" in metadata
and isinstance(metadata["domainProperties"], DomainPropertiesClass)
}
urn_match = [
urn
for urn, name in domain_properties_metadata.items()
if name == domain_name
]
if urn_match:
return urn_match[0]
else:
return None
def emit(
self,
item: Union[
MetadataChangeEvent,
MetadataChangeProposal,
MetadataChangeProposalWrapper,
2023-05-17 08:17:52 -07:00
UsageAggregationClass,
],
callback: Union[Callable[[Exception, str], None], None] = None,
emit_mode: EmitMode = EmitMode.ASYNC,
) -> None:
2023-05-17 08:17:52 -07:00
self.emitted.append(item) # type: ignore
def emit_mce(self, mce: MetadataChangeEvent) -> None:
self.emitted.append(mce)
def emit_mcp(
self,
mcp: Union[MetadataChangeProposal, MetadataChangeProposalWrapper],
async_flag: Optional[bool] = None,
emit_mode: EmitMode = EmitMode.ASYNC,
wait_timeout: Optional[timedelta] = timedelta(seconds=3600),
) -> None:
self.emitted.append(mcp)
def get_entities(
self,
entity_name: str,
urns: List[str],
aspects: Optional[List[str]] = None,
with_system_metadata: bool = False,
) -> Dict[str, Dict[str, Tuple[_Aspect, Optional[SystemMetadataClass]]]]:
result: Dict[str, Dict[str, Tuple[_Aspect, Optional[SystemMetadataClass]]]] = {}
for urn, entity in self.entity_graph.items():
if urn not in urns:
continue
if urn not in result:
result[urn] = {}
for aspect_name, aspect in entity.items():
if aspects and aspect_name not in aspects:
continue
# Mock implementation always returns None for system metadata
system_metadata = None
result[urn][aspect_name] = (aspect, system_metadata)
return result
def get_emitted(
self,
) -> List[
Union[
2023-05-17 08:17:52 -07:00
MetadataChangeEvent, MetadataChangeProposal, MetadataChangeProposalWrapper
]
]:
return self.emitted
def sink_to_file(self, file: Path) -> None:
write_metadata_file(file, self.emitted)