2025-03-11 08:25:53 -05:00
|
|
|
import json
|
2025-03-13 11:33:28 -05:00
|
|
|
from datetime import timedelta
|
|
|
|
from unittest.mock import ANY, Mock, patch
|
2025-03-11 08:25:53 -05:00
|
|
|
|
2025-03-13 11:33:28 -05:00
|
|
|
import pytest
|
|
|
|
from requests import Response
|
|
|
|
|
|
|
|
from datahub.configuration.common import TraceTimeoutError, TraceValidationError
|
2023-03-29 08:53:20 +05:30
|
|
|
from datahub.emitter import rest_emitter
|
2025-03-11 08:25:53 -05:00
|
|
|
from datahub.emitter.mcp import MetadataChangeProposalWrapper
|
2025-03-13 11:33:28 -05:00
|
|
|
from datahub.emitter.response_helper import TraceData
|
2025-03-11 08:25:53 -05:00
|
|
|
from datahub.emitter.rest_emitter import (
|
|
|
|
BATCH_INGEST_MAX_PAYLOAD_LENGTH,
|
|
|
|
INGEST_MAX_PAYLOAD_BYTES,
|
|
|
|
DataHubRestEmitter,
|
|
|
|
DatahubRestEmitter,
|
2025-03-13 11:33:28 -05:00
|
|
|
logger,
|
2025-03-11 08:25:53 -05:00
|
|
|
)
|
2025-03-27 22:21:33 -07:00
|
|
|
from datahub.errors import APITracingWarning
|
2025-03-11 08:25:53 -05:00
|
|
|
from datahub.metadata.com.linkedin.pegasus2avro.common import (
|
|
|
|
Status,
|
|
|
|
)
|
|
|
|
from datahub.metadata.com.linkedin.pegasus2avro.dataset import (
|
|
|
|
DatasetProfile,
|
|
|
|
DatasetProperties,
|
|
|
|
)
|
2025-04-25 13:54:28 -05:00
|
|
|
from datahub.metadata.schema_classes import (
|
|
|
|
ChangeTypeClass,
|
|
|
|
)
|
|
|
|
from datahub.specific.dataset import DatasetPatchBuilder
|
2021-08-08 22:30:55 -07:00
|
|
|
|
|
|
|
MOCK_GMS_ENDPOINT = "http://fakegmshost:8080"
|
|
|
|
|
|
|
|
|
2025-03-13 11:33:28 -05:00
|
|
|
@pytest.fixture
|
|
|
|
def openapi_emitter() -> DataHubRestEmitter:
|
|
|
|
return DataHubRestEmitter(MOCK_GMS_ENDPOINT, openapi_ingestion=True)
|
|
|
|
|
|
|
|
|
2025-01-07 17:00:13 -05:00
|
|
|
def test_datahub_rest_emitter_construction() -> None:
|
2021-08-08 22:30:55 -07:00
|
|
|
emitter = DatahubRestEmitter(MOCK_GMS_ENDPOINT)
|
2025-01-07 17:00:13 -05:00
|
|
|
assert emitter._session_config.timeout == rest_emitter._DEFAULT_TIMEOUT_SEC
|
|
|
|
assert (
|
|
|
|
emitter._session_config.retry_status_codes
|
|
|
|
== rest_emitter._DEFAULT_RETRY_STATUS_CODES
|
|
|
|
)
|
|
|
|
assert (
|
|
|
|
emitter._session_config.retry_max_times == rest_emitter._DEFAULT_RETRY_MAX_TIMES
|
|
|
|
)
|
2021-08-08 22:30:55 -07:00
|
|
|
|
|
|
|
|
2025-01-07 17:00:13 -05:00
|
|
|
def test_datahub_rest_emitter_timeout_construction() -> None:
|
2021-08-08 22:30:55 -07:00
|
|
|
emitter = DatahubRestEmitter(
|
|
|
|
MOCK_GMS_ENDPOINT, connect_timeout_sec=2, read_timeout_sec=4
|
|
|
|
)
|
2025-01-07 17:00:13 -05:00
|
|
|
assert emitter._session_config.timeout == (2, 4)
|
2021-09-23 21:53:39 +02:00
|
|
|
|
|
|
|
|
2025-01-07 17:00:13 -05:00
|
|
|
def test_datahub_rest_emitter_general_timeout_construction() -> None:
|
2023-11-30 21:00:43 -05:00
|
|
|
emitter = DatahubRestEmitter(MOCK_GMS_ENDPOINT, timeout_sec=2, read_timeout_sec=4)
|
2025-01-07 17:00:13 -05:00
|
|
|
assert emitter._session_config.timeout == (2, 4)
|
2023-11-30 21:00:43 -05:00
|
|
|
|
|
|
|
|
2025-01-07 17:00:13 -05:00
|
|
|
def test_datahub_rest_emitter_retry_construction() -> None:
|
2022-01-12 04:35:06 +01:00
|
|
|
emitter = DatahubRestEmitter(
|
|
|
|
MOCK_GMS_ENDPOINT,
|
|
|
|
retry_status_codes=[418],
|
|
|
|
retry_max_times=42,
|
|
|
|
)
|
2025-01-07 17:00:13 -05:00
|
|
|
assert emitter._session_config.retry_status_codes == [418]
|
|
|
|
assert emitter._session_config.retry_max_times == 42
|
2022-01-12 04:35:06 +01:00
|
|
|
|
|
|
|
|
2025-01-07 17:00:13 -05:00
|
|
|
def test_datahub_rest_emitter_extra_params() -> None:
|
2021-09-23 21:53:39 +02:00
|
|
|
emitter = DatahubRestEmitter(
|
|
|
|
MOCK_GMS_ENDPOINT, extra_headers={"key1": "value1", "key2": "value2"}
|
|
|
|
)
|
|
|
|
assert emitter._session.headers.get("key1") == "value1"
|
|
|
|
assert emitter._session.headers.get("key2") == "value2"
|
2025-03-11 08:25:53 -05:00
|
|
|
|
|
|
|
|
2025-03-13 11:33:28 -05:00
|
|
|
def test_openapi_emitter_emit(openapi_emitter):
|
2025-03-11 08:25:53 -05:00
|
|
|
item = MetadataChangeProposalWrapper(
|
|
|
|
entityUrn="urn:li:dataset:(urn:li:dataPlatform:mysql,User.UserAccount,PROD)",
|
|
|
|
aspect=DatasetProfile(
|
|
|
|
rowCount=2000,
|
|
|
|
columnCount=15,
|
|
|
|
timestampMillis=1626995099686,
|
|
|
|
),
|
|
|
|
)
|
|
|
|
|
|
|
|
with patch.object(openapi_emitter, "_emit_generic") as mock_method:
|
|
|
|
openapi_emitter.emit_mcp(item)
|
|
|
|
|
|
|
|
mock_method.assert_called_once_with(
|
|
|
|
f"{MOCK_GMS_ENDPOINT}/openapi/v3/entity/dataset?async=false",
|
|
|
|
payload=[
|
|
|
|
{
|
|
|
|
"urn": "urn:li:dataset:(urn:li:dataPlatform:mysql,User.UserAccount,PROD)",
|
|
|
|
"datasetProfile": {
|
|
|
|
"value": {
|
|
|
|
"rowCount": 2000,
|
|
|
|
"columnCount": 15,
|
|
|
|
"timestampMillis": 1626995099686,
|
|
|
|
"partitionSpec": {
|
|
|
|
"partition": "FULL_TABLE_SNAPSHOT",
|
|
|
|
"type": "FULL_TABLE",
|
|
|
|
},
|
|
|
|
},
|
|
|
|
"systemMetadata": {
|
|
|
|
"lastObserved": ANY,
|
|
|
|
"runId": "no-run-id-provided",
|
|
|
|
"lastRunId": "no-run-id-provided",
|
|
|
|
"properties": {
|
|
|
|
"clientId": "acryl-datahub",
|
|
|
|
"clientVersion": "1!0.0.0.dev0",
|
|
|
|
},
|
|
|
|
},
|
|
|
|
},
|
|
|
|
}
|
|
|
|
],
|
2025-04-25 13:54:28 -05:00
|
|
|
method="post",
|
2025-03-11 08:25:53 -05:00
|
|
|
)
|
|
|
|
|
|
|
|
|
2025-03-13 11:33:28 -05:00
|
|
|
def test_openapi_emitter_emit_mcps(openapi_emitter):
|
2025-03-11 08:25:53 -05:00
|
|
|
with patch(
|
|
|
|
"datahub.emitter.rest_emitter.DataHubRestEmitter._emit_generic"
|
|
|
|
) as mock_emit:
|
|
|
|
items = [
|
|
|
|
MetadataChangeProposalWrapper(
|
|
|
|
entityUrn=f"urn:li:dataset:(urn:li:dataPlatform:mysql,User.UserAccount{i},PROD)",
|
|
|
|
aspect=DatasetProfile(
|
|
|
|
rowCount=2000 + i,
|
|
|
|
columnCount=15,
|
|
|
|
timestampMillis=1626995099686,
|
|
|
|
),
|
|
|
|
)
|
|
|
|
for i in range(3)
|
|
|
|
]
|
|
|
|
|
|
|
|
result = openapi_emitter.emit_mcps(items)
|
|
|
|
|
|
|
|
assert result == 1
|
|
|
|
|
|
|
|
# Single chunk test - all items should be in one batch
|
|
|
|
mock_emit.assert_called_once()
|
|
|
|
call_args = mock_emit.call_args
|
|
|
|
assert (
|
|
|
|
call_args[0][0]
|
|
|
|
== f"{MOCK_GMS_ENDPOINT}/openapi/v3/entity/dataset?async=true"
|
|
|
|
)
|
|
|
|
assert isinstance(call_args[1]["payload"], str) # Should be JSON string
|
|
|
|
|
|
|
|
|
2025-03-13 11:33:28 -05:00
|
|
|
def test_openapi_emitter_emit_mcps_max_bytes(openapi_emitter):
|
2025-03-11 08:25:53 -05:00
|
|
|
with patch(
|
|
|
|
"datahub.emitter.rest_emitter.DataHubRestEmitter._emit_generic"
|
|
|
|
) as mock_emit:
|
|
|
|
# Create a large payload that will force chunking
|
|
|
|
large_payload = "x" * (
|
|
|
|
INGEST_MAX_PAYLOAD_BYTES // 2
|
|
|
|
) # Each item will be about half max size
|
|
|
|
items = [
|
|
|
|
MetadataChangeProposalWrapper(
|
|
|
|
entityUrn=f"urn:li:dataset:(urn:li:dataPlatform:mysql,LargePayload{i},PROD)",
|
|
|
|
aspect=DatasetProperties(name=large_payload),
|
|
|
|
)
|
|
|
|
for i in range(
|
|
|
|
3
|
|
|
|
) # This should create at least 2 chunks given the payload size
|
|
|
|
]
|
|
|
|
|
|
|
|
openapi_emitter.emit_mcps(items)
|
|
|
|
|
|
|
|
# Verify multiple chunks were created
|
|
|
|
assert mock_emit.call_count > 1
|
|
|
|
|
|
|
|
# Verify each chunk's payload is within size limits
|
|
|
|
for call in mock_emit.call_args_list:
|
|
|
|
args = call[1]
|
|
|
|
assert "payload" in args
|
|
|
|
payload = args["payload"]
|
|
|
|
assert isinstance(payload, str) # Should be JSON string
|
|
|
|
assert len(payload.encode()) <= INGEST_MAX_PAYLOAD_BYTES
|
|
|
|
|
|
|
|
# Verify the payload structure
|
|
|
|
payload_data = json.loads(payload)
|
|
|
|
assert payload_data[0]["urn"].startswith("urn:li:dataset")
|
|
|
|
assert "datasetProperties" in payload_data[0]
|
|
|
|
|
|
|
|
|
2025-03-13 11:33:28 -05:00
|
|
|
def test_openapi_emitter_emit_mcps_max_items(openapi_emitter):
|
2025-03-11 08:25:53 -05:00
|
|
|
with patch(
|
|
|
|
"datahub.emitter.rest_emitter.DataHubRestEmitter._emit_generic"
|
|
|
|
) as mock_emit:
|
|
|
|
# Create more items than BATCH_INGEST_MAX_PAYLOAD_LENGTH
|
|
|
|
items = [
|
|
|
|
MetadataChangeProposalWrapper(
|
|
|
|
entityUrn=f"urn:li:dataset:(urn:li:dataPlatform:mysql,Item{i},PROD)",
|
|
|
|
aspect=DatasetProfile(
|
|
|
|
rowCount=i,
|
|
|
|
columnCount=15,
|
|
|
|
timestampMillis=1626995099686,
|
|
|
|
),
|
|
|
|
)
|
|
|
|
for i in range(
|
|
|
|
BATCH_INGEST_MAX_PAYLOAD_LENGTH + 2
|
|
|
|
) # Create 2 more than max
|
|
|
|
]
|
|
|
|
|
|
|
|
openapi_emitter.emit_mcps(items)
|
|
|
|
|
|
|
|
# Verify multiple chunks were created
|
|
|
|
assert mock_emit.call_count == 2
|
|
|
|
|
|
|
|
# Check first chunk
|
|
|
|
first_call = mock_emit.call_args_list[0]
|
|
|
|
first_payload = json.loads(first_call[1]["payload"])
|
|
|
|
assert len(first_payload) == BATCH_INGEST_MAX_PAYLOAD_LENGTH
|
|
|
|
|
|
|
|
# Check second chunk
|
|
|
|
second_call = mock_emit.call_args_list[1]
|
|
|
|
second_payload = json.loads(second_call[1]["payload"])
|
|
|
|
assert len(second_payload) == 2 # Should have the remaining 2 items
|
|
|
|
|
|
|
|
|
2025-03-13 11:33:28 -05:00
|
|
|
def test_openapi_emitter_emit_mcps_multiple_entity_types(openapi_emitter):
|
2025-03-11 08:25:53 -05:00
|
|
|
with patch(
|
|
|
|
"datahub.emitter.rest_emitter.DataHubRestEmitter._emit_generic"
|
|
|
|
) as mock_emit:
|
|
|
|
# Create items for two different entity types
|
|
|
|
dataset_items = [
|
|
|
|
MetadataChangeProposalWrapper(
|
|
|
|
entityUrn=f"urn:li:dataset:(urn:li:dataPlatform:mysql,Dataset{i},PROD)",
|
|
|
|
aspect=DatasetProfile(
|
|
|
|
rowCount=i,
|
|
|
|
columnCount=15,
|
|
|
|
timestampMillis=1626995099686,
|
|
|
|
),
|
|
|
|
)
|
|
|
|
for i in range(2)
|
|
|
|
]
|
|
|
|
|
|
|
|
dashboard_items = [
|
|
|
|
MetadataChangeProposalWrapper(
|
|
|
|
entityUrn=f"urn:li:dashboard:(looker,dashboards.{i})",
|
|
|
|
aspect=Status(removed=False),
|
|
|
|
)
|
|
|
|
for i in range(2)
|
|
|
|
]
|
|
|
|
|
|
|
|
items = dataset_items + dashboard_items
|
|
|
|
result = openapi_emitter.emit_mcps(items)
|
|
|
|
|
|
|
|
# Should return number of unique entity URLs
|
|
|
|
assert result == 2
|
|
|
|
assert mock_emit.call_count == 2
|
|
|
|
|
|
|
|
# Check that calls were made with different URLs but correct payloads
|
|
|
|
calls = {
|
|
|
|
call[0][0]: json.loads(call[1]["payload"])
|
|
|
|
for call in mock_emit.call_args_list
|
|
|
|
}
|
|
|
|
|
|
|
|
# Verify each URL got the right aspects
|
|
|
|
for url, payload in calls.items():
|
|
|
|
if "datasetProfile" in payload[0]:
|
|
|
|
assert url.endswith("dataset?async=true")
|
|
|
|
assert len(payload) == 2
|
|
|
|
assert all("datasetProfile" in item for item in payload)
|
|
|
|
else:
|
|
|
|
assert url.endswith("dashboard?async=true")
|
|
|
|
assert len(payload) == 2
|
|
|
|
assert all("status" in item for item in payload)
|
2025-03-13 11:33:28 -05:00
|
|
|
|
|
|
|
|
|
|
|
def test_openapi_emitter_emit_mcp_with_tracing(openapi_emitter):
|
|
|
|
"""Test emitting a single MCP with tracing enabled"""
|
|
|
|
with patch(
|
|
|
|
"datahub.emitter.rest_emitter.DataHubRestEmitter._emit_generic"
|
|
|
|
) as mock_emit:
|
|
|
|
# Mock the response for the initial emit
|
|
|
|
mock_response = Mock(spec=Response)
|
|
|
|
mock_response.status_code = 200
|
|
|
|
mock_response.headers = {"traceparent": "test-trace-123"}
|
|
|
|
mock_response.json.return_value = [
|
|
|
|
{
|
|
|
|
"urn": "urn:li:dataset:(urn:li:dataPlatform:mysql,User.UserAccount,PROD)",
|
|
|
|
"datasetProfile": {},
|
|
|
|
}
|
|
|
|
]
|
|
|
|
mock_emit.return_value = mock_response
|
|
|
|
|
|
|
|
# Create test item
|
|
|
|
item = MetadataChangeProposalWrapper(
|
|
|
|
entityUrn="urn:li:dataset:(urn:li:dataPlatform:mysql,User.UserAccount,PROD)",
|
|
|
|
aspect=DatasetProfile(
|
|
|
|
rowCount=2000,
|
|
|
|
columnCount=15,
|
|
|
|
timestampMillis=1626995099686,
|
|
|
|
),
|
|
|
|
)
|
|
|
|
|
|
|
|
# Set up mock for trace verification responses
|
|
|
|
trace_responses = [
|
|
|
|
# First check - pending
|
|
|
|
{
|
|
|
|
"urn:li:dataset:(urn:li:dataPlatform:mysql,User.UserAccount,PROD)": {
|
|
|
|
"datasetProfile": {
|
|
|
|
"success": True,
|
|
|
|
"primaryStorage": {"writeStatus": "PENDING"},
|
|
|
|
"searchStorage": {"writeStatus": "PENDING"},
|
|
|
|
}
|
|
|
|
}
|
|
|
|
},
|
|
|
|
# Second check - completed
|
|
|
|
{
|
|
|
|
"urn:li:dataset:(urn:li:dataPlatform:mysql,User.UserAccount,PROD)": {
|
|
|
|
"datasetProfile": {
|
|
|
|
"success": True,
|
|
|
|
"primaryStorage": {"writeStatus": "ACTIVE_STATE"},
|
|
|
|
"searchStorage": {"writeStatus": "ACTIVE_STATE"},
|
|
|
|
}
|
|
|
|
}
|
|
|
|
},
|
|
|
|
]
|
|
|
|
|
|
|
|
def side_effect(*args, **kwargs):
|
|
|
|
if "trace/write" in args[0]:
|
|
|
|
mock_trace_response = Mock(spec=Response)
|
|
|
|
mock_trace_response.json.return_value = trace_responses.pop(0)
|
|
|
|
return mock_trace_response
|
|
|
|
return mock_response
|
|
|
|
|
|
|
|
mock_emit.side_effect = side_effect
|
|
|
|
|
|
|
|
# Emit with tracing enabled
|
|
|
|
openapi_emitter.emit_mcp(
|
|
|
|
item, async_flag=True, trace_flag=True, trace_timeout=timedelta(seconds=10)
|
|
|
|
)
|
|
|
|
|
|
|
|
# Verify initial emit call
|
|
|
|
assert (
|
|
|
|
mock_emit.call_args_list[0][0][0]
|
|
|
|
== f"{MOCK_GMS_ENDPOINT}/openapi/v3/entity/dataset?async=true"
|
|
|
|
)
|
|
|
|
|
|
|
|
# Verify trace verification calls
|
|
|
|
trace_calls = [
|
|
|
|
call for call in mock_emit.call_args_list if "trace/write" in call[0][0]
|
|
|
|
]
|
|
|
|
assert len(trace_calls) == 2
|
|
|
|
assert "test-trace-123" in trace_calls[0][0][0]
|
|
|
|
|
|
|
|
|
|
|
|
def test_openapi_emitter_emit_mcps_with_tracing(openapi_emitter):
|
|
|
|
"""Test emitting multiple MCPs with tracing enabled"""
|
|
|
|
with patch(
|
|
|
|
"datahub.emitter.rest_emitter.DataHubRestEmitter._emit_generic"
|
|
|
|
) as mock_emit:
|
|
|
|
# Create test items
|
|
|
|
items = [
|
|
|
|
MetadataChangeProposalWrapper(
|
|
|
|
entityUrn=f"urn:li:dataset:(urn:li:dataPlatform:mysql,User.UserAccount{i},PROD)",
|
|
|
|
aspect=Status(removed=False),
|
|
|
|
)
|
|
|
|
for i in range(2)
|
|
|
|
]
|
|
|
|
|
|
|
|
# Mock responses for initial emit
|
|
|
|
emit_responses = []
|
|
|
|
mock_resp = Mock(spec=Response)
|
|
|
|
mock_resp.status_code = 200
|
|
|
|
mock_resp.headers = {"traceparent": "test-trace"}
|
|
|
|
mock_resp.json.return_value = [
|
|
|
|
{
|
|
|
|
"urn": f"urn:li:dataset:(urn:li:dataPlatform:mysql,User.UserAccount{i},PROD)",
|
|
|
|
"status": {"removed": False},
|
|
|
|
}
|
|
|
|
for i in range(2)
|
|
|
|
]
|
|
|
|
emit_responses.append(mock_resp)
|
|
|
|
|
|
|
|
# Mock trace verification responses
|
|
|
|
trace_responses = [
|
|
|
|
# First check - all pending
|
|
|
|
{
|
|
|
|
f"urn:li:dataset:(urn:li:dataPlatform:mysql,User.UserAccount{i},PROD)": {
|
|
|
|
"status": {
|
|
|
|
"success": True,
|
|
|
|
"primaryStorage": {"writeStatus": "PENDING"},
|
|
|
|
"searchStorage": {"writeStatus": "PENDING"},
|
|
|
|
}
|
|
|
|
}
|
|
|
|
for i in range(2)
|
|
|
|
},
|
|
|
|
# Second check - all completed
|
|
|
|
{
|
|
|
|
f"urn:li:dataset:(urn:li:dataPlatform:mysql,User.UserAccount{i},PROD)": {
|
|
|
|
"status": {
|
|
|
|
"success": True,
|
|
|
|
"primaryStorage": {"writeStatus": "ACTIVE_STATE"},
|
|
|
|
"searchStorage": {"writeStatus": "ACTIVE_STATE"},
|
|
|
|
}
|
|
|
|
}
|
|
|
|
for i in range(2)
|
|
|
|
},
|
|
|
|
]
|
|
|
|
|
|
|
|
response_iter = iter(emit_responses)
|
|
|
|
trace_response_iter = iter(trace_responses)
|
|
|
|
|
|
|
|
def side_effect(*args, **kwargs):
|
|
|
|
if "trace/write" in args[0]:
|
|
|
|
mock_trace_response = Mock(spec=Response)
|
|
|
|
mock_trace_response.json.return_value = next(trace_response_iter)
|
|
|
|
return mock_trace_response
|
|
|
|
return next(response_iter)
|
|
|
|
|
|
|
|
mock_emit.side_effect = side_effect
|
|
|
|
|
|
|
|
# Emit with tracing enabled
|
|
|
|
result = openapi_emitter.emit_mcps(
|
|
|
|
items, async_flag=True, trace_flag=True, trace_timeout=timedelta(seconds=10)
|
|
|
|
)
|
|
|
|
|
|
|
|
assert result == 1 # Should return number of unique entity URLs
|
|
|
|
|
|
|
|
# Verify initial emit calls
|
|
|
|
emit_calls = [
|
|
|
|
call for call in mock_emit.call_args_list if "entity/dataset" in call[0][0]
|
|
|
|
]
|
|
|
|
assert len(emit_calls) == 1
|
|
|
|
assert (
|
|
|
|
emit_calls[0][0][0]
|
|
|
|
== f"{MOCK_GMS_ENDPOINT}/openapi/v3/entity/dataset?async=true"
|
|
|
|
)
|
|
|
|
|
|
|
|
# Verify trace verification calls
|
|
|
|
trace_calls = [
|
|
|
|
call for call in mock_emit.call_args_list if "trace/write" in call[0][0]
|
|
|
|
]
|
|
|
|
assert len(trace_calls) == 2
|
|
|
|
|
|
|
|
|
|
|
|
def test_openapi_emitter_trace_timeout(openapi_emitter):
|
|
|
|
"""Test that tracing properly handles timeouts"""
|
|
|
|
with patch(
|
|
|
|
"datahub.emitter.rest_emitter.DataHubRestEmitter._emit_generic"
|
|
|
|
) as mock_emit:
|
|
|
|
# Mock initial emit response
|
|
|
|
mock_response = Mock(spec=Response)
|
|
|
|
mock_response.status_code = 200
|
|
|
|
mock_response.headers = {"traceparent": "test-trace-123"}
|
|
|
|
mock_response.json.return_value = [
|
|
|
|
{
|
|
|
|
"urn": "urn:li:dataset:(urn:li:dataPlatform:mysql,User.UserAccount,PROD)",
|
|
|
|
"datasetProfile": {},
|
|
|
|
}
|
|
|
|
]
|
|
|
|
|
|
|
|
# Mock trace verification to always return pending
|
|
|
|
mock_trace_response = Mock(spec=Response)
|
|
|
|
mock_trace_response.json.return_value = {
|
|
|
|
"urn:li:dataset:(urn:li:dataPlatform:mysql,User.UserAccount,PROD)": {
|
|
|
|
"datasetProfile": {
|
|
|
|
"success": True,
|
|
|
|
"primaryStorage": {"writeStatus": "PENDING"},
|
|
|
|
"searchStorage": {"writeStatus": "PENDING"},
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
def side_effect(*args, **kwargs):
|
|
|
|
return mock_trace_response if "trace/write" in args[0] else mock_response
|
|
|
|
|
|
|
|
mock_emit.side_effect = side_effect
|
|
|
|
|
|
|
|
item = MetadataChangeProposalWrapper(
|
|
|
|
entityUrn="urn:li:dataset:(urn:li:dataPlatform:mysql,User.UserAccount,PROD)",
|
|
|
|
aspect=DatasetProfile(
|
|
|
|
rowCount=2000,
|
|
|
|
columnCount=15,
|
|
|
|
timestampMillis=1626995099686,
|
|
|
|
),
|
|
|
|
)
|
|
|
|
|
|
|
|
# Emit with very short timeout
|
|
|
|
with pytest.raises(TraceTimeoutError) as exc_info:
|
|
|
|
openapi_emitter.emit_mcp(
|
|
|
|
item,
|
|
|
|
async_flag=True,
|
|
|
|
trace_flag=True,
|
|
|
|
trace_timeout=timedelta(milliseconds=1),
|
|
|
|
)
|
|
|
|
|
|
|
|
assert "Timeout waiting for async write completion" in str(exc_info.value)
|
|
|
|
|
|
|
|
|
|
|
|
def test_openapi_emitter_missing_trace_header(openapi_emitter):
|
|
|
|
"""Test behavior when trace header is missing"""
|
|
|
|
with patch(
|
|
|
|
"datahub.emitter.rest_emitter.DataHubRestEmitter._emit_generic"
|
|
|
|
) as mock_emit:
|
|
|
|
# Mock response without trace header
|
|
|
|
mock_response = Mock(spec=Response)
|
|
|
|
mock_response.status_code = 200
|
|
|
|
mock_response.headers = {} # No traceparent header
|
|
|
|
mock_response.json.return_value = [
|
|
|
|
{
|
|
|
|
"urn": "urn:li:dataset:(urn:li:dataPlatform:mysql,MissingTraceHeader,PROD)",
|
|
|
|
"status": {"removed": False},
|
|
|
|
}
|
|
|
|
]
|
|
|
|
mock_emit.return_value = mock_response
|
|
|
|
|
|
|
|
item = MetadataChangeProposalWrapper(
|
|
|
|
entityUrn="urn:li:dataset:(urn:li:dataPlatform:mysql,MissingTraceHeader,PROD)",
|
|
|
|
aspect=Status(removed=False),
|
|
|
|
)
|
|
|
|
|
2025-03-27 22:21:33 -07:00
|
|
|
# Should not raise exception but log a warning.
|
|
|
|
with pytest.warns(APITracingWarning):
|
|
|
|
openapi_emitter.emit_mcp(
|
|
|
|
item,
|
|
|
|
async_flag=True,
|
|
|
|
trace_flag=True,
|
|
|
|
trace_timeout=timedelta(seconds=10),
|
|
|
|
)
|
2025-03-13 11:33:28 -05:00
|
|
|
|
|
|
|
|
|
|
|
def test_openapi_emitter_invalid_status_code(openapi_emitter):
|
|
|
|
"""Test behavior when response has non-200 status code"""
|
|
|
|
with patch(
|
|
|
|
"datahub.emitter.rest_emitter.DataHubRestEmitter._emit_generic"
|
|
|
|
) as mock_emit:
|
|
|
|
# Mock response with error status code
|
|
|
|
mock_response = Mock(spec=Response)
|
|
|
|
mock_response.status_code = 500
|
|
|
|
mock_response.headers = {"traceparent": "test-trace-123"}
|
|
|
|
mock_response.json.return_value = [
|
|
|
|
{
|
|
|
|
"urn": "urn:li:dataset:(urn:li:dataPlatform:mysql,InvalidStatusCode,PROD)",
|
|
|
|
"datasetProfile": {},
|
|
|
|
}
|
|
|
|
]
|
|
|
|
mock_emit.return_value = mock_response
|
|
|
|
|
|
|
|
item = MetadataChangeProposalWrapper(
|
|
|
|
entityUrn="urn:li:dataset:(urn:li:dataPlatform:mysql,InvalidStatusCode,PROD)",
|
|
|
|
aspect=DatasetProfile(
|
|
|
|
rowCount=2000,
|
|
|
|
columnCount=15,
|
|
|
|
timestampMillis=1626995099686,
|
|
|
|
),
|
|
|
|
)
|
|
|
|
|
|
|
|
# Should not raise exception but log warning
|
|
|
|
openapi_emitter.emit_mcp(
|
|
|
|
item, async_flag=True, trace_flag=True, trace_timeout=timedelta(seconds=10)
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
|
|
def test_openapi_emitter_trace_failure(openapi_emitter):
|
|
|
|
"""Test handling of trace verification failures"""
|
|
|
|
with patch(
|
|
|
|
"datahub.emitter.rest_emitter.DataHubRestEmitter._emit_generic"
|
|
|
|
) as mock_emit:
|
|
|
|
test_urn = "urn:li:dataset:(urn:li:dataPlatform:mysql,TraceFailure,PROD)"
|
|
|
|
|
|
|
|
# Create initial emit response
|
|
|
|
emit_response = Mock(spec=Response)
|
|
|
|
emit_response.status_code = 200
|
|
|
|
emit_response.headers = {"traceparent": "test-trace-123"}
|
|
|
|
emit_response.json.return_value = [{"urn": test_urn, "datasetProfile": {}}]
|
|
|
|
|
|
|
|
# Create trace verification response
|
|
|
|
trace_response = Mock(spec=Response)
|
|
|
|
trace_response.status_code = 200
|
|
|
|
trace_response.headers = {}
|
|
|
|
trace_response.json.return_value = {
|
|
|
|
test_urn: {
|
|
|
|
"datasetProfile": {
|
|
|
|
"success": False,
|
|
|
|
"error": "Failed to write to storage",
|
|
|
|
"primaryStorage": {"writeStatus": "ERROR"},
|
|
|
|
"searchStorage": {"writeStatus": "ERROR"},
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
def side_effect(*args, **kwargs):
|
|
|
|
if "trace/write" in args[0]:
|
|
|
|
return trace_response
|
|
|
|
return emit_response
|
|
|
|
|
|
|
|
mock_emit.side_effect = side_effect
|
|
|
|
|
|
|
|
item = MetadataChangeProposalWrapper(
|
|
|
|
entityUrn=test_urn,
|
|
|
|
aspect=DatasetProfile(
|
|
|
|
rowCount=2000,
|
|
|
|
columnCount=15,
|
|
|
|
timestampMillis=1626995099686,
|
|
|
|
),
|
|
|
|
)
|
|
|
|
|
|
|
|
with pytest.raises(TraceValidationError) as exc_info:
|
|
|
|
openapi_emitter.emit_mcp(
|
|
|
|
item,
|
|
|
|
async_flag=True,
|
|
|
|
trace_flag=True,
|
|
|
|
trace_timeout=timedelta(seconds=10),
|
|
|
|
)
|
|
|
|
|
|
|
|
assert "Unable to validate async write to DataHub GMS" in str(exc_info.value)
|
|
|
|
|
|
|
|
# Verify the error details are included
|
|
|
|
assert "Failed to write to storage" in str(exc_info.value)
|
|
|
|
|
|
|
|
# Verify trace was actually called
|
|
|
|
trace_calls = [
|
|
|
|
call for call in mock_emit.call_args_list if "trace/write" in call[0][0]
|
|
|
|
]
|
|
|
|
assert len(trace_calls) > 0
|
|
|
|
assert "test-trace-123" in trace_calls[0][0][0]
|
|
|
|
|
|
|
|
|
|
|
|
def test_await_status_empty_trace_data(openapi_emitter):
|
|
|
|
with patch(
|
|
|
|
"datahub.emitter.rest_emitter.DataHubRestEmitter._emit_generic"
|
|
|
|
) as mock_emit:
|
|
|
|
openapi_emitter._await_status([], timedelta(seconds=10))
|
|
|
|
assert not mock_emit._emit_generic.called
|
|
|
|
|
|
|
|
|
|
|
|
def test_await_status_successful_completion(openapi_emitter):
|
|
|
|
with patch(
|
|
|
|
"datahub.emitter.rest_emitter.DataHubRestEmitter._emit_generic"
|
|
|
|
) as mock_emit:
|
|
|
|
trace = TraceData(
|
|
|
|
trace_id="test-trace-id", data={"urn:li:dataset:test": ["status"]}
|
|
|
|
)
|
|
|
|
|
|
|
|
mock_response = Mock(
|
|
|
|
json=lambda: {
|
|
|
|
"urn:li:dataset:test": {
|
|
|
|
"status": {
|
|
|
|
"success": True,
|
|
|
|
"primaryStorage": {"writeStatus": "ACTIVE_STATE"},
|
|
|
|
"searchStorage": {"writeStatus": "TRACE_NOT_IMPLEMENTED"},
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
)
|
|
|
|
mock_emit.return_value = mock_response
|
|
|
|
|
|
|
|
openapi_emitter._await_status([trace], timedelta(seconds=10))
|
|
|
|
assert not trace.data # Should be empty after successful completion
|
|
|
|
|
|
|
|
|
|
|
|
def test_await_status_timeout(openapi_emitter):
|
|
|
|
with patch(
|
|
|
|
"datahub.emitter.rest_emitter.DataHubRestEmitter._emit_generic"
|
|
|
|
) as mock_emit:
|
|
|
|
trace = TraceData(
|
|
|
|
trace_id="test-trace-id", data={"urn:li:dataset:test": ["status"]}
|
|
|
|
)
|
|
|
|
|
|
|
|
mock_response = Mock()
|
|
|
|
mock_response.json.return_value = {
|
|
|
|
"urn:li:dataset:test": {
|
|
|
|
"status": {
|
|
|
|
"success": True,
|
|
|
|
"primaryStorage": {"writeStatus": "PENDING"},
|
|
|
|
"searchStorage": {"writeStatus": "PENDING"},
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
mock_emit.return_value = mock_response
|
|
|
|
|
|
|
|
with pytest.raises(TraceTimeoutError) as exc_info:
|
|
|
|
openapi_emitter._await_status([trace], timedelta(seconds=0.1))
|
|
|
|
assert "Timeout waiting for async write completion" in str(exc_info.value)
|
|
|
|
|
|
|
|
|
|
|
|
def test_await_status_persistence_failure(openapi_emitter):
|
|
|
|
with patch(
|
|
|
|
"datahub.emitter.rest_emitter.DataHubRestEmitter._emit_generic"
|
|
|
|
) as mock_emit:
|
|
|
|
trace = TraceData(
|
|
|
|
trace_id="test-trace-id", data={"urn:li:dataset:test": ["status"]}
|
|
|
|
)
|
|
|
|
|
|
|
|
mock_response = Mock()
|
|
|
|
mock_response.json.return_value = {
|
|
|
|
"urn:li:dataset:test": {
|
|
|
|
"status": {
|
|
|
|
"success": False,
|
|
|
|
"primaryStorage": {"writeStatus": "ERROR"},
|
|
|
|
"searchStorage": {"writeStatus": "PENDING"},
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
mock_emit.return_value = mock_response
|
|
|
|
|
|
|
|
with pytest.raises(TraceValidationError) as exc_info:
|
|
|
|
openapi_emitter._await_status([trace], timedelta(seconds=10))
|
|
|
|
assert "Persistence failure" in str(exc_info.value)
|
|
|
|
|
|
|
|
|
|
|
|
def test_await_status_multiple_aspects(openapi_emitter):
|
|
|
|
with patch(
|
|
|
|
"datahub.emitter.rest_emitter.DataHubRestEmitter._emit_generic"
|
|
|
|
) as mock_emit:
|
|
|
|
trace = TraceData(
|
|
|
|
trace_id="test-trace-id", data={"urn:li:dataset:test": ["status", "schema"]}
|
|
|
|
)
|
|
|
|
|
|
|
|
mock_response = Mock()
|
|
|
|
mock_response.json.return_value = {
|
|
|
|
"urn:li:dataset:test": {
|
|
|
|
"status": {
|
|
|
|
"success": True,
|
|
|
|
"primaryStorage": {"writeStatus": "ACTIVE_STATE"},
|
|
|
|
"searchStorage": {"writeStatus": "ACTIVE_STATE"},
|
|
|
|
},
|
|
|
|
"schema": {
|
|
|
|
"success": True,
|
|
|
|
"primaryStorage": {"writeStatus": "HISTORIC_STATE"},
|
|
|
|
"searchStorage": {"writeStatus": "NO_OP"},
|
|
|
|
},
|
|
|
|
}
|
|
|
|
}
|
|
|
|
mock_emit.return_value = mock_response
|
|
|
|
|
|
|
|
openapi_emitter._await_status([trace], timedelta(seconds=10))
|
|
|
|
assert not trace.data
|
|
|
|
|
|
|
|
|
|
|
|
def test_await_status_logging(openapi_emitter):
|
|
|
|
with patch.object(logger, "debug") as mock_debug, patch.object(
|
|
|
|
logger, "error"
|
|
|
|
) as mock_error, patch(
|
|
|
|
"datahub.emitter.rest_emitter.DataHubRestEmitter._emit_generic"
|
|
|
|
) as mock_emit:
|
|
|
|
# Test empty trace data logging
|
|
|
|
openapi_emitter._await_status([], timedelta(seconds=10))
|
|
|
|
mock_debug.assert_called_once_with("No trace data to verify")
|
|
|
|
|
|
|
|
# Test error logging
|
|
|
|
trace = TraceData(trace_id="test-id", data={"urn:test": ["status"]})
|
|
|
|
mock_emit.side_effect = TraceValidationError("Test error")
|
|
|
|
with pytest.raises(TraceValidationError):
|
|
|
|
openapi_emitter._await_status([trace], timedelta(seconds=10))
|
|
|
|
mock_error.assert_called_once()
|
2025-04-25 13:54:28 -05:00
|
|
|
|
|
|
|
|
|
|
|
def test_openapi_emitter_same_url_different_methods(openapi_emitter):
|
|
|
|
"""Test handling of requests with same URL but different HTTP methods"""
|
|
|
|
with patch(
|
|
|
|
"datahub.emitter.rest_emitter.DataHubRestEmitter._emit_generic"
|
|
|
|
) as mock_emit:
|
|
|
|
items = [
|
|
|
|
# POST requests for updating
|
|
|
|
MetadataChangeProposalWrapper(
|
|
|
|
entityUrn=f"urn:li:dataset:(urn:li:dataPlatform:mysql,UpdateMe{i},PROD)",
|
|
|
|
entityType="dataset",
|
|
|
|
aspectName="datasetProperties",
|
|
|
|
changeType=ChangeTypeClass.UPSERT,
|
|
|
|
aspect=DatasetProperties(name=f"Updated Dataset {i}"),
|
|
|
|
)
|
|
|
|
for i in range(2)
|
|
|
|
] + [
|
|
|
|
# PATCH requests for fetching
|
|
|
|
next(
|
|
|
|
iter(
|
|
|
|
DatasetPatchBuilder(
|
|
|
|
f"urn:li:dataset:(urn:li:dataPlatform:mysql,PatchMe{i},PROD)"
|
|
|
|
)
|
|
|
|
.set_qualified_name(f"PatchMe{i}")
|
|
|
|
.build()
|
|
|
|
)
|
|
|
|
)
|
|
|
|
for i in range(2)
|
|
|
|
]
|
|
|
|
|
|
|
|
# Run the test
|
|
|
|
result = openapi_emitter.emit_mcps(items)
|
|
|
|
|
|
|
|
# Verify that we made 2 calls (one for each HTTP method)
|
|
|
|
assert result == 2
|
|
|
|
assert mock_emit.call_count == 2
|
|
|
|
|
|
|
|
# Check that calls were made with different methods but the same URL
|
|
|
|
calls = {}
|
|
|
|
for call in mock_emit.call_args_list:
|
|
|
|
method = call[1]["method"]
|
|
|
|
url = call[0][0]
|
|
|
|
calls[(method, url)] = call
|
|
|
|
|
|
|
|
assert (
|
|
|
|
"post",
|
|
|
|
f"{MOCK_GMS_ENDPOINT}/openapi/v3/entity/dataset?async=true",
|
|
|
|
) in calls
|
|
|
|
assert (
|
|
|
|
"patch",
|
|
|
|
f"{MOCK_GMS_ENDPOINT}/openapi/v3/entity/dataset?async=true",
|
|
|
|
) in calls
|
|
|
|
|
|
|
|
|
|
|
|
def test_openapi_emitter_mixed_method_chunking(openapi_emitter):
|
|
|
|
"""Test that chunking works correctly across different HTTP methods"""
|
|
|
|
with patch(
|
|
|
|
"datahub.emitter.rest_emitter.DataHubRestEmitter._emit_generic"
|
|
|
|
) as mock_emit, patch(
|
|
|
|
"datahub.emitter.rest_emitter.BATCH_INGEST_MAX_PAYLOAD_LENGTH", 2
|
|
|
|
):
|
|
|
|
# Create more items than the chunk size for each method
|
|
|
|
items = [
|
|
|
|
# POST items (4 items, should create 2 chunks)
|
|
|
|
MetadataChangeProposalWrapper(
|
|
|
|
entityUrn=f"urn:li:dataset:(urn:li:dataPlatform:mysql,Dataset{i},PROD)",
|
|
|
|
entityType="dataset",
|
|
|
|
aspectName="datasetProfile",
|
|
|
|
changeType=ChangeTypeClass.UPSERT,
|
|
|
|
aspect=DatasetProfile(rowCount=i, columnCount=15, timestampMillis=0),
|
|
|
|
)
|
|
|
|
for i in range(4)
|
|
|
|
] + [
|
|
|
|
# PATCH items (3 items, should create 2 chunks)
|
|
|
|
next(
|
|
|
|
iter(
|
|
|
|
DatasetPatchBuilder(
|
|
|
|
f"urn:li:dataset:(urn:li:dataPlatform:mysql,PatchMe{i},PROD)"
|
|
|
|
)
|
|
|
|
.set_qualified_name(f"PatchMe{i}")
|
|
|
|
.build()
|
|
|
|
)
|
|
|
|
)
|
|
|
|
for i in range(3)
|
|
|
|
]
|
|
|
|
|
|
|
|
# Run the test with a smaller chunk size to force multiple chunks
|
|
|
|
result = openapi_emitter.emit_mcps(items)
|
|
|
|
|
|
|
|
# Should have 4 chunks total:
|
|
|
|
# - 2 chunks for POST (4 items with max 2 per chunk)
|
|
|
|
# - 2 chunks for PATCH (3 items with max 2 per chunk)
|
|
|
|
assert result == 4
|
|
|
|
assert mock_emit.call_count == 4
|
|
|
|
|
|
|
|
# Count the calls by method and verify chunking
|
|
|
|
post_calls = [
|
|
|
|
call for call in mock_emit.call_args_list if call[1]["method"] == "post"
|
|
|
|
]
|
|
|
|
patch_calls = [
|
|
|
|
call for call in mock_emit.call_args_list if call[1]["method"] == "patch"
|
|
|
|
]
|
|
|
|
|
|
|
|
assert len(post_calls) == 2 # 2 chunks for POST
|
|
|
|
assert len(patch_calls) == 2 # 2 chunks for PATCH
|
|
|
|
|
|
|
|
# Verify first chunks have max size and last chunks have remainders
|
|
|
|
post_payloads = [json.loads(call[1]["payload"]) for call in post_calls]
|
|
|
|
patch_payloads = [json.loads(call[1]["payload"]) for call in patch_calls]
|
|
|
|
|
|
|
|
assert len(post_payloads[0]) == 2
|
|
|
|
assert len(post_payloads[1]) == 2
|
|
|
|
assert len(patch_payloads[0]) == 2
|
|
|
|
assert len(patch_payloads[1]) == 1
|
|
|
|
|
|
|
|
# Verify all post calls are to the dataset endpoint
|
|
|
|
for call in post_calls:
|
|
|
|
assert (
|
|
|
|
call[0][0]
|
|
|
|
== f"{MOCK_GMS_ENDPOINT}/openapi/v3/entity/dataset?async=true"
|
|
|
|
)
|
|
|
|
|
|
|
|
# Verify all patch calls are to the dataset endpoint
|
|
|
|
for call in patch_calls:
|
|
|
|
assert (
|
|
|
|
call[0][0]
|
|
|
|
== f"{MOCK_GMS_ENDPOINT}/openapi/v3/entity/dataset?async=true"
|
|
|
|
)
|