mirror of
https://github.com/datahub-project/datahub.git
synced 2025-12-28 18:38:17 +00:00
feat(rest_emitter): support delete emit mcp (#14033)
This commit is contained in:
parent
84ad365b37
commit
c694168991
@ -37,3 +37,5 @@ clean {
|
||||
delete 'src/mainGeneratedAvroSchema/avro'
|
||||
delete 'src/renamed/avro'
|
||||
}
|
||||
|
||||
generateTestDataTemplate.dependsOn spotlessJava
|
||||
|
||||
@ -4,7 +4,7 @@ import json
|
||||
import re
|
||||
import textwrap
|
||||
from pathlib import Path
|
||||
from typing import Dict, Iterable, List, Optional, Tuple, Union
|
||||
from typing import Dict, Iterable, List, Optional, Tuple, Union, Set
|
||||
|
||||
import avro.schema
|
||||
import click
|
||||
@ -323,7 +323,7 @@ ASPECT_NAME_MAP: Dict[str, Type[_Aspect]] = {{
|
||||
for aspect in ASPECT_CLASSES
|
||||
}}
|
||||
|
||||
from typing import Literal
|
||||
from typing import Literal, Set
|
||||
from typing_extensions import TypedDict
|
||||
|
||||
class AspectBag(TypedDict, total=False):
|
||||
@ -334,6 +334,8 @@ KEY_ASPECTS: Dict[str, Type[_Aspect]] = {{
|
||||
{f",{newline} ".join(f"'{aspect['Aspect']['keyForEntity']}': {aspect['name']}Class" for aspect in aspects if aspect["Aspect"].get("keyForEntity"))}
|
||||
}}
|
||||
|
||||
KEY_ASPECT_NAMES: Set[str] = {{cls.ASPECT_NAME for cls in KEY_ASPECTS.values()}}
|
||||
|
||||
ENTITY_TYPE_NAMES: List[str] = [
|
||||
{f",{newline} ".join(f"'{aspect['Aspect']['keyForEntity']}'" for aspect in aspects if aspect["Aspect"].get("keyForEntity"))}
|
||||
]
|
||||
|
||||
@ -61,6 +61,10 @@ from datahub.metadata.com.linkedin.pegasus2avro.mxe import (
|
||||
MetadataChangeProposal,
|
||||
)
|
||||
from datahub.metadata.com.linkedin.pegasus2avro.usage import UsageAggregation
|
||||
from datahub.metadata.schema_classes import (
|
||||
KEY_ASPECT_NAMES,
|
||||
ChangeTypeClass,
|
||||
)
|
||||
from datahub.utilities.server_config_util import RestServiceConfig, ServiceFeature
|
||||
|
||||
if TYPE_CHECKING:
|
||||
@ -626,15 +630,27 @@ class DataHubRestEmitter(Closeable, Emitter):
|
||||
trace_data = extract_trace_data(response) if response else None
|
||||
|
||||
else:
|
||||
url = f"{self._gms_server}/aspects?action=ingestProposal"
|
||||
if mcp.changeType == ChangeTypeClass.DELETE:
|
||||
if mcp.aspectName not in KEY_ASPECT_NAMES:
|
||||
raise OperationalError(
|
||||
f"Delete not supported for non key aspect: {mcp.aspectName} for urn: "
|
||||
f"{mcp.entityUrn}"
|
||||
)
|
||||
|
||||
mcp_obj = preserve_unicode_escapes(pre_json_transform(mcp.to_obj()))
|
||||
payload_dict = {
|
||||
"proposal": mcp_obj,
|
||||
"async": "true"
|
||||
if emit_mode in (EmitMode.ASYNC, EmitMode.ASYNC_WAIT)
|
||||
else "false",
|
||||
}
|
||||
url = f"{self._gms_server}/entities?action=delete"
|
||||
payload_dict = {
|
||||
"urn": mcp.entityUrn,
|
||||
}
|
||||
else:
|
||||
url = f"{self._gms_server}/aspects?action=ingestProposal"
|
||||
|
||||
mcp_obj = preserve_unicode_escapes(pre_json_transform(mcp.to_obj()))
|
||||
payload_dict = {
|
||||
"proposal": mcp_obj,
|
||||
"async": "true"
|
||||
if emit_mode in (EmitMode.ASYNC, EmitMode.ASYNC_WAIT)
|
||||
else "false",
|
||||
}
|
||||
|
||||
payload = json.dumps(payload_dict)
|
||||
|
||||
|
||||
@ -10,6 +10,7 @@ from requests import Response, Session
|
||||
|
||||
from datahub.configuration.common import (
|
||||
ConfigurationError,
|
||||
OperationalError,
|
||||
TraceTimeoutError,
|
||||
TraceValidationError,
|
||||
)
|
||||
@ -36,6 +37,8 @@ from datahub.metadata.com.linkedin.pegasus2avro.dataset import (
|
||||
DatasetProperties,
|
||||
)
|
||||
from datahub.metadata.schema_classes import (
|
||||
KEY_ASPECT_NAMES,
|
||||
KEY_ASPECTS,
|
||||
ChangeTypeClass,
|
||||
)
|
||||
from datahub.specific.dataset import DatasetPatchBuilder
|
||||
@ -1260,6 +1263,208 @@ class TestDataHubRestEmitter:
|
||||
# ASCII should remain unchanged
|
||||
assert result["nested"]["list"][0] == "item1" # Pure ASCII
|
||||
|
||||
def test_emit_mcp_delete_functionality(self):
|
||||
"""Test that MCP delete operations use correct URL and payload format"""
|
||||
emitter = DataHubRestEmitter(MOCK_GMS_ENDPOINT, openapi_ingestion=False)
|
||||
|
||||
# Test Case 1: Delete with key aspect (should succeed)
|
||||
delete_mcp_key_aspect = MetadataChangeProposalWrapper(
|
||||
entityUrn="urn:li:dataset:(urn:li:dataPlatform:mysql,DeleteTest,PROD)",
|
||||
entityType="dataset",
|
||||
aspectName="datasetKey", # This is a key aspect
|
||||
changeType=ChangeTypeClass.DELETE,
|
||||
aspect=None, # No aspect data needed for delete
|
||||
)
|
||||
|
||||
with patch.object(emitter, "_emit_generic") as mock_emit:
|
||||
emitter.emit_mcp(delete_mcp_key_aspect)
|
||||
|
||||
# Verify _emit_generic was called once
|
||||
mock_emit.assert_called_once()
|
||||
|
||||
# Check that delete URL format was used
|
||||
url = mock_emit.call_args[0][0]
|
||||
assert url == f"{MOCK_GMS_ENDPOINT}/entities?action=delete"
|
||||
|
||||
# Check that delete payload format was used (just URN)
|
||||
payload = mock_emit.call_args[0][1]
|
||||
payload_dict = json.loads(payload)
|
||||
expected_payload = {
|
||||
"urn": "urn:li:dataset:(urn:li:dataPlatform:mysql,DeleteTest,PROD)"
|
||||
}
|
||||
assert payload_dict == expected_payload
|
||||
|
||||
# Test Case 2: Delete with non-key aspect (should log error and return)
|
||||
delete_mcp_non_key_aspect = MetadataChangeProposalWrapper(
|
||||
entityUrn="urn:li:dataset:(urn:li:dataPlatform:mysql,DeleteTest,PROD)",
|
||||
entityType="dataset",
|
||||
aspectName="datasetProperties", # This is NOT a key aspect
|
||||
changeType=ChangeTypeClass.DELETE,
|
||||
aspect=None,
|
||||
)
|
||||
|
||||
with (
|
||||
patch.object(emitter, "_emit_generic") as mock_emit,
|
||||
):
|
||||
try:
|
||||
emitter.emit_mcp(delete_mcp_non_key_aspect)
|
||||
except OperationalError as e:
|
||||
assert e.message == (
|
||||
"Delete not supported for non key aspect: datasetProperties for urn: "
|
||||
"urn:li:dataset:(urn:li:dataPlatform:mysql,DeleteTest,PROD)"
|
||||
)
|
||||
|
||||
# Verify _emit_generic was NOT called
|
||||
mock_emit.assert_not_called()
|
||||
|
||||
def test_emit_mcp_delete_vs_upsert_different_urls(self):
|
||||
"""Test that delete and upsert operations use different URLs"""
|
||||
emitter = DataHubRestEmitter(MOCK_GMS_ENDPOINT, openapi_ingestion=False)
|
||||
|
||||
# Create delete MCP
|
||||
delete_mcp = MetadataChangeProposalWrapper(
|
||||
entityUrn="urn:li:dataset:(urn:li:dataPlatform:mysql,DeleteTest,PROD)",
|
||||
entityType="dataset",
|
||||
aspectName="datasetKey",
|
||||
changeType=ChangeTypeClass.DELETE,
|
||||
aspect=None,
|
||||
)
|
||||
|
||||
# Create upsert MCP
|
||||
upsert_mcp = MetadataChangeProposalWrapper(
|
||||
entityUrn="urn:li:dataset:(urn:li:dataPlatform:mysql,UpsertTest,PROD)",
|
||||
entityType="dataset",
|
||||
aspectName="datasetProperties",
|
||||
changeType=ChangeTypeClass.UPSERT,
|
||||
aspect=DatasetProperties(name="Test Dataset"),
|
||||
)
|
||||
|
||||
with patch.object(emitter, "_emit_generic") as mock_emit:
|
||||
# Test delete
|
||||
emitter.emit_mcp(delete_mcp)
|
||||
delete_url = mock_emit.call_args[0][0]
|
||||
|
||||
# Reset mock
|
||||
mock_emit.reset_mock()
|
||||
|
||||
# Test upsert
|
||||
emitter.emit_mcp(upsert_mcp)
|
||||
upsert_url = mock_emit.call_args[0][0]
|
||||
|
||||
# Verify different URLs were used
|
||||
assert delete_url == f"{MOCK_GMS_ENDPOINT}/entities?action=delete"
|
||||
assert upsert_url == f"{MOCK_GMS_ENDPOINT}/aspects?action=ingestProposal"
|
||||
assert delete_url != upsert_url
|
||||
|
||||
def test_emit_mcp_delete_payload_structure(self):
|
||||
"""Test that delete payload has correct structure compared to upsert"""
|
||||
emitter = DataHubRestEmitter(MOCK_GMS_ENDPOINT, openapi_ingestion=False)
|
||||
|
||||
# Create delete MCP
|
||||
delete_mcp = MetadataChangeProposalWrapper(
|
||||
entityUrn="urn:li:dataset:(urn:li:dataPlatform:mysql,DeleteTest,PROD)",
|
||||
entityType="dataset",
|
||||
aspectName="datasetKey",
|
||||
changeType=ChangeTypeClass.DELETE,
|
||||
aspect=None,
|
||||
)
|
||||
|
||||
# Create upsert MCP for comparison
|
||||
upsert_mcp = MetadataChangeProposalWrapper(
|
||||
entityUrn="urn:li:dataset:(urn:li:dataPlatform:mysql,UpsertTest,PROD)",
|
||||
entityType="dataset",
|
||||
aspectName="datasetProperties",
|
||||
changeType=ChangeTypeClass.UPSERT,
|
||||
aspect=DatasetProperties(name="Test Dataset"),
|
||||
)
|
||||
|
||||
with patch.object(emitter, "_emit_generic") as mock_emit:
|
||||
# Test delete payload
|
||||
emitter.emit_mcp(delete_mcp)
|
||||
delete_payload = json.loads(mock_emit.call_args[0][1])
|
||||
|
||||
# Reset mock
|
||||
mock_emit.reset_mock()
|
||||
|
||||
# Test upsert payload
|
||||
emitter.emit_mcp(upsert_mcp)
|
||||
upsert_payload = json.loads(mock_emit.call_args[0][1])
|
||||
|
||||
# Verify delete payload structure
|
||||
assert "urn" in delete_payload
|
||||
assert (
|
||||
delete_payload["urn"]
|
||||
== "urn:li:dataset:(urn:li:dataPlatform:mysql,DeleteTest,PROD)"
|
||||
)
|
||||
assert (
|
||||
"proposal" not in delete_payload
|
||||
) # Should not have proposal for delete
|
||||
assert "async" not in delete_payload # Should not have async for delete
|
||||
|
||||
# Verify upsert payload structure (for comparison)
|
||||
assert "proposal" in upsert_payload
|
||||
assert "async" in upsert_payload
|
||||
assert "urn" not in upsert_payload # URN is inside proposal for upsert
|
||||
|
||||
def test_emit_mcp_delete_with_key_aspects_only(self):
|
||||
"""Test that delete only works with key aspects from KEY_ASPECTS"""
|
||||
emitter = DataHubRestEmitter(MOCK_GMS_ENDPOINT, openapi_ingestion=False)
|
||||
|
||||
# Test with a known key aspect (should work)
|
||||
key_aspect = next(iter(KEY_ASPECT_NAMES)) # Get first key aspect
|
||||
delete_mcp_key = MetadataChangeProposalWrapper(
|
||||
entityUrn="urn:li:dataset:(urn:li:dataPlatform:mysql,KeyAspectTest,PROD)",
|
||||
entityType="dataset",
|
||||
aspectName=key_aspect,
|
||||
changeType=ChangeTypeClass.DELETE,
|
||||
aspect=None,
|
||||
)
|
||||
|
||||
with patch.object(emitter, "_emit_generic") as mock_emit:
|
||||
emitter.emit_mcp(delete_mcp_key)
|
||||
|
||||
# Should have called _emit_generic
|
||||
mock_emit.assert_called_once()
|
||||
|
||||
# Verify URL and payload
|
||||
url = mock_emit.call_args[0][0]
|
||||
payload = json.loads(mock_emit.call_args[0][1])
|
||||
|
||||
assert url == f"{MOCK_GMS_ENDPOINT}/entities?action=delete"
|
||||
assert payload == {
|
||||
"urn": "urn:li:dataset:(urn:li:dataPlatform:mysql,KeyAspectTest,PROD)"
|
||||
}
|
||||
|
||||
# Test with a non-key aspect (should fail)
|
||||
non_key_aspects = ["datasetProperties", "datasetProfile", "status"]
|
||||
for aspect_name in non_key_aspects:
|
||||
if aspect_name not in KEY_ASPECTS: # Ensure it's actually not a key aspect
|
||||
delete_mcp_non_key = MetadataChangeProposalWrapper(
|
||||
entityUrn="urn:li:dataset:(urn:li:dataPlatform:mysql,NonKeyAspectTest,PROD)",
|
||||
entityType="dataset",
|
||||
aspectName=aspect_name,
|
||||
changeType=ChangeTypeClass.DELETE,
|
||||
aspect=None,
|
||||
)
|
||||
|
||||
with (
|
||||
patch.object(emitter, "_emit_generic") as mock_emit,
|
||||
):
|
||||
try:
|
||||
emitter.emit_mcp(delete_mcp_non_key)
|
||||
except OperationalError as e:
|
||||
assert (
|
||||
f"Delete not supported for non key aspect: {aspect_name} for urn: "
|
||||
"urn:li:dataset:(urn:li:dataPlatform:mysql,NonKeyAspectTest,PROD)"
|
||||
) == e.message
|
||||
|
||||
# Should NOT have called _emit_generic
|
||||
mock_emit.assert_not_called()
|
||||
|
||||
# Reset for next iteration
|
||||
mock_emit.reset_mock()
|
||||
break # Only need to test one non-key aspect
|
||||
|
||||
|
||||
class TestOpenApiModeSelection:
|
||||
def test_sdk_client_mode_no_env_var(self, mock_session, mock_response):
|
||||
|
||||
@ -13,6 +13,7 @@ from datahub.emitter.serialization_helper import pre_json_transform
|
||||
from datahub.metadata.schema_classes import (
|
||||
AuditStampClass,
|
||||
ChangeAuditStampsClass,
|
||||
ChangeTypeClass,
|
||||
DashboardInfoClass,
|
||||
GenericAspectClass,
|
||||
MetadataChangeProposalClass,
|
||||
@ -97,3 +98,59 @@ def test_gms_ignore_unknown_dashboard_info(graph_client):
|
||||
assert dashboard_info
|
||||
assert dashboard_info.title == invalid_dashboard_info["title"]
|
||||
assert dashboard_info.description == invalid_dashboard_info["description"]
|
||||
|
||||
|
||||
def test_gms_delete_mcp(graph_client):
|
||||
dashboard_urn = make_dashboard_urn(platform="looker", name="test-delete-mcp")
|
||||
generated_urns.extend([dashboard_urn])
|
||||
|
||||
audit_stamp = pre_json_transform(
|
||||
ChangeAuditStampsClass(
|
||||
created=AuditStampClass(
|
||||
time=int(time.time() * 1000),
|
||||
actor="urn:li:corpuser:datahub",
|
||||
)
|
||||
).to_obj()
|
||||
)
|
||||
|
||||
invalid_dashboard_info = {
|
||||
"title": "Ignore Unknown Title",
|
||||
"description": "Ignore Unknown Description",
|
||||
"lastModified": audit_stamp,
|
||||
"notAValidField": "invalid field value",
|
||||
}
|
||||
mcpw = MetadataChangeProposalInvalidWrapper(
|
||||
entityUrn=dashboard_urn,
|
||||
aspectName="dashboardInfo",
|
||||
aspect=invalid_dashboard_info,
|
||||
)
|
||||
|
||||
mcp = mcpw.make_mcp()
|
||||
assert "notAValidField" in str(mcp)
|
||||
assert "invalid field value" in str(mcp)
|
||||
|
||||
graph_client.emit_mcp(mcpw, emit_mode=EmitMode.SYNC_PRIMARY)
|
||||
|
||||
dashboard_info = graph_client.get_aspect(
|
||||
entity_urn=dashboard_urn,
|
||||
aspect_type=DashboardInfoClass,
|
||||
)
|
||||
|
||||
assert dashboard_info
|
||||
assert dashboard_info.title == invalid_dashboard_info["title"]
|
||||
assert dashboard_info.description == invalid_dashboard_info["description"]
|
||||
|
||||
new_mcpw = MetadataChangeProposalWrapper(
|
||||
entityUrn=dashboard_urn,
|
||||
aspectName="dashboardKey",
|
||||
changeType=ChangeTypeClass.DELETE,
|
||||
)
|
||||
|
||||
graph_client.emit_mcp(new_mcpw, emit_mode=EmitMode.SYNC_PRIMARY)
|
||||
|
||||
dashboard_info = graph_client.get_aspect(
|
||||
entity_urn=dashboard_urn,
|
||||
aspect_type=DashboardInfoClass,
|
||||
)
|
||||
|
||||
assert not dashboard_info
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user