diff --git a/metadata-ingestion/scripts/avro_codegen.py b/metadata-ingestion/scripts/avro_codegen.py index 0a6b1b730a..d9b335fdc8 100644 --- a/metadata-ingestion/scripts/avro_codegen.py +++ b/metadata-ingestion/scripts/avro_codegen.py @@ -2,21 +2,20 @@ import json import types import unittest.mock from pathlib import Path -from typing import Dict, Iterable, List, Union +from typing import Any, Dict, Iterable, List, Union import avro.schema import click from avrogen import write_schema_files -def load_schema_file(schema_file: str) -> str: +def load_schema_file(schema_file: Union[str, Path]) -> dict: raw_schema_text = Path(schema_file).read_text() - return json.dumps(json.loads(raw_schema_text), indent=2) + return json.loads(raw_schema_text) -def merge_schemas(schemas: List[str]) -> str: +def merge_schemas(schemas_obj: List[Any]) -> str: # Combine schemas. - schemas_obj = [json.loads(schema) for schema in schemas] merged = ["null"] + schemas_obj # Patch add_name method to NOT complain about duplicate names @@ -114,11 +113,85 @@ def make_load_schema_methods(schemas: Iterable[str]) -> str: ) +def annotate_aspects(aspects: List[dict], schema_class_file: Path) -> None: + schema_classes_lines = schema_class_file.read_text().splitlines() + line_lookup_table = {line: i for i, line in enumerate(schema_classes_lines)} + + # Create the Aspect class. + # We ensure that it cannot be instantiated directly, as + # per https://stackoverflow.com/a/7989101/5004662. + schema_classes_lines[ + line_lookup_table["__SCHEMAS: Dict[str, RecordSchema] = {}"] + ] += """ + +class _Aspect(DictWrapper): + ASPECT_NAME: str = None # type: ignore + + def __init__(self): + if type(self) is _Aspect: + raise TypeError("_Aspect is an abstract class, and cannot be instantiated directly.") + super().__init__() + + @classmethod + def get_aspect_name(cls) -> str: + return cls.ASPECT_NAME # type: ignore +""" + + for aspect in aspects: + className = f'{aspect["name"]}Class' + aspectName = aspect["Aspect"]["name"] + class_def_original = f"class {className}(DictWrapper):" + + # Make the aspects inherit from the Aspect class. + class_def_line = line_lookup_table[class_def_original] + schema_classes_lines[class_def_line] = f"class {className}(_Aspect):" + + # Define the ASPECT_NAME class attribute. + # There's always an empty line between the docstring and the RECORD_SCHEMA class attribute. + # We need to find it and insert our line of code there. + empty_line = class_def_line + 1 + while not ( + schema_classes_lines[empty_line].strip() == "" + and schema_classes_lines[empty_line + 1] + .strip() + .startswith("RECORD_SCHEMA = ") + ): + empty_line += 1 + schema_classes_lines[empty_line] = f"\n ASPECT_NAME = '{aspectName}'" + + schema_class_file.write_text("\n".join(schema_classes_lines)) + + @click.command() -@click.argument("schema_files", type=click.Path(exists=True), nargs=-1, required=True) +@click.argument( + "schemas_path", type=click.Path(exists=True, file_okay=False), required=True +) @click.argument("outdir", type=click.Path(), required=True) -def generate(schema_files: List[str], outdir: str) -> None: - schemas: Dict[str, str] = {} +def generate(schemas_path: str, outdir: str) -> None: + required_schema_files = { + "mxe/MetadataChangeEvent.avsc", + "mxe/MetadataChangeProposal.avsc", + "usage/UsageAggregation.avsc", + "mxe/MetadataChangeLog.avsc", + "mxe/PlatformEvent.avsc", + "platform/event/v1/EntityChangeEvent.avsc", + } + + # Find all the aspect schemas / other important schemas. + aspect_file_stems: List[str] = [] + schema_files: List[Path] = [] + for schema_file in Path(schemas_path).glob("**/*.avsc"): + relative_path = schema_file.relative_to(schemas_path).as_posix() + if relative_path in required_schema_files: + schema_files.append(schema_file) + required_schema_files.remove(relative_path) + elif load_schema_file(schema_file).get("Aspect"): + aspect_file_stems.append(schema_file.stem) + schema_files.append(schema_file) + + assert not required_schema_files, f"Schema files not found: {required_schema_files}" + + schemas: Dict[str, dict] = {} for schema_file in schema_files: schema = load_schema_file(schema_file) schemas[Path(schema_file).stem] = schema @@ -130,12 +203,18 @@ def generate(schema_files: List[str], outdir: str) -> None: # Schema files post-processing. (Path(outdir) / "__init__.py").write_text("# This file is intentionally empty.\n") add_avro_python3_warning(Path(outdir) / "schema_classes.py") + annotate_aspects( + [schemas[aspect_file_stem] for aspect_file_stem in aspect_file_stems], + Path(outdir) / "schema_classes.py", + ) # Save raw schema files in codegen as well. schema_save_dir = Path(outdir) / "schemas" schema_save_dir.mkdir() for schema_out_file, schema in schemas.items(): - (schema_save_dir / f"{schema_out_file}.avsc").write_text(schema) + (schema_save_dir / f"{schema_out_file}.avsc").write_text( + json.dumps(schema, indent=2) + ) # Add load_schema method. with open(schema_save_dir / "__init__.py", "a") as schema_dir_init: diff --git a/metadata-ingestion/scripts/codegen.sh b/metadata-ingestion/scripts/codegen.sh index 41e933376f..5e41a0a2c9 100755 --- a/metadata-ingestion/scripts/codegen.sh +++ b/metadata-ingestion/scripts/codegen.sh @@ -6,24 +6,6 @@ OUTDIR=./src/datahub/metadata # Note: this assumes that datahub has already been built with `./gradlew build`. DATAHUB_ROOT=.. SCHEMAS_ROOT="$DATAHUB_ROOT/metadata-events/mxe-schemas/src/renamed/avro/com/linkedin" -FILES="$SCHEMAS_ROOT/mxe/MetadataChangeEvent.avsc $SCHEMAS_ROOT/mxe/MetadataChangeProposal.avsc $SCHEMAS_ROOT/usage/UsageAggregation.avsc $SCHEMAS_ROOT/mxe/MetadataChangeLog.avsc $SCHEMAS_ROOT/mxe/PlatformEvent.avsc $SCHEMAS_ROOT/platform/event/v1/EntityChangeEvent.avsc" -# Since we depend on jq, check if jq is installed -if ! which jq > /dev/null; then - echo "jq is not installed. Please install jq and rerun (https://stedolan.github.io/jq/)" - exit 1 -fi -find $SCHEMAS_ROOT -name "*.avsc" | sort | while read file -do -# Add all other files that are aspects but not included in the above - if (jq '.Aspect' -e $file > /dev/null) - then - FILES="${FILES} ${file}" - fi - echo $FILES > /tmp/codegen_files.txt -done - -FILES=$(cat /tmp/codegen_files.txt) - -rm -r $OUTDIR || true -python scripts/avro_codegen.py $FILES $OUTDIR +rm -r $OUTDIR 2>/dev/null || true +python scripts/avro_codegen.py $SCHEMAS_ROOT $OUTDIR diff --git a/metadata-ingestion/src/datahub/cli/cli_utils.py b/metadata-ingestion/src/datahub/cli/cli_utils.py index 8a82f24631..59f78167de 100644 --- a/metadata-ingestion/src/datahub/cli/cli_utils.py +++ b/metadata-ingestion/src/datahub/cli/cli_utils.py @@ -10,7 +10,6 @@ from typing import Any, Dict, Iterable, List, Optional, Tuple, Type, Union import click import requests import yaml -from avrogen.dict_wrapper import DictWrapper from pydantic import BaseModel, ValidationError from requests.models import Response from requests.sessions import Session @@ -54,6 +53,7 @@ from datahub.metadata.schema_classes import ( SubTypesClass, UpstreamLineageClass, ViewPropertiesClass, + _Aspect, ) from datahub.utilities.urns.urn import Urn @@ -712,7 +712,7 @@ def get_aspects_for_entity( aspects: List[str] = [], typed: bool = False, cached_session_host: Optional[Tuple[Session, str]] = None, -) -> Dict[str, Union[dict, DictWrapper]]: +) -> Dict[str, Union[dict, _Aspect]]: # Process non-timeseries aspects non_timeseries_aspects: List[str] = [ a for a in aspects if a not in timeseries_class_to_aspect_name_map.values() @@ -745,7 +745,7 @@ def get_aspects_for_entity( aspect_cls.RECORD_SCHEMA.fullname.replace("pegasus2avro.", "") ] = aspect_value - aspect_map: Dict[str, Union[dict, DictWrapper]] = {} + aspect_map: Dict[str, Union[dict, _Aspect]] = {} for a in aspect_list.values(): aspect_name = a["name"] aspect_py_class: Optional[Type[Any]] = _get_pydantic_class_from_aspect_name( diff --git a/metadata-ingestion/src/datahub/cli/migration_utils.py b/metadata-ingestion/src/datahub/cli/migration_utils.py index 79546e07ac..e40e19dbd3 100644 --- a/metadata-ingestion/src/datahub/cli/migration_utils.py +++ b/metadata-ingestion/src/datahub/cli/migration_utils.py @@ -5,6 +5,7 @@ from typing import Dict, Iterable, List from avrogen.dict_wrapper import DictWrapper from datahub.cli import cli_utils +from datahub.emitter.mce_builder import Aspect from datahub.emitter.mcp import MetadataChangeProposalWrapper from datahub.metadata.schema_classes import ( ChangeTypeClass, @@ -212,11 +213,11 @@ class UrnListModifier: def modify_urn_list_for_aspect( aspect_name: str, - aspect: DictWrapper, + aspect: Aspect, relationship_type: str, old_urn: str, new_urn: str, -) -> DictWrapper: +) -> Aspect: if hasattr(UrnListModifier, f"{aspect_name}_modifier"): modifier = getattr(UrnListModifier, f"{aspect_name}_modifier") diff --git a/metadata-ingestion/src/datahub/emitter/mce_builder.py b/metadata-ingestion/src/datahub/emitter/mce_builder.py index cdbde24ac4..3d8208f508 100644 --- a/metadata-ingestion/src/datahub/emitter/mce_builder.py +++ b/metadata-ingestion/src/datahub/emitter/mce_builder.py @@ -9,7 +9,6 @@ from hashlib import md5 from typing import Any, List, Optional, Type, TypeVar, Union, cast, get_type_hints import typing_inspect -from avrogen.dict_wrapper import DictWrapper from datahub.configuration.source_common import DEFAULT_ENV as DEFAULT_ENV_CONFIGURATION from datahub.emitter.serialization_helper import pre_json_transform @@ -33,9 +32,11 @@ from datahub.metadata.schema_classes import ( UpstreamClass, UpstreamLineageClass, ) +from datahub.metadata.schema_classes import _Aspect as AspectAbstract from datahub.utilities.urns.dataset_urn import DatasetUrn logger = logging.getLogger(__name__) +Aspect = TypeVar("Aspect", bound=AspectAbstract) DEFAULT_ENV = DEFAULT_ENV_CONFIGURATION DEFAULT_FLOW_CLUSTER = "prod" @@ -290,10 +291,6 @@ def make_lineage_mce( return mce -# This bound isn't tight, but it's better than nothing. -Aspect = TypeVar("Aspect", bound=DictWrapper) - - def can_add_aspect(mce: MetadataChangeEventClass, AspectType: Type[Aspect]) -> bool: SnapshotType = type(mce.proposedSnapshot) diff --git a/metadata-ingestion/src/datahub/emitter/mcp.py b/metadata-ingestion/src/datahub/emitter/mcp.py index 1d410616fb..f80609b114 100644 --- a/metadata-ingestion/src/datahub/emitter/mcp.py +++ b/metadata-ingestion/src/datahub/emitter/mcp.py @@ -10,6 +10,7 @@ from datahub.metadata.schema_classes import ( KafkaAuditHeaderClass, MetadataChangeProposalClass, SystemMetadataClass, + _Aspect, ) @@ -23,15 +24,32 @@ def _make_generic_aspect(codegen_obj: DictWrapper) -> GenericAspectClass: @dataclasses.dataclass class MetadataChangeProposalWrapper: + # TODO: remove manually aspectName from the codebase + # TODO: (after) remove aspectName field from this class + # TODO: infer entityType from entityUrn + # TODO: set changeType's default to UPSERT + entityType: str changeType: Union[str, ChangeTypeClass] entityUrn: Union[None, str] = None - entityKeyAspect: Union[None, DictWrapper] = None + entityKeyAspect: Union[None, _Aspect] = None auditHeader: Union[None, KafkaAuditHeaderClass] = None aspectName: Union[None, str] = None - aspect: Union[None, DictWrapper] = None + aspect: Union[None, _Aspect] = None systemMetadata: Union[None, SystemMetadataClass] = None + def __post_init__(self) -> None: + if not self.aspectName and self.aspect: + self.aspectName = self.aspect.get_aspect_name() + elif ( + self.aspectName + and self.aspect + and self.aspectName != self.aspect.get_aspect_name() + ): + raise ValueError( + f"aspectName {self.aspectName} does not match aspect type {type(self.aspect)} with name {self.aspect.get_aspect_name()}" + ) + def make_mcp(self) -> MetadataChangeProposalClass: serializedEntityKeyAspect: Union[None, GenericAspectClass] = None if isinstance(self.entityKeyAspect, DictWrapper): diff --git a/metadata-ingestion/src/datahub/emitter/mcp_builder.py b/metadata-ingestion/src/datahub/emitter/mcp_builder.py index 055db3c6a4..ecb9146a21 100644 --- a/metadata-ingestion/src/datahub/emitter/mcp_builder.py +++ b/metadata-ingestion/src/datahub/emitter/mcp_builder.py @@ -85,7 +85,6 @@ def add_domain_to_entity_wu( entityType=entity_type, changeType=ChangeTypeClass.UPSERT, entityUrn=f"{entity_urn}", - aspectName="domains", aspect=DomainsClass(domains=[domain_urn]), ) wu = MetadataWorkUnit(id=f"{domain_urn}-to-{entity_urn}", mcp=mcp) @@ -99,7 +98,6 @@ def add_owner_to_entity_wu( entityType=entity_type, changeType=ChangeTypeClass.UPSERT, entityUrn=f"{entity_urn}", - aspectName="ownership", aspect=OwnershipClass( owners=[ OwnerClass( @@ -120,7 +118,6 @@ def add_tags_to_entity_wu( entityType=entity_type, changeType=ChangeTypeClass.UPSERT, entityUrn=f"{entity_urn}", - aspectName="globalTags", aspect=GlobalTagsClass( tags=[TagAssociationClass(f"urn:li:tag:{tag}") for tag in tags] ), @@ -148,7 +145,6 @@ def gen_containers( changeType=ChangeTypeClass.UPSERT, entityUrn=f"{container_urn}", # entityKeyAspect=ContainerKeyClass(guid=schema_container_key.guid()), - aspectName="containerProperties", aspect=ContainerProperties( name=name, description=description, @@ -164,7 +160,6 @@ def gen_containers( changeType=ChangeTypeClass.UPSERT, entityUrn=f"{container_urn}", # entityKeyAspect=ContainerKeyClass(guid=schema_container_key.guid()), - aspectName="dataPlatformInstance", aspect=DataPlatformInstance( platform=f"{make_data_platform_urn(container_key.platform)}", ), @@ -180,7 +175,6 @@ def gen_containers( changeType=ChangeTypeClass.UPSERT, entityUrn=f"{container_urn}", # entityKeyAspect=ContainerKeyClass(guid=schema_container_key.guid()), - aspectName="subTypes", aspect=SubTypesClass(typeNames=sub_types), ) wu = MetadataWorkUnit( @@ -220,7 +214,6 @@ def gen_containers( changeType=ChangeTypeClass.UPSERT, entityUrn=f"{container_urn}", # entityKeyAspect=ContainerKeyClass(guid=schema_container_key.guid()), - aspectName="container", aspect=ContainerClass(container=parent_container_urn), # aspect=ContainerKeyClass(guid=database_container_key.guid()) ) @@ -245,7 +238,6 @@ def add_dataset_to_container( entityType="dataset", changeType=ChangeTypeClass.UPSERT, entityUrn=f"{dataset_urn}", - aspectName="container", aspect=ContainerClass(container=f"{container_urn}"), # aspect=ContainerKeyClass(guid=schema_container_key.guid()) ) @@ -263,7 +255,6 @@ def add_entity_to_container( entityType=entity_type, changeType=ChangeTypeClass.UPSERT, entityUrn=entity_urn, - aspectName="container", aspect=ContainerClass(container=f"{container_urn}"), ) wu = MetadataWorkUnit(id=f"container-{container_urn}-to-{entity_urn}", mcp=mcp) diff --git a/metadata-ingestion/src/datahub/ingestion/source/salesforce.py b/metadata-ingestion/src/datahub/ingestion/source/salesforce.py index 7afbbe33d1..cfbff4b018 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/salesforce.py +++ b/metadata-ingestion/src/datahub/ingestion/source/salesforce.py @@ -38,7 +38,6 @@ from datahub.metadata.schema_classes import ( DatasetProfileClass, DatasetPropertiesClass, DateTypeClass, - DictWrapper, EnumTypeClass, ForeignKeyConstraintClass, GlobalTagsClass, @@ -711,7 +710,7 @@ class SalesforceSource(Source): ) def wrap_aspect_as_workunit( - self, entityName: str, entityUrn: str, aspectName: str, aspect: DictWrapper + self, entityName: str, entityUrn: str, aspectName: str, aspect: builder.Aspect ) -> WorkUnit: wu = MetadataWorkUnit( id=f"{aspectName}-for-{entityUrn}", diff --git a/metadata-ingestion/src/datahub/ingestion/transformer/base_transformer.py b/metadata-ingestion/src/datahub/ingestion/transformer/base_transformer.py index a157d0d955..a2b1c13502 100644 --- a/metadata-ingestion/src/datahub/ingestion/transformer/base_transformer.py +++ b/metadata-ingestion/src/datahub/ingestion/transformer/base_transformer.py @@ -1,6 +1,6 @@ import logging from abc import ABCMeta, abstractmethod -from typing import Any, Dict, Iterable, List, Optional, Type, Union +from typing import Any, Dict, Iterable, List, Optional, Type, Union, cast import datahub.emitter.mce_builder from datahub.emitter.mce_builder import Aspect @@ -30,6 +30,7 @@ from datahub.metadata.schema_classes import ( StatusClass, UpstreamLineageClass, ViewPropertiesClass, + _Aspect, ) from datahub.utilities.urns.urn import Urn @@ -217,7 +218,7 @@ class BaseTransformer(Transformer, metaclass=ABCMeta): transformed_aspect = self.transform_aspect( entity_urn=envelope.record.entityUrn, aspect_name=envelope.record.aspectName, - aspect=envelope.record.aspect, + aspect=cast(_Aspect, envelope.record.aspect), ) self._mark_processed(envelope.record.entityUrn) if transformed_aspect is None: diff --git a/metadata-ingestion/tests/unit/serde/test_serde.py b/metadata-ingestion/tests/unit/serde/test_serde.py index 8e0b1ce747..e40ff55e93 100644 --- a/metadata-ingestion/tests/unit/serde/test_serde.py +++ b/metadata-ingestion/tests/unit/serde/test_serde.py @@ -12,7 +12,11 @@ from datahub.cli.json_file import check_mce_file from datahub.emitter import mce_builder from datahub.ingestion.run.pipeline import Pipeline from datahub.ingestion.source.file import iterate_mce_file -from datahub.metadata.schema_classes import MetadataChangeEventClass +from datahub.metadata.schema_classes import ( + MetadataChangeEventClass, + OwnershipClass, + _Aspect, +) from datahub.metadata.schemas import getMetadataChangeEventSchema from tests.test_helpers import mce_helpers from tests.test_helpers.click_helpers import run_datahub_cmd @@ -21,6 +25,18 @@ from tests.test_helpers.type_helpers import PytestConfig FROZEN_TIME = "2021-07-22 18:54:06" +def test_codegen_aspect_name(): + assert issubclass(OwnershipClass, _Aspect) + + assert OwnershipClass.ASPECT_NAME == "ownership" + assert OwnershipClass.get_aspect_name() == "ownership" + + +def test_cannot_instantiated_codegen_aspect(): + with pytest.raises(TypeError, match="instantiate"): + _Aspect() + + @freeze_time(FROZEN_TIME) @pytest.mark.parametrize( "json_filename", diff --git a/metadata-ingestion/tests/unit/test_csv_enricher_source.py b/metadata-ingestion/tests/unit/test_csv_enricher_source.py index c294605f72..efbbbf1fa4 100644 --- a/metadata-ingestion/tests/unit/test_csv_enricher_source.py +++ b/metadata-ingestion/tests/unit/test_csv_enricher_source.py @@ -50,6 +50,7 @@ def create_mocked_csv_enricher_source() -> CSVEnricherSource: ["oldtag1", "oldtag2"] ) graph.get_aspect_v2.return_value = None + graph.get_domain.return_value = None ctx.graph = graph return CSVEnricherSource( CSVEnricherConfig(**create_base_csv_enricher_config()), ctx diff --git a/metadata-ingestion/tests/unit/test_transform_dataset.py b/metadata-ingestion/tests/unit/test_transform_dataset.py index b44b629643..f3c02ef9ba 100644 --- a/metadata-ingestion/tests/unit/test_transform_dataset.py +++ b/metadata-ingestion/tests/unit/test_transform_dataset.py @@ -1392,7 +1392,8 @@ def test_old_transformers_working_as_before(mock_time): dataset_mcps = [ make_generic_dataset_mcp(), make_generic_dataset_mcp( - aspect=DatasetPropertiesClass(description="Another test MCP") + aspect_name="datasetProperties", + aspect=DatasetPropertiesClass(description="Another test MCP"), ), EndOfStream(), ]