diff --git a/metadata-ingestion/src/datahub/ingestion/source/sql/presto_on_hive.py b/metadata-ingestion/src/datahub/ingestion/source/sql/presto_on_hive.py index 47ac5814d6..b0cffc52d7 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/sql/presto_on_hive.py +++ b/metadata-ingestion/src/datahub/ingestion/source/sql/presto_on_hive.py @@ -129,6 +129,16 @@ class PrestoOnHiveConfig(BasicSQLAlchemyConfig): description="Add the Presto catalog name (e.g. hive) to the generated dataset urns. `urn:li:dataset:(urn:li:dataPlatform:hive,hive.user.logging_events,PROD)` versus `urn:li:dataset:(urn:li:dataPlatform:hive,user.logging_events,PROD)`", ) + extra_properties: List[str] = Field( + default=[], + description="By default, the connector extracts a specific set of properties from the metastore tables with a sql query. Use this list of keys to provide additional properties that you would like to extract. You have to make sure the column name returned by the sql query is the same as the key you provide here.", + ) + + enable_properties_merge: bool = Field( + default=False, + description="By default, the connector overwrites properties every time. Set this to True to enable merging of properties with what exists on the server.", + ) + def get_sql_alchemy_url( self, uri_opts: Optional[Dict[str, Any]] = None, database: Optional[str] = None ) -> str: @@ -478,13 +488,11 @@ class PrestoOnHiveSource(SQLAlchemySource): dataset_snapshot.aspects.append(schema_metadata) # add table properties - properties: Dict[str, str] = { - "create_date": columns[-1]["create_date"], - "table_type": columns[-1]["table_type"], - "table_location": "" - if columns[-1]["table_location"] is None - else columns[-1]["table_location"], - } + default_properties = ["create_date", "table_type", "table_location"] + properties: Dict[str, str] = {} + for prop in default_properties + self.config.extra_properties: + if prop in columns[-1]: + properties[prop] = str(columns[-1][prop]) or "" par_columns: str = ", ".join( [c["col_name"] for c in columns if c["is_partition_col"]] @@ -492,17 +500,42 @@ class PrestoOnHiveSource(SQLAlchemySource): if par_columns != "": properties["partitioned_columns"] = par_columns - dataset_properties = DatasetPropertiesClass( - name=key.table, - description=columns[-1]["description"], - customProperties=properties, + table_description = ( + columns[-1]["description"] if "description" in columns[-1] else "" ) - dataset_snapshot.aspects.append(dataset_properties) yield from self.add_hive_dataset_to_container( dataset_urn=dataset_urn, inspector=inspector, schema=key.schema ) + if self.config.enable_properties_merge: + from datahub.specific.dataset import DatasetPatchBuilder + + patch_builder: DatasetPatchBuilder = DatasetPatchBuilder( + urn=dataset_snapshot.urn + ) + patch_builder.set_display_name(key.table).set_description( + description=table_description + ) + for prop, value in properties.items(): + patch_builder.add_custom_property(key=prop, value=value) + yield from [ + MetadataWorkUnit( + id=f"{mcp_raw.entityUrn}-{DatasetPropertiesClass.ASPECT_NAME}", + mcp_raw=mcp_raw, + ) + for mcp_raw in patch_builder.build() + ] + else: + # we add to the MCE to keep compatibility with previous output + # if merging is disabled + dataset_properties = DatasetPropertiesClass( + name=key.table, + description=table_description, + customProperties=properties, + ) + dataset_snapshot.aspects.append(dataset_properties) + # construct mce mce = MetadataChangeEvent(proposedSnapshot=dataset_snapshot) yield SqlWorkUnit(id=dataset_name, mce=mce) diff --git a/metadata-ingestion/src/datahub/specific/dataset.py b/metadata-ingestion/src/datahub/specific/dataset.py index 19d3d75520..fcfe049fb1 100644 --- a/metadata-ingestion/src/datahub/specific/dataset.py +++ b/metadata-ingestion/src/datahub/specific/dataset.py @@ -216,3 +216,13 @@ class DatasetPatchBuilder(MetadataPatchProposal): def remove_custom_property(self, key: str) -> "DatasetPatchBuilder": self.custom_properties_patch_helper.remove_property(key) return self + + def set_display_name(self, display_name: str) -> "DatasetPatchBuilder": + if display_name is not None: + self._add_patch( + DatasetProperties.ASPECT_NAME, + "replace", + path="/name", + value=display_name, + ) + return self