feat(ingest): add urn modification helper (#7440)

Co-authored-by: Tamas Nemeth <treff7es@gmail.com>
This commit is contained in:
Harshal Sheth 2023-03-16 16:27:08 -04:00 committed by GitHub
parent 145cbd4d9c
commit 89734587f7
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 442 additions and 88 deletions

View File

@ -1,8 +1,7 @@
import json
import types
import unittest.mock
import re
from pathlib import Path
from typing import Any, Dict, Iterable, List, Optional, Union
from typing import Dict, Iterable, List, Optional, Tuple, Union
import avro.schema
import click
@ -66,29 +65,89 @@ def load_schemas(schemas_path: str) -> Dict[str, dict]:
return schemas
def merge_schemas(schemas_obj: List[Any]) -> str:
# Combine schemas.
def patch_schemas(schemas: Dict[str, dict], pdl_path: Path) -> Dict[str, dict]:
# We can easily find normal urn types using the generated avro schema,
# but for arrays of urns there's nothing in the avro schema and hence
# we have to look in the PDL files instead.
urn_arrays: Dict[
str, List[Tuple[str, str]]
] = {} # schema name -> list of (field name, type)
# First, we need to load the PDL files and find all urn arrays.
for pdl_file in Path(pdl_path).glob("**/*.pdl"):
pdl_text = pdl_file.read_text()
# TRICKY: We assume that all urn types end with "Urn".
arrays = re.findall(
r"^\s*(\w+)\s*:\s*(?:optional\s+)?array\[(\w*Urn)\]",
pdl_text,
re.MULTILINE,
)
if arrays:
schema_name = pdl_file.stem
urn_arrays[schema_name] = [(item[0], item[1]) for item in arrays]
# Then, we can patch each schema.
patched_schemas = {}
for name, schema in schemas.items():
patched_schemas[name] = patch_schema(schema, urn_arrays)
return patched_schemas
def patch_schema(schema: dict, urn_arrays: Dict[str, List[Tuple[str, str]]]) -> dict:
"""
This method patches the schema to add an "Urn" property to all urn fields.
Because the inner type in an array is not a named Avro schema, for urn arrays
we annotate the array field and add an "urn_is_array" property.
"""
# We're using Names() to generate a full list of embedded schemas.
all_schemas = avro.schema.Names()
patched = avro.schema.make_avsc_object(schema, names=all_schemas)
for nested in all_schemas.names.values():
if isinstance(nested, (avro.schema.EnumSchema, avro.schema.FixedSchema)):
continue
assert isinstance(nested, avro.schema.RecordSchema)
# Patch normal urn types.
field: avro.schema.Field
for field in nested.fields:
java_class: Optional[str] = field.props.get("java", {}).get("class")
if java_class and java_class.startswith(
"com.linkedin.pegasus2avro.common.urn."
):
field.set_prop("Urn", java_class.split(".")[-1])
# Patch array urn types.
if nested.name in urn_arrays:
mapping = urn_arrays[nested.name]
for field_name, type in mapping:
field = nested.fields_dict[field_name]
field.set_prop("Urn", type)
field.set_prop("urn_is_array", True)
return patched.to_json()
def merge_schemas(schemas_obj: List[dict]) -> str:
# Combine schemas as a "union" of all of the types.
merged = ["null"] + schemas_obj
# Patch add_name method to NOT complain about duplicate names
def add_name(self, name_attr, space_attr, new_schema):
to_add = avro.schema.Name(name_attr, space_attr, self.default_namespace)
# Patch add_name method to NOT complain about duplicate names.
class NamesWithDups(avro.schema.Names):
def add_name(self, name_attr, space_attr, new_schema):
to_add = avro.schema.Name(name_attr, space_attr, self.default_namespace)
self.names[to_add.fullname] = new_schema
return to_add
self.names[to_add.fullname] = new_schema
return to_add
with unittest.mock.patch("avro.schema.Names.add_name", add_name):
cleaned_schema = avro.schema.make_avsc_object(merged)
cleaned_schema = avro.schema.make_avsc_object(merged, names=NamesWithDups())
# Convert back to an Avro schema JSON representation.
class MappingProxyEncoder(json.JSONEncoder):
def default(self, obj):
if isinstance(obj, types.MappingProxyType):
return dict(obj)
return json.JSONEncoder.default(self, obj)
out_schema = cleaned_schema.to_json()
encoded = json.dumps(out_schema, cls=MappingProxyEncoder, indent=2)
encoded = json.dumps(out_schema, indent=2)
return encoded
@ -149,11 +208,11 @@ load_schema_method = """
import functools
import pathlib
@functools.lru_cache(maxsize=None)
def _load_schema(schema_name: str) -> str:
return (pathlib.Path(__file__).parent / f"{schema_name}.avsc").read_text()
"""
individual_schema_method = """
@functools.lru_cache(maxsize=None)
def get{schema_name}Schema() -> str:
return _load_schema("{schema_name}")
"""
@ -165,6 +224,17 @@ def make_load_schema_methods(schemas: Iterable[str]) -> str:
)
def save_raw_schemas(schema_save_dir: Path, schemas: Dict[str, dict]) -> None:
# Save raw avsc files.
schema_save_dir.mkdir()
for name, schema in schemas.items():
(schema_save_dir / f"{name}.avsc").write_text(json.dumps(schema, indent=2))
# Add getXSchema methods.
with open(schema_save_dir / "__init__.py", "w") as schema_dir_init:
schema_dir_init.write(make_load_schema_methods(schemas.keys()))
def annotate_aspects(aspects: List[dict], schema_class_file: Path) -> None:
schema_classes_lines = schema_class_file.read_text().splitlines()
line_lookup_table = {line: i for i, line in enumerate(schema_classes_lines)}
@ -177,9 +247,9 @@ def annotate_aspects(aspects: List[dict], schema_class_file: Path) -> None:
] += """
class _Aspect(DictWrapper):
ASPECT_NAME: str = None # type: ignore
ASPECT_TYPE: str = "default"
ASPECT_INFO: dict = None # type: ignore
ASPECT_NAME: ClassVar[str] = None # type: ignore
ASPECT_TYPE: ClassVar[str] = "default"
ASPECT_INFO: ClassVar[dict] = None # type: ignore
def __init__(self):
if type(self) is _Aspect:
@ -225,7 +295,11 @@ class _Aspect(DictWrapper):
schema_classes_lines[
empty_line
] += f"\n ASPECT_TYPE = '{aspect['Aspect']['type']}'"
schema_classes_lines[empty_line] += f"\n ASPECT_INFO = {aspect['Aspect']}"
aspect_info = {
k: v for k, v in aspect["Aspect"].items() if k not in {"name", "type"}
}
schema_classes_lines[empty_line] += f"\n ASPECT_INFO = {aspect_info}"
schema_classes_lines[empty_line + 1] += "\n"
@ -233,8 +307,6 @@ class _Aspect(DictWrapper):
newline = "\n"
schema_classes_lines.append(
f"""
from typing import Type
ASPECT_CLASSES: List[Type[_Aspect]] = [
{f',{newline} '.join(f"{aspect['name']}Class" for aspect in aspects)}
]
@ -252,14 +324,22 @@ KEY_ASPECTS: Dict[str, Type[_Aspect]] = {{
@click.argument(
"entity_registry", type=click.Path(exists=True, dir_okay=False), required=True
)
@click.argument(
"pdl_path", type=click.Path(exists=True, file_okay=False), required=True
)
@click.argument(
"schemas_path", type=click.Path(exists=True, file_okay=False), required=True
)
@click.argument("outdir", type=click.Path(), required=True)
def generate(entity_registry: str, schemas_path: str, outdir: str) -> None:
def generate(
entity_registry: str, pdl_path: str, schemas_path: str, outdir: str
) -> None:
entities = load_entity_registry(Path(entity_registry))
schemas = load_schemas(schemas_path)
# Patch the avsc files.
schemas = patch_schemas(schemas, Path(pdl_path))
# Special handling for aspects.
aspects = {
schema["Aspect"]["name"]: schema
@ -288,8 +368,8 @@ def generate(entity_registry: str, schemas_path: str, outdir: str) -> None:
# Check for unused aspects. We currently have quite a few.
# unused_aspects = set(aspects.keys()) - set().union(
# {entity.keyAspect for entity in entities},
# *(set(entity.aspects) for entity in entities),
# {entity.keyAspect for entity in entities},
# *(set(entity.aspects) for entity in entities),
# )
merged_schema = merge_schemas(list(schemas.values()))
@ -303,17 +383,17 @@ def generate(entity_registry: str, schemas_path: str, outdir: str) -> None:
Path(outdir) / "schema_classes.py",
)
# Save raw schema files in codegen as well.
# Keep a copy of a few raw avsc files.
required_avsc_schemas = {"MetadataChangeEvent", "MetadataChangeProposal"}
schema_save_dir = Path(outdir) / "schemas"
schema_save_dir.mkdir()
for schema_out_file, schema in schemas.items():
(schema_save_dir / f"{schema_out_file}.avsc").write_text(
json.dumps(schema, indent=2)
)
# Add load_schema method.
with open(schema_save_dir / "__init__.py", "a") as schema_dir_init:
schema_dir_init.write(make_load_schema_methods(schemas.keys()))
save_raw_schemas(
schema_save_dir,
{
name: schema
for name, schema in schemas.items()
if name in required_avsc_schemas
},
)
# Add headers for all generated files
generated_files = Path(outdir).glob("**/*.py")

View File

@ -5,8 +5,10 @@ OUTDIR=./src/datahub/metadata
# Note: this assumes that datahub has already been built with `./gradlew build`.
DATAHUB_ROOT=..
SCHEMAS_ROOT="$DATAHUB_ROOT/metadata-events/mxe-schemas/src/renamed/avro/com/linkedin"
SCHEMAS_PDL="$DATAHUB_ROOT/metadata-models/src/main/pegasus/com/linkedin"
SCHEMAS_AVSC="$DATAHUB_ROOT/metadata-events/mxe-schemas/src/renamed/avro/com/linkedin"
ENTITY_REGISTRY="$DATAHUB_ROOT/metadata-models/src/main/resources/entity-registry.yml"
rm -r $OUTDIR 2>/dev/null || true
python scripts/avro_codegen.py $ENTITY_REGISTRY $SCHEMAS_ROOT $OUTDIR
python scripts/avro_codegen.py $ENTITY_REGISTRY $SCHEMAS_PDL $SCHEMAS_AVSC $OUTDIR

View File

@ -37,7 +37,7 @@ framework_common = {
"entrypoints",
"docker",
"expandvars>=0.6.5",
"avro-gen3==0.7.8",
"avro-gen3==0.7.10",
# "avro-gen3 @ git+https://github.com/acryldata/avro_gen@master#egg=avro-gen3",
"avro>=1.10.2,<1.11",
"python-dateutil>=2.8.0",

View File

@ -144,9 +144,7 @@ class DataHubGraph(DatahubRestEmitter):
response_json = response.json()
# Figure out what field to look in.
record_schema: RecordSchema = aspect_type.__getattribute__(
aspect_type, "RECORD_SCHEMA"
)
record_schema: RecordSchema = aspect_type.RECORD_SCHEMA
aspect_type_name = record_schema.fullname.replace(".pegasus2avro", "")
# Deserialize the aspect json into the aspect type.
@ -335,15 +333,9 @@ class DataHubGraph(DatahubRestEmitter):
result: Dict[str, Optional[Aspect]] = {}
for aspect_type in aspect_types:
record_schema: RecordSchema = aspect_type.__getattribute__(
aspect_type, "RECORD_SCHEMA"
)
if not record_schema:
logger.warning(
f"Failed to infer type name of the aspect from the aspect type class {aspect_type}. Continuing, but this will fail."
)
else:
aspect_type_name = record_schema.props["Aspect"]["name"]
record_schema = aspect_type.RECORD_SCHEMA
aspect_type_name = record_schema.props["Aspect"]["name"]
aspect_json = response_json.get("aspects", {}).get(aspect_type_name)
if aspect_json:
# need to apply a transform to the response to match rest.li and avro serialization

View File

@ -13,8 +13,3 @@ def _check_sink_classes(cls: Type[Sink]) -> None:
sink_registry = PluginRegistry[Sink](extra_cls_check=_check_sink_classes)
sink_registry.register_from_entrypoint("datahub.ingestion.sink.plugins")
# These sinks are always enabled
assert sink_registry.get("console")
assert sink_registry.get("file")
assert sink_registry.get("blackhole")

View File

@ -0,0 +1,96 @@
from typing import Callable, List, Tuple, Union
from avro.schema import Field, RecordSchema
from datahub.metadata.schema_classes import DictWrapper
from datahub.utilities.urns.dataset_urn import DatasetUrn
from datahub.utilities.urns.urn import Urn, guess_entity_type
_Path = List[Union[str, int]]
def list_urns_with_path(model: DictWrapper) -> List[Tuple[str, _Path]]:
"""List urns in the given model with their paths.
Args:
model: The model to list urns from.
Returns:
A list of tuples of the form (urn, path), where path is a list of keys.
"""
schema: RecordSchema = model.RECORD_SCHEMA
urns: List[Tuple[str, _Path]] = []
for key, value in model.items():
if not value:
continue
field_schema: Field = schema.fields_dict[key]
is_urn = field_schema.get_prop("Urn") is not None
if isinstance(value, DictWrapper):
for urn, path in list_urns_with_path(value):
urns.append((urn, [key, *path]))
elif isinstance(value, list):
for i, item in enumerate(value):
if isinstance(item, DictWrapper):
for urn, path in list_urns_with_path(item):
urns.append((urn, [key, i, *path]))
elif is_urn:
urns.append((item, [key, i]))
elif is_urn:
urns.append((value, [key]))
return urns
def transform_urns(model: DictWrapper, func: Callable[[str], str]) -> None:
"""
Rewrites all URNs in the given object according to the given function.
"""
for old_urn, path in list_urns_with_path(model):
new_urn = func(old_urn)
if old_urn != new_urn:
_modify_at_path(model, path, new_urn)
def _modify_at_path(
model: Union[DictWrapper, list], path: _Path, new_value: str
) -> None:
assert len(path) > 0
if len(path) == 1:
if isinstance(path[0], int):
assert isinstance(model, list)
model[path[0]] = new_value
else:
assert isinstance(model, DictWrapper)
model._inner_dict[path[0]] = new_value
elif isinstance(path[0], int):
assert isinstance(model, list)
return _modify_at_path(model[path[0]], path[1:], new_value)
else:
assert isinstance(model, DictWrapper)
return _modify_at_path(model._inner_dict[path[0]], path[1:], new_value)
def _lowercase_dataset_urn(dataset_urn: str) -> str:
cur_urn = DatasetUrn.create_from_string(dataset_urn)
cur_urn._entity_id[1] = cur_urn._entity_id[1].lower()
return str(cur_urn)
def lowercase_dataset_urns(model: DictWrapper) -> None:
def modify_urn(urn: str) -> str:
if guess_entity_type(urn) == "dataset":
return _lowercase_dataset_urn(urn)
elif guess_entity_type(urn) == "schemaField":
cur_urn = Urn.create_from_string(urn)
cur_urn._entity_id[0] = _lowercase_dataset_urn(cur_urn._entity_id[0])
return str(cur_urn)
return urn
transform_urns(model, modify_urn)

View File

@ -15,13 +15,7 @@ from datahub.emitter import mce_builder
from datahub.emitter.serialization_helper import post_json_transform, pre_json_transform
from datahub.ingestion.run.pipeline import Pipeline
from datahub.ingestion.source.file import FileSourceConfig, GenericFileSource
from datahub.metadata.schema_classes import (
ASPECT_CLASSES,
KEY_ASPECTS,
MetadataChangeEventClass,
OwnershipClass,
_Aspect,
)
from datahub.metadata.schema_classes import MetadataChangeEventClass
from datahub.metadata.schemas import getMetadataChangeEventSchema
from tests.test_helpers import mce_helpers
from tests.test_helpers.click_helpers import run_datahub_cmd
@ -30,24 +24,6 @@ from tests.test_helpers.type_helpers import PytestConfig
FROZEN_TIME = "2021-07-22 18:54:06"
def test_codegen_aspect_name():
assert issubclass(OwnershipClass, _Aspect)
assert OwnershipClass.ASPECT_NAME == "ownership"
assert OwnershipClass.get_aspect_name() == "ownership"
def test_codegen_aspects():
# These bounds are extremely loose, and mainly verify that the lists aren't empty.
assert len(ASPECT_CLASSES) > 30
assert len(KEY_ASPECTS) > 10
def test_cannot_instantiated_codegen_aspect():
with pytest.raises(TypeError, match="instantiate"):
_Aspect()
@freeze_time(FROZEN_TIME)
@pytest.mark.parametrize(
"json_filename",

View File

@ -0,0 +1,144 @@
import datahub.emitter.mce_builder as builder
from datahub.metadata.com.linkedin.pegasus2avro.dataset import (
DatasetLineageTypeClass,
FineGrainedLineage,
FineGrainedLineageDownstreamType,
FineGrainedLineageUpstreamType,
Upstream,
UpstreamLineage,
)
from datahub.utilities.urns.urn_iter import list_urns_with_path, lowercase_dataset_urns
def _datasetUrn(tbl):
return builder.make_dataset_urn("bigquery", tbl, "PROD")
def _fldUrn(tbl, fld):
return builder.make_schema_field_urn(_datasetUrn(tbl), fld)
def test_list_urns_upstream():
upstream_table = Upstream(
dataset=_datasetUrn("upstream_table_1"),
type=DatasetLineageTypeClass.TRANSFORMED,
)
urns = list_urns_with_path(upstream_table)
assert urns == [
(
"urn:li:corpuser:unknown",
["auditStamp", "actor"],
),
(
"urn:li:dataset:(urn:li:dataPlatform:bigquery,upstream_table_1,PROD)",
["dataset"],
),
]
def test_upstream_lineage_urn_iterator():
upstream_table_1 = Upstream(
dataset=_datasetUrn("upstream_table_1"),
type=DatasetLineageTypeClass.TRANSFORMED,
)
upstream_table_2 = Upstream(
dataset=_datasetUrn("upstream_table_2"),
type=DatasetLineageTypeClass.TRANSFORMED,
)
# Construct a lineage aspect.
upstream_lineage = UpstreamLineage(
upstreams=[upstream_table_1, upstream_table_2],
fineGrainedLineages=[
FineGrainedLineage(
upstreamType=FineGrainedLineageUpstreamType.FIELD_SET,
upstreams=[
_fldUrn("upstream_table_1", "c1"),
_fldUrn("upstream_table_2", "c2"),
],
downstreamType=FineGrainedLineageDownstreamType.FIELD_SET,
downstreams=[
_fldUrn("downstream_table", "c3"),
_fldUrn("downstream_table", "c4"),
],
),
FineGrainedLineage(
upstreamType=FineGrainedLineageUpstreamType.DATASET,
upstreams=[_datasetUrn("upstream_table_1")],
downstreamType=FineGrainedLineageDownstreamType.FIELD,
downstreams=[_fldUrn("downstream_table", "c5")],
),
],
)
urns = list_urns_with_path(upstream_lineage)
assert urns == [
(
"urn:li:corpuser:unknown",
["upstreams", 0, "auditStamp", "actor"],
),
(
"urn:li:dataset:(urn:li:dataPlatform:bigquery,upstream_table_1,PROD)",
["upstreams", 0, "dataset"],
),
("urn:li:corpuser:unknown", ["upstreams", 1, "auditStamp", "actor"]),
(
"urn:li:dataset:(urn:li:dataPlatform:bigquery,upstream_table_2,PROD)",
["upstreams", 1, "dataset"],
),
(
"urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:bigquery,upstream_table_1,PROD),c1)",
["fineGrainedLineages", 0, "upstreams", 0],
),
(
"urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:bigquery,upstream_table_2,PROD),c2)",
["fineGrainedLineages", 0, "upstreams", 1],
),
(
"urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:bigquery,downstream_table,PROD),c3)",
["fineGrainedLineages", 0, "downstreams", 0],
),
(
"urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:bigquery,downstream_table,PROD),c4)",
["fineGrainedLineages", 0, "downstreams", 1],
),
(
"urn:li:dataset:(urn:li:dataPlatform:bigquery,upstream_table_1,PROD)",
["fineGrainedLineages", 1, "upstreams", 0],
),
(
"urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:bigquery,downstream_table,PROD),c5)",
["fineGrainedLineages", 1, "downstreams", 0],
),
]
def _make_test_lineage_obj(upstream: str, downstream: str) -> UpstreamLineage:
return UpstreamLineage(
upstreams=[
Upstream(
dataset=_datasetUrn(upstream),
type=DatasetLineageTypeClass.TRANSFORMED,
)
],
fineGrainedLineages=[
FineGrainedLineage(
upstreamType=FineGrainedLineageUpstreamType.DATASET,
upstreams=[_datasetUrn(upstream)],
downstreamType=FineGrainedLineageDownstreamType.FIELD,
downstreams=[_fldUrn(downstream, "c5")],
),
],
)
def test_dataset_urn_lowercase_transformer():
original = _make_test_lineage_obj("upstreamTable", "downstreamTable")
expected = _make_test_lineage_obj("upstreamtable", "downstreamtable")
assert original != expected # sanity check
lowercase_dataset_urns(original)
assert original == expected

View File

@ -0,0 +1,69 @@
import pytest
from datahub.metadata.schema_classes import (
ASPECT_CLASSES,
KEY_ASPECTS,
FineGrainedLineageClass,
OwnershipClass,
TelemetryKeyClass,
UpstreamClass,
_Aspect,
)
def test_class_filter() -> None:
# The codegen should only generate classes for aspects and a few extra classes.
# As such, stuff like lineage search results should not appear.
with pytest.raises(ImportError):
from datahub.metadata.schema_classes import ( # type: ignore[attr-defined] # noqa: F401
LineageSearchResultClass,
)
def test_codegen_aspect_name():
assert issubclass(OwnershipClass, _Aspect)
assert OwnershipClass.ASPECT_NAME == "ownership"
assert OwnershipClass.get_aspect_name() == "ownership"
def test_codegen_aspects():
# These bounds are extremely loose, and mainly verify that the lists aren't empty.
assert len(ASPECT_CLASSES) > 30
assert len(KEY_ASPECTS) > 10
def test_key_aspect_info():
expected = {
"keyForEntity": "telemetry",
"entityCategory": "internal",
"entityAspects": ["telemetryClientId"],
}
assert TelemetryKeyClass.ASPECT_INFO == expected
assert TelemetryKeyClass.get_aspect_info() == expected
def test_cannot_instantiate_codegen_aspect():
with pytest.raises(TypeError, match="instantiate"):
_Aspect()
def test_urn_annotation():
# We rely on these annotations elsewhere, so we want to make sure they show up.
assert (
UpstreamClass.RECORD_SCHEMA.fields_dict["dataset"].get_prop("Urn")
== "DatasetUrn"
)
assert not UpstreamClass.RECORD_SCHEMA.fields_dict["dataset"].get_prop(
"urn_is_array"
)
assert (
FineGrainedLineageClass.RECORD_SCHEMA.fields_dict["upstreams"].get_prop("Urn")
== "Urn"
)
assert FineGrainedLineageClass.RECORD_SCHEMA.fields_dict["upstreams"].get_prop(
"urn_is_array"
)

View File

@ -25,7 +25,7 @@ from tests.test_helpers.click_helpers import run_datahub_cmd
"registry,expected",
[
(source_registry, ["file"]),
(sink_registry, ["console", "file"]),
(sink_registry, ["console", "file", "blackhole"]),
(extractor_registry, ["generic"]),
(
transform_registry,

View File

@ -2139,7 +2139,7 @@ def run_pattern_dataset_schema_terms_transformation_semantics(
urn=builder.make_term_urn("pii")
)
],
auditStamp=models.AuditStampClass.construct_with_defaults(),
auditStamp=models.AuditStampClass._construct_with_defaults(),
),
type=models.SchemaFieldDataTypeClass(type=models.StringTypeClass()),
nativeDataType="VARCHAR(100)",
@ -2153,7 +2153,7 @@ def run_pattern_dataset_schema_terms_transformation_semantics(
urn=builder.make_term_urn("pii")
)
],
auditStamp=models.AuditStampClass.construct_with_defaults(),
auditStamp=models.AuditStampClass._construct_with_defaults(),
),
type=models.SchemaFieldDataTypeClass(type=models.StringTypeClass()),
nativeDataType="VARCHAR(100)",