mirror of
https://github.com/datahub-project/datahub.git
synced 2025-11-02 11:49:23 +00:00
Adding smoke test for batch ingestion throwing exception (#12453)
This commit is contained in:
parent
cb47577ac5
commit
7ac0dc65e1
@ -51,7 +51,7 @@ class MetadataChangeProposalInvalidWrapper(MetadataChangeProposalWrapper):
|
||||
return mcp
|
||||
|
||||
|
||||
@pytest.fixture(scope="module")
|
||||
@pytest.fixture(scope="module", autouse=True)
|
||||
def ingest_cleanup_data(auth_session, graph_client, request):
|
||||
yield
|
||||
delete_urns(graph_client, generated_urns)
|
||||
|
||||
@ -3,6 +3,7 @@ from typing import List
|
||||
|
||||
import pytest
|
||||
|
||||
import datahub.metadata.schema_classes as models
|
||||
from datahub.emitter.mce_builder import make_dashboard_urn
|
||||
from datahub.emitter.mcp import MetadataChangeProposalWrapper
|
||||
from datahub.emitter.serialization_helper import pre_json_transform
|
||||
@ -12,6 +13,7 @@ from datahub.metadata.schema_classes import (
|
||||
ChangeAuditStampsClass,
|
||||
DashboardInfoClass,
|
||||
)
|
||||
from datahub.metadata.urns import MlModelUrn
|
||||
from tests.consistency_utils import wait_for_writes_to_sync
|
||||
from tests.restli.restli_test import MetadataChangeProposalInvalidWrapper
|
||||
from tests.utils import delete_urns
|
||||
@ -19,7 +21,7 @@ from tests.utils import delete_urns
|
||||
generated_urns: List[str] = []
|
||||
|
||||
|
||||
@pytest.fixture(scope="module")
|
||||
@pytest.fixture(scope="module", autouse=True)
|
||||
def ingest_cleanup_data(auth_session, graph_client, request):
|
||||
yield
|
||||
delete_urns(graph_client, generated_urns)
|
||||
@ -84,6 +86,29 @@ def _create_invalid_dashboard_mcp() -> MetadataChangeProposalClass:
|
||||
return mcp_invalid.make_mcp()
|
||||
|
||||
|
||||
def _create_invalid_dataset_mcps() -> List[MetadataChangeProposalWrapper]:
|
||||
dataset_urn = "urn:li:dataset:(urn:li:dataPlatform:kafka,my_dataset,PROD)"
|
||||
model_urn = MlModelUrn("mlflow", "my_model", "PROD").urn()
|
||||
bad_mcps = [
|
||||
MetadataChangeProposalWrapper(
|
||||
entityUrn=dataset_urn,
|
||||
aspect=models.StatusClass(removed=False),
|
||||
),
|
||||
MetadataChangeProposalWrapper(
|
||||
entityUrn=dataset_urn,
|
||||
aspect=models.UpstreamLineageClass(
|
||||
upstreams=[
|
||||
models.UpstreamClass(
|
||||
dataset=model_urn,
|
||||
type=models.DatasetLineageTypeClass.TRANSFORMED,
|
||||
)
|
||||
]
|
||||
),
|
||||
),
|
||||
]
|
||||
return bad_mcps
|
||||
|
||||
|
||||
def test_restli_batch_ingestion_sync(graph_client):
|
||||
# Positive Test (all valid MetadataChangeProposal)
|
||||
mcps = _create_valid_dashboard_mcps()
|
||||
@ -133,3 +158,30 @@ def test_restli_batch_ingestion_async(graph_client):
|
||||
assert aspect.title == "Dummy Title For Testing"
|
||||
assert aspect.description == "Dummy Description For Testing"
|
||||
assert aspect.lastModified is not None
|
||||
|
||||
|
||||
def test_restli_batch_ingestion_exception_sync(graph_client):
|
||||
"""
|
||||
Test Batch ingestion when an exception occurs in sync mode
|
||||
"""
|
||||
bad_mcps = _create_invalid_dataset_mcps()
|
||||
generated_urns.extend([mcp.entityUrn for mcp in bad_mcps if mcp.entityUrn])
|
||||
|
||||
try:
|
||||
graph_client.emit_mcps(bad_mcps, async_flag=False)
|
||||
raise AssertionError("should have thrown an exception")
|
||||
except Exception as e:
|
||||
if isinstance(e, AssertionError):
|
||||
raise e
|
||||
print(f"Error emitting MCPs due to {e}")
|
||||
|
||||
|
||||
def test_restli_batch_ingestion_exception_async(graph_client):
|
||||
"""
|
||||
Test Batch ingestion when an exception occurs in async mode
|
||||
"""
|
||||
bad_mcps = _create_invalid_dataset_mcps()
|
||||
generated_urns.extend([mcp.entityUrn for mcp in bad_mcps if mcp.entityUrn])
|
||||
# TODO expectation is that it throws exception, but it doesn't currently.this test case need to change after fix.
|
||||
ret = graph_client.emit_mcps(bad_mcps, async_flag=True)
|
||||
assert ret >= 0
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user