datahub/metadata-ingestion/examples/library/ermodelrelationship_create_basic.py

148 lines
4.3 KiB
Python

# metadata-ingestion/examples/library/ermodelrelationship_create_basic.py
import os
import time
from datahub.emitter.mce_builder import make_data_platform_urn, make_dataset_urn
from datahub.emitter.mcp import MetadataChangeProposalWrapper
from datahub.emitter.rest_emitter import DatahubRestEmitter
from datahub.metadata.schema_classes import (
AuditStampClass,
ERModelRelationshipCardinalityClass,
ERModelRelationshipKeyClass,
ERModelRelationshipPropertiesClass,
NumberTypeClass,
OtherSchemaClass,
RelationshipFieldMappingClass,
SchemaFieldClass,
SchemaFieldDataTypeClass,
SchemaMetadataClass,
StringTypeClass,
)
GMS_ENDPOINT = os.getenv("DATAHUB_GMS_URL", "http://localhost:8080")
GMS_TOKEN = os.getenv("DATAHUB_GMS_TOKEN")
PLATFORM = "mysql"
ENV = "PROD"
emitter = DatahubRestEmitter(gms_server=GMS_ENDPOINT, token=GMS_TOKEN)
def create_dataset_with_schema(
dataset_name: str, fields: list[SchemaFieldClass]
) -> str:
"""Helper function to create a dataset with schema."""
dataset_urn = make_dataset_urn(PLATFORM, dataset_name, ENV)
schema_metadata = SchemaMetadataClass(
schemaName=dataset_name,
platform=make_data_platform_urn(PLATFORM),
fields=fields,
version=0,
hash="",
platformSchema=OtherSchemaClass(rawSchema=""),
)
emitter.emit_mcp(
MetadataChangeProposalWrapper(
entityUrn=dataset_urn,
aspect=schema_metadata,
)
)
return dataset_urn
def create_schema_field(
field_path: str, native_type: str, data_type: SchemaFieldDataTypeClass
) -> SchemaFieldClass:
"""Helper function to create a schema field."""
return SchemaFieldClass(
fieldPath=field_path,
type=data_type,
nativeDataType=native_type,
description=f"Field: {field_path}",
lastModified=AuditStampClass(
time=int(time.time() * 1000),
actor="urn:li:corpuser:datahub",
),
)
# Create Employee table
employee_fields = [
create_schema_field("id", "int", SchemaFieldDataTypeClass(type=NumberTypeClass())),
create_schema_field(
"name", "varchar(100)", SchemaFieldDataTypeClass(type=StringTypeClass())
),
create_schema_field(
"email", "varchar(255)", SchemaFieldDataTypeClass(type=StringTypeClass())
),
create_schema_field(
"company_id", "int", SchemaFieldDataTypeClass(type=NumberTypeClass())
),
]
employee_urn = create_dataset_with_schema("Employee", employee_fields)
print(f"Created Employee dataset: {employee_urn}")
# Create Company table
company_fields = [
create_schema_field("id", "int", SchemaFieldDataTypeClass(type=NumberTypeClass())),
create_schema_field(
"name", "varchar(200)", SchemaFieldDataTypeClass(type=StringTypeClass())
),
create_schema_field(
"industry", "varchar(100)", SchemaFieldDataTypeClass(type=StringTypeClass())
),
]
company_urn = create_dataset_with_schema("Company", company_fields)
print(f"Created Company dataset: {company_urn}")
# Create ER Model Relationship
relationship_id = "employee_to_company"
relationship_urn = f"urn:li:erModelRelationship:{relationship_id}"
# Emit the key aspect
relationship_key = ERModelRelationshipKeyClass(id=relationship_id)
emitter.emit_mcp(
MetadataChangeProposalWrapper(
entityUrn=relationship_urn,
aspect=relationship_key,
)
)
# Emit the properties aspect
relationship_properties = ERModelRelationshipPropertiesClass(
name="Employee to Company Relationship",
source=employee_urn,
destination=company_urn,
relationshipFieldMappings=[
RelationshipFieldMappingClass(
sourceField="company_id",
destinationField="id",
)
],
cardinality=ERModelRelationshipCardinalityClass.N_ONE,
customProperties={
"constraint_type": "FOREIGN_KEY",
"on_delete": "CASCADE",
"on_update": "CASCADE",
},
created=AuditStampClass(
time=int(time.time() * 1000),
actor="urn:li:corpuser:datahub",
),
)
emitter.emit_mcp(
MetadataChangeProposalWrapper(
entityUrn=relationship_urn,
aspect=relationship_properties,
)
)
print(f"Created ER Model Relationship: {relationship_urn}")
print(
"This N:1 relationship connects Employee.company_id to Company.id, "
"representing that many employees belong to one company."
)