2022-02-03 12:26:08 +05:30
|
|
|
import json
|
2022-02-22 16:21:40 -08:00
|
|
|
import time
|
2022-03-04 11:51:31 -08:00
|
|
|
import urllib
|
|
|
|
|
2022-02-03 12:26:08 +05:30
|
|
|
import pytest
|
|
|
|
import requests
|
2022-03-04 11:51:31 -08:00
|
|
|
|
2022-02-03 12:26:08 +05:30
|
|
|
from datahub.cli.docker import check_local_docker_containers
|
2022-03-04 11:51:31 -08:00
|
|
|
from datahub.emitter.mce_builder import make_dataset_urn, make_schema_field_urn
|
2022-02-06 14:30:40 -08:00
|
|
|
from datahub.emitter.mcp import MetadataChangeProposalWrapper
|
2022-03-04 11:51:31 -08:00
|
|
|
from datahub.ingestion.api.common import PipelineContext, RecordEnvelope
|
|
|
|
from datahub.ingestion.api.sink import NoopWriteCallback
|
2022-02-06 14:30:40 -08:00
|
|
|
from datahub.ingestion.sink.file import FileSink, FileSinkConfig
|
2022-03-04 11:51:31 -08:00
|
|
|
from datahub.metadata.com.linkedin.pegasus2avro.assertion import AssertionStdAggregation
|
|
|
|
from datahub.metadata.schema_classes import (
|
|
|
|
AssertionInfoClass,
|
|
|
|
AssertionResultClass,
|
|
|
|
AssertionResultTypeClass,
|
|
|
|
AssertionRunEventClass,
|
|
|
|
AssertionRunStatusClass,
|
|
|
|
AssertionStdOperatorClass,
|
|
|
|
AssertionTypeClass,
|
|
|
|
DatasetAssertionInfoClass,
|
|
|
|
DatasetAssertionScopeClass,
|
|
|
|
PartitionSpecClass,
|
|
|
|
PartitionTypeClass,
|
|
|
|
)
|
|
|
|
from tests.utils import ingest_file_via_rest
|
|
|
|
from tests.utils import delete_urns_from_file
|
|
|
|
|
|
|
|
|
2022-02-03 12:26:08 +05:30
|
|
|
GMS_ENDPOINT = "http://localhost:8080"
|
|
|
|
|
|
|
|
restli_default_headers = {
|
|
|
|
"X-RestLi-Protocol-Version": "2.0.0",
|
|
|
|
}
|
|
|
|
|
|
|
|
|
2022-02-06 14:30:40 -08:00
|
|
|
def create_test_data(test_file):
|
|
|
|
assertion_urn = "urn:li:assertion:2d3b06a6e77e1f24adc9860a05ea089b"
|
2022-03-04 11:51:31 -08:00
|
|
|
dataset_urn = make_dataset_urn(platform="postgres", name="foo")
|
2022-02-06 14:30:40 -08:00
|
|
|
assertion_info = AssertionInfoClass(
|
|
|
|
type=AssertionTypeClass.DATASET,
|
2022-03-04 11:51:31 -08:00
|
|
|
customProperties={"suite_name": "demo_suite"},
|
2022-02-06 14:30:40 -08:00
|
|
|
datasetAssertion=DatasetAssertionInfoClass(
|
2022-03-04 11:51:31 -08:00
|
|
|
fields=[make_schema_field_urn(dataset_urn, "col1")],
|
|
|
|
dataset=dataset_urn,
|
2022-02-06 14:30:40 -08:00
|
|
|
scope=DatasetAssertionScopeClass.DATASET_COLUMN,
|
2022-03-04 11:51:31 -08:00
|
|
|
operator=AssertionStdOperatorClass.LESS_THAN,
|
|
|
|
nativeType="column_value_is_less_than",
|
|
|
|
aggregation=AssertionStdAggregation.IDENTITY,
|
|
|
|
nativeParameters={"max_value": "99"},
|
2022-02-22 16:21:40 -08:00
|
|
|
),
|
2022-03-04 11:51:31 -08:00
|
|
|
)
|
2022-02-06 14:30:40 -08:00
|
|
|
# The assertion definition
|
|
|
|
mcp1 = MetadataChangeProposalWrapper(
|
|
|
|
entityType="assertion",
|
|
|
|
changeType="UPSERT",
|
|
|
|
entityUrn=assertion_urn,
|
|
|
|
aspectName="assertionInfo",
|
2022-03-04 11:51:31 -08:00
|
|
|
aspect=assertion_info,
|
2022-02-06 14:30:40 -08:00
|
|
|
)
|
2022-03-04 11:51:31 -08:00
|
|
|
timestamps = [
|
|
|
|
1643794280350,
|
|
|
|
1643794280352,
|
|
|
|
1643794280354,
|
|
|
|
1643880726872,
|
|
|
|
1643880726874,
|
|
|
|
1643880726875,
|
|
|
|
]
|
2022-02-22 16:21:40 -08:00
|
|
|
msg_ids = []
|
2022-03-04 11:51:31 -08:00
|
|
|
# The assertion run event attached to the dataset
|
2022-02-06 14:30:40 -08:00
|
|
|
mcp2 = MetadataChangeProposalWrapper(
|
2022-03-04 11:51:31 -08:00
|
|
|
entityType="assertion",
|
|
|
|
entityUrn=assertion_urn,
|
2022-02-06 14:30:40 -08:00
|
|
|
changeType="UPSERT",
|
|
|
|
aspectName="assertionRunEvent",
|
|
|
|
aspect=AssertionRunEventClass(
|
|
|
|
timestampMillis=timestamps[0],
|
|
|
|
partitionSpec=PartitionSpecClass(
|
|
|
|
partition="[{'country': 'IN'}]",
|
|
|
|
type=PartitionTypeClass.PARTITION,
|
2022-03-04 11:51:31 -08:00
|
|
|
),
|
2022-02-06 14:30:40 -08:00
|
|
|
messageId=str(timestamps[0]),
|
|
|
|
assertionUrn=assertion_urn,
|
|
|
|
asserteeUrn=dataset_urn,
|
|
|
|
result=AssertionResultClass(
|
|
|
|
type=AssertionResultTypeClass.SUCCESS,
|
|
|
|
actualAggValue=90,
|
|
|
|
externalUrl="http://example.com/uuid1",
|
2022-03-04 11:51:31 -08:00
|
|
|
),
|
2022-02-06 14:30:40 -08:00
|
|
|
runId="uuid1",
|
2022-03-04 11:51:31 -08:00
|
|
|
status=AssertionRunStatusClass.COMPLETE,
|
|
|
|
),
|
2022-02-06 14:30:40 -08:00
|
|
|
)
|
|
|
|
|
|
|
|
mcp3 = MetadataChangeProposalWrapper(
|
2022-03-04 11:51:31 -08:00
|
|
|
entityType="assertion",
|
|
|
|
entityUrn=assertion_urn,
|
2022-02-06 14:30:40 -08:00
|
|
|
changeType="UPSERT",
|
|
|
|
aspectName="assertionRunEvent",
|
|
|
|
aspect=AssertionRunEventClass(
|
|
|
|
timestampMillis=timestamps[1],
|
|
|
|
partitionSpec=PartitionSpecClass(
|
|
|
|
partition="[{'country': 'US'}]",
|
|
|
|
type=PartitionTypeClass.PARTITION,
|
2022-03-04 11:51:31 -08:00
|
|
|
),
|
2022-02-06 14:30:40 -08:00
|
|
|
messageId=str(timestamps[1]),
|
|
|
|
assertionUrn=assertion_urn,
|
|
|
|
asserteeUrn=dataset_urn,
|
|
|
|
result=AssertionResultClass(
|
|
|
|
type=AssertionResultTypeClass.FAILURE,
|
|
|
|
actualAggValue=101,
|
|
|
|
externalUrl="http://example.com/uuid1",
|
2022-03-04 11:51:31 -08:00
|
|
|
),
|
2022-02-06 14:30:40 -08:00
|
|
|
runId="uuid1",
|
2022-03-04 11:51:31 -08:00
|
|
|
status=AssertionRunStatusClass.COMPLETE,
|
|
|
|
),
|
2022-02-06 14:30:40 -08:00
|
|
|
)
|
|
|
|
# Result of evaluating this assertion on the whole dataset
|
|
|
|
mcp4 = MetadataChangeProposalWrapper(
|
2022-03-04 11:51:31 -08:00
|
|
|
entityType="assertion",
|
|
|
|
entityUrn=assertion_urn,
|
2022-02-06 14:30:40 -08:00
|
|
|
changeType="UPSERT",
|
|
|
|
aspectName="assertionRunEvent",
|
|
|
|
aspect=AssertionRunEventClass(
|
|
|
|
timestampMillis=timestamps[2],
|
|
|
|
partitionSpec=PartitionSpecClass(
|
|
|
|
partition="FULL_TABLE_SNAPSHOT",
|
|
|
|
type=PartitionTypeClass.FULL_TABLE,
|
2022-03-04 11:51:31 -08:00
|
|
|
),
|
2022-02-06 14:30:40 -08:00
|
|
|
messageId=str(timestamps[2]),
|
|
|
|
assertionUrn=assertion_urn,
|
|
|
|
asserteeUrn=dataset_urn,
|
|
|
|
result=AssertionResultClass(
|
|
|
|
type=AssertionResultTypeClass.SUCCESS,
|
|
|
|
actualAggValue=93,
|
|
|
|
externalUrl="http://example.com/uuid1",
|
2022-03-04 11:51:31 -08:00
|
|
|
),
|
2022-02-06 14:30:40 -08:00
|
|
|
runId="uuid1",
|
2022-03-04 11:51:31 -08:00
|
|
|
status=AssertionRunStatusClass.COMPLETE,
|
|
|
|
),
|
2022-02-06 14:30:40 -08:00
|
|
|
)
|
|
|
|
|
|
|
|
mcp5 = MetadataChangeProposalWrapper(
|
2022-03-04 11:51:31 -08:00
|
|
|
entityType="assertion",
|
|
|
|
entityUrn=assertion_urn,
|
2022-02-06 14:30:40 -08:00
|
|
|
changeType="UPSERT",
|
|
|
|
aspectName="assertionRunEvent",
|
|
|
|
aspect=AssertionRunEventClass(
|
|
|
|
timestampMillis=timestamps[3],
|
|
|
|
partitionSpec=PartitionSpecClass(
|
|
|
|
partition="[{'country': 'IN'}]",
|
|
|
|
type=PartitionTypeClass.PARTITION,
|
2022-03-04 11:51:31 -08:00
|
|
|
),
|
2022-02-06 14:30:40 -08:00
|
|
|
messageId=str(timestamps[3]),
|
|
|
|
assertionUrn=assertion_urn,
|
|
|
|
asserteeUrn=dataset_urn,
|
|
|
|
result=AssertionResultClass(
|
|
|
|
type=AssertionResultTypeClass.SUCCESS,
|
|
|
|
actualAggValue=90,
|
|
|
|
externalUrl="http://example.com/uuid1",
|
2022-03-04 11:51:31 -08:00
|
|
|
),
|
2022-02-06 14:30:40 -08:00
|
|
|
runId="uuid1",
|
2022-03-04 11:51:31 -08:00
|
|
|
status=AssertionRunStatusClass.COMPLETE,
|
|
|
|
),
|
2022-02-06 14:30:40 -08:00
|
|
|
)
|
|
|
|
mcp6 = MetadataChangeProposalWrapper(
|
2022-03-04 11:51:31 -08:00
|
|
|
entityType="assertion",
|
|
|
|
entityUrn=assertion_urn,
|
2022-02-06 14:30:40 -08:00
|
|
|
changeType="UPSERT",
|
|
|
|
aspectName="assertionRunEvent",
|
|
|
|
aspect=AssertionRunEventClass(
|
|
|
|
timestampMillis=timestamps[4],
|
|
|
|
partitionSpec=PartitionSpecClass(
|
|
|
|
partition="[{'country': 'US'}]",
|
|
|
|
type=PartitionTypeClass.PARTITION,
|
2022-03-04 11:51:31 -08:00
|
|
|
),
|
2022-02-06 14:30:40 -08:00
|
|
|
messageId=str(timestamps[4]),
|
|
|
|
assertionUrn=assertion_urn,
|
|
|
|
asserteeUrn=dataset_urn,
|
|
|
|
result=AssertionResultClass(
|
|
|
|
type=AssertionResultTypeClass.FAILURE,
|
|
|
|
actualAggValue=101,
|
|
|
|
externalUrl="http://example.com/uuid1",
|
2022-03-04 11:51:31 -08:00
|
|
|
),
|
2022-02-06 14:30:40 -08:00
|
|
|
runId="uuid1",
|
2022-03-04 11:51:31 -08:00
|
|
|
status=AssertionRunStatusClass.COMPLETE,
|
|
|
|
),
|
2022-02-06 14:30:40 -08:00
|
|
|
)
|
|
|
|
|
|
|
|
# Result of evaluating this assertion on the whole dataset
|
|
|
|
mcp7 = MetadataChangeProposalWrapper(
|
2022-03-04 11:51:31 -08:00
|
|
|
entityType="assertion",
|
|
|
|
entityUrn=assertion_urn,
|
2022-02-06 14:30:40 -08:00
|
|
|
changeType="UPSERT",
|
|
|
|
aspectName="assertionRunEvent",
|
|
|
|
aspect=AssertionRunEventClass(
|
|
|
|
timestampMillis=timestamps[5],
|
|
|
|
partitionSpec=PartitionSpecClass(
|
|
|
|
partition="FULL_TABLE_SNAPSHOT",
|
|
|
|
type=PartitionTypeClass.FULL_TABLE,
|
2022-03-04 11:51:31 -08:00
|
|
|
),
|
2022-02-06 14:30:40 -08:00
|
|
|
messageId=str(timestamps[5]),
|
|
|
|
assertionUrn=assertion_urn,
|
|
|
|
asserteeUrn=dataset_urn,
|
|
|
|
result=AssertionResultClass(
|
|
|
|
type=AssertionResultTypeClass.SUCCESS,
|
|
|
|
actualAggValue=93,
|
|
|
|
externalUrl="http://example.com/uuid1",
|
2022-03-04 11:51:31 -08:00
|
|
|
),
|
2022-02-06 14:30:40 -08:00
|
|
|
runId="uuid1",
|
2022-03-04 11:51:31 -08:00
|
|
|
status=AssertionRunStatusClass.COMPLETE,
|
|
|
|
),
|
2022-02-06 14:30:40 -08:00
|
|
|
)
|
|
|
|
|
2022-03-04 11:51:31 -08:00
|
|
|
fileSink: FileSink = FileSink.create(
|
|
|
|
FileSinkConfig(filename=test_file), ctx=PipelineContext(run_id="test-file")
|
|
|
|
)
|
2022-02-06 14:30:40 -08:00
|
|
|
for mcp in [mcp1, mcp2, mcp3, mcp4, mcp5, mcp6, mcp7]:
|
2022-03-04 11:51:31 -08:00
|
|
|
fileSink.write_record_async(
|
|
|
|
RecordEnvelope(record=mcp, metadata={}), write_callback=NoopWriteCallback()
|
|
|
|
)
|
2022-02-06 14:30:40 -08:00
|
|
|
fileSink.close()
|
|
|
|
|
|
|
|
|
|
|
|
@pytest.fixture(scope="session")
|
|
|
|
def generate_test_data(tmp_path_factory):
|
|
|
|
"""Generates metadata events data and stores into a test file"""
|
2022-03-04 11:51:31 -08:00
|
|
|
print("generating assertions test data")
|
2022-02-06 14:30:40 -08:00
|
|
|
dir_name = tmp_path_factory.mktemp("test_dq_events")
|
|
|
|
file_name = dir_name / "test_dq_events.json"
|
|
|
|
create_test_data(test_file=str(file_name))
|
|
|
|
yield str(file_name)
|
2022-03-04 11:51:31 -08:00
|
|
|
print("removing assertions test data")
|
|
|
|
delete_urns_from_file(str(file_name))
|
|
|
|
|
2022-02-06 14:30:40 -08:00
|
|
|
|
2022-02-03 12:26:08 +05:30
|
|
|
@pytest.fixture(scope="session")
|
2022-02-06 14:30:40 -08:00
|
|
|
def wait_for_healthchecks(generate_test_data):
|
2022-02-03 12:26:08 +05:30
|
|
|
# Simply assert that everything is healthy, but don't wait.
|
|
|
|
assert not check_local_docker_containers()
|
|
|
|
yield
|
|
|
|
|
|
|
|
|
|
|
|
@pytest.mark.dependency()
|
|
|
|
def test_healthchecks(wait_for_healthchecks):
|
|
|
|
# Call to wait_for_healthchecks fixture will do the actual functionality.
|
|
|
|
pass
|
|
|
|
|
|
|
|
|
|
|
|
@pytest.mark.dependency(depends=["test_healthchecks"])
|
2022-03-04 11:51:31 -08:00
|
|
|
def test_run_ingestion(generate_test_data):
|
2022-02-06 14:30:40 -08:00
|
|
|
ingest_file_via_rest(generate_test_data)
|
2022-02-03 12:26:08 +05:30
|
|
|
|
|
|
|
@pytest.mark.dependency(depends=["test_healthchecks", "test_run_ingestion"])
|
|
|
|
def test_gms_get_latest_assertions_results_by_partition():
|
2022-03-04 11:51:31 -08:00
|
|
|
urn = make_dataset_urn("postgres", "foo")
|
2022-02-03 12:26:08 +05:30
|
|
|
|
|
|
|
# sleep for elasticsearch indices to be updated
|
|
|
|
time.sleep(5)
|
|
|
|
|
|
|
|
# Query
|
|
|
|
# Given the dataset
|
2022-02-06 14:30:40 -08:00
|
|
|
# show me latest assertion run events grouped-by date, partition, assertionId
|
2022-02-03 12:26:08 +05:30
|
|
|
query = json.dumps(
|
|
|
|
{
|
2022-03-04 11:51:31 -08:00
|
|
|
"entityName": "assertion",
|
2022-02-06 14:30:40 -08:00
|
|
|
"aspectName": "assertionRunEvent",
|
2022-02-03 12:26:08 +05:30
|
|
|
"filter": {
|
|
|
|
"or": [
|
|
|
|
{
|
|
|
|
"and": [
|
|
|
|
{
|
2022-03-04 11:51:31 -08:00
|
|
|
"field": "asserteeUrn",
|
2022-02-03 12:26:08 +05:30
|
|
|
"value": urn,
|
|
|
|
"condition": "EQUAL",
|
|
|
|
}
|
|
|
|
]
|
|
|
|
}
|
|
|
|
]
|
|
|
|
},
|
2022-03-04 11:51:31 -08:00
|
|
|
"metrics": [{"fieldPath": "status", "aggregationType": "LATEST"}],
|
2022-02-03 12:26:08 +05:30
|
|
|
"buckets": [
|
|
|
|
{"key": "asserteeUrn", "type": "STRING_GROUPING_BUCKET"},
|
|
|
|
{"key": "partitionSpec.partition", "type": "STRING_GROUPING_BUCKET"},
|
|
|
|
{
|
|
|
|
"key": "timestampMillis",
|
|
|
|
"type": "DATE_GROUPING_BUCKET",
|
|
|
|
"timeWindowSize": {"multiple": 1, "unit": "DAY"},
|
|
|
|
},
|
|
|
|
{"key": "assertionUrn", "type": "STRING_GROUPING_BUCKET"},
|
|
|
|
],
|
|
|
|
}
|
|
|
|
)
|
|
|
|
response = requests.post(
|
|
|
|
f"{GMS_ENDPOINT}/analytics?action=getTimeseriesStats",
|
|
|
|
data=query,
|
|
|
|
headers=restli_default_headers,
|
|
|
|
)
|
|
|
|
|
|
|
|
response.raise_for_status()
|
|
|
|
data = response.json()
|
|
|
|
|
|
|
|
assert data["value"]
|
|
|
|
assert data["value"]["table"]
|
|
|
|
assert sorted(data["value"]["table"]["columnNames"]) == [
|
|
|
|
"asserteeUrn",
|
|
|
|
"assertionUrn",
|
2022-03-04 11:51:31 -08:00
|
|
|
"latest_status",
|
2022-02-03 12:26:08 +05:30
|
|
|
"partitionSpec.partition",
|
|
|
|
"timestampMillis",
|
|
|
|
]
|
|
|
|
assert len(data["value"]["table"]["rows"]) == 6
|
|
|
|
assert (
|
|
|
|
data["value"]["table"]["rows"][0][
|
|
|
|
data["value"]["table"]["columnNames"].index("asserteeUrn")
|
|
|
|
]
|
|
|
|
== urn
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
|
|
@pytest.mark.dependency(depends=["test_healthchecks", "test_run_ingestion"])
|
|
|
|
def test_gms_get_assertions_on_dataset():
|
|
|
|
"""lists all assertion urns including those which may not have executed"""
|
2022-03-04 11:51:31 -08:00
|
|
|
urn = make_dataset_urn("postgres", "foo")
|
2022-02-03 12:26:08 +05:30
|
|
|
response = requests.get(
|
|
|
|
f"{GMS_ENDPOINT}/relationships?direction=INCOMING&urn={urllib.parse.quote(urn)}&types=Asserts"
|
|
|
|
)
|
|
|
|
|
|
|
|
response.raise_for_status()
|
|
|
|
data = response.json()
|
|
|
|
assert len(data["relationships"]) == 1
|
|
|
|
|
|
|
|
|
|
|
|
@pytest.mark.dependency(depends=["test_healthchecks", "test_run_ingestion"])
|
|
|
|
def test_gms_get_assertions_on_dataset_field():
|
|
|
|
"""lists all assertion urns including those which may not have executed"""
|
2022-03-04 11:51:31 -08:00
|
|
|
dataset_urn = make_dataset_urn("postgres", "foo")
|
2022-02-06 14:30:40 -08:00
|
|
|
field_urn = make_schema_field_urn(dataset_urn, "col1")
|
2022-02-03 12:26:08 +05:30
|
|
|
response = requests.get(
|
2022-02-06 14:30:40 -08:00
|
|
|
f"{GMS_ENDPOINT}/relationships?direction=INCOMING&urn={urllib.parse.quote(field_urn)}&types=Asserts"
|
2022-02-03 12:26:08 +05:30
|
|
|
)
|
|
|
|
|
|
|
|
response.raise_for_status()
|
|
|
|
data = response.json()
|
|
|
|
assert len(data["relationships"]) == 1
|
|
|
|
|
|
|
|
|
|
|
|
@pytest.mark.dependency(depends=["test_healthchecks", "test_run_ingestion"])
|
|
|
|
def test_gms_get_assertion_info():
|
|
|
|
assertion_urn = "urn:li:assertion:2d3b06a6e77e1f24adc9860a05ea089b"
|
|
|
|
response = requests.get(
|
|
|
|
f"{GMS_ENDPOINT}/aspects/{urllib.parse.quote(assertion_urn)}\
|
|
|
|
?aspect=assertionInfo&version=0",
|
|
|
|
headers=restli_default_headers,
|
|
|
|
)
|
|
|
|
|
|
|
|
response.raise_for_status()
|
|
|
|
data = response.json()
|
|
|
|
|
|
|
|
assert data["aspect"]
|
|
|
|
assert data["aspect"]["com.linkedin.assertion.AssertionInfo"]
|
2022-02-06 14:30:40 -08:00
|
|
|
assert data["aspect"]["com.linkedin.assertion.AssertionInfo"]["type"] == "DATASET"
|
2022-03-04 11:51:31 -08:00
|
|
|
assert data["aspect"]["com.linkedin.assertion.AssertionInfo"]["datasetAssertion"][
|
|
|
|
"scope"
|
|
|
|
]
|