datahub/metadata-ingestion/examples/library/data_quality_mcpw_rest.py

165 lines
5.2 KiB
Python

import json
import time
import datahub.emitter.mce_builder as builder
from datahub.emitter.mcp import MetadataChangeProposalWrapper
from datahub.emitter.rest_emitter import DatahubRestEmitter
from datahub.metadata.com.linkedin.pegasus2avro.assertion import (
AssertionInfo,
AssertionResult,
AssertionResultType,
AssertionRunEvent,
AssertionRunStatus,
AssertionStdAggregation,
AssertionStdOperator,
AssertionStdParameter,
AssertionStdParameters,
AssertionStdParameterType,
AssertionType,
DatasetAssertionInfo,
DatasetAssertionScope,
)
from datahub.metadata.com.linkedin.pegasus2avro.common import DataPlatformInstance
from datahub.metadata.com.linkedin.pegasus2avro.dataset import DatasetProperties
from datahub.metadata.com.linkedin.pegasus2avro.timeseries import PartitionSpec
def datasetUrn(tbl: str) -> str:
return builder.make_dataset_urn("postgres", tbl)
def fldUrn(tbl: str, fld: str) -> str:
return f"urn:li:schemaField:({datasetUrn(tbl)}, {fld})"
def assertionUrn(info: AssertionInfo) -> str:
return "urn:li:assertion:432475190cc846f2894b5b3aa4d55af2"
def emitAssertionResult(assertionResult: AssertionRunEvent) -> None:
dataset_assertionRunEvent_mcp = MetadataChangeProposalWrapper(
entityUrn=assertionResult.assertionUrn,
aspect=assertionResult,
)
# Emit BatchAssertion Result! (timeseries aspect)
emitter.emit_mcp(dataset_assertionRunEvent_mcp)
# Create an emitter to the GMS REST API.
emitter = DatahubRestEmitter("http://localhost:8080")
datasetProperties = DatasetProperties(
name="bazTable",
)
# Construct a MetadataChangeProposalWrapper object for dataset
dataset_mcp = MetadataChangeProposalWrapper(
entityUrn=datasetUrn("bazTable"),
aspect=datasetProperties,
)
# Emit Dataset entity properties aspect! (Skip if dataset is already present)
emitter.emit_mcp(dataset_mcp)
# Construct an assertion object.
assertion_maxVal = AssertionInfo(
type=AssertionType.DATASET,
datasetAssertion=DatasetAssertionInfo(
scope=DatasetAssertionScope.DATASET_COLUMN,
operator=AssertionStdOperator.BETWEEN,
nativeType="expect_column_max_to_be_between",
aggregation=AssertionStdAggregation.MAX,
fields=[fldUrn("bazTable", "col1")],
dataset=datasetUrn("bazTable"),
nativeParameters={"max_value": "99", "min_value": "89"},
parameters=AssertionStdParameters(
minValue=AssertionStdParameter(
type=AssertionStdParameterType.NUMBER, value="89"
),
maxValue=AssertionStdParameter(
type=AssertionStdParameterType.NUMBER, value="99"
),
),
),
customProperties={"suite_name": "demo_suite"},
)
# Construct a MetadataChangeProposalWrapper object.
assertion_maxVal_mcp = MetadataChangeProposalWrapper(
entityUrn=assertionUrn(assertion_maxVal),
aspect=assertion_maxVal,
)
# Emit Assertion entity info aspect!
emitter.emit_mcp(assertion_maxVal_mcp)
# Construct an assertion platform object.
assertion_dataPlatformInstance = DataPlatformInstance(
platform=builder.make_data_platform_urn("great-expectations")
)
# Construct a MetadataChangeProposalWrapper object for assertion platform
assertion_dataPlatformInstance_mcp = MetadataChangeProposalWrapper(
entityUrn=assertionUrn(assertion_maxVal),
aspect=assertion_dataPlatformInstance,
)
# Emit Assertion entity platform aspect!
emitter.emit(assertion_dataPlatformInstance_mcp)
# Construct batch assertion result object for partition 1 batch
assertionResult_maxVal_batch_partition1 = AssertionRunEvent(
timestampMillis=int(time.time() * 1000),
assertionUrn=assertionUrn(assertion_maxVal),
asserteeUrn=datasetUrn("bazTable"),
partitionSpec=PartitionSpec(partition=json.dumps([{"country": "IN"}])),
runId="uuid1",
status=AssertionRunStatus.COMPLETE,
result=AssertionResult(
type=AssertionResultType.SUCCESS,
externalUrl="http://example.com/uuid1",
actualAggValue=90,
),
)
emitAssertionResult(
assertionResult_maxVal_batch_partition1,
)
# Construct batch assertion result object for partition 2 batch
assertionResult_maxVal_batch_partition2 = AssertionRunEvent(
timestampMillis=int(time.time() * 1000),
assertionUrn=assertionUrn(assertion_maxVal),
asserteeUrn=datasetUrn("bazTable"),
partitionSpec=PartitionSpec(partition=json.dumps([{"country": "US"}])),
runId="uuid1",
status=AssertionRunStatus.COMPLETE,
result=AssertionResult(
type=AssertionResultType.FAILURE,
externalUrl="http://example.com/uuid1",
actualAggValue=101,
),
)
emitAssertionResult(
assertionResult_maxVal_batch_partition2,
)
# Construct batch assertion result object for full table batch.
assertionResult_maxVal_batch_fulltable = AssertionRunEvent(
timestampMillis=int(time.time() * 1000),
assertionUrn=assertionUrn(assertion_maxVal),
asserteeUrn=datasetUrn("bazTable"),
runId="uuid1",
status=AssertionRunStatus.COMPLETE,
result=AssertionResult(
type=AssertionResultType.SUCCESS,
externalUrl="http://example.com/uuid1",
actualAggValue=93,
),
)
emitAssertionResult(
assertionResult_maxVal_batch_fulltable,
)