feat(ingest/presto-on-hive): add support for extra properties and merge property capabilities (#8147)

Co-authored-by: Shirshanka Das <shirshanka@apache.org>
This commit is contained in:
Tamas Nemeth 2023-06-07 13:12:20 +02:00 committed by GitHub
parent 9fa8489cb8
commit ea79a4b6af
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 55 additions and 12 deletions

View File

@ -129,6 +129,16 @@ class PrestoOnHiveConfig(BasicSQLAlchemyConfig):
description="Add the Presto catalog name (e.g. hive) to the generated dataset urns. `urn:li:dataset:(urn:li:dataPlatform:hive,hive.user.logging_events,PROD)` versus `urn:li:dataset:(urn:li:dataPlatform:hive,user.logging_events,PROD)`",
)
extra_properties: List[str] = Field(
default=[],
description="By default, the connector extracts a specific set of properties from the metastore tables with a sql query. Use this list of keys to provide additional properties that you would like to extract. You have to make sure the column name returned by the sql query is the same as the key you provide here.",
)
enable_properties_merge: bool = Field(
default=False,
description="By default, the connector overwrites properties every time. Set this to True to enable merging of properties with what exists on the server.",
)
def get_sql_alchemy_url(
self, uri_opts: Optional[Dict[str, Any]] = None, database: Optional[str] = None
) -> str:
@ -478,13 +488,11 @@ class PrestoOnHiveSource(SQLAlchemySource):
dataset_snapshot.aspects.append(schema_metadata)
# add table properties
properties: Dict[str, str] = {
"create_date": columns[-1]["create_date"],
"table_type": columns[-1]["table_type"],
"table_location": ""
if columns[-1]["table_location"] is None
else columns[-1]["table_location"],
}
default_properties = ["create_date", "table_type", "table_location"]
properties: Dict[str, str] = {}
for prop in default_properties + self.config.extra_properties:
if prop in columns[-1]:
properties[prop] = str(columns[-1][prop]) or ""
par_columns: str = ", ".join(
[c["col_name"] for c in columns if c["is_partition_col"]]
@ -492,17 +500,42 @@ class PrestoOnHiveSource(SQLAlchemySource):
if par_columns != "":
properties["partitioned_columns"] = par_columns
dataset_properties = DatasetPropertiesClass(
name=key.table,
description=columns[-1]["description"],
customProperties=properties,
table_description = (
columns[-1]["description"] if "description" in columns[-1] else ""
)
dataset_snapshot.aspects.append(dataset_properties)
yield from self.add_hive_dataset_to_container(
dataset_urn=dataset_urn, inspector=inspector, schema=key.schema
)
if self.config.enable_properties_merge:
from datahub.specific.dataset import DatasetPatchBuilder
patch_builder: DatasetPatchBuilder = DatasetPatchBuilder(
urn=dataset_snapshot.urn
)
patch_builder.set_display_name(key.table).set_description(
description=table_description
)
for prop, value in properties.items():
patch_builder.add_custom_property(key=prop, value=value)
yield from [
MetadataWorkUnit(
id=f"{mcp_raw.entityUrn}-{DatasetPropertiesClass.ASPECT_NAME}",
mcp_raw=mcp_raw,
)
for mcp_raw in patch_builder.build()
]
else:
# we add to the MCE to keep compatibility with previous output
# if merging is disabled
dataset_properties = DatasetPropertiesClass(
name=key.table,
description=table_description,
customProperties=properties,
)
dataset_snapshot.aspects.append(dataset_properties)
# construct mce
mce = MetadataChangeEvent(proposedSnapshot=dataset_snapshot)
yield SqlWorkUnit(id=dataset_name, mce=mce)

View File

@ -216,3 +216,13 @@ class DatasetPatchBuilder(MetadataPatchProposal):
def remove_custom_property(self, key: str) -> "DatasetPatchBuilder":
self.custom_properties_patch_helper.remove_property(key)
return self
def set_display_name(self, display_name: str) -> "DatasetPatchBuilder":
if display_name is not None:
self._add_patch(
DatasetProperties.ASPECT_NAME,
"replace",
path="/name",
value=display_name,
)
return self