2023-04-16 02:03:05 +05:30
import collections
2023-11-30 18:11:36 -05:00
import copy
2021-02-05 21:03:04 -08:00
import json
2023-03-16 16:27:08 -04:00
import re
2023-11-30 18:11:36 -05:00
import textwrap
2021-06-11 09:44:18 -07:00
from pathlib import Path
2023-03-16 16:27:08 -04:00
from typing import Dict , Iterable , List , Optional , Tuple , Union
2021-02-05 21:03:04 -08:00
2021-06-17 10:04:28 -07:00
import avro . schema
2021-02-05 21:03:04 -08:00
import click
2023-01-17 19:41:43 -08:00
import pydantic
import yaml
2021-02-11 23:14:20 -08:00
from avrogen import write_schema_files
2021-02-05 21:03:04 -08:00
2023-01-17 19:41:43 -08:00
ENTITY_CATEGORY_UNSET = " _unset_ "
class EntityType ( pydantic . BaseModel ) :
name : str
doc : Optional [ str ] = None
category : str = ENTITY_CATEGORY_UNSET
keyAspect : str
aspects : List [ str ]
def load_entity_registry ( entity_registry_file : Path ) - > List [ EntityType ] :
with entity_registry_file . open ( ) as f :
raw_entity_registry = yaml . safe_load ( f )
entities = pydantic . parse_obj_as ( List [ EntityType ] , raw_entity_registry [ " entities " ] )
return entities
2021-06-17 10:04:28 -07:00
2022-08-07 14:52:58 +00:00
def load_schema_file ( schema_file : Union [ str , Path ] ) - > dict :
2022-07-06 20:39:27 +10:00
raw_schema_text = Path ( schema_file ) . read_text ( )
2022-08-07 14:52:58 +00:00
return json . loads ( raw_schema_text )
2021-06-17 10:04:28 -07:00
2023-01-17 19:41:43 -08:00
def load_schemas ( schemas_path : str ) - > Dict [ str , dict ] :
required_schema_files = {
" mxe/MetadataChangeEvent.avsc " ,
" mxe/MetadataChangeProposal.avsc " ,
" usage/UsageAggregation.avsc " ,
" mxe/MetadataChangeLog.avsc " ,
" mxe/PlatformEvent.avsc " ,
" platform/event/v1/EntityChangeEvent.avsc " ,
" metadata/query/filter/Filter.avsc " , # temporarily added to test reserved keywords support
}
# Find all the aspect schemas / other important schemas.
schema_files : List [ Path ] = [ ]
for schema_file in Path ( schemas_path ) . glob ( " **/*.avsc " ) :
relative_path = schema_file . relative_to ( schemas_path ) . as_posix ( )
if relative_path in required_schema_files :
schema_files . append ( schema_file )
required_schema_files . remove ( relative_path )
elif load_schema_file ( schema_file ) . get ( " Aspect " ) :
schema_files . append ( schema_file )
assert not required_schema_files , f " Schema files not found: { required_schema_files } "
schemas : Dict [ str , dict ] = { }
for schema_file in schema_files :
schema = load_schema_file ( schema_file )
schemas [ Path ( schema_file ) . stem ] = schema
return schemas
2023-03-16 16:27:08 -04:00
def patch_schemas ( schemas : Dict [ str , dict ] , pdl_path : Path ) - > Dict [ str , dict ] :
# We can easily find normal urn types using the generated avro schema,
# but for arrays of urns there's nothing in the avro schema and hence
# we have to look in the PDL files instead.
urn_arrays : Dict [
str , List [ Tuple [ str , str ] ]
] = { } # schema name -> list of (field name, type)
2021-06-17 10:04:28 -07:00
2023-03-16 16:27:08 -04:00
# First, we need to load the PDL files and find all urn arrays.
for pdl_file in Path ( pdl_path ) . glob ( " **/*.pdl " ) :
pdl_text = pdl_file . read_text ( )
2021-06-17 10:04:28 -07:00
2023-03-16 16:27:08 -04:00
# TRICKY: We assume that all urn types end with "Urn".
arrays = re . findall (
r " ^ \ s*( \ w+) \ s*: \ s*(?:optional \ s+)?array \ [( \ w*Urn) \ ] " ,
pdl_text ,
re . MULTILINE ,
)
if arrays :
schema_name = pdl_file . stem
urn_arrays [ schema_name ] = [ ( item [ 0 ] , item [ 1 ] ) for item in arrays ]
2021-10-18 18:35:27 -04:00
2023-03-16 16:27:08 -04:00
# Then, we can patch each schema.
patched_schemas = { }
for name , schema in schemas . items ( ) :
patched_schemas [ name ] = patch_schema ( schema , urn_arrays )
2021-06-17 10:04:28 -07:00
2023-03-16 16:27:08 -04:00
return patched_schemas
def patch_schema ( schema : dict , urn_arrays : Dict [ str , List [ Tuple [ str , str ] ] ] ) - > dict :
"""
This method patches the schema to add an " Urn " property to all urn fields .
Because the inner type in an array is not a named Avro schema , for urn arrays
we annotate the array field and add an " urn_is_array " property .
"""
# We're using Names() to generate a full list of embedded schemas.
all_schemas = avro . schema . Names ( )
patched = avro . schema . make_avsc_object ( schema , names = all_schemas )
for nested in all_schemas . names . values ( ) :
if isinstance ( nested , ( avro . schema . EnumSchema , avro . schema . FixedSchema ) ) :
continue
assert isinstance ( nested , avro . schema . RecordSchema )
# Patch normal urn types.
field : avro . schema . Field
for field in nested . fields :
2023-11-30 18:11:36 -05:00
field_props : dict = field . props # type: ignore
java_props : dict = field_props . get ( " java " , { } )
java_class : Optional [ str ] = java_props . get ( " class " )
2023-03-16 16:27:08 -04:00
if java_class and java_class . startswith (
" com.linkedin.pegasus2avro.common.urn. "
) :
2023-11-30 18:11:36 -05:00
type = java_class . split ( " . " ) [ - 1 ]
entity_types = field_props . get ( " Relationship " , { } ) . get (
" entityTypes " , [ ]
)
field . set_prop ( " Urn " , type )
if entity_types :
field . set_prop ( " entityTypes " , entity_types )
2023-03-16 16:27:08 -04:00
# Patch array urn types.
if nested . name in urn_arrays :
mapping = urn_arrays [ nested . name ]
for field_name , type in mapping :
field = nested . fields_dict [ field_name ]
field . set_prop ( " Urn " , type )
field . set_prop ( " urn_is_array " , True )
2023-11-30 18:11:36 -05:00
return patched . to_json ( ) # type: ignore
2023-03-16 16:27:08 -04:00
def merge_schemas ( schemas_obj : List [ dict ] ) - > str :
# Combine schemas as a "union" of all of the types.
merged = [ " null " ] + schemas_obj
2024-02-29 16:13:43 -08:00
# Check that we don't have the same class name in multiple namespaces.
names_to_spaces : Dict [ str , str ] = { }
2023-03-16 16:27:08 -04:00
# Patch add_name method to NOT complain about duplicate names.
class NamesWithDups ( avro . schema . Names ) :
def add_name ( self , name_attr , space_attr , new_schema ) :
to_add = avro . schema . Name ( name_attr , space_attr , self . default_namespace )
2024-02-29 16:13:43 -08:00
assert to_add . name
assert to_add . space
2023-11-30 18:11:36 -05:00
assert to_add . fullname
2024-02-29 16:13:43 -08:00
if to_add . name in names_to_spaces :
if names_to_spaces [ to_add . name ] != to_add . space :
raise ValueError (
f " Duplicate name { to_add . name } in namespaces { names_to_spaces [ to_add . name ] } and { to_add . space } . "
" This will cause conflicts in the generated code. "
)
else :
names_to_spaces [ to_add . name ] = to_add . space
2023-03-16 16:27:08 -04:00
self . names [ to_add . fullname ] = new_schema
return to_add
cleaned_schema = avro . schema . make_avsc_object ( merged , names = NamesWithDups ( ) )
2021-06-17 10:04:28 -07:00
2023-03-16 16:27:08 -04:00
# Convert back to an Avro schema JSON representation.
2021-06-17 10:04:28 -07:00
out_schema = cleaned_schema . to_json ( )
2023-03-16 16:27:08 -04:00
encoded = json . dumps ( out_schema , indent = 2 )
2021-06-17 10:04:28 -07:00
return encoded
2023-10-25 13:06:12 -04:00
autogen_header = """ # mypy: ignore-errors
# flake8: noqa
2021-02-05 21:03:04 -08:00
2021-06-11 09:44:18 -07:00
# This file is autogenerated by /metadata-ingestion/scripts/avro_codegen.py
# Do not modify manually!
2022-08-04 11:35:52 +00:00
# pylint: skip-file
2021-06-11 09:44:18 -07:00
# fmt: off
2024-03-11 14:14:59 -07:00
# isort: skip_file
2021-06-11 09:44:18 -07:00
"""
2021-10-18 18:35:27 -04:00
autogen_footer = """
# fmt: on
"""
2021-06-11 09:44:18 -07:00
def suppress_checks_in_file ( filepath : Union [ str , Path ] ) - > None :
"""
Adds a couple lines to the top of an autogenerated file :
- Comments to suppress flake8 and black .
- A note stating that the file was autogenerated .
"""
2021-03-18 02:05:05 -04:00
with open ( filepath , " r+ " ) as f :
contents = f . read ( )
f . seek ( 0 , 0 )
2021-06-11 09:44:18 -07:00
f . write ( autogen_header )
2021-03-18 02:05:05 -04:00
f . write ( contents )
2021-06-17 10:04:28 -07:00
f . write ( autogen_footer )
2021-03-18 02:05:05 -04:00
2021-10-18 18:35:27 -04:00
def add_avro_python3_warning ( filepath : Path ) - > None :
contents = filepath . read_text ( )
contents = f """
# The SchemaFromJSONData method only exists in avro-python3, but is called make_avsc_object in avro.
# We can use this fact to detect conflicts between the two packages. Pip won't detect those conflicts
# because both are namespace packages, and hence are allowed to overwrite files from each other.
# This means that installation order matters, which is a pretty unintuitive outcome.
# See https://github.com/pypa/pip/issues/4625 for details.
try :
2023-11-10 09:34:08 -08:00
from avro . schema import SchemaFromJSONData # type: ignore
2021-10-18 18:35:27 -04:00
import warnings
warnings . warn ( " It seems like ' avro-python3 ' is installed, which conflicts with the ' avro ' package used by datahub. "
+ " Try running `pip uninstall avro-python3 && pip install --upgrade --force-reinstall avro` to fix this issue. " )
except ImportError :
pass
{ contents }
"""
filepath . write_text ( contents )
2021-06-17 10:04:28 -07:00
load_schema_method = """
import functools
import pathlib
2023-03-16 16:27:08 -04:00
@functools.lru_cache ( maxsize = None )
2021-06-17 10:04:28 -07:00
def _load_schema ( schema_name : str ) - > str :
return ( pathlib . Path ( __file__ ) . parent / f " { schema_name } .avsc " ) . read_text ( )
"""
individual_schema_method = """
def get { schema_name } Schema ( ) - > str :
return _load_schema ( " {schema_name} " )
"""
2021-02-05 21:03:04 -08:00
2021-06-17 10:04:28 -07:00
def make_load_schema_methods ( schemas : Iterable [ str ] ) - > str :
return load_schema_method + " " . join (
individual_schema_method . format ( schema_name = schema ) for schema in schemas
2021-02-12 10:46:28 -08:00
)
2021-02-05 21:03:04 -08:00
2023-03-16 16:27:08 -04:00
def save_raw_schemas ( schema_save_dir : Path , schemas : Dict [ str , dict ] ) - > None :
# Save raw avsc files.
for name , schema in schemas . items ( ) :
( schema_save_dir / f " { name } .avsc " ) . write_text ( json . dumps ( schema , indent = 2 ) )
# Add getXSchema methods.
with open ( schema_save_dir / " __init__.py " , " w " ) as schema_dir_init :
schema_dir_init . write ( make_load_schema_methods ( schemas . keys ( ) ) )
2022-08-07 14:52:58 +00:00
def annotate_aspects ( aspects : List [ dict ] , schema_class_file : Path ) - > None :
schema_classes_lines = schema_class_file . read_text ( ) . splitlines ( )
line_lookup_table = { line : i for i , line in enumerate ( schema_classes_lines ) }
2023-12-08 13:13:49 -05:00
# Import the _Aspect class.
2022-08-07 14:52:58 +00:00
schema_classes_lines [
line_lookup_table [ " __SCHEMAS: Dict[str, RecordSchema] = {} " ]
] + = """
2023-12-08 13:13:49 -05:00
from datahub . _codegen . aspect import _Aspect
2022-08-07 14:52:58 +00:00
"""
for aspect in aspects :
2025-01-22 13:46:24 -08:00
className = f " { aspect [ ' name ' ] } Class "
2022-08-07 14:52:58 +00:00
aspectName = aspect [ " Aspect " ] [ " name " ]
class_def_original = f " class { className } (DictWrapper): "
# Make the aspects inherit from the Aspect class.
class_def_line = line_lookup_table [ class_def_original ]
schema_classes_lines [ class_def_line ] = f " class { className } (_Aspect): "
# Define the ASPECT_NAME class attribute.
# There's always an empty line between the docstring and the RECORD_SCHEMA class attribute.
# We need to find it and insert our line of code there.
empty_line = class_def_line + 1
while not (
schema_classes_lines [ empty_line ] . strip ( ) == " "
and schema_classes_lines [ empty_line + 1 ]
. strip ( )
. startswith ( " RECORD_SCHEMA = " )
) :
empty_line + = 1
2022-08-16 00:21:40 +00:00
schema_classes_lines [ empty_line ] = " \n "
schema_classes_lines [ empty_line ] + = f " \n ASPECT_NAME = ' { aspectName } ' "
if " type " in aspect [ " Aspect " ] :
2025-01-22 13:46:24 -08:00
schema_classes_lines [ empty_line ] + = (
f " \n ASPECT_TYPE = ' { aspect [ ' Aspect ' ] [ ' type ' ] } ' "
)
2023-03-16 16:27:08 -04:00
aspect_info = {
k : v for k , v in aspect [ " Aspect " ] . items ( ) if k not in { " name " , " type " }
}
schema_classes_lines [ empty_line ] + = f " \n ASPECT_INFO = { aspect_info } "
2023-01-17 19:41:43 -08:00
schema_classes_lines [ empty_line + 1 ] + = " \n "
2022-08-16 00:21:40 +00:00
# Finally, generate a big list of all available aspects.
newline = " \n "
schema_classes_lines . append (
f """
2022-09-10 18:43:11 -07:00
ASPECT_CLASSES : List [ Type [ _Aspect ] ] = [
2025-01-22 13:46:24 -08:00
{ f " , { newline } " . join ( f " { aspect [ ' name ' ] } Class " for aspect in aspects ) }
2022-08-16 00:21:40 +00:00
]
2023-01-17 19:41:43 -08:00
2023-04-27 02:14:13 +05:30
ASPECT_NAME_MAP : Dict [ str , Type [ _Aspect ] ] = { {
aspect . get_aspect_name ( ) : aspect
for aspect in ASPECT_CLASSES
} }
2025-03-03 10:05:26 -08:00
from typing import Literal
2023-04-27 02:14:13 +05:30
from typing_extensions import TypedDict
class AspectBag ( TypedDict , total = False ) :
2025-01-22 13:46:24 -08:00
{ f " { newline } " . join ( f " { aspect [ ' Aspect ' ] [ ' name ' ] } : { aspect [ ' name ' ] } Class " for aspect in aspects ) }
2023-04-27 02:14:13 +05:30
2023-01-17 19:41:43 -08:00
KEY_ASPECTS : Dict [ str , Type [ _Aspect ] ] = { {
2025-01-22 13:46:24 -08:00
{ f " , { newline } " . join ( f " ' { aspect [ ' Aspect ' ] [ ' keyForEntity ' ] } ' : { aspect [ ' name ' ] } Class " for aspect in aspects if aspect [ " Aspect " ] . get ( " keyForEntity " ) ) }
2023-01-17 19:41:43 -08:00
} }
2025-03-03 10:05:26 -08:00
ENTITY_TYPE_NAMES : List [ str ] = [
{ f " , { newline } " . join ( f " ' { aspect [ ' Aspect ' ] [ ' keyForEntity ' ] } ' " for aspect in aspects if aspect [ " Aspect " ] . get ( " keyForEntity " ) ) }
]
EntityTypeName = Literal [
{ f " , { newline } " . join ( f " ' { aspect [ ' Aspect ' ] [ ' keyForEntity ' ] } ' " for aspect in aspects if aspect [ " Aspect " ] . get ( " keyForEntity " ) ) }
]
2022-08-16 00:21:40 +00:00
"""
)
2022-08-07 14:52:58 +00:00
schema_class_file . write_text ( " \n " . join ( schema_classes_lines ) )
2023-11-30 18:11:36 -05:00
def write_urn_classes ( key_aspects : List [ dict ] , urn_dir : Path ) - > None :
urn_dir . mkdir ( )
( urn_dir / " __init__.py " ) . write_text ( " \n # This file is intentionally left empty. " )
code = """
# This file contains classes corresponding to entity URNs.
2025-03-03 10:05:26 -08:00
from typing import ClassVar , List , Optional , Type , TYPE_CHECKING , Union , Literal
2023-11-30 18:11:36 -05:00
import functools
from deprecated . sphinx import deprecated as _sphinx_deprecated
from datahub . utilities . urn_encoder import UrnEncoder
from datahub . utilities . urns . _urn_base import _SpecificUrn , Urn
from datahub . utilities . urns . error import InvalidUrnError
deprecated = functools . partial ( _sphinx_deprecated , version = " 0.12.0.2 " )
"""
for aspect in key_aspects :
entity_type = aspect [ " Aspect " ] [ " keyForEntity " ]
code + = generate_urn_class ( entity_type , aspect )
( urn_dir / " urn_defs.py " ) . write_text ( code )
def capitalize_entity_name ( entity_name : str ) - > str :
# Examples:
# corpuser -> CorpUser
# corpGroup -> CorpGroup
# mlModelDeployment -> MlModelDeployment
if entity_name == " corpuser " :
return " CorpUser "
return f " { entity_name [ 0 ] . upper ( ) } { entity_name [ 1 : ] } "
def python_type ( avro_type : str ) - > str :
if avro_type == " string " :
return " str "
elif (
isinstance ( avro_type , dict )
and avro_type . get ( " type " ) == " enum "
and avro_type . get ( " name " ) == " FabricType "
) :
# TODO: make this stricter using an enum
return " str "
raise ValueError ( f " unknown type { avro_type } " )
def field_type ( field : dict ) - > str :
return python_type ( field [ " type " ] )
def field_name ( field : dict ) - > str :
manual_mapping = {
" origin " : " env " ,
" platformName " : " platform_name " ,
}
name : str = field [ " name " ]
if name in manual_mapping :
return manual_mapping [ name ]
# If the name is mixed case, convert to snake case.
if name . lower ( ) != name :
# Inject an underscore before each capital letter, and then convert to lowercase.
return re . sub ( r " (?<!^)(?=[A-Z]) " , " _ " , name ) . lower ( )
return name
_create_from_id = """
@classmethod
@deprecated ( reason = " Use the constructor instead " )
def create_from_id ( cls , id : str ) - > " {class_name} " :
return cls ( id )
"""
_extra_urn_methods : Dict [ str , List [ str ] ] = {
" corpGroup " : [ _create_from_id . format ( class_name = " CorpGroupUrn " ) ] ,
" corpuser " : [ _create_from_id . format ( class_name = " CorpUserUrn " ) ] ,
" dataFlow " : [
"""
@classmethod
def create_from_ids (
cls ,
orchestrator : str ,
flow_id : str ,
env : str ,
platform_instance : Optional [ str ] = None ,
) - > " DataFlowUrn " :
return cls (
orchestrator = orchestrator ,
flow_id = f " { platform_instance } . { flow_id } " if platform_instance else flow_id ,
cluster = env ,
)
@deprecated ( reason = " Use .orchestrator instead " )
def get_orchestrator_name ( self ) - > str :
return self . orchestrator
@deprecated ( reason = " Use .flow_id instead " )
def get_flow_id ( self ) - > str :
return self . flow_id
@deprecated ( reason = " Use .cluster instead " )
def get_env ( self ) - > str :
return self . cluster
""" ,
] ,
" dataJob " : [
"""
@classmethod
def create_from_ids ( cls , data_flow_urn : str , job_id : str ) - > " DataJobUrn " :
return cls ( data_flow_urn , job_id )
def get_data_flow_urn ( self ) - > " DataFlowUrn " :
return DataFlowUrn . from_string ( self . flow )
@deprecated ( reason = " Use .job_id instead " )
def get_job_id ( self ) - > str :
return self . job_id
"""
] ,
" dataPlatform " : [ _create_from_id . format ( class_name = " DataPlatformUrn " ) ] ,
" dataProcessInstance " : [
_create_from_id . format ( class_name = " DataProcessInstanceUrn " ) ,
"""
@deprecated ( reason = " Use .id instead " )
def get_dataprocessinstance_id ( self ) - > str :
return self . id
""" ,
] ,
" dataset " : [
"""
@classmethod
def create_from_ids (
cls ,
platform_id : str ,
table_name : str ,
env : str ,
platform_instance : Optional [ str ] = None ,
) - > " DatasetUrn " :
return DatasetUrn (
platform = platform_id ,
name = f " { platform_instance } . { table_name } " if platform_instance else table_name ,
env = env ,
)
from datahub . utilities . urns . field_paths import get_simple_field_path_from_v2_field_path as _get_simple_field_path_from_v2_field_path
get_simple_field_path_from_v2_field_path = staticmethod ( deprecated ( reason = ' Use the function from the field_paths module instead ' ) ( _get_simple_field_path_from_v2_field_path ) )
def get_data_platform_urn ( self ) - > " DataPlatformUrn " :
return DataPlatformUrn . from_string ( self . platform )
@deprecated ( reason = " Use .name instead " )
def get_dataset_name ( self ) - > str :
return self . name
@deprecated ( reason = " Use .env instead " )
def get_env ( self ) - > str :
return self . env
"""
] ,
" domain " : [ _create_from_id . format ( class_name = " DomainUrn " ) ] ,
" notebook " : [
"""
@deprecated ( reason = " Use .notebook_tool instead " )
def get_platform_id ( self ) - > str :
return self . notebook_tool
@deprecated ( reason = " Use .notebook_id instead " )
def get_notebook_id ( self ) - > str :
return self . notebook_id
"""
] ,
" tag " : [ _create_from_id . format ( class_name = " TagUrn " ) ] ,
2025-03-05 14:25:20 +01:00
" chart " : [
"""
@classmethod
def create_from_ids (
cls ,
platform : str ,
name : str ,
platform_instance : Optional [ str ] = None ,
) - > " ChartUrn " :
return ChartUrn (
dashboard_tool = platform ,
chart_id = f " { platform_instance } . { name } " if platform_instance else name ,
)
"""
] ,
" dashboard " : [
"""
@classmethod
def create_from_ids (
cls ,
platform : str ,
name : str ,
platform_instance : Optional [ str ] = None ,
) - > " DashboardUrn " :
return DashboardUrn (
dashboard_tool = platform ,
dashboard_id = f " { platform_instance } . { name } " if platform_instance else name ,
)
"""
] ,
2023-11-30 18:11:36 -05:00
}
def generate_urn_class ( entity_type : str , key_aspect : dict ) - > str :
""" Generate a class definition for this entity.
The class definition has the following structure :
- A class attribute ENTITY_TYPE , which is the entity type string .
- A class attribute URN_PARTS , which is the number of parts in the URN .
- A constructor that takes the URN parts as arguments . The field names
will match the key aspect ' s field names. It will also have a _allow_coercion
flag , which will allow for some normalization ( e . g . upper case env ) .
Then , each part will be validated ( including nested calls for urn subparts ) .
- Utilities for converting to / from the key aspect .
- Any additional methods that are required for this entity type , defined above .
These are primarily for backwards compatibility .
- Getter methods for each field .
"""
class_name = f " { capitalize_entity_name ( entity_type ) } Urn "
fields = copy . deepcopy ( key_aspect [ " fields " ] )
if entity_type == " container " :
# The annotations say guid is optional, but it is required.
# This is a quick fix of the annotations.
assert field_name ( fields [ 0 ] ) == " guid "
assert fields [ 0 ] [ " type " ] == [ " null " , " string " ]
fields [ 0 ] [ " type " ] = " string "
2025-01-22 13:46:24 -08:00
arg_count = len ( fields )
2023-11-30 18:11:36 -05:00
2025-01-10 09:45:31 -08:00
field_urn_type_classes = { }
for field in fields :
# Figure out if urn types are valid for each field.
field_urn_type_class = None
if field_name ( field ) == " platform " :
field_urn_type_class = " DataPlatformUrn "
elif field . get ( " Urn " ) :
if len ( field . get ( " entityTypes " , [ ] ) ) == 1 :
field_entity_type = field [ " entityTypes " ] [ 0 ]
field_urn_type_class = f " { capitalize_entity_name ( field_entity_type ) } Urn "
else :
field_urn_type_class = " Urn "
field_urn_type_classes [ field_name ( field ) ] = field_urn_type_class
2025-01-22 13:46:24 -08:00
if arg_count == 1 :
field = fields [ 0 ]
if field_urn_type_classes [ field_name ( field ) ] is None :
# All single-arg urn types should accept themselves.
field_urn_type_classes [ field_name ( field ) ] = class_name
2025-01-10 09:45:31 -08:00
2023-11-30 18:11:36 -05:00
_init_arg_parts : List [ str ] = [ ]
for field in fields :
2025-01-10 09:45:31 -08:00
field_urn_type_class = field_urn_type_classes [ field_name ( field ) ]
2023-11-30 18:11:36 -05:00
default = ' " PROD " ' if field_name ( field ) == " env " else None
2025-01-10 09:45:31 -08:00
type_hint = field_type ( field )
if field_urn_type_class :
type_hint = f ' Union[ " { field_urn_type_class } " , str] '
_arg_part = f " { field_name ( field ) } : { type_hint } "
2023-11-30 18:11:36 -05:00
if default :
_arg_part + = f " = { default } "
_init_arg_parts . append ( _arg_part )
init_args = " , " . join ( _init_arg_parts )
super_init_args = " , " . join ( field_name ( field ) for field in fields )
parse_ids_mapping = " , " . join (
f " { field_name ( field ) } =entity_ids[ { i } ] " for i , field in enumerate ( fields )
)
key_aspect_class = f " { key_aspect [ ' name ' ] } Class "
to_key_aspect_args = " , " . join (
# The LHS bypasses any field name aliases.
f " { field [ ' name ' ] } =self. { field_name ( field ) } "
for field in fields
)
from_key_aspect_args = " , " . join (
f " { field_name ( field ) } =key_aspect. { field [ ' name ' ] } " for field in fields
)
init_coercion = " "
init_validation = " "
for field in fields :
2024-07-12 22:32:54 -07:00
init_validation + = f ' if not { field_name ( field ) } : \n raise InvalidUrnError( " { class_name } { field_name ( field ) } cannot be empty " ) \n '
2023-11-30 18:11:36 -05:00
# Generalized mechanism for validating embedded urns.
2025-01-10 09:45:31 -08:00
field_urn_type_class = field_urn_type_classes [ field_name ( field ) ]
2025-01-22 13:46:24 -08:00
if field_urn_type_class and field_urn_type_class == class_name :
# First, we need to extract the main piece from the urn type.
init_validation + = (
f " if isinstance( { field_name ( field ) } , { field_urn_type_class } ): \n "
f " { field_name ( field ) } = { field_name ( field ) } . { field_name ( field ) } \n "
)
# If it's still an urn type, that's a problem.
init_validation + = (
f " elif isinstance( { field_name ( field ) } , Urn): \n "
f " raise InvalidUrnError(f ' Expecting a { field_urn_type_class } but got {{ { field_name ( field ) } }} ' ) \n "
)
# Then, we do character validation as normal.
init_validation + = (
f " if UrnEncoder.contains_reserved_char( { field_name ( field ) } ): \n "
f " raise InvalidUrnError(f ' { class_name } { field_name ( field ) } contains reserved characters ' ) \n "
)
elif field_urn_type_class :
init_validation + = f " { field_name ( field ) } = str( { field_name ( field ) } ) # convert urn type to str \n "
2023-11-30 18:11:36 -05:00
init_validation + = (
f " assert { field_urn_type_class } .from_string( { field_name ( field ) } ) \n "
)
else :
init_validation + = (
2024-07-12 22:32:54 -07:00
f " if UrnEncoder.contains_reserved_char( { field_name ( field ) } ): \n "
f " raise InvalidUrnError(f ' { class_name } { field_name ( field ) } contains reserved characters ' ) \n "
2023-11-30 18:11:36 -05:00
)
2025-01-22 13:46:24 -08:00
# TODO add ALL_ENV_TYPES validation
2023-11-30 18:11:36 -05:00
2025-01-22 13:46:24 -08:00
# Field coercion logic.
2023-11-30 18:11:36 -05:00
if field_name ( field ) == " env " :
init_coercion + = " env = env.upper() \n "
2025-01-22 13:46:24 -08:00
elif field_name ( field ) == " platform " :
# For platform names in particular, we also qualify them when they don't have the prefix.
# We can rely on the DataPlatformUrn constructor to do this prefixing.
init_coercion + = " platform = DataPlatformUrn(platform).urn() \n "
elif field_urn_type_class is not None :
# For urn types, we need to parse them into urn types where appropriate.
# Otherwise, we just need to encode special characters.
init_coercion + = (
f " if isinstance( { field_name ( field ) } , str): \n "
f " if { field_name ( field ) } .startswith( ' urn:li: ' ): \n "
f " try: \n "
f " { field_name ( field ) } = { field_urn_type_class } .from_string( { field_name ( field ) } ) \n "
f " except InvalidUrnError: \n "
f " raise InvalidUrnError(f ' Expecting a { field_urn_type_class } but got {{ { field_name ( field ) } }} ' ) \n "
f " else: \n "
f " { field_name ( field ) } = UrnEncoder.encode_string( { field_name ( field ) } ) \n "
)
else :
2023-11-30 18:11:36 -05:00
# For all non-urns, run the value through the UrnEncoder.
init_coercion + = (
f " { field_name ( field ) } = UrnEncoder.encode_string( { field_name ( field ) } ) \n "
)
if not init_coercion :
init_coercion = " pass "
# TODO include the docs for each field
code = f """
if TYPE_CHECKING :
from datahub . metadata . schema_classes import { key_aspect_class }
class { class_name } ( _SpecificUrn ) :
2025-03-03 10:05:26 -08:00
ENTITY_TYPE : ClassVar [ Literal [ " {entity_type} " ] ] = " {entity_type} "
2024-12-27 13:50:43 -05:00
_URN_PARTS : ClassVar [ int ] = { arg_count }
2023-11-30 18:11:36 -05:00
def __init__ ( self , { init_args } , * , _allow_coercion : bool = True ) - > None :
if _allow_coercion :
# Field coercion logic (if any is required).
2025-01-22 13:46:24 -08:00
{ textwrap . indent ( init_coercion . strip ( ) , prefix = " " * 4 * 3 ) }
2023-11-30 18:11:36 -05:00
# Validation logic.
2025-01-22 13:46:24 -08:00
{ textwrap . indent ( init_validation . strip ( ) , prefix = " " * 4 * 2 ) }
2023-11-30 18:11:36 -05:00
super ( ) . __init__ ( self . ENTITY_TYPE , [ { super_init_args } ] )
@classmethod
def _parse_ids ( cls , entity_ids : List [ str ] ) - > " {class_name} " :
2024-12-27 13:50:43 -05:00
if len ( entity_ids ) != cls . _URN_PARTS :
raise InvalidUrnError ( f " { class_name } should have {{ cls._URN_PARTS }} parts, got {{ len(entity_ids) }} : {{ entity_ids }} " )
2023-11-30 18:11:36 -05:00
return cls ( { parse_ids_mapping } , _allow_coercion = False )
@classmethod
def underlying_key_aspect_type ( cls ) - > Type [ " {key_aspect_class} " ] :
from datahub . metadata . schema_classes import { key_aspect_class }
return { key_aspect_class }
def to_key_aspect ( self ) - > " {key_aspect_class} " :
from datahub . metadata . schema_classes import { key_aspect_class }
return { key_aspect_class } ( { to_key_aspect_args } )
@classmethod
def from_key_aspect ( cls , key_aspect : " {key_aspect_class} " ) - > " {class_name} " :
return cls ( { from_key_aspect_args } )
"""
for extra_method in _extra_urn_methods . get ( entity_type , [ ] ) :
code + = textwrap . indent ( extra_method , prefix = " " * 4 )
for i , field in enumerate ( fields ) :
code + = f """
@property
def { field_name ( field ) } ( self ) - > { field_type ( field ) } :
2025-02-24 11:57:51 -08:00
return self . _entity_ids [ { i } ]
2023-11-30 18:11:36 -05:00
"""
return code
2021-06-17 10:04:28 -07:00
@click.command ( )
2023-01-17 19:41:43 -08:00
@click.argument (
" entity_registry " , type = click . Path ( exists = True , dir_okay = False ) , required = True
)
2023-03-16 16:27:08 -04:00
@click.argument (
" pdl_path " , type = click . Path ( exists = True , file_okay = False ) , required = True
)
2022-08-07 14:52:58 +00:00
@click.argument (
" schemas_path " , type = click . Path ( exists = True , file_okay = False ) , required = True
)
2021-06-17 10:04:28 -07:00
@click.argument ( " outdir " , type = click . Path ( ) , required = True )
2023-09-22 16:43:58 -07:00
@click.option ( " --check-unused-aspects " , is_flag = True , default = False )
@click.option ( " --enable-custom-loader " , is_flag = True , default = True )
2023-03-16 16:27:08 -04:00
def generate (
2023-09-22 16:43:58 -07:00
entity_registry : str ,
pdl_path : str ,
schemas_path : str ,
outdir : str ,
check_unused_aspects : bool ,
enable_custom_loader : bool ,
2023-03-16 16:27:08 -04:00
) - > None :
2023-01-17 19:41:43 -08:00
entities = load_entity_registry ( Path ( entity_registry ) )
schemas = load_schemas ( schemas_path )
2023-03-16 16:27:08 -04:00
# Patch the avsc files.
schemas = patch_schemas ( schemas , Path ( pdl_path ) )
2023-01-17 19:41:43 -08:00
# Special handling for aspects.
aspects = {
schema [ " Aspect " ] [ " name " ] : schema
for schema in schemas . values ( )
if schema . get ( " Aspect " )
2022-08-07 14:52:58 +00:00
}
2023-11-30 18:11:36 -05:00
# Copy entity registry info into the corresponding key aspect.
2023-01-17 19:41:43 -08:00
for entity in entities :
# This implicitly requires that all keyAspects are resolvable.
aspect = aspects [ entity . keyAspect ]
2022-08-07 14:52:58 +00:00
2023-01-17 19:41:43 -08:00
# This requires that entities cannot share a keyAspect.
2023-01-18 01:42:08 -08:00
if (
" keyForEntity " in aspect [ " Aspect " ]
and aspect [ " Aspect " ] [ " keyForEntity " ] != entity . name
) :
raise ValueError (
2025-01-22 13:46:24 -08:00
f " Entity key { entity . keyAspect } is used by { aspect [ ' Aspect ' ] [ ' keyForEntity ' ] } and { entity . name } "
2023-01-18 01:42:08 -08:00
)
2022-08-07 14:52:58 +00:00
2023-04-16 02:03:05 +05:30
# Also require that the aspect list is deduplicated.
duplicate_aspects = collections . Counter ( entity . aspects ) - collections . Counter (
set ( entity . aspects )
)
if duplicate_aspects :
raise ValueError (
f " Entity { entity . name } has duplicate aspects: { duplicate_aspects } "
)
2023-01-17 19:41:43 -08:00
aspect [ " Aspect " ] [ " keyForEntity " ] = entity . name
aspect [ " Aspect " ] [ " entityCategory " ] = entity . category
aspect [ " Aspect " ] [ " entityAspects " ] = entity . aspects
if entity . doc is not None :
aspect [ " Aspect " ] [ " entityDoc " ] = entity . doc
2021-06-17 10:04:28 -07:00
2023-01-17 19:41:43 -08:00
# Check for unused aspects. We currently have quite a few.
2023-09-22 16:43:58 -07:00
if check_unused_aspects :
unused_aspects = set ( aspects . keys ( ) ) - set ( ) . union (
{ entity . keyAspect for entity in entities } ,
* ( set ( entity . aspects ) for entity in entities ) ,
)
if unused_aspects :
raise ValueError ( f " Unused aspects: { unused_aspects } " )
2021-06-17 10:04:28 -07:00
2023-01-17 19:41:43 -08:00
merged_schema = merge_schemas ( list ( schemas . values ( ) ) )
2021-06-17 10:04:28 -07:00
write_schema_files ( merged_schema , outdir )
2021-10-18 18:35:27 -04:00
# Schema files post-processing.
( Path ( outdir ) / " __init__.py " ) . write_text ( " # This file is intentionally empty. \n " )
add_avro_python3_warning ( Path ( outdir ) / " schema_classes.py " )
2022-08-07 14:52:58 +00:00
annotate_aspects (
2023-01-17 19:41:43 -08:00
list ( aspects . values ( ) ) ,
2022-08-07 14:52:58 +00:00
Path ( outdir ) / " schema_classes.py " ,
)
2021-02-05 21:03:04 -08:00
2023-09-22 16:43:58 -07:00
if enable_custom_loader :
# Move schema_classes.py -> _schema_classes.py
# and add a custom loader.
( Path ( outdir ) / " _schema_classes.py " ) . write_text (
( Path ( outdir ) / " schema_classes.py " ) . read_text ( )
)
( Path ( outdir ) / " schema_classes.py " ) . write_text (
"""
# This is a specialized shim layer that allows us to dynamically load custom models from elsewhere.
import importlib
from typing import TYPE_CHECKING
2024-11-20 13:33:30 -08:00
from datahub . _codegen . aspect import _Aspect as _Aspect
2023-11-30 18:11:36 -05:00
from datahub . utilities . docs_build import IS_SPHINX_BUILD
2023-09-22 16:43:58 -07:00
from datahub . utilities . _custom_package_loader import get_custom_models_package
_custom_package_path = get_custom_models_package ( )
if TYPE_CHECKING or not _custom_package_path :
from . _schema_classes import *
# Required explicitly because __all__ doesn't include _ prefixed names.
2023-12-08 13:13:49 -05:00
from . _schema_classes import __SCHEMA_TYPES
2023-11-30 18:11:36 -05:00
if IS_SPHINX_BUILD :
# Set __module__ to the current module so that Sphinx will document the
# classes as belonging to this module instead of the custom package.
for _cls in list ( globals ( ) . values ( ) ) :
if hasattr ( _cls , " __module__ " ) and " datahub.metadata._schema_classes " in _cls . __module__ :
_cls . __module__ = __name__
2023-09-22 16:43:58 -07:00
else :
_custom_package = importlib . import_module ( _custom_package_path )
globals ( ) . update ( _custom_package . __dict__ )
2023-11-30 18:11:36 -05:00
"""
)
( Path ( outdir ) / " urns.py " ) . write_text (
"""
# This is a specialized shim layer that allows us to dynamically load custom URN types from elsewhere.
import importlib
from typing import TYPE_CHECKING
from datahub . utilities . docs_build import IS_SPHINX_BUILD
from datahub . utilities . _custom_package_loader import get_custom_urns_package
2024-11-20 13:33:30 -08:00
from datahub . utilities . urns . _urn_base import Urn as Urn # noqa: F401
2023-09-22 16:43:58 -07:00
2023-11-30 18:11:36 -05:00
_custom_package_path = get_custom_urns_package ( )
if TYPE_CHECKING or not _custom_package_path :
from . _urns . urn_defs import * # noqa: F401
if IS_SPHINX_BUILD :
# Set __module__ to the current module so that Sphinx will document the
# classes as belonging to this module instead of the custom package.
for _cls in list ( globals ( ) . values ( ) ) :
if hasattr ( _cls , " __module__ " ) and ( " datahub.metadata._urns.urn_defs " in _cls . __module__ or _cls is Urn ) :
_cls . __module__ = __name__
else :
_custom_package = importlib . import_module ( _custom_package_path )
globals ( ) . update ( _custom_package . __dict__ )
2023-09-22 16:43:58 -07:00
"""
)
2023-11-30 18:11:36 -05:00
# Generate URN classes.
urn_dir = Path ( outdir ) / " _urns "
write_urn_classes (
[ aspect for aspect in aspects . values ( ) if aspect [ " Aspect " ] . get ( " keyForEntity " ) ] ,
urn_dir ,
)
# Save raw schema files in codegen as well.
schema_save_dir = Path ( outdir ) / " schemas "
schema_save_dir . mkdir ( )
for schema_out_file , schema in schemas . items ( ) :
( schema_save_dir / f " { schema_out_file } .avsc " ) . write_text (
json . dumps ( schema , indent = 2 )
)
2023-03-16 16:27:08 -04:00
# Keep a copy of a few raw avsc files.
required_avsc_schemas = { " MetadataChangeEvent " , " MetadataChangeProposal " }
save_raw_schemas (
schema_save_dir ,
{
name : schema
for name , schema in schemas . items ( )
if name in required_avsc_schemas
} ,
)
2021-06-17 10:04:28 -07:00
2021-06-11 09:44:18 -07:00
# Add headers for all generated files
generated_files = Path ( outdir ) . glob ( " **/*.py " )
for file in generated_files :
suppress_checks_in_file ( file )
2021-02-05 21:03:04 -08:00
if __name__ == " __main__ " :
generate ( )