mirror of
https://github.com/datahub-project/datahub.git
synced 2025-12-04 14:44:02 +00:00
refactor(model): refactor new Assertion models (#4064)
This commit is contained in:
parent
3c52f6c62b
commit
d19241740e
@ -75,6 +75,11 @@ def make_dataset_urn_with_platform_instance(
|
||||
return make_dataset_urn(platform=platform, name=name, env=env)
|
||||
|
||||
|
||||
def make_schema_field_urn(parent_urn: str, field_path: str) -> str:
|
||||
assert parent_urn.startswith("urn:li:"), "Schema field's parent must be an urn"
|
||||
return f"urn:li:schemaField:({parent_urn},{field_path})"
|
||||
|
||||
|
||||
def dataset_urn_to_key(dataset_urn: str) -> Optional[DatasetKeyClass]:
|
||||
pattern = r"urn:li:dataset:\(urn:li:dataPlatform:(.*),(.*),(.*)\)"
|
||||
results = re.search(pattern, dataset_urn)
|
||||
|
||||
@ -2,7 +2,6 @@ namespace com.linkedin.assertion
|
||||
|
||||
import com.linkedin.common.CustomProperties
|
||||
import com.linkedin.common.ExternalReference
|
||||
import com.linkedin.common.Urn
|
||||
|
||||
/**
|
||||
* Information about an assertion
|
||||
@ -12,40 +11,25 @@ import com.linkedin.common.Urn
|
||||
}
|
||||
record AssertionInfo includes CustomProperties, ExternalReference {
|
||||
/**
|
||||
* One or more dataset schema fields that are targeted by this assertion
|
||||
* Type of assertion. Assertion types can evolve to span Datasets, Flows (Pipelines), Models, Features etc.
|
||||
*/
|
||||
@Relationship = {
|
||||
"/*": {
|
||||
"name": "Asserts",
|
||||
"entityTypes": [ "schemaField" ]
|
||||
type: enum AssertionType {
|
||||
// When present, then DatasetAssertionInfo elements are filled out
|
||||
DATASET
|
||||
}
|
||||
}
|
||||
datasetFields: optional array[Urn]
|
||||
|
||||
/**
|
||||
* One or more datasets that are targeted by this assertion
|
||||
* Dataset Assertion information when type is DATASET
|
||||
*/
|
||||
@Relationship = {
|
||||
"/*": {
|
||||
"name": "Asserts",
|
||||
"entityTypes": [ "dataset" ]
|
||||
}
|
||||
}
|
||||
datasets: optional array[Urn]
|
||||
|
||||
/**
|
||||
* Type of assertion
|
||||
*/
|
||||
assertionType: AssertionType
|
||||
datasetAssertion: optional DatasetAssertionInfo
|
||||
|
||||
/*
|
||||
* Logic for assertion such as implementation of custom nativeOperator
|
||||
* Logic for the assertion as expressed in the native assertion language. Code fragments, query strings, etc.
|
||||
*/
|
||||
assertionLogic: optional string
|
||||
|
||||
/**
|
||||
* Parameters required for the assertion. e.g. min_value, max_value, value, columns
|
||||
*/
|
||||
assertionParameters: map[string, string] = { }
|
||||
|
||||
parameters: optional map[string, string]
|
||||
}
|
||||
@ -1,51 +1,53 @@
|
||||
namespace com.linkedin.assertion
|
||||
|
||||
import com.linkedin.timeseries.TimeseriesAspectBase
|
||||
import com.linkedin.common.ExternalReference
|
||||
import com.linkedin.common.Urn
|
||||
record AssertionResult {
|
||||
|
||||
/**
|
||||
* The results of evaluating the assertion on the batch
|
||||
/**
|
||||
* The final result, e.g. either SUCCESS or FAILURE.
|
||||
*/
|
||||
@Aspect = {
|
||||
"name": "assertionResult",
|
||||
"type": "timeseries",
|
||||
}
|
||||
|
||||
record AssertionResult includes TimeseriesAspectBase {
|
||||
|
||||
/*
|
||||
* Urn of assertion which is evaluated
|
||||
*/
|
||||
@TimeseriesField = {}
|
||||
assertionUrn: Urn
|
||||
|
||||
/*
|
||||
* Urn of entity being asserted
|
||||
*/
|
||||
//example - dataset urn, if dataset is being asserted
|
||||
@TimeseriesField = {}
|
||||
asserteeUrn: Urn
|
||||
|
||||
/**
|
||||
* Specification of the batch whose data quality is evaluated
|
||||
*/
|
||||
batchSpec: optional BatchSpec
|
||||
|
||||
/**
|
||||
* Results of assertion
|
||||
*/
|
||||
@TimeseriesField = {}
|
||||
batchAssertionResult: BatchAssertionResult
|
||||
|
||||
/**
|
||||
* Native Run identifier of platform evaluating the assertions
|
||||
*/
|
||||
//Multiple assertions could occur in same evaluator run
|
||||
nativeEvaluatorRunId: optional string
|
||||
|
||||
/**
|
||||
* Runtime parameters of evaluation
|
||||
*/
|
||||
runtimeContext: map[string, string] = { }
|
||||
type: enum AssertionResultType {
|
||||
/**
|
||||
* The Assertion Succeeded
|
||||
*/
|
||||
SUCCESS
|
||||
/**
|
||||
* The Assertion Failed
|
||||
*/
|
||||
FAILURE
|
||||
}
|
||||
|
||||
/**
|
||||
* Number of rows for evaluated batch
|
||||
*/
|
||||
rowCount: optional long
|
||||
|
||||
/**
|
||||
* Number of rows with missing value for evaluated batch
|
||||
*/
|
||||
missingCount: optional long
|
||||
|
||||
/**
|
||||
* Number of rows with unexpected value for evaluated batch
|
||||
*/
|
||||
unexpectedCount: optional long
|
||||
|
||||
/**
|
||||
* Observed aggregate value for evaluated batch
|
||||
*/
|
||||
actualAggValue: optional float
|
||||
|
||||
/**
|
||||
* Other results of evaluation
|
||||
*/
|
||||
nativeResults: optional map[string, string]
|
||||
|
||||
/**
|
||||
* URL where full results are available
|
||||
*/
|
||||
externalUrl: optional string
|
||||
|
||||
/**
|
||||
* Runtime context for the evaluation
|
||||
*/
|
||||
runtimeContext: optional map[string, string]
|
||||
}
|
||||
@ -0,0 +1,63 @@
|
||||
namespace com.linkedin.assertion
|
||||
|
||||
import com.linkedin.timeseries.TimeseriesAspectBase
|
||||
import com.linkedin.common.ExternalReference
|
||||
import com.linkedin.common.Urn
|
||||
|
||||
/**
|
||||
* An event representing the current status of evaluating an assertion on a batch.
|
||||
* AssertionRunEvent should be used for reporting the status of a run as an assertion evaluation progresses.
|
||||
*/
|
||||
@Aspect = {
|
||||
"name": "assertionRunEvent",
|
||||
"type": "timeseries",
|
||||
}
|
||||
record AssertionRunEvent includes TimeseriesAspectBase {
|
||||
|
||||
/**
|
||||
* Native (platform-specific) identifier for this run
|
||||
*/
|
||||
//Multiple assertions could occur in same evaluator run
|
||||
runId: string
|
||||
|
||||
/*
|
||||
* Urn of assertion which is evaluated
|
||||
*/
|
||||
@TimeseriesField = {}
|
||||
assertionUrn: Urn
|
||||
|
||||
/*
|
||||
* Urn of entity on which the assertion is applicable
|
||||
*/
|
||||
//example - dataset urn, if dataset is being asserted
|
||||
@TimeseriesField = {}
|
||||
asserteeUrn: Urn
|
||||
|
||||
/**
|
||||
* Specification of the batch which this run is evaluating
|
||||
*/
|
||||
batchSpec: optional BatchSpec
|
||||
|
||||
/**
|
||||
* The status of the assertion run as per this timeseries event.
|
||||
*/
|
||||
// Currently just supports COMPLETE, but should evolve to support other statuses like STARTED, RUNNING, etc.
|
||||
@TimeseriesField = {}
|
||||
status: enum AssertionRunStatus {
|
||||
/**
|
||||
* The Assertion Run has completed
|
||||
*/
|
||||
COMPLETE
|
||||
}
|
||||
|
||||
/**
|
||||
* Results of assertion, present if the status is COMPLETE
|
||||
*/
|
||||
@TimeseriesField = {}
|
||||
result: optional AssertionResult
|
||||
|
||||
/**
|
||||
* Runtime parameters of evaluation
|
||||
*/
|
||||
runtimeContext: optional map[string, string]
|
||||
}
|
||||
@ -1,32 +0,0 @@
|
||||
namespace com.linkedin.assertion
|
||||
|
||||
/**
|
||||
* Type of Assertion
|
||||
*/
|
||||
record AssertionType {
|
||||
/**
|
||||
* Scope of Assertion
|
||||
*/
|
||||
scope: enum AssertionScope {
|
||||
DATASET_COLUMN
|
||||
DATASET_ROWS
|
||||
DATASET_SCHEMA
|
||||
CROSS_DATASET
|
||||
}
|
||||
|
||||
/**
|
||||
* Assertion details for scope DATASET_COLUMN
|
||||
*/
|
||||
datasetColumnAssertion: optional DatasetColumnAssertion
|
||||
|
||||
/**
|
||||
* Assertion details for scope DATASET_ROWS
|
||||
*/
|
||||
datasetRowsAssertion: optional DatasetRowsAssertion
|
||||
|
||||
/**
|
||||
* Assertion details for scope DATASET_SCHEMA
|
||||
*/
|
||||
datasetSchemaAssertion: optional DatasetSchemaAssertion
|
||||
|
||||
}
|
||||
@ -1,40 +0,0 @@
|
||||
namespace com.linkedin.assertion
|
||||
|
||||
record BatchAssertionResult {
|
||||
|
||||
/**
|
||||
* Indicator of whether the constraint is fully satisfied for the batch
|
||||
*/
|
||||
success: boolean
|
||||
|
||||
/**
|
||||
* Number of rows for evaluated batch
|
||||
*/
|
||||
rowCount: optional long
|
||||
|
||||
/**
|
||||
* Number of rows with missing value for evaluated batch
|
||||
*/
|
||||
missingCount: optional long
|
||||
|
||||
/**
|
||||
* Number of rows with unexpected value for evaluated batch
|
||||
*/
|
||||
unexpectedCount: optional long
|
||||
|
||||
/**
|
||||
* Observed aggregate value for evaluated batch
|
||||
*/
|
||||
actualAggValue: optional float
|
||||
|
||||
/**
|
||||
* Other results of evaluation
|
||||
*/
|
||||
nativeResults: map[string, string] = { }
|
||||
|
||||
/**
|
||||
* URL where the reference exist
|
||||
*/
|
||||
//TODO - Change type to optional Url, not working
|
||||
externalUrl: optional string
|
||||
}
|
||||
@ -0,0 +1,66 @@
|
||||
namespace com.linkedin.assertion
|
||||
|
||||
import com.linkedin.common.Urn
|
||||
|
||||
/**
|
||||
* Assertion attributes that are applicable to Dataset Assertions
|
||||
**/
|
||||
record DatasetAssertionInfo {
|
||||
/**
|
||||
* Scope of the Assertion. What part of the dataset does this assertion apply to?
|
||||
* Declared optional to make it convenient to inline into other aspects like AssertionInfo without requiring
|
||||
* additional nesting. Semantically required, if this is a Dataset Assertion.
|
||||
**/
|
||||
scope: optional enum DatasetAssertionScope {
|
||||
/**
|
||||
* This assertion applies to dataset columns
|
||||
*/
|
||||
DATASET_COLUMN
|
||||
/**
|
||||
* This assertion applies to entire rows of the dataset
|
||||
*/
|
||||
DATASET_ROWS
|
||||
/**
|
||||
* This assertion applies to the schema of the dataset
|
||||
*/
|
||||
DATASET_SCHEMA
|
||||
// Future evolution can include things like CROSS_DATASET assertions
|
||||
}
|
||||
|
||||
/**
|
||||
* Assertion details when scope is DATASET_COLUMN
|
||||
*/
|
||||
columnAssertion: optional DatasetColumnAssertion
|
||||
|
||||
/**
|
||||
* Assertion details when scope is DATASET_ROWS
|
||||
*/
|
||||
rowsAssertion: optional DatasetRowsAssertion
|
||||
|
||||
/**
|
||||
* Assertion details when scope is DATASET_SCHEMA
|
||||
*/
|
||||
schemaAssertion: optional DatasetSchemaAssertion
|
||||
|
||||
/**
|
||||
* One or more dataset schema fields that are targeted by this assertion
|
||||
*/
|
||||
@Relationship = {
|
||||
"/*": {
|
||||
"name": "Asserts",
|
||||
"entityTypes": [ "schemaField" ]
|
||||
}
|
||||
}
|
||||
fields: optional array[Urn]
|
||||
|
||||
/**
|
||||
* One or more datasets that are targeted by this assertion
|
||||
*/
|
||||
@Relationship = {
|
||||
"/*": {
|
||||
"name": "Asserts",
|
||||
"entityTypes": [ "dataset" ]
|
||||
}
|
||||
}
|
||||
datasets: optional array[Urn]
|
||||
}
|
||||
@ -1,14 +0,0 @@
|
||||
namespace com.linkedin.metadata.aspect
|
||||
|
||||
import com.linkedin.metadata.key.AssertionKey
|
||||
import com.linkedin.common.DataPlatformInstance
|
||||
import com.linkedin.assertion.AssertionInfo
|
||||
|
||||
/**
|
||||
* A union of all supported metadata aspects for a Assertion
|
||||
*/
|
||||
typeref AssertionAspect = union[
|
||||
AssertionKey,
|
||||
DataPlatformInstance,
|
||||
AssertionInfo
|
||||
]
|
||||
@ -1,24 +0,0 @@
|
||||
namespace com.linkedin.metadata.snapshot
|
||||
|
||||
import com.linkedin.common.Urn
|
||||
import com.linkedin.metadata.aspect.AssertionAspect
|
||||
|
||||
/**
|
||||
* A metadata snapshot for a specific Assertion entity.
|
||||
*/
|
||||
@Entity = {
|
||||
"name": "assertion",
|
||||
"keyAspect": "assertionKey"
|
||||
}
|
||||
record AssertionSnapshot {
|
||||
|
||||
/**
|
||||
* URN for the entity the metadata snapshot is associated with.
|
||||
*/
|
||||
urn: Urn
|
||||
|
||||
/**
|
||||
* The list of metadata aspects associated with the assertion.
|
||||
*/
|
||||
aspects: array[AssertionAspect]
|
||||
}
|
||||
@ -12,7 +12,7 @@ entities:
|
||||
- schemaMetadata
|
||||
- status
|
||||
- container
|
||||
- assertionResult
|
||||
- assertionRunEvent
|
||||
- name: dataHubPolicy
|
||||
doc: DataHub Policies represent access policies granted to users or groups on metadata operations like edit, view etc.
|
||||
keyAspect: dataHubPolicyKey
|
||||
|
||||
@ -5,8 +5,13 @@ import pytest
|
||||
import requests
|
||||
from datahub.cli.docker import check_local_docker_containers
|
||||
from tests.utils import ingest_file_via_rest
|
||||
|
||||
bootstrap_sample_data = "test_resources/bootstrap_data_quality.json"
|
||||
from datahub.metadata.schema_classes import AssertionResultTypeClass, DatasetAssertionInfoClass, PartitionTypeClass, AssertionInfoClass, AssertionTypeClass, DatasetAssertionScopeClass, DatasetColumnAssertionClass, AssertionStdOperatorClass, DatasetColumnStdAggFuncClass, AssertionRunEventClass, PartitionSpecClass, AssertionResultClass, AssertionRunStatusClass
|
||||
from datahub.emitter.mce_builder import make_schema_field_urn, make_dataset_urn
|
||||
from datahub.emitter.mcp import MetadataChangeProposalWrapper
|
||||
from datahub.ingestion.sink.file import FileSink, FileSinkConfig
|
||||
from datahub.ingestion.api.sink import NoopWriteCallback, WriteCallback
|
||||
from datahub.ingestion.api.common import RecordEnvelope
|
||||
from datahub.ingestion.api.common import PipelineContext
|
||||
GMS_ENDPOINT = "http://localhost:8080"
|
||||
|
||||
restli_default_headers = {
|
||||
@ -14,8 +19,202 @@ restli_default_headers = {
|
||||
}
|
||||
|
||||
|
||||
def create_test_data(test_file):
|
||||
assertion_urn = "urn:li:assertion:2d3b06a6e77e1f24adc9860a05ea089b"
|
||||
dataset_urn=make_dataset_urn(platform="postgres", name="fooTable")
|
||||
assertion_info = AssertionInfoClass(
|
||||
type=AssertionTypeClass.DATASET,
|
||||
customProperties={
|
||||
"suite_name": "demo_suite"
|
||||
},
|
||||
datasetAssertion=DatasetAssertionInfoClass(
|
||||
fields=[make_schema_field_urn(dataset_urn,"col1")],
|
||||
datasets=[dataset_urn],
|
||||
scope=DatasetAssertionScopeClass.DATASET_COLUMN,
|
||||
columnAssertion=DatasetColumnAssertionClass(
|
||||
stdOperator=AssertionStdOperatorClass.LESS_THAN,
|
||||
nativeOperator="column_value_is_less_than",
|
||||
stdAggFunc=DatasetColumnStdAggFuncClass.IDENTITY,
|
||||
),
|
||||
),
|
||||
parameters={
|
||||
"max_value": "99"
|
||||
},
|
||||
)
|
||||
# The assertion definition
|
||||
mcp1 = MetadataChangeProposalWrapper(
|
||||
entityType="assertion",
|
||||
changeType="UPSERT",
|
||||
entityUrn=assertion_urn,
|
||||
aspectName="assertionInfo",
|
||||
aspect=assertion_info
|
||||
)
|
||||
timestamps = [1643794280350, 1643794280352, 1643794280354, 1643880726872, 1643880726874, 1643880726875]
|
||||
msg_ids = []
|
||||
# The assertion run event attached to the dataset
|
||||
mcp2 = MetadataChangeProposalWrapper(
|
||||
entityType="dataset",
|
||||
entityUrn=dataset_urn,
|
||||
changeType="UPSERT",
|
||||
aspectName="assertionRunEvent",
|
||||
aspect=AssertionRunEventClass(
|
||||
timestampMillis=timestamps[0],
|
||||
partitionSpec=PartitionSpecClass(
|
||||
partition="[{'country': 'IN'}]",
|
||||
type=PartitionTypeClass.PARTITION,
|
||||
),
|
||||
messageId=str(timestamps[0]),
|
||||
assertionUrn=assertion_urn,
|
||||
asserteeUrn=dataset_urn,
|
||||
result=AssertionResultClass(
|
||||
type=AssertionResultTypeClass.SUCCESS,
|
||||
actualAggValue=90,
|
||||
externalUrl="http://example.com/uuid1",
|
||||
),
|
||||
runId="uuid1",
|
||||
status=AssertionRunStatusClass.COMPLETE
|
||||
)
|
||||
)
|
||||
|
||||
mcp3 = MetadataChangeProposalWrapper(
|
||||
entityType="dataset",
|
||||
entityUrn=dataset_urn,
|
||||
changeType="UPSERT",
|
||||
aspectName="assertionRunEvent",
|
||||
aspect=AssertionRunEventClass(
|
||||
timestampMillis=timestamps[1],
|
||||
partitionSpec=PartitionSpecClass(
|
||||
partition="[{'country': 'US'}]",
|
||||
type=PartitionTypeClass.PARTITION,
|
||||
),
|
||||
messageId=str(timestamps[1]),
|
||||
assertionUrn=assertion_urn,
|
||||
asserteeUrn=dataset_urn,
|
||||
result=AssertionResultClass(
|
||||
type=AssertionResultTypeClass.FAILURE,
|
||||
actualAggValue=101,
|
||||
externalUrl="http://example.com/uuid1",
|
||||
),
|
||||
runId="uuid1",
|
||||
status=AssertionRunStatusClass.COMPLETE
|
||||
)
|
||||
)
|
||||
# Result of evaluating this assertion on the whole dataset
|
||||
mcp4 = MetadataChangeProposalWrapper(
|
||||
entityType="dataset",
|
||||
entityUrn=dataset_urn,
|
||||
changeType="UPSERT",
|
||||
aspectName="assertionRunEvent",
|
||||
aspect=AssertionRunEventClass(
|
||||
timestampMillis=timestamps[2],
|
||||
partitionSpec=PartitionSpecClass(
|
||||
partition="FULL_TABLE_SNAPSHOT",
|
||||
type=PartitionTypeClass.FULL_TABLE,
|
||||
),
|
||||
messageId=str(timestamps[2]),
|
||||
assertionUrn=assertion_urn,
|
||||
asserteeUrn=dataset_urn,
|
||||
result=AssertionResultClass(
|
||||
type=AssertionResultTypeClass.SUCCESS,
|
||||
actualAggValue=93,
|
||||
externalUrl="http://example.com/uuid1",
|
||||
),
|
||||
runId="uuid1",
|
||||
status=AssertionRunStatusClass.COMPLETE
|
||||
)
|
||||
)
|
||||
|
||||
mcp5 = MetadataChangeProposalWrapper(
|
||||
entityType="dataset",
|
||||
entityUrn=dataset_urn,
|
||||
changeType="UPSERT",
|
||||
aspectName="assertionRunEvent",
|
||||
aspect=AssertionRunEventClass(
|
||||
timestampMillis=timestamps[3],
|
||||
partitionSpec=PartitionSpecClass(
|
||||
partition="[{'country': 'IN'}]",
|
||||
type=PartitionTypeClass.PARTITION,
|
||||
),
|
||||
messageId=str(timestamps[3]),
|
||||
assertionUrn=assertion_urn,
|
||||
asserteeUrn=dataset_urn,
|
||||
result=AssertionResultClass(
|
||||
type=AssertionResultTypeClass.SUCCESS,
|
||||
actualAggValue=90,
|
||||
externalUrl="http://example.com/uuid1",
|
||||
),
|
||||
runId="uuid1",
|
||||
status=AssertionRunStatusClass.COMPLETE
|
||||
)
|
||||
)
|
||||
mcp6 = MetadataChangeProposalWrapper(
|
||||
entityType="dataset",
|
||||
entityUrn=dataset_urn,
|
||||
changeType="UPSERT",
|
||||
aspectName="assertionRunEvent",
|
||||
aspect=AssertionRunEventClass(
|
||||
timestampMillis=timestamps[4],
|
||||
partitionSpec=PartitionSpecClass(
|
||||
partition="[{'country': 'US'}]",
|
||||
type=PartitionTypeClass.PARTITION,
|
||||
),
|
||||
messageId=str(timestamps[4]),
|
||||
assertionUrn=assertion_urn,
|
||||
asserteeUrn=dataset_urn,
|
||||
result=AssertionResultClass(
|
||||
type=AssertionResultTypeClass.FAILURE,
|
||||
actualAggValue=101,
|
||||
externalUrl="http://example.com/uuid1",
|
||||
),
|
||||
runId="uuid1",
|
||||
status=AssertionRunStatusClass.COMPLETE
|
||||
)
|
||||
)
|
||||
|
||||
# Result of evaluating this assertion on the whole dataset
|
||||
mcp7 = MetadataChangeProposalWrapper(
|
||||
entityType="dataset",
|
||||
entityUrn=dataset_urn,
|
||||
changeType="UPSERT",
|
||||
aspectName="assertionRunEvent",
|
||||
aspect=AssertionRunEventClass(
|
||||
timestampMillis=timestamps[5],
|
||||
partitionSpec=PartitionSpecClass(
|
||||
partition="FULL_TABLE_SNAPSHOT",
|
||||
type=PartitionTypeClass.FULL_TABLE,
|
||||
),
|
||||
messageId=str(timestamps[5]),
|
||||
assertionUrn=assertion_urn,
|
||||
asserteeUrn=dataset_urn,
|
||||
result=AssertionResultClass(
|
||||
type=AssertionResultTypeClass.SUCCESS,
|
||||
actualAggValue=93,
|
||||
externalUrl="http://example.com/uuid1",
|
||||
),
|
||||
runId="uuid1",
|
||||
status=AssertionRunStatusClass.COMPLETE
|
||||
)
|
||||
)
|
||||
|
||||
fileSink: FileSink = FileSink.create(FileSinkConfig(filename=test_file), ctx=PipelineContext(run_id="test-file"))
|
||||
for mcp in [mcp1, mcp2, mcp3, mcp4, mcp5, mcp6, mcp7]:
|
||||
fileSink.write_record_async(RecordEnvelope(record=mcp, metadata={}), write_callback=NoopWriteCallback())
|
||||
fileSink.close()
|
||||
|
||||
|
||||
|
||||
|
||||
@pytest.fixture(scope="session")
|
||||
def wait_for_healthchecks():
|
||||
def generate_test_data(tmp_path_factory):
|
||||
"""Generates metadata events data and stores into a test file"""
|
||||
dir_name = tmp_path_factory.mktemp("test_dq_events")
|
||||
file_name = dir_name / "test_dq_events.json"
|
||||
create_test_data(test_file=str(file_name))
|
||||
yield str(file_name)
|
||||
|
||||
|
||||
@pytest.fixture(scope="session")
|
||||
def wait_for_healthchecks(generate_test_data):
|
||||
# Simply assert that everything is healthy, but don't wait.
|
||||
assert not check_local_docker_containers()
|
||||
yield
|
||||
@ -28,8 +227,8 @@ def test_healthchecks(wait_for_healthchecks):
|
||||
|
||||
|
||||
@pytest.mark.dependency(depends=["test_healthchecks"])
|
||||
def test_run_ingestion(wait_for_healthchecks):
|
||||
ingest_file_via_rest(bootstrap_sample_data)
|
||||
def test_run_ingestion(wait_for_healthchecks, generate_test_data):
|
||||
ingest_file_via_rest(generate_test_data)
|
||||
|
||||
|
||||
@pytest.mark.dependency(depends=["test_healthchecks", "test_run_ingestion"])
|
||||
@ -41,11 +240,11 @@ def test_gms_get_latest_assertions_results_by_partition():
|
||||
|
||||
# Query
|
||||
# Given the dataset
|
||||
# show me latest assertion results grouped-by date, partition, assertionId
|
||||
# show me latest assertion run events grouped-by date, partition, assertionId
|
||||
query = json.dumps(
|
||||
{
|
||||
"entityName": "dataset",
|
||||
"aspectName": "assertionResult",
|
||||
"aspectName": "assertionRunEvent",
|
||||
"filter": {
|
||||
"or": [
|
||||
{
|
||||
@ -60,7 +259,7 @@ def test_gms_get_latest_assertions_results_by_partition():
|
||||
]
|
||||
},
|
||||
"metrics": [
|
||||
{"fieldPath": "batchAssertionResult", "aggregationType": "LATEST"}
|
||||
{"fieldPath": "result", "aggregationType": "LATEST"}
|
||||
],
|
||||
"buckets": [
|
||||
{"key": "asserteeUrn", "type": "STRING_GROUPING_BUCKET"},
|
||||
@ -88,7 +287,7 @@ def test_gms_get_latest_assertions_results_by_partition():
|
||||
assert sorted(data["value"]["table"]["columnNames"]) == [
|
||||
"asserteeUrn",
|
||||
"assertionUrn",
|
||||
"latest_batchAssertionResult",
|
||||
"latest_result",
|
||||
"partitionSpec.partition",
|
||||
"timestampMillis",
|
||||
]
|
||||
@ -117,9 +316,10 @@ def test_gms_get_assertions_on_dataset():
|
||||
@pytest.mark.dependency(depends=["test_healthchecks", "test_run_ingestion"])
|
||||
def test_gms_get_assertions_on_dataset_field():
|
||||
"""lists all assertion urns including those which may not have executed"""
|
||||
urn = "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:postgres,fooTable,PROD), col1)"
|
||||
dataset_urn=make_dataset_urn("postgres","fooTable")
|
||||
field_urn = make_schema_field_urn(dataset_urn, "col1")
|
||||
response = requests.get(
|
||||
f"{GMS_ENDPOINT}/relationships?direction=INCOMING&urn={urllib.parse.quote(urn)}&types=Asserts"
|
||||
f"{GMS_ENDPOINT}/relationships?direction=INCOMING&urn={urllib.parse.quote(field_urn)}&types=Asserts"
|
||||
)
|
||||
|
||||
response.raise_for_status()
|
||||
@ -141,4 +341,6 @@ def test_gms_get_assertion_info():
|
||||
|
||||
assert data["aspect"]
|
||||
assert data["aspect"]["com.linkedin.assertion.AssertionInfo"]
|
||||
assert data["aspect"]["com.linkedin.assertion.AssertionInfo"]["assertionType"]
|
||||
assert data["aspect"]["com.linkedin.assertion.AssertionInfo"]["type"] == "DATASET"
|
||||
assert data["aspect"]["com.linkedin.assertion.AssertionInfo"]["datasetAssertion"]["scope"]
|
||||
|
||||
|
||||
@ -1,72 +0,0 @@
|
||||
[
|
||||
{
|
||||
"entityType": "assertion",
|
||||
"entityUrn": "urn:li:assertion:2d3b06a6e77e1f24adc9860a05ea089b",
|
||||
"changeType": "UPSERT",
|
||||
"aspectName": "assertionInfo",
|
||||
"aspect": {
|
||||
"value": "{\"customProperties\": {\"suite_name\": \"demo_suite\"}, \"datasetFields\": [\"urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:postgres,fooTable,PROD), col1)\"], \"datasets\": [\"urn:li:dataset:(urn:li:dataPlatform:postgres,fooTable,PROD)\"], \"assertionType\": {\"scope\": \"DATASET_COLUMN\", \"datasetColumnAssertion\": {\"stdOperator\": \"LESS_THAN\", \"nativeOperator\": \"column_value_is_less_than\", \"stdAggFunc\": \"IDENTITY\"}}, \"assertionParameters\": {\"max_value\": \"99\"}}",
|
||||
"contentType": "application/json"
|
||||
}
|
||||
},
|
||||
{
|
||||
"entityType": "dataset",
|
||||
"entityUrn": "urn:li:dataset:(urn:li:dataPlatform:postgres,fooTable,PROD)",
|
||||
"changeType": "UPSERT",
|
||||
"aspectName": "assertionResult",
|
||||
"aspect": {
|
||||
"value": "{\"timestampMillis\": 1643794280350, \"partitionSpec\": {\"type\": \"PARTITION\", \"partition\": \"[{'country': 'IN'}]\"}, \"messageId\": \"1643794280350\", \"assertionUrn\": \"urn:li:assertion:2d3b06a6e77e1f24adc9860a05ea089b\", \"asserteeUrn\": \"urn:li:dataset:(urn:li:dataPlatform:postgres,fooTable,PROD)\", \"batchAssertionResult\": {\"success\": true, \"actualAggValue\": 90, \"nativeResults\": {}, \"externalUrl\": \"http://example.com/uuid1\"}, \"nativeEvaluatorRunId\": \"uuid1\", \"runtimeContext\": {}}",
|
||||
"contentType": "application/json"
|
||||
}
|
||||
},
|
||||
{
|
||||
"entityType": "dataset",
|
||||
"entityUrn": "urn:li:dataset:(urn:li:dataPlatform:postgres,fooTable,PROD)",
|
||||
"changeType": "UPSERT",
|
||||
"aspectName": "assertionResult",
|
||||
"aspect": {
|
||||
"value": "{\"timestampMillis\": 1643794280352, \"partitionSpec\": {\"type\": \"PARTITION\", \"partition\": \"[{'country': 'US'}]\"}, \"messageId\": \"1643794280352\", \"assertionUrn\": \"urn:li:assertion:2d3b06a6e77e1f24adc9860a05ea089b\", \"asserteeUrn\": \"urn:li:dataset:(urn:li:dataPlatform:postgres,fooTable,PROD)\", \"batchAssertionResult\": {\"success\": false, \"actualAggValue\": 101, \"nativeResults\": {}, \"externalUrl\": \"http://example.com/uuid1\"}, \"nativeEvaluatorRunId\": \"uuid1\", \"runtimeContext\": {}}",
|
||||
"contentType": "application/json"
|
||||
}
|
||||
},
|
||||
{
|
||||
"entityType": "dataset",
|
||||
"entityUrn": "urn:li:dataset:(urn:li:dataPlatform:postgres,fooTable,PROD)",
|
||||
"changeType": "UPSERT",
|
||||
"aspectName": "assertionResult",
|
||||
"aspect": {
|
||||
"value": "{\"timestampMillis\": 1643794280354, \"partitionSpec\": {\"type\": \"FULL_TABLE\", \"partition\": \"FULL_TABLE_SNAPSHOT\"}, \"messageId\": \"1643794280354\", \"assertionUrn\": \"urn:li:assertion:2d3b06a6e77e1f24adc9860a05ea089b\", \"asserteeUrn\": \"urn:li:dataset:(urn:li:dataPlatform:postgres,fooTable,PROD)\", \"batchAssertionResult\": {\"success\": true, \"actualAggValue\": 93, \"nativeResults\": {}, \"externalUrl\": \"http://example.com/uuid1\"}, \"nativeEvaluatorRunId\": \"uuid1\", \"runtimeContext\": {}}",
|
||||
"contentType": "application/json"
|
||||
}
|
||||
},
|
||||
{
|
||||
"entityType": "dataset",
|
||||
"entityUrn": "urn:li:dataset:(urn:li:dataPlatform:postgres,fooTable,PROD)",
|
||||
"changeType": "UPSERT",
|
||||
"aspectName": "assertionResult",
|
||||
"aspect": {
|
||||
"value": "{\"timestampMillis\": 1643880726872, \"partitionSpec\": {\"type\": \"PARTITION\", \"partition\": \"[{'country': 'IN'}]\"}, \"messageId\": \"1643880726872\", \"assertionUrn\": \"urn:li:assertion:2d3b06a6e77e1f24adc9860a05ea089b\", \"asserteeUrn\": \"urn:li:dataset:(urn:li:dataPlatform:postgres,fooTable,PROD)\", \"batchAssertionResult\": {\"success\": true, \"actualAggValue\": 90, \"nativeResults\": {}, \"externalUrl\": \"http://example.com/uuid1\"}, \"nativeEvaluatorRunId\": \"uuid1\", \"runtimeContext\": {}}",
|
||||
"contentType": "application/json"
|
||||
}
|
||||
},
|
||||
{
|
||||
"entityType": "dataset",
|
||||
"entityUrn": "urn:li:dataset:(urn:li:dataPlatform:postgres,fooTable,PROD)",
|
||||
"changeType": "UPSERT",
|
||||
"aspectName": "assertionResult",
|
||||
"aspect": {
|
||||
"value": "{\"timestampMillis\": 1643880726874, \"partitionSpec\": {\"type\": \"PARTITION\", \"partition\": \"[{'country': 'US'}]\"}, \"messageId\": \"1643880726874\", \"assertionUrn\": \"urn:li:assertion:2d3b06a6e77e1f24adc9860a05ea089b\", \"asserteeUrn\": \"urn:li:dataset:(urn:li:dataPlatform:postgres,fooTable,PROD)\", \"batchAssertionResult\": {\"success\": false, \"actualAggValue\": 101, \"nativeResults\": {}, \"externalUrl\": \"http://example.com/uuid1\"}, \"nativeEvaluatorRunId\": \"uuid1\", \"runtimeContext\": {}}",
|
||||
"contentType": "application/json"
|
||||
}
|
||||
},
|
||||
{
|
||||
"entityType": "dataset",
|
||||
"entityUrn": "urn:li:dataset:(urn:li:dataPlatform:postgres,fooTable,PROD)",
|
||||
"changeType": "UPSERT",
|
||||
"aspectName": "assertionResult",
|
||||
"aspect": {
|
||||
"value": "{\"timestampMillis\": 1643880726875, \"partitionSpec\": {\"type\": \"FULL_TABLE\", \"partition\": \"FULL_TABLE_SNAPSHOT\"}, \"messageId\": \"1643880726875\", \"assertionUrn\": \"urn:li:assertion:2d3b06a6e77e1f24adc9860a05ea089b\", \"asserteeUrn\": \"urn:li:dataset:(urn:li:dataPlatform:postgres,fooTable,PROD)\", \"batchAssertionResult\": {\"success\": true, \"actualAggValue\": 93, \"nativeResults\": {}, \"externalUrl\": \"http://example.com/uuid1\"}, \"nativeEvaluatorRunId\": \"uuid1\", \"runtimeContext\": {}}",
|
||||
"contentType": "application/json"
|
||||
}
|
||||
}
|
||||
]
|
||||
Loading…
x
Reference in New Issue
Block a user