datahub/metadata-ingestion/examples/library/assertion_create_sql_metric.py

57 lines
1.7 KiB
Python

# metadata-ingestion/examples/library/assertion_sql_metric.py
import os
import datahub.emitter.mce_builder as builder
from datahub.emitter.mcp import MetadataChangeProposalWrapper
from datahub.emitter.rest_emitter import DatahubRestEmitter
from datahub.metadata.schema_classes import (
AssertionInfoClass,
AssertionStdOperatorClass,
AssertionStdParameterClass,
AssertionStdParametersClass,
AssertionStdParameterTypeClass,
AssertionTypeClass,
SqlAssertionInfoClass,
SqlAssertionTypeClass,
)
emitter = DatahubRestEmitter(
gms_server=os.getenv("DATAHUB_GMS_URL", "http://localhost:8080"),
token=os.getenv("DATAHUB_GMS_TOKEN"),
)
dataset_urn = builder.make_dataset_urn(platform="postgres", name="public.transactions")
sql_assertion_info = SqlAssertionInfoClass(
type=SqlAssertionTypeClass.METRIC,
entity=dataset_urn,
statement="SELECT SUM(amount) FROM public.transactions WHERE status = 'completed' AND date = CURRENT_DATE",
operator=AssertionStdOperatorClass.GREATER_THAN_OR_EQUAL_TO,
parameters=AssertionStdParametersClass(
value=AssertionStdParameterClass(
type=AssertionStdParameterTypeClass.NUMBER,
value="0",
)
),
)
assertion_info = AssertionInfoClass(
type=AssertionTypeClass.SQL,
sqlAssertion=sql_assertion_info,
description="Total completed transaction amount today must be non-negative",
)
assertion_urn = builder.make_assertion_urn(
builder.datahub_guid(
{"entity": dataset_urn, "type": "sql-completed-transactions-sum"}
)
)
assertion_info_mcp = MetadataChangeProposalWrapper(
entityUrn=assertion_urn,
aspect=assertion_info,
)
emitter.emit_mcp(assertion_info_mcp)
print(f"Created SQL assertion: {assertion_urn}")