datahub/metadata-ingestion/examples/library/data_quality_mcpw_rest.py

133 lines
4.1 KiB
Python
Raw Normal View History

import time
import datahub.emitter.mce_builder as builder
from datahub.emitter.mcp import MetadataChangeProposalWrapper
from datahub.emitter.rest_emitter import DatahubRestEmitter
from datahub.metadata.com.linkedin.pegasus2avro.assertion import (
AssertionInfo,
AssertionResult,
AssertionResultType,
AssertionRunEvent,
AssertionRunStatus,
AssertionStdAggregation,
AssertionStdOperator,
AssertionType,
DatasetAssertionInfo,
DatasetAssertionScope,
)
from datahub.metadata.com.linkedin.pegasus2avro.events.metadata import ChangeType
from datahub.metadata.schema_classes import AssertionRunEventClass, PartitionSpecClass
def datasetUrn(tbl: str) -> str:
return builder.make_dataset_urn("postgres", tbl)
def fldUrn(tbl: str, fld: str) -> str:
return f"urn:li:schemaField:({datasetUrn(tbl)}, {fld})"
def assertionUrn(info: AssertionInfo) -> str:
return "urn:li:assertion:432475190cc846f2894b5b3aa4d55af2"
def emitAssertionResult(assertionResult: AssertionResult) -> None:
dataset_assertionRunEvent_mcp = MetadataChangeProposalWrapper(
entityType="assertion",
changeType=ChangeType.UPSERT,
entityUrn=assertionResult.assertionUrn,
aspectName="assertionRunEvent",
aspect=assertionResult,
)
# Emit BatchAssertion Result! (timseries aspect)
emitter.emit_mcp(dataset_assertionRunEvent_mcp)
# Construct an assertion object.
assertion_maxVal = AssertionInfo(
type=AssertionType.DATASET,
datasetAssertion=DatasetAssertionInfo(
scope=DatasetAssertionScope.DATASET_COLUMN,
operator=AssertionStdOperator.LESS_THAN,
nativeType="column_value_is_less_than",
aggregation=AssertionStdAggregation.IDENTITY,
fields=[fldUrn("bazTable", "col1")],
dataset=datasetUrn("bazTable"),
nativeParameters={"max_value": "99"},
),
customProperties={"suite_name": "demo_suite"},
)
# Construct a MetadataChangeProposalWrapper object.
assertion_maxVal_mcp = MetadataChangeProposalWrapper(
entityType="assertion",
changeType=ChangeType.UPSERT,
entityUrn=assertionUrn(assertion_maxVal),
aspectName="assertionInfo",
aspect=assertion_maxVal,
)
# Create an emitter to the GMS REST API.
emitter = DatahubRestEmitter("http://localhost:8080")
# Emit Assertion entity info object!
emitter.emit_mcp(assertion_maxVal_mcp)
# Construct batch assertion result object for partition 1 batch
assertionResult_maxVal_batch_partition1 = AssertionRunEvent(
timestampMillis=int(time.time() * 1000),
assertionUrn=assertionUrn(assertion_maxVal),
asserteeUrn=datasetUrn("bazTable"),
partitionSpec=PartitionSpecClass(partition=str([{"country": "IN"}])),
runId="uuid1",
status=AssertionRunStatus.COMPLETE,
result=AssertionResult(
type=AssertionResultType.SUCCESS,
externalUrl="http://example.com/uuid1",
actualAggValue=90,
),
)
emitAssertionResult(
assertionResult_maxVal_batch_partition1,
)
# Construct batch assertion result object for partition 2 batch
assertionResult_maxVal_batch_partition2 = AssertionRunEventClass(
timestampMillis=int(time.time() * 1000),
assertionUrn=assertionUrn(assertion_maxVal),
asserteeUrn=datasetUrn("bazTable"),
partitionSpec=PartitionSpecClass(partition=str([{"country": "US"}])),
runId="uuid1",
status=AssertionRunStatus.COMPLETE,
result=AssertionResult(
type=AssertionResultType.FAILURE,
externalUrl="http://example.com/uuid1",
actualAggValue=101,
),
)
emitAssertionResult(
assertionResult_maxVal_batch_partition2,
)
# Construct batch assertion result object for full table batch.
assertionResult_maxVal_batch_fulltable = AssertionRunEventClass(
timestampMillis=int(time.time() * 1000),
assertionUrn=assertionUrn(assertion_maxVal),
asserteeUrn=datasetUrn("bazTable"),
runId="uuid1",
status=AssertionRunStatus.COMPLETE,
result=AssertionResult(
type=AssertionResultType.SUCCESS,
externalUrl="http://example.com/uuid1",
actualAggValue=93,
),
)
emitAssertionResult(
assertionResult_maxVal_batch_fulltable,
)