2022-02-18 09:45:45 -08:00
import glob
2021-12-05 12:22:17 -08:00
import json
import logging
2022-12-30 14:02:57 -05:00
import os
2022-02-18 09:45:45 -08:00
import re
2022-12-30 14:02:57 -05:00
import shutil
2021-12-05 12:22:17 -08:00
import unittest . mock
from dataclasses import Field , dataclass , field
2025-07-01 20:51:18 +05:30
from datetime import datetime , timezone
2022-09-14 09:57:42 -07:00
from enum import auto
2021-12-05 12:22:17 -08:00
from pathlib import Path
2024-07-17 11:25:15 -07:00
from typing import Any , Dict , Iterable , List , Optional , Tuple
2021-12-05 12:22:17 -08:00
import avro . schema
import click
2025-07-01 20:51:18 +05:30
from utils import should_write_json_file
2021-12-05 12:22:17 -08:00
2024-07-17 11:25:15 -07:00
from datahub . configuration . common import ConfigEnum , PermissiveConfigModel
2024-11-21 00:19:41 -08:00
from datahub . emitter . mce_builder import (
make_data_platform_urn ,
make_dataset_urn ,
make_schema_field_urn ,
)
2021-12-05 12:22:17 -08:00
from datahub . emitter . mcp import MetadataChangeProposalWrapper
from datahub . emitter . rest_emitter import DatahubRestEmitter
from datahub . ingestion . api . common import PipelineContext , RecordEnvelope
from datahub . ingestion . api . sink import NoopWriteCallback
from datahub . ingestion . extractor . schema_util import avro_schema_to_mce_fields
from datahub . ingestion . sink . file import FileSink , FileSinkConfig
from datahub . metadata . schema_classes import (
2024-07-17 11:25:15 -07:00
BrowsePathEntryClass ,
2021-12-05 12:22:17 -08:00
BrowsePathsClass ,
2024-07-17 11:25:15 -07:00
BrowsePathsV2Class ,
2021-12-05 12:22:17 -08:00
DatasetPropertiesClass ,
ForeignKeyConstraintClass ,
GlobalTagsClass ,
OtherSchemaClass ,
2022-09-05 21:09:07 -07:00
SchemaFieldClass as SchemaField ,
2021-12-05 12:22:17 -08:00
SchemaFieldDataTypeClass ,
SchemaMetadataClass ,
StringTypeClass ,
2025-10-23 17:43:35 -07:00
StructuredPropertiesClass ,
StructuredPropertyDefinitionClass ,
StructuredPropertySettingsClass ,
StructuredPropertyValueAssignmentClass ,
2021-12-05 12:22:17 -08:00
SubTypesClass ,
TagAssociationClass ,
)
2025-10-23 17:43:35 -07:00
from datahub . metadata . urns import StructuredPropertyUrn , Urn
2021-12-05 12:22:17 -08:00
logger = logging . getLogger ( __name__ )
2022-03-29 18:32:04 -07:00
# TODO: Support generating docs for each event type in entity registry.
2022-05-26 16:24:54 +05:30
2021-12-05 12:22:17 -08:00
def capitalize_first ( something : str ) - > str :
return something [ 0 : 1 ] . upper ( ) + something [ 1 : ]
2022-09-14 09:57:42 -07:00
class EntityCategory ( ConfigEnum ) :
2022-02-18 09:45:45 -08:00
CORE = auto ( )
INTERNAL = auto ( )
2021-12-05 12:22:17 -08:00
@dataclass
class EntityDefinition :
name : str
keyAspect : str
aspects : List [ str ] = field ( default_factory = list )
aspect_map : Optional [ Dict [ str , Any ] ] = None
relationship_map : Optional [ Dict [ str , str ] ] = None
doc : Optional [ str ] = None
2022-02-18 09:45:45 -08:00
doc_file_contents : Optional [ str ] = None
# entities are by default in the CORE category unless specified otherwise
category : EntityCategory = EntityCategory . CORE
priority : Optional [ int ] = None
2021-12-05 12:22:17 -08:00
# schema: Optional[avro.schema.Schema] = None
# logical_schema: Optional[avro.schema.Schema] = None
# @validator("name")
# def lower_everything(cls, v: str) -> str:
# return v.lower()
@property
def display_name ( self ) :
return capitalize_first ( self . name )
@dataclass
class AspectDefinition :
name : str
EntityUrns : Optional [ List [ str ] ] = None
schema : Optional [ avro . schema . Schema ] = None
type : Optional [ str ] = None
2022-05-26 16:24:54 +05:30
2022-03-29 18:32:04 -07:00
@dataclass
class EventDefinition :
name : str
2021-12-05 12:22:17 -08:00
2022-05-26 16:24:54 +05:30
2025-07-01 20:51:18 +05:30
# New dataclasses for lineage representation
@dataclass
class LineageRelationship :
name : str
entityTypes : List [ str ]
isLineage : bool = True
@dataclass
class LineageField :
name : str
path : str
isLineage : bool = True
relationship : Optional [ LineageRelationship ] = None
@dataclass
class LineageAspect :
aspect : str
fields : List [ LineageField ]
@dataclass
class LineageEntity :
aspects : Dict [ str , LineageAspect ]
@dataclass
class LineageData :
entities : Dict [ str , LineageEntity ]
2021-12-05 12:22:17 -08:00
entity_registry : Dict [ str , EntityDefinition ] = { }
def get_aspects_from_snapshot (
snapshot_schema : avro . schema . RecordSchema ,
) - > Dict [ str , AspectDefinition ] :
union_schema : avro . schema . UnionSchema = snapshot_schema . fields [ 1 ] . type . items
aspect_map = { }
for aspect_schema in union_schema . schemas :
if " Aspect " in aspect_schema . props :
aspectDef = AspectDefinition (
schema = aspect_schema ,
name = aspect_schema . props [ " Aspect " ] . get ( " name " ) ,
)
aspect_map [ aspectDef . name ] = aspectDef
return aspect_map
aspect_registry : Dict [ str , AspectDefinition ] = { }
2022-02-18 09:45:45 -08:00
# A holder for generated docs
generated_documentation : Dict [ str , str ] = { }
2021-12-05 12:22:17 -08:00
# Patch add_name method to NOT complain about duplicate names
def add_name ( self , name_attr , space_attr , new_schema ) :
to_add = avro . schema . Name ( name_attr , space_attr , self . default_namespace )
if self . names :
self . names [ to_add . fullname ] = new_schema
return to_add
def load_schema_file ( schema_file : str ) - > None :
2025-07-01 20:51:18 +05:30
logger . debug ( f " Loading schema file: { schema_file } " )
2021-12-05 12:22:17 -08:00
with open ( schema_file ) as f :
raw_schema_text = f . read ( )
avro_schema = avro . schema . parse ( raw_schema_text )
if (
isinstance ( avro_schema , avro . schema . RecordSchema )
and " Aspect " in avro_schema . other_props
) :
# probably an aspect schema
record_schema : avro . schema . RecordSchema = avro_schema
aspect_def = record_schema . get_prop ( " Aspect " )
2022-12-30 14:02:57 -05:00
aspect_definition = AspectDefinition ( * * aspect_def )
2021-12-05 12:22:17 -08:00
aspect_definition . schema = record_schema
aspect_registry [ aspect_definition . name ] = aspect_definition
2025-07-01 20:51:18 +05:30
logger . debug ( f " Loaded aspect schema: { aspect_definition . name } " )
2021-12-05 12:22:17 -08:00
elif avro_schema . name == " MetadataChangeEvent " :
# probably an MCE schema
field : Field = avro_schema . fields [ 1 ]
assert isinstance ( field . type , avro . schema . UnionSchema )
for member_schema in field . type . schemas :
if " Entity " in member_schema . props :
entity_def = member_schema . props [ " Entity " ]
entity_name = entity_def [ " name " ]
entity_definition = entity_registry . get (
entity_name , EntityDefinition ( * * entity_def )
)
entity_definition . aspect_map = get_aspects_from_snapshot ( member_schema )
2025-07-01 20:51:18 +05:30
all_aspects = [ a for a in entity_definition . aspect_map ]
2021-12-05 12:22:17 -08:00
# in terms of order, we prefer the aspects from snapshot over the aspects from the config registry
# so we flip the aspect list here
for aspect_name in entity_definition . aspects :
if aspect_name not in all_aspects :
all_aspects . append ( aspect_name )
entity_definition . aspects = all_aspects
entity_registry [ entity_name ] = entity_definition
2025-08-07 15:29:01 +05:30
logger . debug (
f " Loaded entity schema: { entity_name } with aspects: { all_aspects } "
)
2021-12-05 12:22:17 -08:00
else :
2025-07-01 20:51:18 +05:30
logger . debug ( f " Ignoring schema { schema_file } " )
def extract_lineage_fields_from_schema (
2025-08-07 15:29:01 +05:30
schema : avro . schema . Schema , current_path : str = " "
2025-07-01 20:51:18 +05:30
) - > List [ LineageField ] :
"""
Recursively extract lineage fields from an Avro schema .
2025-08-07 15:29:01 +05:30
2025-07-01 20:51:18 +05:30
Args :
schema : The Avro schema to analyze
current_path : The current field path ( for nested fields )
2025-08-07 15:29:01 +05:30
2025-07-01 20:51:18 +05:30
Returns :
List of LineageField objects found in the schema
"""
lineage_fields = [ ]
2025-08-07 15:29:01 +05:30
2025-07-01 20:51:18 +05:30
if isinstance ( schema , avro . schema . RecordSchema ) :
logger . debug ( f " Analyzing record schema at path: { current_path } " )
for field in schema . fields :
field_path = f " { current_path } . { field . name } " if current_path else field . name
logger . debug ( f " Analyzing field: { field . name } at path: { field_path } " )
2025-08-07 15:29:01 +05:30
2025-07-01 20:51:18 +05:30
# Check if this field has lineage properties
is_lineage = False
relationship_info = None
2025-08-07 15:29:01 +05:30
2025-07-01 20:51:18 +05:30
# Check for isLineage property
2025-08-07 15:29:01 +05:30
if hasattr ( field , " other_props " ) and field . other_props :
is_lineage = field . other_props . get ( " isLineage " , False )
2025-07-01 20:51:18 +05:30
if is_lineage :
logger . debug ( f " Found isLineage=true for field: { field_path } " )
2025-08-07 15:29:01 +05:30
2025-07-01 20:51:18 +05:30
# Check for Relationship property
2025-08-07 15:29:01 +05:30
if " Relationship " in field . other_props :
relationship_data = field . other_props [ " Relationship " ]
logger . debug (
f " Found Relationship property for field: { field_path } : { relationship_data } "
)
2025-07-01 20:51:18 +05:30
# Handle both direct relationship and path-based relationship
2025-08-07 15:29:01 +05:30
if " entityTypes " in relationship_data :
2025-07-01 20:51:18 +05:30
# Direct relationship
2025-08-07 15:29:01 +05:30
relationship_is_lineage = relationship_data . get (
" isLineage " , False
)
2025-07-01 20:51:18 +05:30
relationship_info = LineageRelationship (
2025-08-07 15:29:01 +05:30
name = relationship_data . get ( " name " , " " ) ,
entityTypes = relationship_data . get ( " entityTypes " , [ ] ) ,
isLineage = relationship_is_lineage ,
2025-07-01 20:51:18 +05:30
)
is_lineage = is_lineage or relationship_is_lineage
else :
# Path-based relationship - find the actual relationship data
for _ , value in relationship_data . items ( ) :
2025-08-07 15:29:01 +05:30
if isinstance ( value , dict ) and " entityTypes " in value :
relationship_is_lineage = value . get ( " isLineage " , False )
2025-07-01 20:51:18 +05:30
relationship_info = LineageRelationship (
2025-08-07 15:29:01 +05:30
name = value . get ( " name " , " " ) ,
entityTypes = value . get ( " entityTypes " , [ ] ) ,
isLineage = relationship_is_lineage ,
2025-07-01 20:51:18 +05:30
)
is_lineage = is_lineage or relationship_is_lineage
break
2025-08-07 15:29:01 +05:30
2025-07-01 20:51:18 +05:30
# If this field is lineage, add it to the results
if is_lineage :
lineage_field = LineageField (
name = field . name ,
path = field_path ,
isLineage = True ,
2025-08-07 15:29:01 +05:30
relationship = relationship_info ,
2025-07-01 20:51:18 +05:30
)
lineage_fields . append ( lineage_field )
logger . debug ( f " Added lineage field: { field_path } " )
2025-08-07 15:29:01 +05:30
2025-07-01 20:51:18 +05:30
# Recursively check nested fields
nested_fields = extract_lineage_fields_from_schema ( field . type , field_path )
lineage_fields . extend ( nested_fields )
2025-08-07 15:29:01 +05:30
2025-07-01 20:51:18 +05:30
elif isinstance ( schema , avro . schema . ArraySchema ) :
logger . debug ( f " Analyzing array schema at path: { current_path } " )
# For arrays, check the items schema
nested_fields = extract_lineage_fields_from_schema ( schema . items , current_path )
lineage_fields . extend ( nested_fields )
2025-08-07 15:29:01 +05:30
2025-07-01 20:51:18 +05:30
elif isinstance ( schema , avro . schema . UnionSchema ) :
logger . debug ( f " Analyzing union schema at path: { current_path } " )
# For unions, check all possible schemas
for union_schema in schema . schemas :
2025-08-07 15:29:01 +05:30
nested_fields = extract_lineage_fields_from_schema (
union_schema , current_path
)
2025-07-01 20:51:18 +05:30
lineage_fields . extend ( nested_fields )
2025-08-07 15:29:01 +05:30
2025-07-01 20:51:18 +05:30
return lineage_fields
def extract_lineage_fields ( ) - > LineageData :
"""
Extract lineage fields from all aspects in the aspect registry .
2025-08-07 15:29:01 +05:30
2025-07-01 20:51:18 +05:30
Returns :
LineageData containing all lineage information organized by entity and aspect
"""
logger . info ( " Starting lineage field extraction " )
2025-08-07 15:29:01 +05:30
2025-07-01 20:51:18 +05:30
lineage_data = LineageData ( entities = { } )
2025-08-07 15:29:01 +05:30
2025-07-01 20:51:18 +05:30
# Group aspects by entity
entity_aspects : Dict [ str , List [ str ] ] = { }
for entity_name , entity_def in entity_registry . items ( ) :
entity_aspects [ entity_name ] = entity_def . aspects
logger . debug ( f " Entity { entity_name } has aspects: { entity_def . aspects } " )
2025-08-07 15:29:01 +05:30
2025-07-01 20:51:18 +05:30
# Process each aspect
for aspect_name , aspect_def in aspect_registry . items ( ) :
logger . debug ( f " Processing aspect: { aspect_name } " )
2025-08-07 15:29:01 +05:30
2025-07-01 20:51:18 +05:30
if not aspect_def . schema :
logger . warning ( f " Aspect { aspect_name } has no schema, skipping " )
continue
2025-08-07 15:29:01 +05:30
2025-07-01 20:51:18 +05:30
# Extract lineage fields from this aspect
lineage_fields = extract_lineage_fields_from_schema ( aspect_def . schema )
2025-08-07 15:29:01 +05:30
2025-07-01 20:51:18 +05:30
if lineage_fields :
2025-08-07 15:29:01 +05:30
logger . info (
f " Found { len ( lineage_fields ) } lineage fields in aspect { aspect_name } "
)
2025-07-01 20:51:18 +05:30
# Find which entities use this aspect
for entity_name , entity_aspect_list in entity_aspects . items ( ) :
if aspect_name in entity_aspect_list :
2025-08-07 15:29:01 +05:30
logger . debug (
f " Aspect { aspect_name } is used by entity { entity_name } "
)
2025-07-01 20:51:18 +05:30
# Initialize entity if not exists
if entity_name not in lineage_data . entities :
lineage_data . entities [ entity_name ] = LineageEntity ( aspects = { } )
2025-08-07 15:29:01 +05:30
2025-07-01 20:51:18 +05:30
# Add aspect with lineage fields
lineage_aspect = LineageAspect (
2025-08-07 15:29:01 +05:30
aspect = aspect_name , fields = lineage_fields
)
lineage_data . entities [ entity_name ] . aspects [ aspect_name ] = (
lineage_aspect
2025-07-01 20:51:18 +05:30
)
else :
logger . debug ( f " No lineage fields found in aspect { aspect_name } " )
2025-08-07 15:29:01 +05:30
2025-07-01 20:51:18 +05:30
# Log summary
total_entities_with_lineage = len ( lineage_data . entities )
2025-08-07 15:29:01 +05:30
total_aspects_with_lineage = sum (
len ( entity . aspects ) for entity in lineage_data . entities . values ( )
)
2025-07-01 20:51:18 +05:30
total_lineage_fields = sum (
2025-08-07 15:29:01 +05:30
len ( aspect . fields )
for entity in lineage_data . entities . values ( )
2025-07-01 20:51:18 +05:30
for aspect in entity . aspects . values ( )
)
2025-08-07 15:29:01 +05:30
2025-07-01 20:51:18 +05:30
logger . info ( " Lineage extraction complete: " )
logger . info ( f " - Entities with lineage: { total_entities_with_lineage } " )
logger . info ( f " - Aspects with lineage: { total_aspects_with_lineage } " )
logger . info ( f " - Total lineage fields: { total_lineage_fields } " )
2025-08-07 15:29:01 +05:30
2025-07-01 20:51:18 +05:30
return lineage_data
def generate_lineage_json ( lineage_data : LineageData ) - > str :
"""
Generate JSON representation of lineage data .
2025-08-07 15:29:01 +05:30
2025-07-01 20:51:18 +05:30
Args :
lineage_data : The lineage data to convert to JSON
2025-08-07 15:29:01 +05:30
2025-07-01 20:51:18 +05:30
Returns :
JSON string representation
"""
logger . info ( " Generating lineage JSON " )
2025-08-07 15:29:01 +05:30
2025-07-01 20:51:18 +05:30
# Convert dataclasses to dictionaries
def lineage_field_to_dict ( field : LineageField ) - > Dict [ str , Any ] :
2025-08-07 15:29:01 +05:30
result = { " name " : field . name , " path " : field . path , " isLineage " : field . isLineage }
2025-07-01 20:51:18 +05:30
if field . relationship :
result [ " relationship " ] = {
" name " : field . relationship . name ,
" entityTypes " : field . relationship . entityTypes ,
2025-08-07 15:29:01 +05:30
" isLineage " : field . relationship . isLineage ,
2025-07-01 20:51:18 +05:30
}
return result
2025-08-07 15:29:01 +05:30
2025-07-01 20:51:18 +05:30
def lineage_aspect_to_dict ( aspect : LineageAspect ) - > Dict [ str , Any ] :
return {
" aspect " : aspect . aspect ,
2025-08-07 15:29:01 +05:30
" fields " : [ lineage_field_to_dict ( field ) for field in aspect . fields ] ,
2025-07-01 20:51:18 +05:30
}
2025-08-07 15:29:01 +05:30
2025-07-01 20:51:18 +05:30
def lineage_entity_to_dict ( entity : LineageEntity ) - > Dict [ str , Any ] :
return {
aspect_name : lineage_aspect_to_dict ( aspect )
for aspect_name , aspect in entity . aspects . items ( )
}
2025-08-07 15:29:01 +05:30
2025-07-01 20:51:18 +05:30
# Build the final JSON structure
json_data = {
" entities " : {
entity_name : lineage_entity_to_dict ( entity )
for entity_name , entity in lineage_data . entities . items ( )
}
}
2025-08-07 15:29:01 +05:30
2025-07-01 20:51:18 +05:30
json_data [ " generated_by " ] = " metadata-ingestion/scripts/modeldocgen.py "
json_data [ " generated_at " ] = datetime . now ( timezone . utc ) . isoformat ( )
2025-08-07 15:29:01 +05:30
2025-07-01 20:51:18 +05:30
json_string = json . dumps ( json_data , indent = 2 )
logger . info ( f " Generated lineage JSON with { len ( json_string ) } characters " )
2025-08-07 15:29:01 +05:30
2025-07-01 20:51:18 +05:30
return json_string
2021-12-05 12:22:17 -08:00
@dataclass
class Relationship :
name : str
src : str
dst : str
doc : Optional [ str ] = None
id : Optional [ str ] = None
@dataclass
class RelationshipAdjacency :
self_loop : List [ Relationship ] = field ( default_factory = list )
incoming : List [ Relationship ] = field ( default_factory = list )
outgoing : List [ Relationship ] = field ( default_factory = list )
@dataclass
class RelationshipGraph :
map : Dict [ str , RelationshipAdjacency ] = field ( default_factory = dict )
def add_edge (
self , src : str , dst : str , label : str , reason : str , edge_id : Optional [ str ] = None
) - > None :
relnship = Relationship (
label , src , dst , reason , id = edge_id or f " { src } : { label } : { dst } : { reason } "
)
if src == dst :
adjacency = self . map . get ( src , RelationshipAdjacency ( ) )
for reln in adjacency . self_loop :
if relnship . id == reln . id :
print ( f " Skipping adding edge since ids match { reln . id } " )
return
adjacency . self_loop . append ( relnship )
self . map [ src ] = adjacency
else :
adjacency = self . map . get ( src , RelationshipAdjacency ( ) )
for reln in adjacency . outgoing :
if relnship . id == reln . id :
logger . info ( f " Skipping adding edge since ids match { reln . id } " )
return
adjacency . outgoing . append ( relnship )
self . map [ src ] = adjacency
adjacency = self . map . get ( dst , RelationshipAdjacency ( ) )
for reln in adjacency . incoming :
if relnship . id == reln . id :
logger . info ( f " Skipping adding edge since ids match { reln . id } " )
return
adjacency . incoming . append ( relnship )
self . map [ dst ] = adjacency
def get_adjacency ( self , node : str ) - > RelationshipAdjacency :
return self . map . get ( node , RelationshipAdjacency ( ) )
def make_relnship_docs ( relationships : List [ Relationship ] , direction : str ) - > str :
doc = " "
map : Dict [ str , List [ Relationship ] ] = { }
for relnship in relationships :
map [ relnship . name ] = map . get ( relnship . name , [ ] )
map [ relnship . name ] . append ( relnship )
for rel_name , relnships in map . items ( ) :
doc + = f " \n - { rel_name } \n "
for relnship in relnships :
doc + = f " \n - { relnship . dst if direction == ' outgoing ' else relnship . src } { relnship . doc or ' ' } "
return doc
2025-10-23 14:01:45 -07:00
@dataclass
class FieldInfo :
name : str
type_name : str
is_array : bool
is_optional : bool
description : str
annotations : List [ str ]
def simplify_type_name ( field_type : Any ) - > Tuple [ str , bool , bool ] :
"""
Extract simplified type name , whether it ' s an array, and whether it ' s optional .
Returns : ( type_name , is_array , is_optional )
"""
is_optional = False
is_array = False
current_type = field_type
# Handle union types (optional fields)
if isinstance ( current_type , avro . schema . UnionSchema ) :
non_null_types = [ t for t in current_type . schemas if t . type != " null " ]
if len ( non_null_types ) == 1 :
is_optional = True
current_type = non_null_types [ 0 ]
else :
# Multiple non-null types - just show as union
return " union " , False , True
# Handle array types
if isinstance ( current_type , avro . schema . ArraySchema ) :
is_array = True
current_type = current_type . items
# Get the actual type name
if isinstance ( current_type , avro . schema . RecordSchema ) :
type_name = current_type . name
elif isinstance ( current_type , avro . schema . PrimitiveSchema ) :
type_name = current_type . type
elif isinstance ( current_type , avro . schema . MapSchema ) :
type_name = " map "
elif isinstance ( current_type , avro . schema . EnumSchema ) :
type_name = current_type . name
else :
type_name = (
str ( current_type . type ) if hasattr ( current_type , " type " ) else " unknown "
)
return type_name , is_array , is_optional
def extract_fields_from_schema ( schema : avro . schema . RecordSchema ) - > List [ FieldInfo ] :
""" Extract top-level fields from an Avro schema. """
fields = [ ]
for avro_field in schema . fields :
type_name , is_array , is_optional = simplify_type_name ( avro_field . type )
# Extract annotations
annotations = [ ]
# Check for deprecated field
if hasattr ( avro_field , " other_props " ) and avro_field . other_props :
if avro_field . other_props . get ( " deprecated " ) :
annotations . append ( " ⚠️ Deprecated " )
if hasattr ( avro_field , " other_props " ) and avro_field . other_props :
if " Searchable " in avro_field . other_props :
searchable_config = avro_field . other_props [ " Searchable " ]
# Check if there's a custom search field name
if (
isinstance ( searchable_config , dict )
and " fieldName " in searchable_config
) :
search_field_name = searchable_config [ " fieldName " ]
# Only show override if it differs from actual field name
if search_field_name != avro_field . name :
annotations . append ( f " Searchable ( { search_field_name } ) " )
else :
annotations . append ( " Searchable " )
else :
annotations . append ( " Searchable " )
if " Relationship " in avro_field . other_props :
relationship_info = avro_field . other_props [ " Relationship " ]
# Extract relationship name if available
rel_name = None
if isinstance ( relationship_info , dict ) :
if " name " in relationship_info :
rel_name = relationship_info . get ( " name " )
else :
# Path-based relationship
for _ , v in relationship_info . items ( ) :
if isinstance ( v , dict ) and " name " in v :
rel_name = v . get ( " name " )
break
if rel_name :
annotations . append ( f " → { rel_name } " )
# Get field description
description = (
avro_field . doc if hasattr ( avro_field , " doc " ) and avro_field . doc else " "
)
fields . append (
FieldInfo (
name = avro_field . name ,
type_name = type_name ,
is_array = is_array ,
is_optional = is_optional ,
description = description ,
annotations = annotations ,
)
)
return fields
def generate_field_table ( fields : List [ FieldInfo ] , common_types : set ) - > str :
""" Generate markdown table for fields with hyperlinks to common types. """
table = " \n | Field | Type | Required | Description | Annotations | \n "
table + = " |-------|------|----------|-------------|-------------| \n "
for field_info in fields :
# Format type with optional hyperlink
type_display = field_info . type_name
if field_info . type_name in common_types :
# Create anchor link (lowercase, replace spaces with hyphens)
anchor = field_info . type_name . lower ( ) . replace ( " " , " - " )
type_display = f " [ { field_info . type_name } ](# { anchor } ) "
if field_info . is_array :
type_display + = " [] "
# Required/optional
required = " ✓ " if not field_info . is_optional else " "
# Annotations
notes = " , " . join ( field_info . annotations ) if field_info . annotations else " "
# Clean description (remove newlines, truncate if too long)
desc = field_info . description . replace ( " \n " , " " ) . strip ( )
if len ( desc ) > 100 :
desc = desc [ : 97 ] + " ... "
table + = (
f " | { field_info . name } | { type_display } | { required } | { desc } | { notes } | \n "
)
return table
def identify_common_types (
entity_def : EntityDefinition ,
) - > Dict [ str , avro . schema . RecordSchema ] :
"""
Identify types that appear in multiple aspects and should be documented separately .
Returns a dict of type_name - > schema for common types .
"""
type_usage : Dict [ str , Tuple [ avro . schema . RecordSchema , int ] ] = { }
# Track which types appear and how often
for aspect_name in entity_def . aspects or [ ] :
if aspect_name not in aspect_registry :
continue
aspect_def = aspect_registry [ aspect_name ]
if not aspect_def . schema :
continue
# Walk through fields and collect record types
for avro_field in aspect_def . schema . fields :
_collect_record_types ( avro_field . type , type_usage )
# Common types are those that appear more than once OR are well-known types
well_known_types = {
" AuditStamp " ,
" Edge " ,
" ChangeAuditStamps " ,
" TimeStamp " ,
" Urn " ,
" DataPlatformInstance " ,
}
common_types = { }
for type_name , ( schema , count ) in type_usage . items ( ) :
if count > 1 or type_name in well_known_types :
common_types [ type_name ] = schema
return common_types
def _collect_record_types (
field_type : Any , type_usage : Dict [ str , Tuple [ avro . schema . RecordSchema , int ] ]
) - > None :
""" Recursively collect record types from a field type. """
if isinstance ( field_type , avro . schema . UnionSchema ) :
for union_type in field_type . schemas :
_collect_record_types ( union_type , type_usage )
elif isinstance ( field_type , avro . schema . ArraySchema ) :
_collect_record_types ( field_type . items , type_usage )
elif isinstance ( field_type , avro . schema . RecordSchema ) :
type_name = field_type . name
if type_name in type_usage :
schema , count = type_usage [ type_name ]
type_usage [ type_name ] = ( schema , count + 1 )
else :
type_usage [ type_name ] = ( field_type , 1 )
def generate_common_types_section (
common_types : Dict [ str , avro . schema . RecordSchema ] ,
) - > str :
""" Generate the Common Types reference section. """
if not common_types :
return " "
section = " \n ### Common Types \n \n "
section + = " These types are used across multiple aspects in this entity. \n \n "
for type_name in sorted ( common_types . keys ( ) ) :
schema = common_types [ type_name ]
section + = f " #### { type_name } \n \n "
doc = schema . get_prop ( " doc " ) if hasattr ( schema , " get_prop " ) else None
if doc :
section + = f " { doc } \n \n "
# Show fields of this common type
section + = " **Fields:** \n \n "
for avro_field in schema . fields :
type_name_inner , is_array , is_optional = simplify_type_name ( avro_field . type )
type_display = type_name_inner
if is_array :
type_display + = " [] "
optional_marker = " ? " if is_optional else " "
field_doc = (
avro_field . doc if hasattr ( avro_field , " doc " ) and avro_field . doc else " "
)
field_doc = field_doc . replace ( " \n " , " " ) . strip ( )
if len ( field_doc ) > 80 :
field_doc = field_doc [ : 77 ] + " ... "
section + = f " - ` { avro_field . name } ` ( { type_display } { optional_marker } ): { field_doc } \n "
section + = " \n "
return section
2021-12-05 12:22:17 -08:00
def make_entity_docs ( entity_display_name : str , graph : RelationshipGraph ) - > str :
entity_name = entity_display_name [ 0 : 1 ] . lower ( ) + entity_display_name [ 1 : ]
2025-07-01 20:51:18 +05:30
entity_def : Optional [ EntityDefinition ] = entity_registry . get ( entity_name )
2021-12-05 12:22:17 -08:00
if entity_def :
2022-02-18 09:45:45 -08:00
doc = entity_def . doc_file_contents or (
f " # { entity_def . display_name } \n { entity_def . doc } "
if entity_def . doc
else f " # { entity_def . display_name } "
)
2025-10-23 14:01:45 -07:00
# Identify common types for this entity
common_types_map = identify_common_types ( entity_def )
common_type_names = set ( common_types_map . keys ( ) )
# Add Tabs import at the start
tabs_import = (
" \n import Tabs from ' @theme/Tabs ' ; \n import TabItem from ' @theme/TabItem ' ; \n "
)
2022-02-18 09:45:45 -08:00
# create aspects section
2025-10-23 14:01:45 -07:00
aspects_section = " \n ### Aspects \n " if entity_def . aspects else " "
2022-02-18 09:45:45 -08:00
deprecated_aspects_section = " "
timeseries_aspects_section = " "
for aspect in entity_def . aspects or [ ] :
2022-12-30 14:02:57 -05:00
aspect_definition : AspectDefinition = aspect_registry [ aspect ]
2022-02-18 09:45:45 -08:00
assert aspect_definition
2022-12-30 14:02:57 -05:00
assert aspect_definition . schema
2022-02-18 09:45:45 -08:00
deprecated_message = (
" (Deprecated) "
if aspect_definition . schema . get_prop ( " Deprecated " )
else " "
)
timeseries_qualifier = (
" (Timeseries) " if aspect_definition . type == " timeseries " else " "
)
2025-10-23 14:01:45 -07:00
# Generate aspect documentation with tabs
2022-02-18 09:45:45 -08:00
this_aspect_doc = " "
this_aspect_doc + = (
2025-10-23 14:01:45 -07:00
f " \n #### { aspect } { deprecated_message } { timeseries_qualifier } \n "
2022-02-18 09:45:45 -08:00
)
2025-10-23 14:01:45 -07:00
this_aspect_doc + = f " { aspect_definition . schema . get_prop ( ' doc ' ) } \n \n "
# Extract fields for table view
fields = extract_fields_from_schema ( aspect_definition . schema )
# Generate tabbed interface
this_aspect_doc + = " <Tabs> \n "
this_aspect_doc + = ' <TabItem value= " fields " label= " Fields " default> \n \n '
this_aspect_doc + = generate_field_table ( fields , common_type_names )
this_aspect_doc + = " \n </TabItem> \n "
this_aspect_doc + = ' <TabItem value= " raw " label= " Raw Schema " > \n \n '
this_aspect_doc + = f " ```javascript \n { json . dumps ( aspect_definition . schema . to_json ( ) , indent = 2 ) } \n ``` \n "
this_aspect_doc + = " \n </TabItem> \n "
this_aspect_doc + = " </Tabs> \n \n "
2022-02-18 09:45:45 -08:00
if deprecated_message :
deprecated_aspects_section + = this_aspect_doc
elif timeseries_qualifier :
timeseries_aspects_section + = this_aspect_doc
else :
aspects_section + = this_aspect_doc
aspects_section + = timeseries_aspects_section + deprecated_aspects_section
2021-12-05 12:22:17 -08:00
2025-10-23 14:01:45 -07:00
# Generate common types section
common_types_section = generate_common_types_section ( common_types_map )
2021-12-05 12:22:17 -08:00
# create relationships section
2025-10-23 14:01:45 -07:00
relationships_section = " \n ### Relationships \n "
2021-12-05 12:22:17 -08:00
adjacency = graph . get_adjacency ( entity_def . display_name )
if adjacency . self_loop :
2025-10-23 14:01:45 -07:00
relationships_section + = " \n #### Self \n These are the relationships to itself, stored in this entity ' s aspects "
2021-12-05 12:22:17 -08:00
for relnship in adjacency . self_loop :
2022-02-10 22:56:28 -08:00
relationships_section + = (
f " \n - { relnship . name } ( { relnship . doc [ 1 : ] if relnship . doc else ' ' } ) "
)
2021-12-05 12:22:17 -08:00
if adjacency . outgoing :
2025-10-23 14:01:45 -07:00
relationships_section + = " \n #### Outgoing \n These are the relationships stored in this entity ' s aspects "
2021-12-05 12:22:17 -08:00
relationships_section + = make_relnship_docs (
adjacency . outgoing , direction = " outgoing "
)
if adjacency . incoming :
2025-10-23 14:01:45 -07:00
relationships_section + = " \n #### Incoming \n These are the relationships stored in other entity ' s aspects "
2021-12-05 12:22:17 -08:00
relationships_section + = make_relnship_docs (
adjacency . incoming , direction = " incoming "
)
# create global metadata graph
2023-09-04 15:49:00 +09:00
global_graph_url = " https://github.com/datahub-project/static-assets/raw/main/imgs/datahub-metadata-model.png "
2021-12-05 12:22:17 -08:00
global_graph_section = (
2025-10-23 14:01:45 -07:00
f " \n ### [Global Metadata Model]( { global_graph_url } ) "
2021-12-05 12:22:17 -08:00
+ f " \n  "
)
2025-10-23 14:01:45 -07:00
# create technical reference guide section
technical_reference_section = " \n ## Technical Reference Guide \n \n "
technical_reference_section + = (
" The sections above provide an overview of how to use this entity. "
" The following sections provide detailed technical information about how metadata is stored and represented in DataHub. \n \n "
)
technical_reference_section + = (
" **Aspects** are the individual pieces of metadata that can be attached to an entity. "
" Each aspect contains specific information (like ownership, tags, or properties) and is stored as a separate record, "
" allowing for flexible and incremental metadata updates. \n \n "
)
technical_reference_section + = (
" **Relationships** show how this entity connects to other entities in the metadata graph. "
" These connections are derived from the fields within each aspect and form the foundation of DataHub ' s knowledge graph. \n \n "
)
technical_reference_section + = (
" ### Reading the Field Tables \n \n "
" Each aspect ' s field table includes an **Annotations** column that provides additional metadata about how fields are used: \n \n "
" - **⚠️ Deprecated**: This field is deprecated and may be removed in a future version. Check the description for the recommended alternative \n "
" - **Searchable**: This field is indexed and can be searched in DataHub ' s search interface \n "
" - **Searchable (fieldname)**: When the field name in parentheses is shown, it indicates the field is indexed under a different name in the search index. For example, `dashboardTool` is indexed as `tool` \n "
" - **→ RelationshipName**: This field creates a relationship to another entity. The arrow indicates this field contains a reference (URN) to another entity, and the name indicates the type of relationship (e.g., `→ Contains`, `→ OwnedBy`) \n \n "
" Fields with complex types (like `Edge`, `AuditStamp`) link to their definitions in the [Common Types](#common-types) section below. \n "
)
final_doc = (
doc
+ tabs_import
+ technical_reference_section
+ aspects_section
+ common_types_section
+ relationships_section
+ global_graph_section
)
2022-02-18 09:45:45 -08:00
generated_documentation [ entity_name ] = final_doc
return final_doc
2021-12-05 12:22:17 -08:00
else :
raise Exception ( f " Failed to find information for entity: { entity_name } " )
2025-10-23 17:43:35 -07:00
def create_search_field_name_property ( ) - > List [ MetadataChangeProposalWrapper ] :
"""
Create the structured property for documenting search field names .
This property is used to capture the actual field name used in the search index
when it differs from the field name in the schema ( e . g . , ' instance ' field is
indexed as ' platformInstance ' ) .
Returns :
List of MCPs for the property definition and settings
"""
property_id = " com.datahub.metadata.searchFieldName "
property_urn = str (
StructuredPropertyUrn . from_string ( f " urn:li:structuredProperty: { property_id } " )
)
# Create property definition
definition_mcp = MetadataChangeProposalWrapper (
entityUrn = property_urn ,
aspect = StructuredPropertyDefinitionClass (
qualifiedName = property_id ,
displayName = " Search Field Name " ,
valueType = Urn . make_data_type_urn ( " string " ) ,
description = (
" The field name used in the search index when it differs from the schema field name. "
" Use this field name when constructing search queries for this field. "
) ,
entityTypes = [ Urn . make_entity_type_urn ( " schemaField " ) ] ,
cardinality = " SINGLE " ,
immutable = False ,
) ,
)
# Create property settings for display
settings_mcp = MetadataChangeProposalWrapper (
entityUrn = property_urn ,
aspect = StructuredPropertySettingsClass (
isHidden = False ,
showInSearchFilters = False ,
showInAssetSummary = True ,
showAsAssetBadge = False ,
showInColumnsTable = True , # Show as a column in schema tables
) ,
)
return [ definition_mcp , settings_mcp ]
2023-06-02 15:50:38 -04:00
def generate_stitched_record (
relnships_graph : RelationshipGraph ,
) - > Iterable [ MetadataChangeProposalWrapper ] :
2021-12-05 12:22:17 -08:00
def strip_types ( field_path : str ) - > str :
final_path = field_path
final_path = re . sub ( r " ( \ [type=[a-zA-Z]+ \ ] \ .) " , " " , final_path )
final_path = re . sub ( r " ^ \ [version=2.0 \ ] \ . " , " " , final_path )
return final_path
2025-10-23 17:43:35 -07:00
# Track schema fields that need structured properties
schema_field_properties : Dict [
str , str
] = { } # schema_field_urn -> search_field_name
2021-12-05 12:22:17 -08:00
for entity_name , entity_def in entity_registry . items ( ) :
entity_display_name = entity_def . display_name
entity_fields = [ ]
for aspect_name in entity_def . aspects :
if aspect_name not in aspect_registry :
print ( f " Did not find aspect name: { aspect_name } in aspect_registry " )
continue
# all aspects should have a schema
aspect_schema = aspect_registry [ aspect_name ] . schema
assert aspect_schema
entity_fields . append (
{
" type " : aspect_schema . to_json ( ) ,
" name " : aspect_name ,
}
)
if entity_fields :
names = avro . schema . Names ( )
field_objects = [ ]
for f in entity_fields :
field = avro . schema . Field (
2023-10-25 13:06:12 -04:00
f [ " type " ] ,
f [ " name " ] ,
2021-12-05 12:22:17 -08:00
has_default = False ,
)
field_objects . append ( field )
with unittest . mock . patch ( " avro.schema.Names.add_name " , add_name ) :
entity_avro_schema = avro . schema . RecordSchema (
name = entity_name ,
namespace = " datahub.metadata.model " ,
names = names ,
fields = [ ] ,
)
entity_avro_schema . set_prop ( " fields " , field_objects )
rawSchema = json . dumps ( entity_avro_schema . to_json ( ) )
# always add the URN which is the primary key
urn_field = SchemaField (
fieldPath = " urn " ,
type = SchemaFieldDataTypeClass ( type = StringTypeClass ( ) ) ,
nativeDataType = " string " ,
nullable = False ,
isPartOfKey = True ,
description = f " The primary identifier for the { entity_name } entity. See the { entity_def . keyAspect } field to understand the structure of this urn. " ,
)
schema_fields : List [ SchemaField ] = [ urn_field ] + avro_schema_to_mce_fields (
rawSchema
)
foreign_keys : List [ ForeignKeyConstraintClass ] = [ ]
source_dataset_urn = make_dataset_urn (
2022-05-26 16:24:54 +05:30
platform = " datahub " ,
2021-12-05 12:22:17 -08:00
name = f " { entity_display_name } " ,
)
for f_field in schema_fields :
if f_field . jsonProps :
json_dict = json . loads ( f_field . jsonProps )
if " Aspect " in json_dict :
aspect_info = json_dict [ " Aspect " ]
f_field . globalTags = f_field . globalTags or GlobalTagsClass (
tags = [ ]
)
f_field . globalTags . tags . append (
TagAssociationClass ( tag = " urn:li:tag:Aspect " )
)
# if this is the key aspect, also add primary-key
if entity_def . keyAspect == aspect_info . get ( " name " ) :
f_field . isPartOfKey = True
2025-07-01 20:51:18 +05:30
if aspect_info . get ( " type " , " " ) == " timeseries " :
2021-12-05 12:22:17 -08:00
# f_field.globalTags = f_field.globalTags or GlobalTagsClass(
# tags=[]
# )
f_field . globalTags . tags . append (
TagAssociationClass ( tag = " urn:li:tag:Temporal " )
)
if " Searchable " in json_dict :
f_field . globalTags = f_field . globalTags or GlobalTagsClass (
tags = [ ]
)
f_field . globalTags . tags . append (
TagAssociationClass ( tag = " urn:li:tag:Searchable " )
)
2025-10-23 17:43:35 -07:00
# Check if search field name differs from actual field name
searchable_config = json_dict [ " Searchable " ]
if (
isinstance ( searchable_config , dict )
and " fieldName " in searchable_config
) :
search_field_name = searchable_config [ " fieldName " ]
# Extract the actual field name from the field path
# Field path format: "[version=2.0].[type=...].<fieldName>"
actual_field_name = strip_types ( f_field . fieldPath ) . split (
" . "
) [ - 1 ]
if search_field_name != actual_field_name :
# Track this for later - we'll emit a separate MCP for the schema field entity
schema_field_urn = make_schema_field_urn (
source_dataset_urn , f_field . fieldPath
)
schema_field_properties [ schema_field_urn ] = (
search_field_name
)
2021-12-05 12:22:17 -08:00
if " Relationship " in json_dict :
relationship_info = json_dict [ " Relationship " ]
# detect if we have relationship specified at leaf level or thru path specs
if " entityTypes " not in relationship_info :
# path spec
2025-08-07 15:29:01 +05:30
assert len ( relationship_info . keys ( ) ) == 1 , (
" We should never have more than one path spec assigned to a relationship annotation "
)
2021-12-05 12:22:17 -08:00
final_info = None
2025-07-01 20:51:18 +05:30
for _ , v in relationship_info . items ( ) :
2021-12-05 12:22:17 -08:00
final_info = v
relationship_info = final_info
assert " entityTypes " in relationship_info
entity_types : List [ str ] = relationship_info . get (
" entityTypes " , [ ]
)
relnship_name = relationship_info . get ( " name " , None )
for entity_type in entity_types :
destination_entity_name = capitalize_first ( entity_type )
foreign_dataset_urn = make_dataset_urn (
2022-05-26 16:24:54 +05:30
platform = " datahub " ,
2021-12-05 12:22:17 -08:00
name = destination_entity_name ,
)
fkey = ForeignKeyConstraintClass (
name = relnship_name ,
foreignDataset = foreign_dataset_urn ,
foreignFields = [
2024-11-21 00:19:41 -08:00
make_schema_field_urn ( foreign_dataset_urn , " urn " )
2021-12-05 12:22:17 -08:00
] ,
sourceFields = [
2025-08-07 15:29:01 +05:30
make_schema_field_urn (
source_dataset_urn , f_field . fieldPath
)
2021-12-05 12:22:17 -08:00
] ,
)
foreign_keys . append ( fkey )
relnships_graph . add_edge (
entity_display_name ,
destination_entity_name ,
fkey . name ,
f " via ` { strip_types ( f_field . fieldPath ) } ` " ,
edge_id = f " { entity_display_name } : { fkey . name } : { destination_entity_name } : { strip_types ( f_field . fieldPath ) } " ,
)
2023-06-02 15:50:38 -04:00
dataset_urn = make_dataset_urn (
platform = " datahub " ,
name = entity_display_name ,
2021-12-05 12:22:17 -08:00
)
2023-06-02 15:50:38 -04:00
yield from MetadataChangeProposalWrapper . construct_many (
entityUrn = dataset_urn ,
2021-12-05 12:22:17 -08:00
aspects = [
2023-06-02 15:50:38 -04:00
SchemaMetadataClass (
schemaName = str ( entity_name ) ,
platform = make_data_platform_urn ( " datahub " ) ,
platformSchema = OtherSchemaClass ( rawSchema = rawSchema ) ,
fields = schema_fields ,
version = 0 ,
hash = " " ,
foreignKeys = foreign_keys if foreign_keys else None ,
) ,
2021-12-05 12:22:17 -08:00
GlobalTagsClass (
tags = [ TagAssociationClass ( tag = " urn:li:tag:Entity " ) ]
) ,
BrowsePathsClass ( [ f " /prod/datahub/entities/ { entity_display_name } " ] ) ,
2023-06-02 15:50:38 -04:00
BrowsePathsV2Class (
[
BrowsePathEntryClass ( id = " entities " ) ,
BrowsePathEntryClass ( id = entity_display_name ) ,
]
) ,
DatasetPropertiesClass (
description = make_entity_docs (
dataset_urn . split ( " : " ) [ - 1 ] . split ( " , " ) [ 1 ] , relnships_graph
)
) ,
SubTypesClass ( typeNames = [ " entity " ] ) ,
2021-12-05 12:22:17 -08:00
] ,
)
2025-10-23 17:43:35 -07:00
# Emit structured properties for schema fields
property_urn = " urn:li:structuredProperty:com.datahub.metadata.searchFieldName "
for schema_field_urn , search_field_name in schema_field_properties . items ( ) :
yield MetadataChangeProposalWrapper (
entityUrn = schema_field_urn ,
aspect = StructuredPropertiesClass (
properties = [
StructuredPropertyValueAssignmentClass (
propertyUrn = property_urn ,
values = [ search_field_name ] ,
)
]
) ,
)
2024-07-17 11:25:15 -07:00
2024-01-22 11:46:04 -06:00
@dataclass
class EntityAspectName :
entityName : str
aspectName : str
2024-07-17 11:25:15 -07:00
class AspectPluginConfig ( PermissiveConfigModel ) :
2024-01-22 11:46:04 -06:00
className : str
enabled : bool
2024-07-17 11:25:15 -07:00
supportedEntityAspectNames : List [ EntityAspectName ] = [ ]
2024-06-17 19:53:54 -05:00
packageScan : Optional [ List [ str ] ] = None
2024-02-21 18:16:42 -06:00
supportedOperations : Optional [ List [ str ] ] = None
2024-01-22 11:46:04 -06:00
2024-07-17 11:25:15 -07:00
class PluginConfiguration ( PermissiveConfigModel ) :
2024-01-22 11:46:04 -06:00
aspectPayloadValidators : Optional [ List [ AspectPluginConfig ] ] = None
mutationHooks : Optional [ List [ AspectPluginConfig ] ] = None
mclSideEffects : Optional [ List [ AspectPluginConfig ] ] = None
mcpSideEffects : Optional [ List [ AspectPluginConfig ] ] = None
2021-12-05 12:22:17 -08:00
2024-07-17 11:25:15 -07:00
class EntityRegistry ( PermissiveConfigModel ) :
2021-12-05 12:22:17 -08:00
entities : List [ EntityDefinition ]
2022-03-29 18:32:04 -07:00
events : Optional [ List [ EventDefinition ] ]
2024-01-22 11:46:04 -06:00
plugins : Optional [ PluginConfiguration ] = None
2021-12-05 12:22:17 -08:00
def load_registry_file ( registry_file : str ) - > Dict [ str , EntityDefinition ] :
import yaml
with open ( registry_file , " r " ) as f :
registry = EntityRegistry . parse_obj ( yaml . safe_load ( f ) )
2025-08-07 15:29:01 +05:30
for index , entity_def in enumerate ( registry . entities ) :
2022-02-18 09:45:45 -08:00
entity_def . priority = index
2021-12-05 12:22:17 -08:00
entity_registry [ entity_def . name ] = entity_def
return entity_registry
2022-02-18 09:45:45 -08:00
def get_sorted_entity_names (
2025-08-07 15:29:01 +05:30
entity_names : List [ Tuple [ str , EntityDefinition ] ] ,
2022-02-18 09:45:45 -08:00
) - > List [ Tuple [ str , List [ str ] ] ] :
2025-06-30 20:16:56 +05:30
"""
Sort entity names by category and priority for documentation generation .
2025-08-07 15:29:01 +05:30
2025-06-30 20:16:56 +05:30
This function organizes entities into a structured order for generating
documentation . Entities are grouped by category ( CORE vs INTERNAL ) and
within each category , sorted by priority and then alphabetically .
2025-08-07 15:29:01 +05:30
2025-06-30 20:16:56 +05:30
Business Logic :
- CORE entities are displayed first , followed by INTERNAL entities
- Within each category , entities with priority values are sorted first
- Priority entities are sorted by their priority value ( lower numbers = higher priority )
- Non - priority entities are sorted alphabetically after priority entities
- Zero and negative priority values are treated as valid priorities
2025-08-07 15:29:01 +05:30
2025-06-30 20:16:56 +05:30
Args :
entity_names : List of tuples containing ( entity_name , EntityDefinition )
2025-08-07 15:29:01 +05:30
2025-06-30 20:16:56 +05:30
Returns :
List of tuples containing ( EntityCategory , List [ str ] ) where :
- First tuple : ( EntityCategory . CORE , sorted_core_entity_names )
- Second tuple : ( EntityCategory . INTERNAL , sorted_internal_entity_names )
2025-08-07 15:29:01 +05:30
2025-06-30 20:16:56 +05:30
Example :
Input : [
( " dataset " , EntityDefinition ( priority = 2 , category = CORE ) ) ,
( " table " , EntityDefinition ( priority = None , category = CORE ) ) ,
( " internal " , EntityDefinition ( priority = 1 , category = INTERNAL ) )
]
Output : [
( EntityCategory . CORE , [ " dataset " , " table " ] ) ,
( EntityCategory . INTERNAL , [ " internal " ] )
]
"""
2022-02-18 09:45:45 -08:00
core_entities = [
( x , y ) for ( x , y ) in entity_names if y . category == EntityCategory . CORE
]
priority_bearing_core_entities = [ ( x , y ) for ( x , y ) in core_entities if y . priority ]
2022-12-30 14:02:57 -05:00
priority_bearing_core_entities . sort ( key = lambda t : t [ 1 ] . priority )
2022-02-18 09:45:45 -08:00
priority_bearing_core_entities = [ x for ( x , y ) in priority_bearing_core_entities ]
non_priority_core_entities = [ x for ( x , y ) in core_entities if not y . priority ]
non_priority_core_entities . sort ( )
internal_entities = [
( x , y ) for ( x , y ) in entity_names if y . category == EntityCategory . INTERNAL
]
priority_bearing_internal_entities = [
x for ( x , y ) in internal_entities if y . priority
]
non_priority_internal_entities = [
x for ( x , y ) in internal_entities if not y . priority
]
sorted_entities = [
(
EntityCategory . CORE ,
priority_bearing_core_entities + non_priority_core_entities ,
) ,
(
EntityCategory . INTERNAL ,
priority_bearing_internal_entities + non_priority_internal_entities ,
) ,
]
return sorted_entities
2021-12-05 12:22:17 -08:00
@click.command ( )
2022-12-30 14:02:57 -05:00
@click.argument ( " schemas_root " , type = click . Path ( exists = True ) , required = True )
@click.option ( " --registry " , type = click . Path ( exists = True ) , required = True )
@click.option ( " --generated-docs-dir " , type = click . Path ( exists = True ) , required = True )
2021-12-05 12:22:17 -08:00
@click.option ( " --server " , type = str , required = False )
@click.option ( " --file " , type = str , required = False )
2022-02-18 09:45:45 -08:00
@click.option (
" --dot " , type = str , required = False , help = " generate a dot file representing the graph "
)
2021-12-05 12:22:17 -08:00
@click.option ( " --png " , type = str , required = False )
2022-02-18 09:45:45 -08:00
@click.option ( " --extra-docs " , type = str , required = False )
2025-08-07 15:29:01 +05:30
@click.option (
" --lineage-output " , type = str , required = False , help = " generate lineage JSON file "
)
def generate ( # noqa: C901
2022-12-30 14:02:57 -05:00
schemas_root : str ,
registry : str ,
generated_docs_dir : str ,
2021-12-05 12:22:17 -08:00
server : Optional [ str ] ,
file : Optional [ str ] ,
dot : Optional [ str ] ,
2022-02-10 22:56:28 -08:00
png : Optional [ str ] ,
2022-02-18 09:45:45 -08:00
extra_docs : Optional [ str ] ,
2025-07-01 20:51:18 +05:30
lineage_output : Optional [ str ] ,
2021-12-05 12:22:17 -08:00
) - > None :
logger . info ( f " server = { server } " )
logger . info ( f " file = { file } " )
logger . info ( f " dot = { dot } " )
logger . info ( f " png = { png } " )
2025-07-01 20:51:18 +05:30
logger . info ( f " lineage_output = { lineage_output } " )
2021-12-05 12:22:17 -08:00
2022-02-18 09:45:45 -08:00
entity_extra_docs = { }
if extra_docs :
for path in glob . glob ( f " { extra_docs } /**/*.md " , recursive = True ) :
m = re . search ( " /docs/entities/(.*)/*.md " , path )
if m :
entity_name = m . group ( 1 )
with open ( path , " r " ) as doc_file :
file_contents = doc_file . read ( )
2023-03-30 05:32:21 +05:30
entity_extra_docs [ entity_name ] = file_contents
2022-02-18 09:45:45 -08:00
2022-12-30 14:02:57 -05:00
# registry file
load_registry_file ( registry )
# schema files
for schema_file in Path ( schemas_root ) . glob ( " **/*.avsc " ) :
if (
schema_file . name in { " MetadataChangeEvent.avsc " }
or json . loads ( schema_file . read_text ( ) ) . get ( " Aspect " ) is not None
) :
load_schema_file ( str ( schema_file ) )
2022-02-10 22:56:28 -08:00
2022-02-18 09:45:45 -08:00
if entity_extra_docs :
for entity_name in entity_extra_docs :
2022-12-30 14:02:57 -05:00
entity_registry [ entity_name ] . doc_file_contents = entity_extra_docs [
2022-02-18 09:45:45 -08:00
entity_name
]
2025-07-01 20:51:18 +05:30
if lineage_output :
logger . info ( f " Generating lineage JSON to { lineage_output } " )
try :
lineage_data = extract_lineage_fields ( )
lineage_json = generate_lineage_json ( lineage_data )
2025-08-07 15:29:01 +05:30
2025-07-01 20:51:18 +05:30
output_path = Path ( lineage_output )
output_path . parent . mkdir ( parents = True , exist_ok = True )
new_json_data = json . loads ( lineage_json )
2025-08-07 15:29:01 +05:30
write_file = should_write_json_file (
output_path , new_json_data , " lineage file "
)
2025-07-01 20:51:18 +05:30
if write_file :
2025-08-07 15:29:01 +05:30
with open ( output_path , " w " ) as f :
2025-07-01 20:51:18 +05:30
f . write ( lineage_json )
logger . info ( f " Successfully wrote lineage JSON to { lineage_output } " )
2025-08-07 15:29:01 +05:30
2025-07-01 20:51:18 +05:30
except Exception as e :
logger . error ( f " Failed to generate lineage JSON: { e } " )
raise
2025-10-23 17:43:35 -07:00
# Create structured property for search field names first
logger . info ( " Creating structured property for search field names " )
structured_property_mcps = create_search_field_name_property ( )
2021-12-05 12:22:17 -08:00
relationship_graph = RelationshipGraph ( )
2025-10-23 17:43:35 -07:00
entity_mcps = list ( generate_stitched_record ( relationship_graph ) )
# Combine MCPs with structured property first
mcps = structured_property_mcps + entity_mcps
2021-12-05 12:22:17 -08:00
2022-02-18 09:45:45 -08:00
shutil . rmtree ( f " { generated_docs_dir } /entities " , ignore_errors = True )
2022-12-30 14:02:57 -05:00
entity_names = [ ( x , entity_registry [ x ] ) for x in generated_documentation ]
2022-02-18 09:45:45 -08:00
sorted_entity_names = get_sorted_entity_names ( entity_names )
index = 0
2025-07-01 20:51:18 +05:30
for _ , sorted_entities in sorted_entity_names :
2022-02-18 09:45:45 -08:00
for entity_name in sorted_entities :
entity_dir = f " { generated_docs_dir } /entities/ "
os . makedirs ( entity_dir , exist_ok = True )
with open ( f " { entity_dir } / { entity_name } .md " , " w " ) as fp :
fp . write ( " --- \n " )
fp . write ( f " sidebar_position: { index } \n " )
fp . write ( " --- \n " )
fp . write ( generated_documentation [ entity_name ] )
index + = 1
2021-12-05 12:22:17 -08:00
if file :
logger . info ( f " Will write events to { file } " )
Path ( file ) . parent . mkdir ( parents = True , exist_ok = True )
fileSink = FileSink (
PipelineContext ( run_id = " generated-metaModel " ) ,
FileSinkConfig ( filename = file ) ,
)
2023-06-02 15:50:38 -04:00
for e in mcps :
2021-12-05 12:22:17 -08:00
fileSink . write_record_async (
RecordEnvelope ( e , metadata = { } ) , write_callback = NoopWriteCallback ( )
)
fileSink . close ( )
pipeline_config = {
" source " : {
" type " : " file " ,
" config " : { " filename " : file } ,
} ,
" sink " : {
" type " : " datahub-rest " ,
" config " : {
" server " : " $ { DATAHUB_SERVER:-http://localhost:8080} " ,
" token " : " $ {DATAHUB_TOKEN:-} " ,
} ,
} ,
" run_id " : " modeldoc-generated " ,
}
pipeline_file = Path ( file ) . parent . absolute ( ) / " pipeline.yml "
with open ( pipeline_file , " w " ) as f :
json . dump ( pipeline_config , f , indent = 2 )
logger . info ( f " Wrote pipeline to { pipeline_file } " )
if server :
logger . info ( f " Will send events to { server } " )
assert server . startswith ( " http:// " ) , " server address must start with http:// "
emitter = DatahubRestEmitter ( gms_server = server )
emitter . test_connection ( )
2023-06-02 15:50:38 -04:00
for e in mcps :
2021-12-05 12:22:17 -08:00
emitter . emit ( e )
if dot :
logger . info ( f " Will write dot file to { dot } " )
import pydot
graph = pydot . Dot ( " my_graph " , graph_type = " graph " )
for node , adjacency in relationship_graph . map . items ( ) :
my_node = pydot . Node (
node ,
label = node ,
shape = " box " ,
)
graph . add_node ( my_node )
if adjacency . self_loop :
for relnship in adjacency . self_loop :
graph . add_edge (
pydot . Edge (
src = relnship . src , dst = relnship . dst , label = relnship . name
)
)
if adjacency . outgoing :
for relnship in adjacency . outgoing :
graph . add_edge (
pydot . Edge (
src = relnship . src , dst = relnship . dst , label = relnship . name
)
)
Path ( dot ) . parent . mkdir ( parents = True , exist_ok = True )
graph . write_raw ( dot )
if png :
try :
graph . write_png ( png )
except Exception as e :
2022-02-10 22:56:28 -08:00
logger . error (
" Failed to create png file. Do you have graphviz installed? "
)
2021-12-05 12:22:17 -08:00
raise e
2022-02-10 22:56:28 -08:00
2021-12-05 12:22:17 -08:00
if __name__ == " __main__ " :
logger . setLevel ( " INFO " )
generate ( )