mirror of
https://github.com/datahub-project/datahub.git
synced 2025-11-12 09:23:52 +00:00
60 lines
2.0 KiB
Python
60 lines
2.0 KiB
Python
# metadata-ingestion/examples/library/datacontract_create_basic.py
|
|
import logging
|
|
import os
|
|
|
|
from datahub.emitter.mce_builder import make_assertion_urn, make_dataset_urn
|
|
from datahub.emitter.mcp import MetadataChangeProposalWrapper
|
|
from datahub.emitter.rest_emitter import DatahubRestEmitter
|
|
from datahub.metadata.schema_classes import (
|
|
DataContractPropertiesClass,
|
|
DataContractStateClass,
|
|
DataContractStatusClass,
|
|
DataQualityContractClass,
|
|
FreshnessContractClass,
|
|
SchemaContractClass,
|
|
StatusClass,
|
|
)
|
|
|
|
log = logging.getLogger(__name__)
|
|
logging.basicConfig(level=logging.INFO)
|
|
|
|
dataset_urn = make_dataset_urn(platform="snowflake", name="purchases", env="PROD")
|
|
|
|
schema_assertion_urn = make_assertion_urn("schema-assertion-for-purchases")
|
|
freshness_assertion_urn = make_assertion_urn("freshness-assertion-for-purchases")
|
|
quality_assertion_urn_1 = make_assertion_urn("quality-assertion-for-purchases-1")
|
|
quality_assertion_urn_2 = make_assertion_urn("quality-assertion-for-purchases-2")
|
|
|
|
contract_urn = "urn:li:dataContract:purchases-contract"
|
|
|
|
properties_aspect = DataContractPropertiesClass(
|
|
entity=dataset_urn,
|
|
schema=[SchemaContractClass(assertion=schema_assertion_urn)],
|
|
freshness=[FreshnessContractClass(assertion=freshness_assertion_urn)],
|
|
dataQuality=[
|
|
DataQualityContractClass(assertion=quality_assertion_urn_1),
|
|
DataQualityContractClass(assertion=quality_assertion_urn_2),
|
|
],
|
|
)
|
|
|
|
status_aspect = StatusClass(removed=False)
|
|
|
|
contract_status_aspect = DataContractStatusClass(state=DataContractStateClass.ACTIVE)
|
|
|
|
rest_emitter = DatahubRestEmitter(
|
|
gms_server=os.getenv("DATAHUB_GMS_URL", "http://localhost:8080"),
|
|
token=os.getenv("DATAHUB_GMS_TOKEN"),
|
|
)
|
|
|
|
for event in MetadataChangeProposalWrapper.construct_many(
|
|
entityUrn=contract_urn,
|
|
aspects=[
|
|
properties_aspect,
|
|
status_aspect,
|
|
contract_status_aspect,
|
|
],
|
|
):
|
|
rest_emitter.emit(event)
|
|
|
|
log.info(f"Created data contract {contract_urn}")
|