From cb12910b6b6f66419a43f4ac40a56c1467ae143c Mon Sep 17 00:00:00 2001 From: Harshal Sheth Date: Tue, 17 Jan 2023 19:41:43 -0800 Subject: [PATCH] feat(ingest): add entity registry in codegen (#6984) Co-authored-by: Pedro Silva --- metadata-ingestion/scripts/avro_codegen.py | 122 ++++++++++++++---- metadata-ingestion/scripts/codegen.sh | 3 +- .../src/datahub/emitter/mce_builder.py | 16 --- .../source/looker/looker_lib_wrapper.py | 2 +- .../tests/unit/serde/test_serde.py | 8 ++ 5 files changed, 105 insertions(+), 46 deletions(-) diff --git a/metadata-ingestion/scripts/avro_codegen.py b/metadata-ingestion/scripts/avro_codegen.py index d6c61cd4ff..59db910050 100644 --- a/metadata-ingestion/scripts/avro_codegen.py +++ b/metadata-ingestion/scripts/avro_codegen.py @@ -2,18 +2,70 @@ import json import types import unittest.mock from pathlib import Path -from typing import Any, Dict, Iterable, List, Union +from typing import Any, Dict, Iterable, List, Optional, Union import avro.schema import click +import pydantic +import yaml from avrogen import write_schema_files +ENTITY_CATEGORY_UNSET = "_unset_" + + +class EntityType(pydantic.BaseModel): + name: str + doc: Optional[str] = None + category: str = ENTITY_CATEGORY_UNSET + + keyAspect: str + aspects: List[str] + + +def load_entity_registry(entity_registry_file: Path) -> List[EntityType]: + with entity_registry_file.open() as f: + raw_entity_registry = yaml.safe_load(f) + + entities = pydantic.parse_obj_as(List[EntityType], raw_entity_registry["entities"]) + return entities + def load_schema_file(schema_file: Union[str, Path]) -> dict: raw_schema_text = Path(schema_file).read_text() return json.loads(raw_schema_text) +def load_schemas(schemas_path: str) -> Dict[str, dict]: + required_schema_files = { + "mxe/MetadataChangeEvent.avsc", + "mxe/MetadataChangeProposal.avsc", + "usage/UsageAggregation.avsc", + "mxe/MetadataChangeLog.avsc", + "mxe/PlatformEvent.avsc", + "platform/event/v1/EntityChangeEvent.avsc", + "metadata/query/filter/Filter.avsc", # temporarily added to test reserved keywords support + } + + # Find all the aspect schemas / other important schemas. + schema_files: List[Path] = [] + for schema_file in Path(schemas_path).glob("**/*.avsc"): + relative_path = schema_file.relative_to(schemas_path).as_posix() + if relative_path in required_schema_files: + schema_files.append(schema_file) + required_schema_files.remove(relative_path) + elif load_schema_file(schema_file).get("Aspect"): + schema_files.append(schema_file) + + assert not required_schema_files, f"Schema files not found: {required_schema_files}" + + schemas: Dict[str, dict] = {} + for schema_file in schema_files: + schema = load_schema_file(schema_file) + schemas[Path(schema_file).stem] = schema + + return schemas + + def merge_schemas(schemas_obj: List[Any]) -> str: # Combine schemas. merged = ["null"] + schemas_obj @@ -127,6 +179,7 @@ def annotate_aspects(aspects: List[dict], schema_class_file: Path) -> None: class _Aspect(DictWrapper): ASPECT_NAME: str = None # type: ignore ASPECT_TYPE: str = "default" + ASPECT_INFO: dict = None # type: ignore def __init__(self): if type(self) is _Aspect: @@ -140,6 +193,10 @@ class _Aspect(DictWrapper): @classmethod def get_aspect_type(cls) -> str: return cls.ASPECT_TYPE + + @classmethod + def get_aspect_info(cls) -> dict: + return cls.ASPECT_INFO """ for aspect in aspects: @@ -168,6 +225,9 @@ class _Aspect(DictWrapper): schema_classes_lines[ empty_line ] += f"\n ASPECT_TYPE = '{aspect['Aspect']['type']}'" + schema_classes_lines[empty_line] += f"\n ASPECT_INFO = {aspect['Aspect']}" + + schema_classes_lines[empty_line + 1] += "\n" # Finally, generate a big list of all available aspects. newline = "\n" @@ -178,6 +238,10 @@ from typing import Type ASPECT_CLASSES: List[Type[_Aspect]] = [ {f',{newline} '.join(f"{aspect['name']}Class" for aspect in aspects)} ] + +KEY_ASPECTS: Dict[str, Type[_Aspect]] = {{ + {f',{newline} '.join(f"'{aspect['Aspect']['keyForEntity']}': {aspect['name']}Class" for aspect in aspects if aspect['Aspect'].get('keyForEntity'))} +}} """ ) @@ -185,49 +249,51 @@ ASPECT_CLASSES: List[Type[_Aspect]] = [ @click.command() +@click.argument( + "entity_registry", type=click.Path(exists=True, dir_okay=False), required=True +) @click.argument( "schemas_path", type=click.Path(exists=True, file_okay=False), required=True ) @click.argument("outdir", type=click.Path(), required=True) -def generate(schemas_path: str, outdir: str) -> None: - required_schema_files = { - "mxe/MetadataChangeEvent.avsc", - "mxe/MetadataChangeProposal.avsc", - "usage/UsageAggregation.avsc", - "mxe/MetadataChangeLog.avsc", - "mxe/PlatformEvent.avsc", - "platform/event/v1/EntityChangeEvent.avsc", - "metadata/query/filter/Filter.avsc", # temporarily added to test reserved keywords support +def generate(entity_registry: str, schemas_path: str, outdir: str) -> None: + entities = load_entity_registry(Path(entity_registry)) + schemas = load_schemas(schemas_path) + + # Special handling for aspects. + aspects = { + schema["Aspect"]["name"]: schema + for schema in schemas.values() + if schema.get("Aspect") } - # Find all the aspect schemas / other important schemas. - aspect_file_stems: List[str] = [] - schema_files: List[Path] = [] - for schema_file in Path(schemas_path).glob("**/*.avsc"): - relative_path = schema_file.relative_to(schemas_path).as_posix() - if relative_path in required_schema_files: - schema_files.append(schema_file) - required_schema_files.remove(relative_path) - elif load_schema_file(schema_file).get("Aspect"): - aspect_file_stems.append(schema_file.stem) - schema_files.append(schema_file) + for entity in entities: + # This implicitly requires that all keyAspects are resolvable. + aspect = aspects[entity.keyAspect] - assert not required_schema_files, f"Schema files not found: {required_schema_files}" + # This requires that entities cannot share a keyAspect. + assert "keyForEntity" not in aspect["Aspect"] - schemas: Dict[str, dict] = {} - for schema_file in schema_files: - schema = load_schema_file(schema_file) - schemas[Path(schema_file).stem] = schema + aspect["Aspect"]["keyForEntity"] = entity.name + aspect["Aspect"]["entityCategory"] = entity.category + aspect["Aspect"]["entityAspects"] = entity.aspects + if entity.doc is not None: + aspect["Aspect"]["entityDoc"] = entity.doc + + # Check for unused aspects. We currently have quite a few. + # unused_aspects = set(aspects.keys()) - set().union( + # {entity.keyAspect for entity in entities}, + # *(set(entity.aspects) for entity in entities), + # ) merged_schema = merge_schemas(list(schemas.values())) - write_schema_files(merged_schema, outdir) # Schema files post-processing. (Path(outdir) / "__init__.py").write_text("# This file is intentionally empty.\n") add_avro_python3_warning(Path(outdir) / "schema_classes.py") annotate_aspects( - [schemas[aspect_file_stem] for aspect_file_stem in aspect_file_stems], + list(aspects.values()), Path(outdir) / "schema_classes.py", ) diff --git a/metadata-ingestion/scripts/codegen.sh b/metadata-ingestion/scripts/codegen.sh index 5e41a0a2c9..1124352a09 100755 --- a/metadata-ingestion/scripts/codegen.sh +++ b/metadata-ingestion/scripts/codegen.sh @@ -6,6 +6,7 @@ OUTDIR=./src/datahub/metadata # Note: this assumes that datahub has already been built with `./gradlew build`. DATAHUB_ROOT=.. SCHEMAS_ROOT="$DATAHUB_ROOT/metadata-events/mxe-schemas/src/renamed/avro/com/linkedin" +ENTITY_REGISTRY="$DATAHUB_ROOT/metadata-models/src/main/resources/entity-registry.yml" rm -r $OUTDIR 2>/dev/null || true -python scripts/avro_codegen.py $SCHEMAS_ROOT $OUTDIR +python scripts/avro_codegen.py $ENTITY_REGISTRY $SCHEMAS_ROOT $OUTDIR diff --git a/metadata-ingestion/src/datahub/emitter/mce_builder.py b/metadata-ingestion/src/datahub/emitter/mce_builder.py index efa867bfe4..1c53ffd9fa 100644 --- a/metadata-ingestion/src/datahub/emitter/mce_builder.py +++ b/metadata-ingestion/src/datahub/emitter/mce_builder.py @@ -131,22 +131,6 @@ def dataset_key_to_urn(key: DatasetKeyClass) -> str: ) -def make_container_new_urn(guid: str) -> str: - return f"urn:dh:container:0:({guid})" - - -def container_new_urn_to_key(dataset_urn: str) -> Optional[ContainerKeyClass]: - pattern = r"urn:dh:container:0:\((.*)\)" - results = re.search(pattern, dataset_urn) - if results is not None: - return ContainerKeyClass(guid=results[1]) - return None - - -# def make_container_urn(platform: str, name: str, env: str = DEFAULT_ENV) -> str: -# return f"urn:li:container:({make_data_platform_urn(platform)},{env},{name})" - - def make_container_urn(guid: str) -> str: return f"urn:li:container:{guid}" diff --git a/metadata-ingestion/src/datahub/ingestion/source/looker/looker_lib_wrapper.py b/metadata-ingestion/src/datahub/ingestion/source/looker/looker_lib_wrapper.py index 35eb64b4ea..498cdded8f 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/looker/looker_lib_wrapper.py +++ b/metadata-ingestion/src/datahub/ingestion/source/looker/looker_lib_wrapper.py @@ -84,7 +84,7 @@ class LookerAPI: ) except SDKError as e: raise ConfigurationError( - "Failed to initialize Looker client. Please check your configuration." + f"Failed to connect/authenticate with looker - check your configuration: {e}" ) from e self.client_stats = LookerAPIStats() diff --git a/metadata-ingestion/tests/unit/serde/test_serde.py b/metadata-ingestion/tests/unit/serde/test_serde.py index 6e59865625..9c5d70b8d0 100644 --- a/metadata-ingestion/tests/unit/serde/test_serde.py +++ b/metadata-ingestion/tests/unit/serde/test_serde.py @@ -15,6 +15,8 @@ from datahub.emitter import mce_builder from datahub.ingestion.run.pipeline import Pipeline from datahub.ingestion.source.file import FileSourceConfig, GenericFileSource from datahub.metadata.schema_classes import ( + ASPECT_CLASSES, + KEY_ASPECTS, MetadataChangeEventClass, OwnershipClass, _Aspect, @@ -34,6 +36,12 @@ def test_codegen_aspect_name(): assert OwnershipClass.get_aspect_name() == "ownership" +def test_codegen_aspects(): + # These bounds are extremely loose, and mainly verify that the lists aren't empty. + assert len(ASPECT_CLASSES) > 30 + assert len(KEY_ASPECTS) > 10 + + def test_cannot_instantiated_codegen_aspect(): with pytest.raises(TypeError, match="instantiate"): _Aspect()