mirror of
https://github.com/datahub-project/datahub.git
synced 2025-06-27 05:03:31 +00:00
135 lines
4.0 KiB
Python
135 lines
4.0 KiB
Python
import time
|
|
|
|
from datahub.emitter.mce_builder import make_data_platform_urn, make_dataset_urn
|
|
from datahub.emitter.mcp import MetadataChangeProposalWrapper
|
|
from datahub.emitter.rest_emitter import DatahubRestEmitter
|
|
from datahub.metadata.schema_classes import (
|
|
AuditStampClass,
|
|
ERModelRelationshipCardinalityClass,
|
|
ERModelRelationshipKeyClass,
|
|
ERModelRelationshipPropertiesClass,
|
|
NumberTypeClass,
|
|
OtherSchemaClass,
|
|
RelationshipFieldMappingClass,
|
|
SchemaFieldClass,
|
|
SchemaFieldDataTypeClass,
|
|
SchemaMetadataClass,
|
|
StringTypeClass,
|
|
)
|
|
|
|
# Configuration
|
|
GMS_ENDPOINT = "http://localhost:8080"
|
|
PLATFORM = "mysql"
|
|
ENV = "PROD"
|
|
|
|
e = DatahubRestEmitter(gms_server=GMS_ENDPOINT, extra_headers={})
|
|
|
|
|
|
def get_schema_field(
|
|
name: str, dtype: str, type: SchemaFieldDataTypeClass
|
|
) -> SchemaFieldClass:
|
|
"""Creates a schema field for MySQL columns."""
|
|
|
|
field = SchemaFieldClass(
|
|
fieldPath=name,
|
|
type=type,
|
|
nativeDataType=dtype,
|
|
description=name,
|
|
lastModified=AuditStampClass(
|
|
time=int(time.time() * 1000),
|
|
actor="urn:li:corpuser:ingestion",
|
|
),
|
|
)
|
|
if name == "id":
|
|
field.isPartitioningKey = True
|
|
return field
|
|
|
|
|
|
# Define Employee Table
|
|
dataset_employee = make_dataset_urn(PLATFORM, "Employee", ENV)
|
|
employee_fields = [
|
|
get_schema_field("id", "int", SchemaFieldDataTypeClass(type=NumberTypeClass())),
|
|
get_schema_field(
|
|
"name", "varchar", SchemaFieldDataTypeClass(type=StringTypeClass())
|
|
),
|
|
get_schema_field("age", "int", SchemaFieldDataTypeClass(type=NumberTypeClass())),
|
|
get_schema_field(
|
|
"company_id", "int", SchemaFieldDataTypeClass(type=NumberTypeClass())
|
|
),
|
|
]
|
|
|
|
e.emit_mcp(
|
|
MetadataChangeProposalWrapper(
|
|
entityUrn=dataset_employee,
|
|
aspect=SchemaMetadataClass(
|
|
schemaName="Employee",
|
|
platform=make_data_platform_urn(PLATFORM),
|
|
fields=employee_fields,
|
|
version=0,
|
|
hash="",
|
|
platformSchema=OtherSchemaClass(rawSchema=""),
|
|
),
|
|
)
|
|
)
|
|
|
|
# Define Company Table
|
|
dataset_company = make_dataset_urn(PLATFORM, "Company", ENV)
|
|
company_fields = [
|
|
get_schema_field("id", "int", SchemaFieldDataTypeClass(type=NumberTypeClass())),
|
|
get_schema_field(
|
|
"name", "varchar", SchemaFieldDataTypeClass(type=StringTypeClass())
|
|
),
|
|
]
|
|
|
|
e.emit_mcp(
|
|
MetadataChangeProposalWrapper(
|
|
entityUrn=dataset_company,
|
|
aspect=SchemaMetadataClass(
|
|
schemaName="Company",
|
|
platform=make_data_platform_urn(PLATFORM),
|
|
fields=company_fields,
|
|
version=0,
|
|
hash="",
|
|
platformSchema=OtherSchemaClass(rawSchema=""),
|
|
),
|
|
)
|
|
)
|
|
|
|
# Establish Relationship (Foreign Key: Employee.company_id → Company.id)
|
|
relationship_key = ERModelRelationshipKeyClass(id="employee_to_company")
|
|
relationship_properties = ERModelRelationshipPropertiesClass(
|
|
name="Employee to Company Relationship",
|
|
source=dataset_employee,
|
|
destination=dataset_company,
|
|
relationshipFieldMappings=[
|
|
RelationshipFieldMappingClass(sourceField="company_id", destinationField="id")
|
|
],
|
|
cardinality=ERModelRelationshipCardinalityClass.N_ONE,
|
|
customProperties={"constraint": "Foreign Key", "index": "company_id"},
|
|
)
|
|
|
|
relationship_urn = f"urn:li:erModelRelationship:{relationship_key.id}"
|
|
|
|
e.emit_mcp(
|
|
MetadataChangeProposalWrapper(
|
|
entityType="erModelRelationship",
|
|
changeType="UPSERT",
|
|
entityKeyAspect=relationship_key,
|
|
aspectName=relationship_key.ASPECT_NAME,
|
|
aspect=relationship_key,
|
|
)
|
|
)
|
|
|
|
e.emit_mcp(
|
|
MetadataChangeProposalWrapper(
|
|
entityUrn=relationship_urn,
|
|
entityType="erModelRelationship",
|
|
changeType="UPSERT",
|
|
aspectName=relationship_properties.ASPECT_NAME,
|
|
aspect=relationship_properties,
|
|
)
|
|
)
|
|
|
|
print("relationship_urn", relationship_urn)
|
|
print("Employee and Company tables created with ERModelRelationship linking them.")
|