feat(ingest/audit): add client id and version in system metadata props (#10829)

This commit is contained in:
Aseem Bansal 2024-07-08 22:08:12 +05:30 committed by GitHub
parent 43bac365bc
commit 41b9e15235
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 70 additions and 12 deletions

View File

@ -13,11 +13,17 @@ from deprecated import deprecated
from requests.models import Response
from requests.sessions import Session
import datahub
from datahub.cli import config_utils
from datahub.emitter.aspect import ASPECT_MAP, TIMESERIES_ASPECT_MAP
from datahub.emitter.mcp import MetadataChangeProposalWrapper
from datahub.emitter.request_helper import make_curl_command
from datahub.emitter.serialization_helper import post_json_transform
from datahub.metadata.schema_classes import _Aspect
from datahub.metadata.com.linkedin.pegasus2avro.mxe import (
MetadataChangeEvent,
MetadataChangeProposal,
)
from datahub.metadata.schema_classes import SystemMetadataClass, _Aspect
from datahub.utilities.urns.urn import Urn, guess_entity_type
log = logging.getLogger(__name__)
@ -689,3 +695,18 @@ def generate_access_token(
return token_name, response.json().get("data", {}).get("createAccessToken", {}).get(
"accessToken", None
)
def ensure_has_system_metadata(
event: Union[
MetadataChangeProposal, MetadataChangeProposalWrapper, MetadataChangeEvent
]
) -> None:
if event.systemMetadata is None:
event.systemMetadata = SystemMetadataClass()
metadata = event.systemMetadata
if metadata.properties is None:
metadata.properties = {}
props = metadata.properties
props["clientId"] = datahub.__package_name__
props["clientVersion"] = datahub.__version__

View File

@ -10,7 +10,11 @@ from deprecated import deprecated
from requests.adapters import HTTPAdapter, Retry
from requests.exceptions import HTTPError, RequestException
from datahub.cli.cli_utils import fixup_gms_url, get_system_auth
from datahub.cli.cli_utils import (
ensure_has_system_metadata,
fixup_gms_url,
get_system_auth,
)
from datahub.configuration.common import ConfigurationError, OperationalError
from datahub.emitter.generic_emitter import Emitter
from datahub.emitter.mcp import MetadataChangeProposalWrapper
@ -228,12 +232,10 @@ class DataHubRestEmitter(Closeable, Emitter):
snapshot_fqn = (
f"com.linkedin.metadata.snapshot.{mce.proposedSnapshot.RECORD_SCHEMA.name}"
)
system_metadata_obj = {}
if mce.systemMetadata is not None:
system_metadata_obj = {
"lastObserved": mce.systemMetadata.lastObserved,
"runId": mce.systemMetadata.runId,
}
ensure_has_system_metadata(mce)
# To make lint happy
assert mce.systemMetadata is not None
system_metadata_obj = mce.systemMetadata.to_obj()
snapshot = {
"entity": {"value": {snapshot_fqn: mce_obj}},
"systemMetadata": system_metadata_obj,
@ -246,7 +248,7 @@ class DataHubRestEmitter(Closeable, Emitter):
self, mcp: Union[MetadataChangeProposal, MetadataChangeProposalWrapper]
) -> None:
url = f"{self._gms_server}/aspects?action=ingestProposal"
ensure_has_system_metadata(mcp)
mcp_obj = pre_json_transform(mcp.to_obj())
payload = json.dumps({"proposal": mcp_obj})
@ -256,6 +258,8 @@ class DataHubRestEmitter(Closeable, Emitter):
self, mcps: List[Union[MetadataChangeProposal, MetadataChangeProposalWrapper]]
) -> None:
url = f"{self._gms_server}/aspects?action=ingestProposalBatch"
for mcp in mcps:
ensure_has_system_metadata(mcp)
mcp_objs = [pre_json_transform(mcp.to_obj()) for mcp in mcps]
payload = json.dumps({"proposals": mcp_objs})

View File

@ -75,7 +75,15 @@ basicAuditStamp = models.AuditStampClass(
}
}
},
"systemMetadata": {},
"systemMetadata": {
"lastObserved": 0,
"lastRunId": "no-run-id-provided",
"properties": {
"clientId": "acryl-datahub",
"clientVersion": "1!0.0.0.dev0",
},
"runId": "no-run-id-provided",
},
},
),
(
@ -125,7 +133,15 @@ basicAuditStamp = models.AuditStampClass(
}
}
},
"systemMetadata": {},
"systemMetadata": {
"lastObserved": 0,
"lastRunId": "no-run-id-provided",
"properties": {
"clientId": "acryl-datahub",
"clientVersion": "1!0.0.0.dev0",
},
"runId": "no-run-id-provided",
},
},
),
(
@ -161,7 +177,15 @@ basicAuditStamp = models.AuditStampClass(
}
}
},
"systemMetadata": {},
"systemMetadata": {
"lastObserved": 0,
"lastRunId": "no-run-id-provided",
"properties": {
"clientId": "acryl-datahub",
"clientVersion": "1!0.0.0.dev0",
},
"runId": "no-run-id-provided",
},
},
),
(
@ -238,6 +262,15 @@ basicAuditStamp = models.AuditStampClass(
"value": '{"owners": [{"owner": "urn:li:corpuser:fbar", "type": "DATAOWNER"}], "ownerTypes": {}, "lastModified": {"time": 0, "actor": "urn:li:corpuser:fbar"}}',
"contentType": "application/json",
},
"systemMetadata": {
"lastObserved": 0,
"lastRunId": "no-run-id-provided",
"properties": {
"clientId": "acryl-datahub",
"clientVersion": "1!0.0.0.dev0",
},
"runId": "no-run-id-provided",
},
}
},
),