feat(presto-on-hive): allow v1 fieldpaths in the presto-on-hive source (#8474)

This commit is contained in:
Gabe Lyons 2023-08-01 14:05:50 -07:00 committed by GitHub
parent a4a8182001
commit 843f82b943
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 1754 additions and 9 deletions

View File

@ -438,7 +438,7 @@ class CSVEnricherSource(Source):
field_match = False
for field_info in current_editable_schema_metadata.editableSchemaFieldInfo:
if (
DatasetUrn._get_simple_field_path_from_v2_field_path(
DatasetUrn.get_simple_field_path_from_v2_field_path(
field_info.fieldPath
)
== field_path

View File

@ -134,6 +134,11 @@ class PrestoOnHiveConfig(BasicSQLAlchemyConfig):
description="By default, the connector overwrites properties every time. Set this to True to enable merging of properties with what exists on the server.",
)
simplify_nested_field_paths: bool = Field(
default=False,
description="Simplify v2 field paths to v1 by default. If the schema has Union or Array types, still falls back to v2",
)
def get_sql_alchemy_url(
self, uri_opts: Optional[Dict[str, Any]] = None, database: Optional[str] = None
) -> str:
@ -527,6 +532,7 @@ class PrestoOnHiveSource(SQLAlchemySource):
None,
None,
schema_fields,
self.config.simplify_nested_field_paths,
)
dataset_snapshot.aspects.append(schema_metadata)
@ -756,6 +762,7 @@ class PrestoOnHiveSource(SQLAlchemySource):
self.platform,
dataset.columns,
canonical_schema=schema_fields,
simplify_nested_field_paths=self.config.simplify_nested_field_paths,
)
dataset_snapshot.aspects.append(schema_metadata)

View File

@ -41,11 +41,13 @@ from datahub.ingestion.source.common.subtypes import (
from datahub.ingestion.source.sql.sql_config import SQLAlchemyConfig
from datahub.ingestion.source.sql.sql_utils import (
add_table_to_schema_container,
downgrade_schema_from_v2,
gen_database_container,
gen_database_key,
gen_schema_container,
gen_schema_key,
get_domain_wu,
schema_requires_v2,
)
from datahub.ingestion.source.state.stale_entity_removal_handler import (
StaleEntityRemovalHandler,
@ -287,7 +289,15 @@ def get_schema_metadata(
pk_constraints: Optional[dict] = None,
foreign_keys: Optional[List[ForeignKeyConstraint]] = None,
canonical_schema: Optional[List[SchemaField]] = None,
simplify_nested_field_paths: bool = False,
) -> SchemaMetadata:
if (
simplify_nested_field_paths
and canonical_schema is not None
and not schema_requires_v2(canonical_schema)
):
canonical_schema = downgrade_schema_from_v2(canonical_schema)
schema_metadata = SchemaMetadata(
schemaName=dataset_name,
platform=make_data_platform_urn(platform),

View File

@ -17,9 +17,16 @@ from datahub.emitter.mcp_builder import (
)
from datahub.ingestion.api.workunit import MetadataWorkUnit
from datahub.metadata.com.linkedin.pegasus2avro.dataset import UpstreamLineage
from datahub.metadata.com.linkedin.pegasus2avro.schema import SchemaField
from datahub.metadata.schema_classes import DataPlatformInstanceClass
from datahub.specific.dataset import DatasetPatchBuilder
from datahub.utilities.registries.domain_registry import DomainRegistry
from datahub.utilities.urns.dataset_urn import DatasetUrn
ARRAY_TOKEN = "[type=array]"
UNION_TOKEN = "[type=union]"
KEY_SCHEMA_PREFIX = "[key=True]."
VERSION_PREFIX = "[version=2.0]."
def gen_schema_key(
@ -223,3 +230,27 @@ def gen_lineage(
for wu in lineage_workunits:
yield wu
# downgrade a schema field
def downgrade_schema_field_from_v2(field: SchemaField) -> SchemaField:
field.fieldPath = DatasetUrn.get_simple_field_path_from_v2_field_path(
field.fieldPath
)
return field
# downgrade a list of schema fields
def downgrade_schema_from_v2(
canonical_schema: List[SchemaField],
) -> List[SchemaField]:
return [downgrade_schema_field_from_v2(field) for field in canonical_schema]
# v2 is only required in case UNION or ARRAY types are present- all other types can be represented in v1 paths
def schema_requires_v2(canonical_schema: List[SchemaField]) -> bool:
for field in canonical_schema:
field_name = field.fieldPath
if ARRAY_TOKEN in field_name or UNION_TOKEN in field_name:
return True
return False

View File

@ -328,7 +328,7 @@ class SchemaResolver(Closeable):
cls, schema_metadata: SchemaMetadataClass
) -> SchemaInfo:
return {
DatasetUrn._get_simple_field_path_from_v2_field_path(col.fieldPath): (
DatasetUrn.get_simple_field_path_from_v2_field_path(col.fieldPath): (
# The actual types are more of a "nice to have".
col.nativeDataType
or "str"
@ -336,7 +336,7 @@ class SchemaResolver(Closeable):
for col in schema_metadata.fields
# TODO: We can't generate lineage to columns nested within structs yet.
if "."
not in DatasetUrn._get_simple_field_path_from_v2_field_path(col.fieldPath)
not in DatasetUrn.get_simple_field_path_from_v2_field_path(col.fieldPath)
}
# TODO add a method to load all from graphql

View File

@ -97,7 +97,7 @@ class DatasetUrn(Urn):
"""A helper function to extract simple . path notation from the v2 field path"""
@staticmethod
def _get_simple_field_path_from_v2_field_path(field_path: str) -> str:
def get_simple_field_path_from_v2_field_path(field_path: str) -> str:
if field_path.startswith("[version=2.0]"):
# this is a v2 field path
tokens = [

View File

@ -53,12 +53,14 @@ def loaded_presto_on_hive(presto_on_hive_runner):
@freeze_time(FROZEN_TIME)
@pytest.mark.integration_batch_1
@pytest.mark.parametrize(
"mode,use_catalog_subtype,use_dataset_pascalcase_subtype,include_catalog_name_in_ids,test_suffix",
"mode,use_catalog_subtype,use_dataset_pascalcase_subtype,include_catalog_name_in_ids,simplify_nested_field_paths,"
"test_suffix",
[
("hive", False, False, False, "_1"),
("presto-on-hive", True, True, False, "_2"),
("hive", False, False, True, "_3"),
("presto-on-hive", True, True, True, "_4"),
("hive", False, False, False, False, "_1"),
("presto-on-hive", True, True, False, False, "_2"),
("hive", False, False, True, False, "_3"),
("presto-on-hive", True, True, True, False, "_4"),
("hive", False, False, False, True, "_5"),
],
)
def test_presto_on_hive_ingest(
@ -71,6 +73,7 @@ def test_presto_on_hive_ingest(
use_catalog_subtype,
use_dataset_pascalcase_subtype,
include_catalog_name_in_ids,
simplify_nested_field_paths,
test_suffix,
):
# Run the metadata ingestion pipeline.
@ -97,6 +100,7 @@ def test_presto_on_hive_ingest(
"mode": mode,
"use_catalog_subtype": use_catalog_subtype,
"use_dataset_pascalcase_subtype": use_dataset_pascalcase_subtype,
"simplify_nested_field_paths": simplify_nested_field_paths,
},
},
"sink": {