feat(sdk): python - add get_aspects_for_entity (#5255)

Co-authored-by: Shirshanka Das <shirshanka@apache.org>
This commit is contained in:
Aezo 2022-07-13 01:10:07 +08:00 committed by GitHub
parent 9fca5277dc
commit ff0aa3f24b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 86 additions and 0 deletions

View File

@ -0,0 +1,33 @@
import logging
from datahub.emitter.mce_builder import make_dataset_urn
# read-modify-write requires access to the DataHubGraph (RestEmitter is not enough)
from datahub.ingestion.graph.client import DatahubClientConfig, DataHubGraph
# Imports for metadata model classes
from datahub.metadata.schema_classes import (
DataPlatformInstanceClass,
DatasetKeyClass,
StatusClass,
)
log = logging.getLogger(__name__)
logging.basicConfig(level=logging.INFO)
dataset_urn = make_dataset_urn(platform="hive", name="realestate_db.sales", env="PROD")
gms_endpoint = "http://localhost:8080"
graph = DataHubGraph(DatahubClientConfig(server=gms_endpoint))
# Query multiple aspects from entity
result = graph.get_aspects_for_entity(
entity_urn=dataset_urn,
aspects=["status", "dataPlatformInstance", "datasetKey"],
aspect_types=[StatusClass, DataPlatformInstanceClass, DatasetKeyClass],
)
# result are typed according to their class if exist
if result is not None:
if result["datasetKey"]:
log.info(result["datasetKey"].name)

View File

@ -288,6 +288,59 @@ class DataHubGraph(DatahubRestEmitter):
)
return None
def get_aspects_for_entity(
self,
entity_urn: str,
aspects: List[str],
aspect_types: List[Type[Aspect]],
) -> Optional[Dict[str, Optional[Aspect]]]:
"""
Get multiple aspects for an entity. To get a single aspect for an entity, use the `get_aspect_v2` method.
Warning: Do not use this method to determine if an entity exists!
This method will always return an entity, even if it doesn't exist. This is an issue with how DataHub server
responds to these calls, and will be fixed automatically when the server-side issue is fixed.
:param str entity_urn: The urn of the entity
:param List[Type[Aspect]] aspect_type_list: List of aspect type classes being requested (e.g. [datahub.metadata.schema_classes.DatasetProperties])
:param List[str] aspects_list: List of aspect names being requested (e.g. [schemaMetadata, datasetProperties])
:return: Optionally, a map of aspect_name to aspect_value as a dictionary if present, aspect_value will be set to None if that aspect was not found. Returns None on HTTP status 404.
:rtype: Optional[Dict[str, Optional[Aspect]]]
:raises HttpError: if the HTTP response is not a 200 or a 404
"""
assert len(aspects) == len(
aspect_types
), f"number of aspects requested ({len(aspects)}) should be the same as number of aspect types provided ({len(aspect_types)})"
aspects_list = ",".join(aspects)
url: str = f"{self._gms_server}/entitiesV2/{Urn.url_encode(entity_urn)}?aspects=List({aspects_list})"
response = self._session.get(url)
if response.status_code == 404:
# not found
return None
response.raise_for_status()
response_json = response.json()
result: Dict[str, Optional[Aspect]] = {}
for aspect_type in aspect_types:
record_schema: RecordSchema = aspect_type.__getattribute__(
aspect_type, "RECORD_SCHEMA"
)
if not record_schema:
logger.warning(
f"Failed to infer type name of the aspect from the aspect type class {aspect_type}. Continuing, but this will fail."
)
else:
aspect_type_name = record_schema.props["Aspect"]["name"]
aspect_json = response_json.get("aspects", {}).get(aspect_type_name)
if aspect_json:
# need to apply a transform to the response to match rest.li and avro serialization
post_json_obj = post_json_transform(aspect_json)
result[aspect_type_name] = aspect_type.from_obj(post_json_obj["value"])
else:
result[aspect_type_name] = None
return result
def _get_search_endpoint(self):
return f"{self.config.server}/entities?action=search"