datahub/metadata-ingestion/examples/library/datacontract_create_basic.py

60 lines
2.0 KiB
Python

# metadata-ingestion/examples/library/datacontract_create_basic.py
import logging
import os
from datahub.emitter.mce_builder import make_assertion_urn, make_dataset_urn
from datahub.emitter.mcp import MetadataChangeProposalWrapper
from datahub.emitter.rest_emitter import DatahubRestEmitter
from datahub.metadata.schema_classes import (
DataContractPropertiesClass,
DataContractStateClass,
DataContractStatusClass,
DataQualityContractClass,
FreshnessContractClass,
SchemaContractClass,
StatusClass,
)
log = logging.getLogger(__name__)
logging.basicConfig(level=logging.INFO)
dataset_urn = make_dataset_urn(platform="snowflake", name="purchases", env="PROD")
schema_assertion_urn = make_assertion_urn("schema-assertion-for-purchases")
freshness_assertion_urn = make_assertion_urn("freshness-assertion-for-purchases")
quality_assertion_urn_1 = make_assertion_urn("quality-assertion-for-purchases-1")
quality_assertion_urn_2 = make_assertion_urn("quality-assertion-for-purchases-2")
contract_urn = "urn:li:dataContract:purchases-contract"
properties_aspect = DataContractPropertiesClass(
entity=dataset_urn,
schema=[SchemaContractClass(assertion=schema_assertion_urn)],
freshness=[FreshnessContractClass(assertion=freshness_assertion_urn)],
dataQuality=[
DataQualityContractClass(assertion=quality_assertion_urn_1),
DataQualityContractClass(assertion=quality_assertion_urn_2),
],
)
status_aspect = StatusClass(removed=False)
contract_status_aspect = DataContractStatusClass(state=DataContractStateClass.ACTIVE)
rest_emitter = DatahubRestEmitter(
gms_server=os.getenv("DATAHUB_GMS_URL", "http://localhost:8080"),
token=os.getenv("DATAHUB_GMS_TOKEN"),
)
for event in MetadataChangeProposalWrapper.construct_many(
entityUrn=contract_urn,
aspects=[
properties_aspect,
status_aspect,
contract_status_aspect,
],
):
rest_emitter.emit(event)
log.info(f"Created data contract {contract_urn}")