2022-03-30 01:55:51 +05:30
|
|
|
import json
|
2022-02-03 12:26:08 +05:30
|
|
|
import time
|
|
|
|
|
|
|
|
import datahub.emitter.mce_builder as builder
|
|
|
|
from datahub.emitter.mcp import MetadataChangeProposalWrapper
|
|
|
|
from datahub.emitter.rest_emitter import DatahubRestEmitter
|
|
|
|
from datahub.metadata.com.linkedin.pegasus2avro.assertion import (
|
|
|
|
AssertionInfo,
|
|
|
|
AssertionResult,
|
2022-03-04 11:51:31 -08:00
|
|
|
AssertionResultType,
|
|
|
|
AssertionRunEvent,
|
|
|
|
AssertionRunStatus,
|
|
|
|
AssertionStdAggregation,
|
2022-02-03 12:26:08 +05:30
|
|
|
AssertionStdOperator,
|
2022-03-30 01:55:51 +05:30
|
|
|
AssertionStdParameter,
|
|
|
|
AssertionStdParameters,
|
|
|
|
AssertionStdParameterType,
|
2022-02-03 12:26:08 +05:30
|
|
|
AssertionType,
|
2022-03-04 11:51:31 -08:00
|
|
|
DatasetAssertionInfo,
|
|
|
|
DatasetAssertionScope,
|
2022-02-03 12:26:08 +05:30
|
|
|
)
|
2022-03-30 01:55:51 +05:30
|
|
|
from datahub.metadata.com.linkedin.pegasus2avro.common import DataPlatformInstance
|
|
|
|
from datahub.metadata.com.linkedin.pegasus2avro.dataset import DatasetProperties
|
|
|
|
from datahub.metadata.com.linkedin.pegasus2avro.timeseries import PartitionSpec
|
2022-02-03 12:26:08 +05:30
|
|
|
|
|
|
|
|
|
|
|
def datasetUrn(tbl: str) -> str:
|
|
|
|
return builder.make_dataset_urn("postgres", tbl)
|
|
|
|
|
|
|
|
|
|
|
|
def fldUrn(tbl: str, fld: str) -> str:
|
|
|
|
return f"urn:li:schemaField:({datasetUrn(tbl)}, {fld})"
|
|
|
|
|
|
|
|
|
|
|
|
def assertionUrn(info: AssertionInfo) -> str:
|
2022-03-04 11:51:31 -08:00
|
|
|
return "urn:li:assertion:432475190cc846f2894b5b3aa4d55af2"
|
2022-02-03 12:26:08 +05:30
|
|
|
|
|
|
|
|
2022-03-30 01:55:51 +05:30
|
|
|
def emitAssertionResult(assertionResult: AssertionRunEvent) -> None:
|
2022-03-04 11:51:31 -08:00
|
|
|
dataset_assertionRunEvent_mcp = MetadataChangeProposalWrapper(
|
|
|
|
entityUrn=assertionResult.assertionUrn,
|
2022-02-03 12:26:08 +05:30
|
|
|
aspect=assertionResult,
|
|
|
|
)
|
|
|
|
|
2022-07-06 20:39:27 +10:00
|
|
|
# Emit BatchAssertion Result! (timeseries aspect)
|
2022-03-04 11:51:31 -08:00
|
|
|
emitter.emit_mcp(dataset_assertionRunEvent_mcp)
|
2022-02-03 12:26:08 +05:30
|
|
|
|
|
|
|
|
2022-03-30 01:55:51 +05:30
|
|
|
# Create an emitter to the GMS REST API.
|
|
|
|
emitter = DatahubRestEmitter("http://localhost:8080")
|
|
|
|
|
|
|
|
datasetProperties = DatasetProperties(
|
|
|
|
name="bazTable",
|
|
|
|
)
|
|
|
|
# Construct a MetadataChangeProposalWrapper object for dataset
|
|
|
|
dataset_mcp = MetadataChangeProposalWrapper(
|
|
|
|
entityUrn=datasetUrn("bazTable"),
|
|
|
|
aspect=datasetProperties,
|
|
|
|
)
|
|
|
|
|
|
|
|
# Emit Dataset entity properties aspect! (Skip if dataset is already present)
|
|
|
|
emitter.emit_mcp(dataset_mcp)
|
|
|
|
|
2022-02-03 12:26:08 +05:30
|
|
|
# Construct an assertion object.
|
|
|
|
assertion_maxVal = AssertionInfo(
|
2022-03-04 11:51:31 -08:00
|
|
|
type=AssertionType.DATASET,
|
|
|
|
datasetAssertion=DatasetAssertionInfo(
|
|
|
|
scope=DatasetAssertionScope.DATASET_COLUMN,
|
2022-03-30 01:55:51 +05:30
|
|
|
operator=AssertionStdOperator.BETWEEN,
|
|
|
|
nativeType="expect_column_max_to_be_between",
|
|
|
|
aggregation=AssertionStdAggregation.MAX,
|
2022-03-04 11:51:31 -08:00
|
|
|
fields=[fldUrn("bazTable", "col1")],
|
|
|
|
dataset=datasetUrn("bazTable"),
|
2022-03-30 01:55:51 +05:30
|
|
|
nativeParameters={"max_value": "99", "min_value": "89"},
|
|
|
|
parameters=AssertionStdParameters(
|
|
|
|
minValue=AssertionStdParameter(
|
|
|
|
type=AssertionStdParameterType.NUMBER, value="89"
|
|
|
|
),
|
|
|
|
maxValue=AssertionStdParameter(
|
|
|
|
type=AssertionStdParameterType.NUMBER, value="99"
|
|
|
|
),
|
|
|
|
),
|
2022-02-03 12:26:08 +05:30
|
|
|
),
|
|
|
|
customProperties={"suite_name": "demo_suite"},
|
|
|
|
)
|
|
|
|
|
|
|
|
# Construct a MetadataChangeProposalWrapper object.
|
|
|
|
assertion_maxVal_mcp = MetadataChangeProposalWrapper(
|
|
|
|
entityUrn=assertionUrn(assertion_maxVal),
|
|
|
|
aspect=assertion_maxVal,
|
|
|
|
)
|
|
|
|
|
2022-03-30 01:55:51 +05:30
|
|
|
# Emit Assertion entity info aspect!
|
2022-02-03 12:26:08 +05:30
|
|
|
emitter.emit_mcp(assertion_maxVal_mcp)
|
|
|
|
|
2022-03-30 01:55:51 +05:30
|
|
|
# Construct an assertion platform object.
|
|
|
|
assertion_dataPlatformInstance = DataPlatformInstance(
|
|
|
|
platform=builder.make_data_platform_urn("great-expectations")
|
|
|
|
)
|
|
|
|
|
|
|
|
# Construct a MetadataChangeProposalWrapper object for assertion platform
|
|
|
|
assertion_dataPlatformInstance_mcp = MetadataChangeProposalWrapper(
|
|
|
|
entityUrn=assertionUrn(assertion_maxVal),
|
|
|
|
aspect=assertion_dataPlatformInstance,
|
|
|
|
)
|
|
|
|
# Emit Assertion entity platform aspect!
|
|
|
|
emitter.emit(assertion_dataPlatformInstance_mcp)
|
|
|
|
|
|
|
|
|
2022-02-03 12:26:08 +05:30
|
|
|
# Construct batch assertion result object for partition 1 batch
|
2022-03-04 11:51:31 -08:00
|
|
|
assertionResult_maxVal_batch_partition1 = AssertionRunEvent(
|
2022-02-03 12:26:08 +05:30
|
|
|
timestampMillis=int(time.time() * 1000),
|
|
|
|
assertionUrn=assertionUrn(assertion_maxVal),
|
2022-03-04 11:51:31 -08:00
|
|
|
asserteeUrn=datasetUrn("bazTable"),
|
2022-03-30 01:55:51 +05:30
|
|
|
partitionSpec=PartitionSpec(partition=json.dumps([{"country": "IN"}])),
|
2022-03-04 11:51:31 -08:00
|
|
|
runId="uuid1",
|
|
|
|
status=AssertionRunStatus.COMPLETE,
|
|
|
|
result=AssertionResult(
|
|
|
|
type=AssertionResultType.SUCCESS,
|
2022-02-03 12:26:08 +05:30
|
|
|
externalUrl="http://example.com/uuid1",
|
|
|
|
actualAggValue=90,
|
|
|
|
),
|
|
|
|
)
|
|
|
|
|
|
|
|
emitAssertionResult(
|
|
|
|
assertionResult_maxVal_batch_partition1,
|
|
|
|
)
|
|
|
|
|
|
|
|
# Construct batch assertion result object for partition 2 batch
|
2022-03-30 01:55:51 +05:30
|
|
|
assertionResult_maxVal_batch_partition2 = AssertionRunEvent(
|
2022-02-03 12:26:08 +05:30
|
|
|
timestampMillis=int(time.time() * 1000),
|
|
|
|
assertionUrn=assertionUrn(assertion_maxVal),
|
2022-03-04 11:51:31 -08:00
|
|
|
asserteeUrn=datasetUrn("bazTable"),
|
2022-03-30 01:55:51 +05:30
|
|
|
partitionSpec=PartitionSpec(partition=json.dumps([{"country": "US"}])),
|
2022-03-04 11:51:31 -08:00
|
|
|
runId="uuid1",
|
|
|
|
status=AssertionRunStatus.COMPLETE,
|
|
|
|
result=AssertionResult(
|
|
|
|
type=AssertionResultType.FAILURE,
|
2022-02-03 12:26:08 +05:30
|
|
|
externalUrl="http://example.com/uuid1",
|
|
|
|
actualAggValue=101,
|
|
|
|
),
|
|
|
|
)
|
|
|
|
|
|
|
|
emitAssertionResult(
|
|
|
|
assertionResult_maxVal_batch_partition2,
|
|
|
|
)
|
|
|
|
|
|
|
|
# Construct batch assertion result object for full table batch.
|
2022-03-30 01:55:51 +05:30
|
|
|
assertionResult_maxVal_batch_fulltable = AssertionRunEvent(
|
2022-02-03 12:26:08 +05:30
|
|
|
timestampMillis=int(time.time() * 1000),
|
|
|
|
assertionUrn=assertionUrn(assertion_maxVal),
|
2022-03-04 11:51:31 -08:00
|
|
|
asserteeUrn=datasetUrn("bazTable"),
|
|
|
|
runId="uuid1",
|
|
|
|
status=AssertionRunStatus.COMPLETE,
|
|
|
|
result=AssertionResult(
|
|
|
|
type=AssertionResultType.SUCCESS,
|
2022-02-03 12:26:08 +05:30
|
|
|
externalUrl="http://example.com/uuid1",
|
|
|
|
actualAggValue=93,
|
|
|
|
),
|
|
|
|
)
|
|
|
|
|
|
|
|
emitAssertionResult(
|
|
|
|
assertionResult_maxVal_batch_fulltable,
|
|
|
|
)
|