feat(ingest): add get_entity_as_mcps method to client (#11425)

This commit is contained in:
Harshal Sheth 2024-09-19 08:22:50 -07:00 committed by GitHub
parent 7a6f194933
commit 7713ea56dd
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

View File

@ -527,6 +527,50 @@ class DataHubGraph(DatahubRestEmitter):
return result
def get_entity_as_mcps(
self, entity_urn: str, aspects: Optional[List[str]] = None
) -> List[MetadataChangeProposalWrapper]:
"""Get all non-timeseries aspects for an entity.
By formatting the entity's aspects as MCPWs, we can also include SystemMetadata.
Warning: Do not use this method to determine if an entity exists! This method will always return
something, even if the entity doesn't actually exist in DataHub.
Args:
entity_urn: The urn of the entity
aspects: Optional list of aspect names being requested (e.g. ["schemaMetadata", "datasetProperties"])
Returns:
A list of MCPWs.
"""
response_json = self.get_entity_raw(entity_urn, aspects)
# Now, we parse the response into proper aspect objects.
results: List[MetadataChangeProposalWrapper] = []
for aspect_name, aspect_json in response_json.get("aspects", {}).items():
aspect_type = ASPECT_NAME_MAP.get(aspect_name)
if aspect_type is None:
logger.warning(f"Ignoring unknown aspect type {aspect_name}")
continue
post_json_obj = post_json_transform(aspect_json)
aspect_value = aspect_type.from_obj(post_json_obj["value"])
system_metadata_raw = post_json_obj["systemMetadata"]
system_metadata = SystemMetadataClass.from_obj(system_metadata_raw)
mcpw = MetadataChangeProposalWrapper(
entityUrn=entity_urn,
aspect=aspect_value,
systemMetadata=system_metadata,
)
results.append(mcpw)
return results
def get_entity_semityped(
self, entity_urn: str, aspects: Optional[List[str]] = None
) -> AspectBag:
@ -545,19 +589,12 @@ class DataHubGraph(DatahubRestEmitter):
not be present in the dictionary. The entity's key aspect will always be present.
"""
response_json = self.get_entity_raw(entity_urn, aspects)
mcps = self.get_entity_as_mcps(entity_urn, aspects)
# Now, we parse the response into proper aspect objects.
result: AspectBag = {}
for aspect_name, aspect_json in response_json.get("aspects", {}).items():
aspect_type = ASPECT_NAME_MAP.get(aspect_name)
if aspect_type is None:
logger.warning(f"Ignoring unknown aspect type {aspect_name}")
continue
post_json_obj = post_json_transform(aspect_json)
aspect_value = aspect_type.from_obj(post_json_obj["value"])
result[aspect_name] = aspect_value # type: ignore
for mcp in mcps:
if mcp.aspect:
result[mcp.aspect.get_aspect_name()] = mcp.aspect # type: ignore
return result