mirror of
				https://github.com/datahub-project/datahub.git
				synced 2025-11-03 20:27:50 +00:00 
			
		
		
		
	feat: Adding clarity around qualified unions and removing extra lines for structs (#3091)
Co-authored-by: Ravindra Lanka <rlanka@acryl.io>
This commit is contained in:
		
							parent
							
								
									801e39bc40
								
							
						
					
					
						commit
						b73d0ebb8b
					
				@ -1,2 +1,4 @@
 | 
			
		||||
export const KEY_SCHEMA_PREFIX = '[key=True].';
 | 
			
		||||
export const VERSION_PREFIX = '[version=2.0].';
 | 
			
		||||
export const ARRAY_TOKEN = '[type=array]';
 | 
			
		||||
export const UNION_TOKEN = '[type=union]';
 | 
			
		||||
 | 
			
		||||
@ -1,5 +1,4 @@
 | 
			
		||||
const ARRAY_TOKEN = '[type=array]';
 | 
			
		||||
const UNION_TOKEN = '[type=union]';
 | 
			
		||||
import { ARRAY_TOKEN, UNION_TOKEN } from './constants';
 | 
			
		||||
 | 
			
		||||
export default function translateFieldPathSegment(fieldPathSegment, i, fieldPathParts) {
 | 
			
		||||
    // for each segment, convert its fieldPath representation into a human readable version
 | 
			
		||||
 | 
			
		||||
@ -9,7 +9,7 @@ import {
 | 
			
		||||
} from '../../../../../../types.generated';
 | 
			
		||||
import { convertTagsForUpdate } from '../../../../../shared/tags/utils/convertTagsForUpdate';
 | 
			
		||||
import { SchemaDiffSummary } from '../components/SchemaVersionSummary';
 | 
			
		||||
import { KEY_SCHEMA_PREFIX, VERSION_PREFIX } from './constants';
 | 
			
		||||
import { KEY_SCHEMA_PREFIX, UNION_TOKEN, VERSION_PREFIX } from './constants';
 | 
			
		||||
import { ExtendedSchemaFields } from './types';
 | 
			
		||||
 | 
			
		||||
export function convertEditableSchemaMeta(
 | 
			
		||||
@ -85,9 +85,24 @@ export function groupByFieldPath(
 | 
			
		||||
        const row = { children: undefined, ...rows[rowIndex] };
 | 
			
		||||
 | 
			
		||||
        for (let j = rowIndex - 1; j >= 0; j--) {
 | 
			
		||||
            if (row.fieldPath.indexOf(rows[j].fieldPath) >= 0) {
 | 
			
		||||
                parentRow = outputRowByPath[rows[j].fieldPath];
 | 
			
		||||
                break;
 | 
			
		||||
            const rowTokens = row.fieldPath.split('.');
 | 
			
		||||
            const isQualifyingUnionField = rowTokens[rowTokens.length - 3] === UNION_TOKEN;
 | 
			
		||||
            if (isQualifyingUnionField) {
 | 
			
		||||
                // in the case of unions, parent will not be a subset of the child
 | 
			
		||||
                rowTokens.splice(rowTokens.length - 2, 1);
 | 
			
		||||
                const parentPath = rowTokens.join('.');
 | 
			
		||||
                console.log({ parentPath });
 | 
			
		||||
 | 
			
		||||
                if (rows[j].fieldPath === parentPath) {
 | 
			
		||||
                    parentRow = outputRowByPath[rows[j].fieldPath];
 | 
			
		||||
                    break;
 | 
			
		||||
                }
 | 
			
		||||
            } else if (!isQualifyingUnionField) {
 | 
			
		||||
                // in the case of structs, arrays, etc, parent will be a subset
 | 
			
		||||
                if (row.fieldPath.indexOf(rows[j].fieldPath) >= 0) {
 | 
			
		||||
                    parentRow = outputRowByPath[rows[j].fieldPath];
 | 
			
		||||
                    break;
 | 
			
		||||
                }
 | 
			
		||||
            }
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
@ -14,7 +14,6 @@ usage stats and data profiles. Therefore, it must satisfy the following requirem
 | 
			
		||||
 | 
			
		||||
* must be unique across all fields within a schema.
 | 
			
		||||
* make schema navigation in the UI more intuitive.
 | 
			
		||||
* allow for capturing and editing documentation of intermediate structs/records leading to a filed, and not just fields.
 | 
			
		||||
* allow for identifying the type of schema the field is part of, such as a `key-schema` or a `value-schema`.
 | 
			
		||||
* allow for future-evolution
 | 
			
		||||
 | 
			
		||||
@ -49,7 +48,8 @@ the record type `A` or `B`.
 | 
			
		||||
The syntax for V2 encoding of the `fieldPath` is captured in the following grammar. The `FieldPathSpec` is essentially
 | 
			
		||||
the type annotated path of the member, with each token along the path representing one level of nested member,
 | 
			
		||||
starting from the most-enclosing type, leading up to the member. In the case of `unions` that have `one-of` semantics,
 | 
			
		||||
the corresponding field will be emitted once for each `member` of the union as its `type`.
 | 
			
		||||
the corresponding field will be emitted once for each `member` of the union as its `type`, along with one path
 | 
			
		||||
corresponding to the `union` itself.
 | 
			
		||||
 | 
			
		||||
### Formal Spec:
 | 
			
		||||
 | 
			
		||||
@ -58,9 +58,8 @@ the corresponding field will be emitted once for each `member` of the union as i
 | 
			
		||||
                   | <VersionToken>.<FieldPathSpec> // when part of a value schema
 | 
			
		||||
<VersionToken> := [version=<VersionId>] // [version=2.0] for v2
 | 
			
		||||
<PartOfKeySchemaToken> := [key=True]  // when part of a key schema
 | 
			
		||||
<FieldPathSpec> := <PathTokenPrefix>+  // this is the type prefixed path upto the intermediate type or a field.
 | 
			
		||||
<PathTokenPrefix> := <TypePrefixToken> | <FieldToken> // intermeidate type, or a field
 | 
			
		||||
<FieldToken> := <TypePrefixToken>.<name_of_the_field> // type of field + field name
 | 
			
		||||
<FieldPathSpec> := <FieldToken>+  // this is the type prefixed path field (nested if repeats).
 | 
			
		||||
<FieldToken> := <TypePrefixToken>.<name_of_the_field> // type prefixed path of a field.
 | 
			
		||||
<TypePrefixToken> := <NestedTypePrefixToken>.<SimpleTypeToken> | <SimpleTypeToken>
 | 
			
		||||
<NestedTypePrefixToken> := [type=<NestedType>]
 | 
			
		||||
<SimpleTypeToken> := [type=<SimpleType>]
 | 
			
		||||
@ -68,16 +67,12 @@ the corresponding field will be emitted once for each `member` of the union as i
 | 
			
		||||
<SimpleType> := int | float | double | string | fixed | enum
 | 
			
		||||
```
 | 
			
		||||
 | 
			
		||||
For the [example above](#example-ambiguous-field-path), this encoding would produce the following 5 unique field paths.
 | 
			
		||||
Notice that there are 3 unique paths corresponding to the `union`, record `A`, and record `B` intermediate types, and 2
 | 
			
		||||
unique paths corresponding to the `A.f` and `B.f` fields.
 | 
			
		||||
For the [example above](#example-ambiguous-field-path), this encoding would produce the following 2 unique paths
 | 
			
		||||
corresponding to the `A.f` and `B.f` fields.
 | 
			
		||||
 | 
			
		||||
```python
 | 
			
		||||
unique_v2_field_paths = [
 | 
			
		||||
 "[version=2.0].[type=union]",
 | 
			
		||||
 "[version=2.0].[type=union].[type=A]",
 | 
			
		||||
 "[version=2.0].[type=union].[type=A].[type=string].f",
 | 
			
		||||
 "[version=2.0].[type=union].[type=B]",
 | 
			
		||||
 "[version=2.0].[type=union].[type=B].[type=string].f"
 | 
			
		||||
]
 | 
			
		||||
```
 | 
			
		||||
@ -86,9 +81,8 @@ NOTE:
 | 
			
		||||
 | 
			
		||||
- this encoding always ensures uniqueness within a schema since the full type annotation leading to a field is encoded
 | 
			
		||||
  in the fieldPath itself.
 | 
			
		||||
- field paths that end with an intermediate type allow us to capture details at record/struct level as well, in addition
 | 
			
		||||
  to the fields themselves.
 | 
			
		||||
- processing a fieldPath, such as from UI, gets simplified simply by walking each token along the path from left-to-right.
 | 
			
		||||
- processing a fieldPath, such as from UI, gets simplified simply by walking each token along the path from
 | 
			
		||||
  left-to-right.
 | 
			
		||||
- adding PartOfKeySchemaToken allows for identifying if the field is part of key-schema.
 | 
			
		||||
- adding VersionToken allows for future evolvability.
 | 
			
		||||
- to represent `optional` fields, which sometimes are modeled as `unions` in formats like `Avro`, instead of treating it
 | 
			
		||||
@ -133,7 +127,6 @@ avro_schema = """
 | 
			
		||||
"""
 | 
			
		||||
 | 
			
		||||
unique_v2_field_paths = [
 | 
			
		||||
    "[version=2.0].[type=E]", # this is the record E itself that we can maintain documentation for!
 | 
			
		||||
    "[version=2.0].[type=E].[type=string].a",
 | 
			
		||||
    "[version=2.0].[type=E].[type=string].b",
 | 
			
		||||
]
 | 
			
		||||
@ -160,8 +153,6 @@ avro_schema = """
 | 
			
		||||
"""
 | 
			
		||||
 | 
			
		||||
unique_v2_field_paths = [
 | 
			
		||||
  "[version=2.0].[key=True].[type=SimpleNested]",
 | 
			
		||||
  "[version=2.0].[key=True].[type=SimpleNested].[type=InnerRcd]",
 | 
			
		||||
  "[version=2.0].[key=True].[type=SimpleNested].[type=InnerRcd].nestedRcd",
 | 
			
		||||
  "[version=2.0].[key=True].[type=SimpleNested].[type=InnerRcd].nestedRcd.[type=string].aStringField",
 | 
			
		||||
]
 | 
			
		||||
@ -189,11 +180,8 @@ avro_schema = """
 | 
			
		||||
"""
 | 
			
		||||
 | 
			
		||||
unique_v2_field_paths = [
 | 
			
		||||
  "[version=2.0].[type=Recursive]",
 | 
			
		||||
  "[version=2.0].[type=Recursive].[type=R]",
 | 
			
		||||
  "[version=2.0].[type=Recursive].[type=R].r",
 | 
			
		||||
  "[version=2.0].[type=Recursive].[type=R].r.[type=int].anIntegerField",
 | 
			
		||||
  "[version=2.0].[type=Recursive].[type=R].r.[type=R]",
 | 
			
		||||
  "[version=2.0].[type=Recursive].[type=R].r.[type=R].aRecursiveField"
 | 
			
		||||
]
 | 
			
		||||
```
 | 
			
		||||
@ -216,10 +204,7 @@ avro_schema ="""
 | 
			
		||||
}
 | 
			
		||||
"""
 | 
			
		||||
unique_v2_field_paths = [
 | 
			
		||||
 "[version=2.0].[type=TreeNode]",
 | 
			
		||||
 "[version=2.0].[type=TreeNode].[type=long].value",
 | 
			
		||||
 "[version=2.0].[type=TreeNode].[type=array]",
 | 
			
		||||
 "[version=2.0].[type=TreeNode].[type=array].[type=TreeNode]",
 | 
			
		||||
 "[version=2.0].[type=TreeNode].[type=array].[type=TreeNode].children",
 | 
			
		||||
]
 | 
			
		||||
```
 | 
			
		||||
@ -246,14 +231,11 @@ avro_schema = """
 | 
			
		||||
}
 | 
			
		||||
"""
 | 
			
		||||
unique_v2_field_paths: List[str] = [
 | 
			
		||||
    "[version=2.0].[key=True].[type=ABFooUnion]",
 | 
			
		||||
    "[version=2.0].[key=True].[type=ABFooUnion].[type=union]",
 | 
			
		||||
    "[version=2.0].[key=True].[type=ABFooUnion].[type=union].[type=A]",
 | 
			
		||||
    "[version=2.0].[key=True].[type=ABFooUnion].[type=union].[type=A].a",
 | 
			
		||||
    "[version=2.0].[key=True].[type=ABFooUnion].[type=union].[type=A].a.[type=string].f",
 | 
			
		||||
    "[version=2.0].[key=True].[type=ABFooUnion].[type=union].[type=B]",
 | 
			
		||||
    "[version=2.0].[key=True].[type=ABFooUnion].[type=union].[type=B].a",
 | 
			
		||||
    "[version=2.0].[key=True].[type=ABFooUnion].[type=union].[type=B].a.[type=string].f",
 | 
			
		||||
    "[version=2.0].[key=True].[type=ABUnion].[type=union].a",
 | 
			
		||||
    "[version=2.0].[key=True].[type=ABUnion].[type=union].[type=A].a",
 | 
			
		||||
    "[version=2.0].[key=True].[type=ABUnion].[type=union].[type=A].a.[type=string].f",
 | 
			
		||||
    "[version=2.0].[key=True].[type=ABUnion].[type=union].[type=B].a",
 | 
			
		||||
    "[version=2.0].[key=True].[type=ABUnion].[type=union].[type=B].a.[type=string].f",
 | 
			
		||||
]
 | 
			
		||||
```
 | 
			
		||||
### Arrays
 | 
			
		||||
@ -286,10 +268,6 @@ avro_schema = """
 | 
			
		||||
}
 | 
			
		||||
"""
 | 
			
		||||
unique_v2_field_paths: List[str] = [
 | 
			
		||||
  "[version=2.0].[type=NestedArray]",
 | 
			
		||||
  "[version=2.0].[type=NestedArray].[type=array]",
 | 
			
		||||
  "[version=2.0].[type=NestedArray].[type=array].[type=array]",
 | 
			
		||||
  "[version=2.0].[type=NestedArray].[type=array].[type=array].[type=Foo]",
 | 
			
		||||
  "[version=2.0].[type=NestedArray].[type=array].[type=array].[type=Foo].ar",
 | 
			
		||||
  "[version=2.0].[type=NestedArray].[type=array].[type=array].[type=Foo].ar.[type=long].a",
 | 
			
		||||
]
 | 
			
		||||
@ -313,8 +291,6 @@ avro_schema = """
 | 
			
		||||
}
 | 
			
		||||
"""
 | 
			
		||||
unique_v2_field_paths = [
 | 
			
		||||
  "[version=2.0].[type=R]",
 | 
			
		||||
  "[version=2.0].[type=R].[type=map]",
 | 
			
		||||
  "[version=2.0].[type=R].[type=map].[type=long].a_map_of_longs_field",
 | 
			
		||||
]
 | 
			
		||||
 | 
			
		||||
@ -357,25 +333,18 @@ avro_schema = """
 | 
			
		||||
"""
 | 
			
		||||
 | 
			
		||||
unique_v2_field_paths: List[str] = [
 | 
			
		||||
  "[version=2.0].[type=ABFooUnion]",
 | 
			
		||||
  "[version=2.0].[type=ABFooUnion].[type=union]",
 | 
			
		||||
  "[version=2.0].[type=ABFooUnion].[type=union].[type=A]",
 | 
			
		||||
  "[version=2.0].[type=ABFooUnion].[type=union].a",
 | 
			
		||||
  "[version=2.0].[type=ABFooUnion].[type=union].[type=A].a",
 | 
			
		||||
  "[version=2.0].[type=ABFooUnion].[type=union].[type=A].a.[type=string].f",
 | 
			
		||||
  "[version=2.0].[type=ABFooUnion].[type=union].[type=B]",
 | 
			
		||||
  "[version=2.0].[type=ABFooUnion].[type=union].[type=B].a",
 | 
			
		||||
  "[version=2.0].[type=ABFooUnion].[type=union].[type=B].a.[type=string].f",
 | 
			
		||||
  "[version=2.0].[type=ABFooUnion].[type=union].[type=array]",
 | 
			
		||||
  "[version=2.0].[type=ABFooUnion].[type=union].[type=array].[type=array]",
 | 
			
		||||
  "[version=2.0].[type=ABFooUnion].[type=union].[type=array].[type=array].[type=Foo]",
 | 
			
		||||
  "[version=2.0].[type=ABFooUnion].[type=union].[type=array].[type=array].[type=Foo].a",
 | 
			
		||||
  "[version=2.0].[type=ABFooUnion].[type=union].[type=array].[type=array].[type=Foo].a.[type=long].f",
 | 
			
		||||
]
 | 
			
		||||
```
 | 
			
		||||
 | 
			
		||||
For more examples, see
 | 
			
		||||
the [unit-tests for AvroToMceSchemaConverter](https://github.com/linkedin/datahub/blob/master/metadata-ingestion/tests/unit/test_schema_util.py)
 | 
			
		||||
.
 | 
			
		||||
the [unit-tests for AvroToMceSchemaConverter](https://github.com/linkedin/datahub/blob/master/metadata-ingestion/tests/unit/test_schema_util.py).
 | 
			
		||||
 | 
			
		||||
### Backward-compatibility
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
@ -129,25 +129,42 @@ class AvroToMceSchemaConverter:
 | 
			
		||||
        return ".".join(self._prefix_name_stack)
 | 
			
		||||
 | 
			
		||||
    @staticmethod
 | 
			
		||||
    def _get_type_annotation(schema: ExtendedAvroNestedSchemas) -> str:
 | 
			
		||||
        if isinstance(schema, avro.schema.RecordSchema):
 | 
			
		||||
            return f"[type={schema.name}]"
 | 
			
		||||
 | 
			
		||||
        # For fields, just return the plain name.
 | 
			
		||||
        if isinstance(schema, avro.schema.Field):
 | 
			
		||||
            return f"{schema.name}"
 | 
			
		||||
    def _get_simple_native_type(schema: ExtendedAvroNestedSchemas) -> str:
 | 
			
		||||
        if isinstance(schema, (avro.schema.RecordSchema, avro.schema.Field)):
 | 
			
		||||
            # For Records, fields, always return the name.
 | 
			
		||||
            return schema.name
 | 
			
		||||
 | 
			
		||||
        # For optional, use the underlying non-null type
 | 
			
		||||
        if isinstance(schema, avro.schema.UnionSchema) and len(schema.schemas) == 2:
 | 
			
		||||
            # Optional types as unions in AVRO. Return underlying non-null sub-type.
 | 
			
		||||
            (first, second) = schema.schemas
 | 
			
		||||
            if first.type == avro.schema.NULL:
 | 
			
		||||
                return f"[type={second.type}]"
 | 
			
		||||
                return second.type
 | 
			
		||||
            elif second.type == avro.schema.NULL:
 | 
			
		||||
                return f"[type={first.type}]"
 | 
			
		||||
                return first.type
 | 
			
		||||
 | 
			
		||||
        # For everything else, use the schema's type
 | 
			
		||||
        return f"[type={schema.type}]"
 | 
			
		||||
        return schema.type
 | 
			
		||||
 | 
			
		||||
    @staticmethod
 | 
			
		||||
    def _get_type_annotation(schema: ExtendedAvroNestedSchemas) -> str:
 | 
			
		||||
        simple_native_type = AvroToMceSchemaConverter._get_simple_native_type(schema)
 | 
			
		||||
        if isinstance(schema, avro.schema.Field):
 | 
			
		||||
            return simple_native_type
 | 
			
		||||
        else:
 | 
			
		||||
            return f"[type={simple_native_type}]"
 | 
			
		||||
 | 
			
		||||
    @staticmethod
 | 
			
		||||
    def _get_underlying_type_if_option_as_union(
 | 
			
		||||
        schema: AvroNestedSchemas, default: Optional[AvroNestedSchemas] = None
 | 
			
		||||
    ) -> AvroNestedSchemas:
 | 
			
		||||
        if isinstance(schema, avro.schema.UnionSchema) and len(schema.schemas) == 2:
 | 
			
		||||
            (first, second) = schema.schemas
 | 
			
		||||
            if first.type == avro.schema.NULL:
 | 
			
		||||
                return second
 | 
			
		||||
            elif second.type == avro.schema.NULL:
 | 
			
		||||
                return first
 | 
			
		||||
        return default
 | 
			
		||||
 | 
			
		||||
    class SchemaFieldEmissionContextManager:
 | 
			
		||||
        """Context Manager for MCE SchemaFiled emission
 | 
			
		||||
@ -178,32 +195,16 @@ class AvroToMceSchemaConverter:
 | 
			
		||||
                        avro.schema.ArraySchema,
 | 
			
		||||
                        avro.schema.Field,
 | 
			
		||||
                        avro.schema.MapSchema,
 | 
			
		||||
                        avro.schema.UnionSchema,
 | 
			
		||||
                        avro.schema.RecordSchema,
 | 
			
		||||
                    ),
 | 
			
		||||
                )
 | 
			
		||||
                and self._converter._fields_stack
 | 
			
		||||
            ):
 | 
			
		||||
                # We are in the context of a non-nested(simple) field.
 | 
			
		||||
                last_field_schema = self._converter._fields_stack[-1]
 | 
			
		||||
 | 
			
		||||
                description = (
 | 
			
		||||
                    last_field_schema.doc
 | 
			
		||||
                    if last_field_schema.doc
 | 
			
		||||
                    else "No description available."
 | 
			
		||||
                )
 | 
			
		||||
 | 
			
		||||
                if last_field_schema.has_default:
 | 
			
		||||
                    description = f"{description}\nField default value: {last_field_schema.default}"
 | 
			
		||||
 | 
			
		||||
                with AvroToMceSchemaConverter.SchemaFieldEmissionContextManager(
 | 
			
		||||
                    last_field_schema,
 | 
			
		||||
                    last_field_schema,
 | 
			
		||||
                    self._converter,
 | 
			
		||||
                    description,
 | 
			
		||||
                ) as field_emitter:
 | 
			
		||||
                    yield from field_emitter.emit()
 | 
			
		||||
                # We are in the context of a non-nested(simple) field or the special-cased union.
 | 
			
		||||
                yield from self._converter._gen_from_last_field()
 | 
			
		||||
            else:
 | 
			
		||||
                # Just emit the SchemaField from schema provided in the Ctor.
 | 
			
		||||
 | 
			
		||||
                schema = self._schema
 | 
			
		||||
                actual_schema = self._actual_schema
 | 
			
		||||
                if isinstance(schema, avro.schema.Field):
 | 
			
		||||
@ -214,15 +215,24 @@ class AvroToMceSchemaConverter:
 | 
			
		||||
                            schema, schema
 | 
			
		||||
                        )
 | 
			
		||||
                    )
 | 
			
		||||
                # Emit the schema field provided in the Ctor.
 | 
			
		||||
 | 
			
		||||
                description = self._description
 | 
			
		||||
                if description is None:
 | 
			
		||||
                    description = schema.props.get("doc", None)
 | 
			
		||||
 | 
			
		||||
                native_data_type = self._converter._prefix_name_stack[-1]
 | 
			
		||||
                if isinstance(schema, (avro.schema.Field, avro.schema.UnionSchema)):
 | 
			
		||||
                    native_data_type = self._converter._prefix_name_stack[-2]
 | 
			
		||||
                type_prefix = "[type="
 | 
			
		||||
                if native_data_type.startswith(type_prefix):
 | 
			
		||||
                    native_data_type = native_data_type[
 | 
			
		||||
                        slice(len(type_prefix), len(native_data_type) - 1)
 | 
			
		||||
                    ]
 | 
			
		||||
 | 
			
		||||
                field = SchemaField(
 | 
			
		||||
                    fieldPath=self._converter._get_cur_field_path(),
 | 
			
		||||
                    # Not populating this field for Avro, since this is blowing up the KAFKA message size.
 | 
			
		||||
                    nativeDataType="",
 | 
			
		||||
                    # Populate it with the simple native type for now.
 | 
			
		||||
                    nativeDataType=native_data_type,
 | 
			
		||||
                    type=self._converter._get_column_type(actual_schema.type),
 | 
			
		||||
                    description=description,
 | 
			
		||||
                    recursive=False,
 | 
			
		||||
@ -234,18 +244,6 @@ class AvroToMceSchemaConverter:
 | 
			
		||||
        def __exit__(self, exc_type, exc_val, exc_tb):
 | 
			
		||||
            self._converter._prefix_name_stack.pop()
 | 
			
		||||
 | 
			
		||||
    @staticmethod
 | 
			
		||||
    def _get_underlying_type_if_option_as_union(
 | 
			
		||||
        schema: AvroNestedSchemas, default: Optional[AvroNestedSchemas] = None
 | 
			
		||||
    ) -> AvroNestedSchemas:
 | 
			
		||||
        if isinstance(schema, avro.schema.UnionSchema) and len(schema.schemas) == 2:
 | 
			
		||||
            (first, second) = schema.schemas
 | 
			
		||||
            if first.type == avro.schema.NULL:
 | 
			
		||||
                return second
 | 
			
		||||
            elif second.type == avro.schema.NULL:
 | 
			
		||||
                return first
 | 
			
		||||
        return default
 | 
			
		||||
 | 
			
		||||
    def _get_sub_schemas(
 | 
			
		||||
        self, schema: ExtendedAvroNestedSchemas
 | 
			
		||||
    ) -> Generator[avro.schema.Schema, None, None]:
 | 
			
		||||
@ -290,13 +288,41 @@ class AvroToMceSchemaConverter:
 | 
			
		||||
    ) -> Generator[SchemaField, None, None]:
 | 
			
		||||
        """Handles generation of MCE SchemaFields for an AVRO Field type."""
 | 
			
		||||
        # NOTE: Here we only manage the field stack and trigger MCE Field generation from this field's type.
 | 
			
		||||
        # The actual emitting of a field happens
 | 
			
		||||
        #  (a) when another nested record is encountered or (b) a non-nested type has been reached.
 | 
			
		||||
        # The actual emitting of a field happens when
 | 
			
		||||
        #  (a) another nested record is encountered or
 | 
			
		||||
        #  (b) a non-nested type has been reached or
 | 
			
		||||
        #  (c) during the special-casing for unions.
 | 
			
		||||
        self._fields_stack.append(field)
 | 
			
		||||
        for sub_schema in self._get_sub_schemas(field):
 | 
			
		||||
            yield from self._to_mce_fields(sub_schema)
 | 
			
		||||
        self._fields_stack.pop()
 | 
			
		||||
 | 
			
		||||
    def _gen_from_last_field(
 | 
			
		||||
        self, schema_to_recurse: Optional[AvroNestedSchemas] = None
 | 
			
		||||
    ) -> Generator[SchemaField, None, None]:
 | 
			
		||||
        """Emits the field most-recent field, optionally triggering sub-schema generation under the field."""
 | 
			
		||||
        last_field_schema = self._fields_stack[-1]
 | 
			
		||||
        # Generate the custom-description for the field.
 | 
			
		||||
        description = (
 | 
			
		||||
            last_field_schema.doc
 | 
			
		||||
            if last_field_schema.doc
 | 
			
		||||
            else "No description available."
 | 
			
		||||
        )
 | 
			
		||||
        if last_field_schema.has_default:
 | 
			
		||||
            description = (
 | 
			
		||||
                f"{description}\nField default value: {last_field_schema.default}"
 | 
			
		||||
            )
 | 
			
		||||
 | 
			
		||||
        with AvroToMceSchemaConverter.SchemaFieldEmissionContextManager(
 | 
			
		||||
            last_field_schema, last_field_schema, self, description
 | 
			
		||||
        ) as f_emit:
 | 
			
		||||
            yield from f_emit.emit()
 | 
			
		||||
 | 
			
		||||
            if schema_to_recurse is not None:
 | 
			
		||||
                # Generate the nested sub-schemas under the most-recent field.
 | 
			
		||||
                for sub_schema in self._get_sub_schemas(schema_to_recurse):
 | 
			
		||||
                    yield from self._to_mce_fields(sub_schema)
 | 
			
		||||
 | 
			
		||||
    def _gen_from_non_field_nested_schemas(
 | 
			
		||||
        self, schema: AvroNestedSchemas
 | 
			
		||||
    ) -> Generator[SchemaField, None, None]:
 | 
			
		||||
@ -315,30 +341,24 @@ class AvroToMceSchemaConverter:
 | 
			
		||||
        with AvroToMceSchemaConverter.SchemaFieldEmissionContextManager(
 | 
			
		||||
            schema, actual_schema, self
 | 
			
		||||
        ) as fe_schema:
 | 
			
		||||
            # Emit non-AVRO field complex schemas(even optional unions that become primitives).
 | 
			
		||||
            yield from fe_schema.emit()
 | 
			
		||||
            if isinstance(
 | 
			
		||||
                actual_schema,
 | 
			
		||||
                (
 | 
			
		||||
                    avro.schema.UnionSchema,
 | 
			
		||||
                    avro.schema.PrimitiveSchema,
 | 
			
		||||
                    avro.schema.FixedSchema,
 | 
			
		||||
                    avro.schema.EnumSchema,
 | 
			
		||||
                ),
 | 
			
		||||
            ):
 | 
			
		||||
                # Emit non-AVRO field complex schemas(even optional unions that become primitives) and special-casing for extra union emission.
 | 
			
		||||
                yield from fe_schema.emit()
 | 
			
		||||
 | 
			
		||||
            if (
 | 
			
		||||
                isinstance(actual_schema, avro.schema.RecordSchema)
 | 
			
		||||
                and self._fields_stack
 | 
			
		||||
            ):
 | 
			
		||||
                # We have encountered a nested record, emit the most-recently seen field.
 | 
			
		||||
                last_field_schema = self._fields_stack[-1]
 | 
			
		||||
                description = (
 | 
			
		||||
                    last_field_schema.doc
 | 
			
		||||
                    if last_field_schema.doc
 | 
			
		||||
                    else "No description available."
 | 
			
		||||
                )
 | 
			
		||||
                if last_field_schema.has_default:
 | 
			
		||||
                    description = f"{description}\nField default value: {last_field_schema.default}"
 | 
			
		||||
                with AvroToMceSchemaConverter.SchemaFieldEmissionContextManager(
 | 
			
		||||
                    last_field_schema, last_field_schema, self, description
 | 
			
		||||
                ) as f_emit:
 | 
			
		||||
                    yield from f_emit.emit()
 | 
			
		||||
                    # Generate the nested sub-schemas under the most-recent field.
 | 
			
		||||
                    if recurse:
 | 
			
		||||
                        for sub_schema in self._get_sub_schemas(actual_schema):
 | 
			
		||||
                            yield from self._to_mce_fields(sub_schema)
 | 
			
		||||
                yield from self._gen_from_last_field(actual_schema if recurse else None)
 | 
			
		||||
            else:
 | 
			
		||||
                # We are not yet in the context of any field. Generate all nested sub-schemas under the complex type.
 | 
			
		||||
                if recurse:
 | 
			
		||||
 | 
			
		||||
@ -98,8 +98,8 @@ def assret_field_paths_match(
 | 
			
		||||
)
 | 
			
		||||
def test_avro_schema_to_mce_fields_events_with_nullable_fields(schema):
 | 
			
		||||
    fields = avro_schema_to_mce_fields(schema)
 | 
			
		||||
    assert 2 == len(fields)
 | 
			
		||||
    assert fields[1].nullable
 | 
			
		||||
    assert 1 == len(fields)
 | 
			
		||||
    assert fields[0].nullable
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def test_avro_schema_to_mce_fields_sample_events_with_different_field_types():
 | 
			
		||||
@ -121,8 +121,6 @@ def test_avro_schema_to_mce_fields_sample_events_with_different_field_types():
 | 
			
		||||
"""
 | 
			
		||||
    fields = avro_schema_to_mce_fields(schema)
 | 
			
		||||
    expected_field_paths = [
 | 
			
		||||
        "[version=2.0].[type=R]",
 | 
			
		||||
        "[version=2.0].[type=R].[type=map]",
 | 
			
		||||
        "[version=2.0].[type=R].[type=map].[type=long].a_map_of_longs_field",
 | 
			
		||||
    ]
 | 
			
		||||
    assret_field_paths_match(fields, expected_field_paths)
 | 
			
		||||
@ -150,7 +148,6 @@ def test_avro_schema_to_mce_fields_record_with_two_fields():
 | 
			
		||||
"""
 | 
			
		||||
    fields = avro_schema_to_mce_fields(schema)
 | 
			
		||||
    expected_field_paths = [
 | 
			
		||||
        "[version=2.0].[type=name]",
 | 
			
		||||
        "[version=2.0].[type=name].[type=string].a",
 | 
			
		||||
        "[version=2.0].[type=name].[type=string].b",
 | 
			
		||||
    ]
 | 
			
		||||
@ -186,11 +183,10 @@ def test_avro_schema_to_mce_fields_with_default():
 | 
			
		||||
"""
 | 
			
		||||
    fields = avro_schema_to_mce_fields(schema)
 | 
			
		||||
    expected_field_paths = [
 | 
			
		||||
        "[version=2.0].[type=name]",
 | 
			
		||||
        "[version=2.0].[type=name].[type=string].aStringField",
 | 
			
		||||
    ]
 | 
			
		||||
    assret_field_paths_match(fields, expected_field_paths)
 | 
			
		||||
    description = fields[1].description
 | 
			
		||||
    description = fields[0].description
 | 
			
		||||
    assert description and "custom, default value" in description
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
@ -213,10 +209,7 @@ def test_avro_schema_with_recursion():
 | 
			
		||||
"""
 | 
			
		||||
    fields = avro_schema_to_mce_fields(schema)
 | 
			
		||||
    expected_field_paths = [
 | 
			
		||||
        "[version=2.0].[type=TreeNode]",
 | 
			
		||||
        "[version=2.0].[type=TreeNode].[type=long].value",
 | 
			
		||||
        "[version=2.0].[type=TreeNode].[type=array]",
 | 
			
		||||
        "[version=2.0].[type=TreeNode].[type=array].[type=TreeNode]",
 | 
			
		||||
        "[version=2.0].[type=TreeNode].[type=array].[type=TreeNode].children",
 | 
			
		||||
    ]
 | 
			
		||||
    assret_field_paths_match(fields, expected_field_paths)
 | 
			
		||||
@ -263,11 +256,9 @@ def test_avro_sample_payment_schema_to_mce_fields_with_nesting():
 | 
			
		||||
"""
 | 
			
		||||
    fields = avro_schema_to_mce_fields(schema)
 | 
			
		||||
    expected_field_paths = [
 | 
			
		||||
        "[version=2.0].[type=Payment]",
 | 
			
		||||
        "[version=2.0].[type=Payment].[type=string].id",
 | 
			
		||||
        "[version=2.0].[type=Payment].[type=double].amount",
 | 
			
		||||
        "[version=2.0].[type=Payment].[type=string].name",
 | 
			
		||||
        "[version=2.0].[type=Payment].[type=PhoneNumber]",
 | 
			
		||||
        "[version=2.0].[type=Payment].[type=PhoneNumber].phoneNumber",
 | 
			
		||||
        "[version=2.0].[type=Payment].[type=PhoneNumber].phoneNumber.[type=string].areaCode",
 | 
			
		||||
        "[version=2.0].[type=Payment].[type=PhoneNumber].phoneNumber.[type=string].countryCode",
 | 
			
		||||
@ -302,13 +293,10 @@ def test_avro_schema_to_mce_fields_with_nesting_across_records():
 | 
			
		||||
    fields = avro_schema_to_mce_fields(schema)
 | 
			
		||||
    expected_field_paths = [
 | 
			
		||||
        "[version=2.0].[type=union]",
 | 
			
		||||
        "[version=2.0].[type=union].[type=Address]",
 | 
			
		||||
        "[version=2.0].[type=union].[type=Address].[type=string].streetAddress",
 | 
			
		||||
        "[version=2.0].[type=union].[type=Address].[type=string].city",
 | 
			
		||||
        "[version=2.0].[type=union].[type=Person]",
 | 
			
		||||
        "[version=2.0].[type=union].[type=Person].[type=string].firstname",
 | 
			
		||||
        "[version=2.0].[type=union].[type=Person].[type=string].lastname",
 | 
			
		||||
        "[version=2.0].[type=union].[type=Person].[type=Address]",
 | 
			
		||||
        "[version=2.0].[type=union].[type=Person].[type=Address].address",
 | 
			
		||||
    ]
 | 
			
		||||
    assret_field_paths_match(fields, expected_field_paths)
 | 
			
		||||
@ -328,7 +316,7 @@ def test_simple_record_with_primitive_types():
 | 
			
		||||
                "name": "enumField",
 | 
			
		||||
                "type": {
 | 
			
		||||
                    "type": "enum",
 | 
			
		||||
                    "name": "EnumField",
 | 
			
		||||
                    "name": "MyTestEnumField",
 | 
			
		||||
                    "symbols": [ "TEST", "TEST1" ],
 | 
			
		||||
                    "symbolDoc": {
 | 
			
		||||
                        "TEST": "test enum",
 | 
			
		||||
@ -341,7 +329,6 @@ def test_simple_record_with_primitive_types():
 | 
			
		||||
"""
 | 
			
		||||
    fields = avro_schema_to_mce_fields(schema)
 | 
			
		||||
    expected_field_paths = [
 | 
			
		||||
        "[version=2.0].[type=Simple]",
 | 
			
		||||
        "[version=2.0].[type=Simple].[type=string].stringField",
 | 
			
		||||
        "[version=2.0].[type=Simple].[type=boolean].booleanField",
 | 
			
		||||
        "[version=2.0].[type=Simple].[type=int].intField",
 | 
			
		||||
@ -371,8 +358,6 @@ def test_simple_nested_record_with_a_string_field_for_key_schema():
 | 
			
		||||
"""
 | 
			
		||||
    fields = avro_schema_to_mce_fields(schema, True)
 | 
			
		||||
    expected_field_paths: List[str] = [
 | 
			
		||||
        "[version=2.0].[key=True].[type=SimpleNested]",
 | 
			
		||||
        "[version=2.0].[key=True].[type=SimpleNested].[type=InnerRcd]",
 | 
			
		||||
        "[version=2.0].[key=True].[type=SimpleNested].[type=InnerRcd].nestedRcd",
 | 
			
		||||
        "[version=2.0].[key=True].[type=SimpleNested].[type=InnerRcd].nestedRcd.[type=string].aStringField",
 | 
			
		||||
    ]
 | 
			
		||||
@ -407,15 +392,17 @@ def test_union_with_nested_record_of_union():
 | 
			
		||||
"""
 | 
			
		||||
    fields = avro_schema_to_mce_fields(schema)
 | 
			
		||||
    expected_field_paths = [
 | 
			
		||||
        "[version=2.0].[type=UnionSample]",
 | 
			
		||||
        "[version=2.0].[type=UnionSample].[type=union]",
 | 
			
		||||
        "[version=2.0].[type=UnionSample].[type=union].aUnion",
 | 
			
		||||
        "[version=2.0].[type=UnionSample].[type=union].[type=boolean].aUnion",
 | 
			
		||||
        "[version=2.0].[type=UnionSample].[type=union].[type=Rcd]",
 | 
			
		||||
        "[version=2.0].[type=UnionSample].[type=union].[type=Rcd].aUnion",
 | 
			
		||||
        "[version=2.0].[type=UnionSample].[type=union].[type=Rcd].aUnion.[type=string].aNullableStringField",
 | 
			
		||||
    ]
 | 
			
		||||
    assret_field_paths_match(fields, expected_field_paths)
 | 
			
		||||
    assert isinstance(fields[5].type.type, StringTypeClass)
 | 
			
		||||
    assert isinstance(fields[3].type.type, StringTypeClass)
 | 
			
		||||
    assert fields[0].nativeDataType == "union"
 | 
			
		||||
    assert fields[1].nativeDataType == "boolean"
 | 
			
		||||
    assert fields[2].nativeDataType == "Rcd"
 | 
			
		||||
    assert fields[3].nativeDataType == "string"
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def test_nested_arrays():
 | 
			
		||||
@ -448,10 +435,6 @@ def test_nested_arrays():
 | 
			
		||||
"""
 | 
			
		||||
    fields = avro_schema_to_mce_fields(schema)
 | 
			
		||||
    expected_field_paths: List[str] = [
 | 
			
		||||
        "[version=2.0].[type=NestedArray]",
 | 
			
		||||
        "[version=2.0].[type=NestedArray].[type=array]",
 | 
			
		||||
        "[version=2.0].[type=NestedArray].[type=array].[type=array]",
 | 
			
		||||
        "[version=2.0].[type=NestedArray].[type=array].[type=array].[type=Foo]",
 | 
			
		||||
        "[version=2.0].[type=NestedArray].[type=array].[type=array].[type=Foo].ar",
 | 
			
		||||
        "[version=2.0].[type=NestedArray].[type=array].[type=array].[type=Foo].ar.[type=long].a",
 | 
			
		||||
    ]
 | 
			
		||||
@ -485,13 +468,10 @@ def test_map_of_union_of_int_and_record_of_union():
 | 
			
		||||
"""
 | 
			
		||||
    fields = avro_schema_to_mce_fields(schema)
 | 
			
		||||
    expected_field_paths = [
 | 
			
		||||
        "[version=2.0].[type=MapSample]",
 | 
			
		||||
        "[version=2.0].[type=MapSample].[type=map]",
 | 
			
		||||
        "[version=2.0].[type=MapSample].[type=map].[type=union]",
 | 
			
		||||
        "[version=2.0].[type=MapSample].[type=map].[type=union].aMap",
 | 
			
		||||
        "[version=2.0].[type=MapSample].[type=map].[type=union].[type=int].aMap",
 | 
			
		||||
        "[version=2.0].[type=MapSample].[type=map].[type=union].[type=Rcd]",
 | 
			
		||||
        "[version=2.0].[type=MapSample].[type=map].[type=union].[type=Rcd].aMap",
 | 
			
		||||
        "[version=2.0].[type=MapSample].[type=map].[type=union].[type=Rcd].aMap.[type=union]",
 | 
			
		||||
        "[version=2.0].[type=MapSample].[type=map].[type=union].[type=Rcd].aMap.[type=union].aUnion",
 | 
			
		||||
        "[version=2.0].[type=MapSample].[type=map].[type=union].[type=Rcd].aMap.[type=union].[type=string].aUnion",
 | 
			
		||||
        "[version=2.0].[type=MapSample].[type=map].[type=union].[type=Rcd].aMap.[type=union].[type=int].aUnion",
 | 
			
		||||
    ]
 | 
			
		||||
@ -519,11 +499,8 @@ def test_recursive_avro():
 | 
			
		||||
"""
 | 
			
		||||
    fields = avro_schema_to_mce_fields(schema)
 | 
			
		||||
    expected_field_paths = [
 | 
			
		||||
        "[version=2.0].[type=Recursive]",
 | 
			
		||||
        "[version=2.0].[type=Recursive].[type=R]",
 | 
			
		||||
        "[version=2.0].[type=Recursive].[type=R].r",
 | 
			
		||||
        "[version=2.0].[type=Recursive].[type=R].r.[type=int].anIntegerField",
 | 
			
		||||
        "[version=2.0].[type=Recursive].[type=R].r.[type=R]",
 | 
			
		||||
        "[version=2.0].[type=Recursive].[type=R].r.[type=R].aRecursiveField",
 | 
			
		||||
    ]
 | 
			
		||||
    assret_field_paths_match(fields, expected_field_paths)
 | 
			
		||||
@ -564,17 +541,11 @@ def test_needs_disambiguation_nested_union_of_records_with_same_field_name():
 | 
			
		||||
"""
 | 
			
		||||
    fields = avro_schema_to_mce_fields(schema)
 | 
			
		||||
    expected_field_paths: List[str] = [
 | 
			
		||||
        "[version=2.0].[type=ABFooUnion]",
 | 
			
		||||
        "[version=2.0].[type=ABFooUnion].[type=union]",
 | 
			
		||||
        "[version=2.0].[type=ABFooUnion].[type=union].[type=A]",
 | 
			
		||||
        "[version=2.0].[type=ABFooUnion].[type=union].a",
 | 
			
		||||
        "[version=2.0].[type=ABFooUnion].[type=union].[type=A].a",
 | 
			
		||||
        "[version=2.0].[type=ABFooUnion].[type=union].[type=A].a.[type=string].f",
 | 
			
		||||
        "[version=2.0].[type=ABFooUnion].[type=union].[type=B]",
 | 
			
		||||
        "[version=2.0].[type=ABFooUnion].[type=union].[type=B].a",
 | 
			
		||||
        "[version=2.0].[type=ABFooUnion].[type=union].[type=B].a.[type=string].f",
 | 
			
		||||
        "[version=2.0].[type=ABFooUnion].[type=union].[type=array]",
 | 
			
		||||
        "[version=2.0].[type=ABFooUnion].[type=union].[type=array].[type=array]",
 | 
			
		||||
        "[version=2.0].[type=ABFooUnion].[type=union].[type=array].[type=array].[type=Foo]",
 | 
			
		||||
        "[version=2.0].[type=ABFooUnion].[type=union].[type=array].[type=array].[type=Foo].a",
 | 
			
		||||
        "[version=2.0].[type=ABFooUnion].[type=union].[type=array].[type=array].[type=Foo].a.[type=long].f",
 | 
			
		||||
    ]
 | 
			
		||||
@ -637,17 +608,11 @@ def test_key_schema_handling():
 | 
			
		||||
"""
 | 
			
		||||
    fields: List[SchemaField] = avro_schema_to_mce_fields(schema, is_key_schema=True)
 | 
			
		||||
    expected_field_paths: List[str] = [
 | 
			
		||||
        "[version=2.0].[key=True].[type=ABFooUnion]",
 | 
			
		||||
        "[version=2.0].[key=True].[type=ABFooUnion].[type=union]",
 | 
			
		||||
        "[version=2.0].[key=True].[type=ABFooUnion].[type=union].[type=A]",
 | 
			
		||||
        "[version=2.0].[key=True].[type=ABFooUnion].[type=union].a",
 | 
			
		||||
        "[version=2.0].[key=True].[type=ABFooUnion].[type=union].[type=A].a",
 | 
			
		||||
        "[version=2.0].[key=True].[type=ABFooUnion].[type=union].[type=A].a.[type=string].f",
 | 
			
		||||
        "[version=2.0].[key=True].[type=ABFooUnion].[type=union].[type=B]",
 | 
			
		||||
        "[version=2.0].[key=True].[type=ABFooUnion].[type=union].[type=B].a",
 | 
			
		||||
        "[version=2.0].[key=True].[type=ABFooUnion].[type=union].[type=B].a.[type=string].f",
 | 
			
		||||
        "[version=2.0].[key=True].[type=ABFooUnion].[type=union].[type=array]",
 | 
			
		||||
        "[version=2.0].[key=True].[type=ABFooUnion].[type=union].[type=array].[type=array]",
 | 
			
		||||
        "[version=2.0].[key=True].[type=ABFooUnion].[type=union].[type=array].[type=array].[type=Foo]",
 | 
			
		||||
        "[version=2.0].[key=True].[type=ABFooUnion].[type=union].[type=array].[type=array].[type=Foo].a",
 | 
			
		||||
        "[version=2.0].[key=True].[type=ABFooUnion].[type=union].[type=array].[type=array].[type=Foo].a.[type=long].f",
 | 
			
		||||
    ]
 | 
			
		||||
 | 
			
		||||
		Loading…
	
	
			
			x
			
			
		
	
		Reference in New Issue
	
	Block a user