mirror of
https://github.com/datahub-project/datahub.git
synced 2025-09-16 12:42:27 +00:00
feat(ingestion/datahub): Improve system metadata handling in datahub source (#14643)
This commit is contained in:
parent
510b2a4082
commit
f6f939d40f
@ -129,6 +129,10 @@ class DataHubSourceConfig(StatefulIngestionConfigBase):
|
|||||||
description="Timeout for each query in seconds. ",
|
description="Timeout for each query in seconds. ",
|
||||||
)
|
)
|
||||||
|
|
||||||
|
preserve_system_metadata: bool = Field(
|
||||||
|
default=True, description="Copy system metadata from the source system"
|
||||||
|
)
|
||||||
|
|
||||||
@root_validator(skip_on_failure=True)
|
@root_validator(skip_on_failure=True)
|
||||||
def check_ingesting_data(cls, values):
|
def check_ingesting_data(cls, values):
|
||||||
if (
|
if (
|
||||||
|
@ -380,7 +380,12 @@ class DataHubDatabaseReader:
|
|||||||
json_metadata = post_json_transform(
|
json_metadata = post_json_transform(
|
||||||
json.loads(row["systemmetadata"] or "{}")
|
json.loads(row["systemmetadata"] or "{}")
|
||||||
)
|
)
|
||||||
|
system_metadata = None
|
||||||
|
if self.config.preserve_system_metadata:
|
||||||
system_metadata = SystemMetadataClass.from_obj(json_metadata)
|
system_metadata = SystemMetadataClass.from_obj(json_metadata)
|
||||||
|
if system_metadata.properties:
|
||||||
|
is_no_op = system_metadata.properties.pop("isNoOp", None)
|
||||||
|
logger.debug(f"Removed potential value for is_no_op={is_no_op}")
|
||||||
return MetadataChangeProposalWrapper(
|
return MetadataChangeProposalWrapper(
|
||||||
entityUrn=row["urn"],
|
entityUrn=row["urn"],
|
||||||
aspect=ASPECT_MAP[row["aspect"]].from_obj(json_aspect),
|
aspect=ASPECT_MAP[row["aspect"]].from_obj(json_aspect),
|
||||||
|
Loading…
x
Reference in New Issue
Block a user