feat(ingest): infer aspectName from aspect type in MCP (#5566)

This commit is contained in:
Harshal Sheth 2022-08-07 14:52:58 +00:00 committed by GitHub
parent c7f477813c
commit 9790f3cefa
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
12 changed files with 142 additions and 56 deletions

View File

@ -2,21 +2,20 @@ import json
import types
import unittest.mock
from pathlib import Path
from typing import Dict, Iterable, List, Union
from typing import Any, Dict, Iterable, List, Union
import avro.schema
import click
from avrogen import write_schema_files
def load_schema_file(schema_file: str) -> str:
def load_schema_file(schema_file: Union[str, Path]) -> dict:
raw_schema_text = Path(schema_file).read_text()
return json.dumps(json.loads(raw_schema_text), indent=2)
return json.loads(raw_schema_text)
def merge_schemas(schemas: List[str]) -> str:
def merge_schemas(schemas_obj: List[Any]) -> str:
# Combine schemas.
schemas_obj = [json.loads(schema) for schema in schemas]
merged = ["null"] + schemas_obj
# Patch add_name method to NOT complain about duplicate names
@ -114,11 +113,85 @@ def make_load_schema_methods(schemas: Iterable[str]) -> str:
)
def annotate_aspects(aspects: List[dict], schema_class_file: Path) -> None:
schema_classes_lines = schema_class_file.read_text().splitlines()
line_lookup_table = {line: i for i, line in enumerate(schema_classes_lines)}
# Create the Aspect class.
# We ensure that it cannot be instantiated directly, as
# per https://stackoverflow.com/a/7989101/5004662.
schema_classes_lines[
line_lookup_table["__SCHEMAS: Dict[str, RecordSchema] = {}"]
] += """
class _Aspect(DictWrapper):
ASPECT_NAME: str = None # type: ignore
def __init__(self):
if type(self) is _Aspect:
raise TypeError("_Aspect is an abstract class, and cannot be instantiated directly.")
super().__init__()
@classmethod
def get_aspect_name(cls) -> str:
return cls.ASPECT_NAME # type: ignore
"""
for aspect in aspects:
className = f'{aspect["name"]}Class'
aspectName = aspect["Aspect"]["name"]
class_def_original = f"class {className}(DictWrapper):"
# Make the aspects inherit from the Aspect class.
class_def_line = line_lookup_table[class_def_original]
schema_classes_lines[class_def_line] = f"class {className}(_Aspect):"
# Define the ASPECT_NAME class attribute.
# There's always an empty line between the docstring and the RECORD_SCHEMA class attribute.
# We need to find it and insert our line of code there.
empty_line = class_def_line + 1
while not (
schema_classes_lines[empty_line].strip() == ""
and schema_classes_lines[empty_line + 1]
.strip()
.startswith("RECORD_SCHEMA = ")
):
empty_line += 1
schema_classes_lines[empty_line] = f"\n ASPECT_NAME = '{aspectName}'"
schema_class_file.write_text("\n".join(schema_classes_lines))
@click.command()
@click.argument("schema_files", type=click.Path(exists=True), nargs=-1, required=True)
@click.argument(
"schemas_path", type=click.Path(exists=True, file_okay=False), required=True
)
@click.argument("outdir", type=click.Path(), required=True)
def generate(schema_files: List[str], outdir: str) -> None:
schemas: Dict[str, str] = {}
def generate(schemas_path: str, outdir: str) -> None:
required_schema_files = {
"mxe/MetadataChangeEvent.avsc",
"mxe/MetadataChangeProposal.avsc",
"usage/UsageAggregation.avsc",
"mxe/MetadataChangeLog.avsc",
"mxe/PlatformEvent.avsc",
"platform/event/v1/EntityChangeEvent.avsc",
}
# Find all the aspect schemas / other important schemas.
aspect_file_stems: List[str] = []
schema_files: List[Path] = []
for schema_file in Path(schemas_path).glob("**/*.avsc"):
relative_path = schema_file.relative_to(schemas_path).as_posix()
if relative_path in required_schema_files:
schema_files.append(schema_file)
required_schema_files.remove(relative_path)
elif load_schema_file(schema_file).get("Aspect"):
aspect_file_stems.append(schema_file.stem)
schema_files.append(schema_file)
assert not required_schema_files, f"Schema files not found: {required_schema_files}"
schemas: Dict[str, dict] = {}
for schema_file in schema_files:
schema = load_schema_file(schema_file)
schemas[Path(schema_file).stem] = schema
@ -130,12 +203,18 @@ def generate(schema_files: List[str], outdir: str) -> None:
# Schema files post-processing.
(Path(outdir) / "__init__.py").write_text("# This file is intentionally empty.\n")
add_avro_python3_warning(Path(outdir) / "schema_classes.py")
annotate_aspects(
[schemas[aspect_file_stem] for aspect_file_stem in aspect_file_stems],
Path(outdir) / "schema_classes.py",
)
# Save raw schema files in codegen as well.
schema_save_dir = Path(outdir) / "schemas"
schema_save_dir.mkdir()
for schema_out_file, schema in schemas.items():
(schema_save_dir / f"{schema_out_file}.avsc").write_text(schema)
(schema_save_dir / f"{schema_out_file}.avsc").write_text(
json.dumps(schema, indent=2)
)
# Add load_schema method.
with open(schema_save_dir / "__init__.py", "a") as schema_dir_init:

View File

@ -6,24 +6,6 @@ OUTDIR=./src/datahub/metadata
# Note: this assumes that datahub has already been built with `./gradlew build`.
DATAHUB_ROOT=..
SCHEMAS_ROOT="$DATAHUB_ROOT/metadata-events/mxe-schemas/src/renamed/avro/com/linkedin"
FILES="$SCHEMAS_ROOT/mxe/MetadataChangeEvent.avsc $SCHEMAS_ROOT/mxe/MetadataChangeProposal.avsc $SCHEMAS_ROOT/usage/UsageAggregation.avsc $SCHEMAS_ROOT/mxe/MetadataChangeLog.avsc $SCHEMAS_ROOT/mxe/PlatformEvent.avsc $SCHEMAS_ROOT/platform/event/v1/EntityChangeEvent.avsc"
# Since we depend on jq, check if jq is installed
if ! which jq > /dev/null; then
echo "jq is not installed. Please install jq and rerun (https://stedolan.github.io/jq/)"
exit 1
fi
find $SCHEMAS_ROOT -name "*.avsc" | sort | while read file
do
# Add all other files that are aspects but not included in the above
if (jq '.Aspect' -e $file > /dev/null)
then
FILES="${FILES} ${file}"
fi
echo $FILES > /tmp/codegen_files.txt
done
FILES=$(cat /tmp/codegen_files.txt)
rm -r $OUTDIR || true
python scripts/avro_codegen.py $FILES $OUTDIR
rm -r $OUTDIR 2>/dev/null || true
python scripts/avro_codegen.py $SCHEMAS_ROOT $OUTDIR

View File

@ -10,7 +10,6 @@ from typing import Any, Dict, Iterable, List, Optional, Tuple, Type, Union
import click
import requests
import yaml
from avrogen.dict_wrapper import DictWrapper
from pydantic import BaseModel, ValidationError
from requests.models import Response
from requests.sessions import Session
@ -54,6 +53,7 @@ from datahub.metadata.schema_classes import (
SubTypesClass,
UpstreamLineageClass,
ViewPropertiesClass,
_Aspect,
)
from datahub.utilities.urns.urn import Urn
@ -712,7 +712,7 @@ def get_aspects_for_entity(
aspects: List[str] = [],
typed: bool = False,
cached_session_host: Optional[Tuple[Session, str]] = None,
) -> Dict[str, Union[dict, DictWrapper]]:
) -> Dict[str, Union[dict, _Aspect]]:
# Process non-timeseries aspects
non_timeseries_aspects: List[str] = [
a for a in aspects if a not in timeseries_class_to_aspect_name_map.values()
@ -745,7 +745,7 @@ def get_aspects_for_entity(
aspect_cls.RECORD_SCHEMA.fullname.replace("pegasus2avro.", "")
] = aspect_value
aspect_map: Dict[str, Union[dict, DictWrapper]] = {}
aspect_map: Dict[str, Union[dict, _Aspect]] = {}
for a in aspect_list.values():
aspect_name = a["name"]
aspect_py_class: Optional[Type[Any]] = _get_pydantic_class_from_aspect_name(

View File

@ -5,6 +5,7 @@ from typing import Dict, Iterable, List
from avrogen.dict_wrapper import DictWrapper
from datahub.cli import cli_utils
from datahub.emitter.mce_builder import Aspect
from datahub.emitter.mcp import MetadataChangeProposalWrapper
from datahub.metadata.schema_classes import (
ChangeTypeClass,
@ -212,11 +213,11 @@ class UrnListModifier:
def modify_urn_list_for_aspect(
aspect_name: str,
aspect: DictWrapper,
aspect: Aspect,
relationship_type: str,
old_urn: str,
new_urn: str,
) -> DictWrapper:
) -> Aspect:
if hasattr(UrnListModifier, f"{aspect_name}_modifier"):
modifier = getattr(UrnListModifier, f"{aspect_name}_modifier")

View File

@ -9,7 +9,6 @@ from hashlib import md5
from typing import Any, List, Optional, Type, TypeVar, Union, cast, get_type_hints
import typing_inspect
from avrogen.dict_wrapper import DictWrapper
from datahub.configuration.source_common import DEFAULT_ENV as DEFAULT_ENV_CONFIGURATION
from datahub.emitter.serialization_helper import pre_json_transform
@ -33,9 +32,11 @@ from datahub.metadata.schema_classes import (
UpstreamClass,
UpstreamLineageClass,
)
from datahub.metadata.schema_classes import _Aspect as AspectAbstract
from datahub.utilities.urns.dataset_urn import DatasetUrn
logger = logging.getLogger(__name__)
Aspect = TypeVar("Aspect", bound=AspectAbstract)
DEFAULT_ENV = DEFAULT_ENV_CONFIGURATION
DEFAULT_FLOW_CLUSTER = "prod"
@ -290,10 +291,6 @@ def make_lineage_mce(
return mce
# This bound isn't tight, but it's better than nothing.
Aspect = TypeVar("Aspect", bound=DictWrapper)
def can_add_aspect(mce: MetadataChangeEventClass, AspectType: Type[Aspect]) -> bool:
SnapshotType = type(mce.proposedSnapshot)

View File

@ -10,6 +10,7 @@ from datahub.metadata.schema_classes import (
KafkaAuditHeaderClass,
MetadataChangeProposalClass,
SystemMetadataClass,
_Aspect,
)
@ -23,15 +24,32 @@ def _make_generic_aspect(codegen_obj: DictWrapper) -> GenericAspectClass:
@dataclasses.dataclass
class MetadataChangeProposalWrapper:
# TODO: remove manually aspectName from the codebase
# TODO: (after) remove aspectName field from this class
# TODO: infer entityType from entityUrn
# TODO: set changeType's default to UPSERT
entityType: str
changeType: Union[str, ChangeTypeClass]
entityUrn: Union[None, str] = None
entityKeyAspect: Union[None, DictWrapper] = None
entityKeyAspect: Union[None, _Aspect] = None
auditHeader: Union[None, KafkaAuditHeaderClass] = None
aspectName: Union[None, str] = None
aspect: Union[None, DictWrapper] = None
aspect: Union[None, _Aspect] = None
systemMetadata: Union[None, SystemMetadataClass] = None
def __post_init__(self) -> None:
if not self.aspectName and self.aspect:
self.aspectName = self.aspect.get_aspect_name()
elif (
self.aspectName
and self.aspect
and self.aspectName != self.aspect.get_aspect_name()
):
raise ValueError(
f"aspectName {self.aspectName} does not match aspect type {type(self.aspect)} with name {self.aspect.get_aspect_name()}"
)
def make_mcp(self) -> MetadataChangeProposalClass:
serializedEntityKeyAspect: Union[None, GenericAspectClass] = None
if isinstance(self.entityKeyAspect, DictWrapper):

View File

@ -85,7 +85,6 @@ def add_domain_to_entity_wu(
entityType=entity_type,
changeType=ChangeTypeClass.UPSERT,
entityUrn=f"{entity_urn}",
aspectName="domains",
aspect=DomainsClass(domains=[domain_urn]),
)
wu = MetadataWorkUnit(id=f"{domain_urn}-to-{entity_urn}", mcp=mcp)
@ -99,7 +98,6 @@ def add_owner_to_entity_wu(
entityType=entity_type,
changeType=ChangeTypeClass.UPSERT,
entityUrn=f"{entity_urn}",
aspectName="ownership",
aspect=OwnershipClass(
owners=[
OwnerClass(
@ -120,7 +118,6 @@ def add_tags_to_entity_wu(
entityType=entity_type,
changeType=ChangeTypeClass.UPSERT,
entityUrn=f"{entity_urn}",
aspectName="globalTags",
aspect=GlobalTagsClass(
tags=[TagAssociationClass(f"urn:li:tag:{tag}") for tag in tags]
),
@ -148,7 +145,6 @@ def gen_containers(
changeType=ChangeTypeClass.UPSERT,
entityUrn=f"{container_urn}",
# entityKeyAspect=ContainerKeyClass(guid=schema_container_key.guid()),
aspectName="containerProperties",
aspect=ContainerProperties(
name=name,
description=description,
@ -164,7 +160,6 @@ def gen_containers(
changeType=ChangeTypeClass.UPSERT,
entityUrn=f"{container_urn}",
# entityKeyAspect=ContainerKeyClass(guid=schema_container_key.guid()),
aspectName="dataPlatformInstance",
aspect=DataPlatformInstance(
platform=f"{make_data_platform_urn(container_key.platform)}",
),
@ -180,7 +175,6 @@ def gen_containers(
changeType=ChangeTypeClass.UPSERT,
entityUrn=f"{container_urn}",
# entityKeyAspect=ContainerKeyClass(guid=schema_container_key.guid()),
aspectName="subTypes",
aspect=SubTypesClass(typeNames=sub_types),
)
wu = MetadataWorkUnit(
@ -220,7 +214,6 @@ def gen_containers(
changeType=ChangeTypeClass.UPSERT,
entityUrn=f"{container_urn}",
# entityKeyAspect=ContainerKeyClass(guid=schema_container_key.guid()),
aspectName="container",
aspect=ContainerClass(container=parent_container_urn),
# aspect=ContainerKeyClass(guid=database_container_key.guid())
)
@ -245,7 +238,6 @@ def add_dataset_to_container(
entityType="dataset",
changeType=ChangeTypeClass.UPSERT,
entityUrn=f"{dataset_urn}",
aspectName="container",
aspect=ContainerClass(container=f"{container_urn}"),
# aspect=ContainerKeyClass(guid=schema_container_key.guid())
)
@ -263,7 +255,6 @@ def add_entity_to_container(
entityType=entity_type,
changeType=ChangeTypeClass.UPSERT,
entityUrn=entity_urn,
aspectName="container",
aspect=ContainerClass(container=f"{container_urn}"),
)
wu = MetadataWorkUnit(id=f"container-{container_urn}-to-{entity_urn}", mcp=mcp)

View File

@ -38,7 +38,6 @@ from datahub.metadata.schema_classes import (
DatasetProfileClass,
DatasetPropertiesClass,
DateTypeClass,
DictWrapper,
EnumTypeClass,
ForeignKeyConstraintClass,
GlobalTagsClass,
@ -711,7 +710,7 @@ class SalesforceSource(Source):
)
def wrap_aspect_as_workunit(
self, entityName: str, entityUrn: str, aspectName: str, aspect: DictWrapper
self, entityName: str, entityUrn: str, aspectName: str, aspect: builder.Aspect
) -> WorkUnit:
wu = MetadataWorkUnit(
id=f"{aspectName}-for-{entityUrn}",

View File

@ -1,6 +1,6 @@
import logging
from abc import ABCMeta, abstractmethod
from typing import Any, Dict, Iterable, List, Optional, Type, Union
from typing import Any, Dict, Iterable, List, Optional, Type, Union, cast
import datahub.emitter.mce_builder
from datahub.emitter.mce_builder import Aspect
@ -30,6 +30,7 @@ from datahub.metadata.schema_classes import (
StatusClass,
UpstreamLineageClass,
ViewPropertiesClass,
_Aspect,
)
from datahub.utilities.urns.urn import Urn
@ -217,7 +218,7 @@ class BaseTransformer(Transformer, metaclass=ABCMeta):
transformed_aspect = self.transform_aspect(
entity_urn=envelope.record.entityUrn,
aspect_name=envelope.record.aspectName,
aspect=envelope.record.aspect,
aspect=cast(_Aspect, envelope.record.aspect),
)
self._mark_processed(envelope.record.entityUrn)
if transformed_aspect is None:

View File

@ -12,7 +12,11 @@ from datahub.cli.json_file import check_mce_file
from datahub.emitter import mce_builder
from datahub.ingestion.run.pipeline import Pipeline
from datahub.ingestion.source.file import iterate_mce_file
from datahub.metadata.schema_classes import MetadataChangeEventClass
from datahub.metadata.schema_classes import (
MetadataChangeEventClass,
OwnershipClass,
_Aspect,
)
from datahub.metadata.schemas import getMetadataChangeEventSchema
from tests.test_helpers import mce_helpers
from tests.test_helpers.click_helpers import run_datahub_cmd
@ -21,6 +25,18 @@ from tests.test_helpers.type_helpers import PytestConfig
FROZEN_TIME = "2021-07-22 18:54:06"
def test_codegen_aspect_name():
assert issubclass(OwnershipClass, _Aspect)
assert OwnershipClass.ASPECT_NAME == "ownership"
assert OwnershipClass.get_aspect_name() == "ownership"
def test_cannot_instantiated_codegen_aspect():
with pytest.raises(TypeError, match="instantiate"):
_Aspect()
@freeze_time(FROZEN_TIME)
@pytest.mark.parametrize(
"json_filename",

View File

@ -50,6 +50,7 @@ def create_mocked_csv_enricher_source() -> CSVEnricherSource:
["oldtag1", "oldtag2"]
)
graph.get_aspect_v2.return_value = None
graph.get_domain.return_value = None
ctx.graph = graph
return CSVEnricherSource(
CSVEnricherConfig(**create_base_csv_enricher_config()), ctx

View File

@ -1392,7 +1392,8 @@ def test_old_transformers_working_as_before(mock_time):
dataset_mcps = [
make_generic_dataset_mcp(),
make_generic_dataset_mcp(
aspect=DatasetPropertiesClass(description="Another test MCP")
aspect_name="datasetProperties",
aspect=DatasetPropertiesClass(description="Another test MCP"),
),
EndOfStream(),
]