Clean up logic for dataset.py yaml loader (#10089)

This commit is contained in:
Ellie O'Neil 2024-03-20 14:41:49 -07:00 committed by GitHub
parent 5a3df32bb5
commit 87169baf96
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 103 additions and 168 deletions

View File

@ -7,10 +7,7 @@ from typing import Dict, Iterable, List, Optional, Tuple, Union
from pydantic import BaseModel, Field, validator
from ruamel.yaml import YAML
from datahub.api.entities.structuredproperties.structuredproperties import (
AllowedTypes,
StructuredProperties,
)
from datahub.api.entities.structuredproperties.structuredproperties import AllowedTypes
from datahub.configuration.common import ConfigModel
from datahub.emitter.mce_builder import (
make_data_platform_urn,
@ -23,7 +20,7 @@ from datahub.emitter.mce_builder import (
)
from datahub.emitter.mcp import MetadataChangeProposalWrapper
from datahub.ingestion.extractor.schema_util import avro_schema_to_mce_fields
from datahub.ingestion.graph.client import DataHubGraph, get_default_graph
from datahub.ingestion.graph.client import DataHubGraph
from datahub.metadata.schema_classes import (
AuditStampClass,
DatasetPropertiesClass,
@ -45,7 +42,6 @@ from datahub.metadata.schema_classes import (
)
from datahub.specific.dataset import DatasetPatchBuilder
from datahub.utilities.urns.dataset_urn import DatasetUrn
from datahub.utilities.urns.urn import Urn
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
@ -66,8 +62,8 @@ class SchemaFieldSpecification(BaseModel):
created: Optional[dict] = None
lastModified: Optional[dict] = None
recursive: Optional[bool] = None
globalTags: Optional[dict] = None
glossaryTerms: Optional[dict] = None
globalTags: Optional[List[str]] = None
glossaryTerms: Optional[List[str]] = None
isPartOfKey: Optional[bool] = None
isPartitioningKey: Optional[bool] = None
jsonProps: Optional[dict] = None
@ -161,7 +157,7 @@ class Dataset(BaseModel):
subtype: Optional[str]
subtypes: Optional[List[str]]
tags: Optional[List[str]] = None
glossaryTerms: Optional[List[str]] = None
glossary_terms: Optional[List[str]] = None
owners: Optional[List[Union[str, Ownership]]] = None
structured_properties: Optional[
Dict[str, Union[str, float, List[Union[str, float]]]]
@ -267,6 +263,34 @@ class Dataset(BaseModel):
self.urn, field.id # type: ignore[arg-type]
)
assert field_urn.startswith("urn:li:schemaField:")
if field.globalTags:
mcp = MetadataChangeProposalWrapper(
entityUrn=field_urn,
aspect=GlobalTagsClass(
tags=[
TagAssociationClass(tag=make_tag_urn(tag))
for tag in field.globalTags
]
),
)
yield mcp
if field.glossaryTerms:
mcp = MetadataChangeProposalWrapper(
entityUrn=field_urn,
aspect=GlossaryTermsClass(
terms=[
GlossaryTermAssociationClass(
urn=make_term_urn(term)
)
for term in field.glossaryTerms
],
auditStamp=self._mint_auditstamp("yaml"),
),
)
yield mcp
if field.structured_properties:
mcp = MetadataChangeProposalWrapper(
entityUrn=field_urn,
@ -284,138 +308,79 @@ class Dataset(BaseModel):
)
yield mcp
if self.subtype or self.subtypes:
mcp = MetadataChangeProposalWrapper(
entityUrn=self.urn,
aspect=SubTypesClass(
typeNames=[
s
for s in [self.subtype] + (self.subtypes or [])
if s
]
),
)
yield mcp
if self.tags:
mcp = MetadataChangeProposalWrapper(
entityUrn=self.urn,
aspect=GlobalTagsClass(
tags=[
TagAssociationClass(tag=make_tag_urn(tag))
for tag in self.tags
]
),
)
yield mcp
if self.glossaryTerms:
mcp = MetadataChangeProposalWrapper(
entityUrn=self.urn,
aspect=GlossaryTermsClass(
terms=[
GlossaryTermAssociationClass(
urn=make_term_urn(term)
)
for term in self.glossaryTerms
],
auditStamp=self._mint_auditstamp("yaml"),
),
)
yield mcp
if self.owners:
mcp = MetadataChangeProposalWrapper(
entityUrn=self.urn,
aspect=OwnershipClass(
owners=[self._mint_owner(o) for o in self.owners]
),
)
yield mcp
if self.structured_properties:
mcp = MetadataChangeProposalWrapper(
entityUrn=self.urn,
aspect=StructuredPropertiesClass(
properties=[
StructuredPropertyValueAssignmentClass(
propertyUrn=f"urn:li:structuredProperty:{prop_key}",
values=prop_value
if isinstance(prop_value, list)
else [prop_value],
)
for prop_key, prop_value in self.structured_properties.items()
]
),
)
yield mcp
if self.downstreams:
for downstream in self.downstreams:
patch_builder = DatasetPatchBuilder(downstream)
assert (
self.urn is not None
) # validator should have filled this in
patch_builder.add_upstream_lineage(
UpstreamClass(
dataset=self.urn,
type="COPY",
)
)
for patch_event in patch_builder.build():
yield patch_event
logger.info(f"Created dataset {self.urn}")
@staticmethod
def extract_structured_properties(
structured_properties: Dict[str, Union[str, float, List[str], List[float]]]
) -> List[Tuple[str, Union[str, float]]]:
structured_properties_flattened: List[Tuple[str, Union[str, float]]] = []
for key, value in structured_properties.items():
validated_structured_property = Dataset.validate_structured_property(
key, value
if self.subtype or self.subtypes:
mcp = MetadataChangeProposalWrapper(
entityUrn=self.urn,
aspect=SubTypesClass(
typeNames=[s for s in [self.subtype] + (self.subtypes or []) if s]
),
)
if validated_structured_property:
structured_properties_flattened.append(validated_structured_property)
structured_properties_flattened = sorted(
structured_properties_flattened, key=lambda x: x[0]
)
return structured_properties_flattened
yield mcp
@staticmethod
def validate_structured_property(
sp_name: str, sp_value: Union[str, float, List[str], List[float]]
) -> Union[Tuple[str, Union[str, float]], None]:
"""
Validate based on:
1. Structured property exists/has been created
2. Structured property value is of the expected type
"""
urn = Urn.make_structured_property_urn(sp_name)
with get_default_graph() as graph:
if graph.exists(urn):
validated_structured_property = StructuredProperties.from_datahub(
graph, urn
)
allowed_type = Urn.get_data_type_from_urn(
validated_structured_property.type
)
try:
if not isinstance(sp_value, list):
return Dataset.validate_type(sp_name, sp_value, allowed_type)
else:
for v in sp_value:
return Dataset.validate_type(sp_name, v, allowed_type)
except ValueError:
logger.warning(
f"Property: {sp_name}, value: {sp_value} should be a {allowed_type}."
if self.tags:
mcp = MetadataChangeProposalWrapper(
entityUrn=self.urn,
aspect=GlobalTagsClass(
tags=[
TagAssociationClass(tag=make_tag_urn(tag)) for tag in self.tags
]
),
)
yield mcp
if self.glossary_terms:
mcp = MetadataChangeProposalWrapper(
entityUrn=self.urn,
aspect=GlossaryTermsClass(
terms=[
GlossaryTermAssociationClass(urn=make_term_urn(term))
for term in self.glossary_terms
],
auditStamp=self._mint_auditstamp("yaml"),
),
)
yield mcp
if self.owners:
mcp = MetadataChangeProposalWrapper(
entityUrn=self.urn,
aspect=OwnershipClass(
owners=[self._mint_owner(o) for o in self.owners]
),
)
yield mcp
if self.structured_properties:
mcp = MetadataChangeProposalWrapper(
entityUrn=self.urn,
aspect=StructuredPropertiesClass(
properties=[
StructuredPropertyValueAssignmentClass(
propertyUrn=f"urn:li:structuredProperty:{prop_key}",
values=prop_value
if isinstance(prop_value, list)
else [prop_value],
)
for prop_key, prop_value in self.structured_properties.items()
]
),
)
yield mcp
if self.downstreams:
for downstream in self.downstreams:
patch_builder = DatasetPatchBuilder(downstream)
assert self.urn is not None # validator should have filled this in
patch_builder.add_upstream_lineage(
UpstreamClass(
dataset=self.urn,
type="COPY",
)
else:
logger.error(
f"Property {sp_name} does not exist and therefore will not be added to dataset. Please create property before trying again."
)
return None
for patch_event in patch_builder.build():
yield patch_event
logger.info(f"Created dataset {self.urn}")
@staticmethod
def validate_type(
@ -543,7 +508,7 @@ class Dataset(BaseModel):
else None,
schema=Dataset._schema_from_schema_metadata(graph, urn),
tags=[tag.tag for tag in tags.tags] if tags else None,
glossaryTerms=[term.urn for term in glossary_terms.terms]
glossary_terms=[term.urn for term in glossary_terms.terms]
if glossary_terms
else None,
owners=yaml_owners,

View File

@ -405,36 +405,6 @@ def test_dataset_yaml_loader(ingest_cleanup_data, graph):
] == ["2023-01-01"]
def test_dataset_structured_property_validation(ingest_cleanup_data, graph, caplog):
from datahub.api.entities.dataset.dataset import Dataset
property_name = f"replicationSLA{randint(10, 10000)}"
property_value = 30
value_type = "number"
create_property_definition(
property_name=property_name, graph=graph, value_type=value_type
)
attach_property_to_entity(
dataset_urns[0], property_name, [property_value], graph=graph
)
assert Dataset.validate_structured_property(
f"{default_namespace}.{property_name}", property_value
) == (
f"{default_namespace}.{property_name}",
float(property_value),
)
assert Dataset.validate_structured_property("testName", "testValue") is None
bad_property_value = "2023-09-20"
assert (
Dataset.validate_structured_property(property_name, bad_property_value) is None
)
def test_structured_property_search(ingest_cleanup_data, graph: DataHubGraph, caplog):
def to_es_name(property_name, namespace=default_namespace):
namespace_field = namespace.replace(".", "_")