Handle Cyclic References in AVRO Schema Conversion (#13608)

Co-authored-by: Mihai Ciocirdel <mihai.ciocirdel@swisscom.com>
This commit is contained in:
mihai103 2025-06-13 18:21:23 +02:00 committed by GitHub
parent 5404ee9b39
commit c39e150e29
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 127 additions and 27 deletions

View File

@ -131,12 +131,15 @@ public class AvroSchemaConverter implements SchemaConverter<Schema> {
FieldPath basePath = new FieldPath(); FieldPath basePath = new FieldPath();
basePath.setKeySchema(isKeySchema); basePath.setKeySchema(isKeySchema);
Set<String> visitedRecords = new HashSet<>();
// Add the record type to the base path // Add the record type to the base path
if (schema.getType() == Schema.Type.RECORD) { if (schema.getType() == Schema.Type.RECORD) {
basePath = basePath.expandType(schema.getName(), schema.toString()); basePath = basePath.expandType(schema.getName(), schema.toString());
visitedRecords.add(schema.getFullName());
} }
processSchema(schema, basePath, defaultNullable, fields); processSchema(schema, basePath, defaultNullable, fields, visitedRecords);
return new SchemaMetadata() return new SchemaMetadata()
.setSchemaName(schema.getName()) .setSchemaName(schema.getName())
@ -155,26 +158,25 @@ public class AvroSchemaConverter implements SchemaConverter<Schema> {
} }
private void processSchema( private void processSchema(
Schema schema, FieldPath fieldPath, boolean defaultNullable, List<SchemaField> fields) { Schema schema,
FieldPath fieldPath,
boolean defaultNullable,
List<SchemaField> fields,
Set<String> visitedRecords) {
if (schema.getType() == Schema.Type.RECORD) { if (schema.getType() == Schema.Type.RECORD) {
for (Schema.Field field : schema.getFields()) { for (Schema.Field field : schema.getFields()) {
processField(field, fieldPath, defaultNullable, fields); processField(field, fieldPath, defaultNullable, fields, visitedRecords);
} }
} }
} }
private void processField(
Schema.Field field, FieldPath fieldPath, boolean defaultNullable, List<SchemaField> fields) {
processField(field, fieldPath, defaultNullable, fields, false, null);
}
private void processField( private void processField(
Schema.Field field, Schema.Field field,
FieldPath fieldPath, FieldPath fieldPath,
boolean defaultNullable, boolean defaultNullable,
List<SchemaField> fields, List<SchemaField> fields,
boolean nullableOverride) { Set<String> visitedRecords) {
processField(field, fieldPath, defaultNullable, fields, nullableOverride, null); processField(field, fieldPath, defaultNullable, fields, false, null, visitedRecords);
} }
private void processField( private void processField(
@ -183,7 +185,18 @@ public class AvroSchemaConverter implements SchemaConverter<Schema> {
boolean defaultNullable, boolean defaultNullable,
List<SchemaField> fields, List<SchemaField> fields,
boolean nullableOverride, boolean nullableOverride,
DataHubType typeOverride) { Set<String> visitedRecords) {
processField(field, fieldPath, defaultNullable, fields, nullableOverride, null, visitedRecords);
}
private void processField(
Schema.Field field,
FieldPath fieldPath,
boolean defaultNullable,
List<SchemaField> fields,
boolean nullableOverride,
DataHubType typeOverride,
Set<String> visitedRecords) {
log.debug( log.debug(
"Processing field: {}, Field path : {}, Field schema: {}", "Processing field: {}, Field path : {}, Field schema: {}",
field.name(), field.name(),
@ -215,17 +228,33 @@ public class AvroSchemaConverter implements SchemaConverter<Schema> {
switch (fieldSchema.getType()) { switch (fieldSchema.getType()) {
case RECORD: case RECORD:
processRecordField( processRecordField(
field, newPath, discriminatedType, defaultNullable, fields, isNullable, typeOverride); field,
newPath,
discriminatedType,
defaultNullable,
fields,
isNullable,
typeOverride,
visitedRecords);
break; break;
case ARRAY: case ARRAY:
processArrayField(field, newPath, discriminatedType, defaultNullable, fields, isNullable); processArrayField(
field, newPath, discriminatedType, defaultNullable, fields, isNullable, visitedRecords);
break; break;
case MAP: case MAP:
processMapField(field, newPath, discriminatedType, defaultNullable, fields, isNullable); processMapField(
field, newPath, discriminatedType, defaultNullable, fields, isNullable, visitedRecords);
break; break;
case UNION: case UNION:
processUnionField( processUnionField(
field, newPath, discriminatedType, defaultNullable, fields, isNullable, typeOverride); field,
newPath,
discriminatedType,
defaultNullable,
fields,
isNullable,
typeOverride,
visitedRecords);
break; break;
case ENUM: case ENUM:
processEnumField(field, newPath, discriminatedType, defaultNullable, fields, isNullable); processEnumField(field, newPath, discriminatedType, defaultNullable, fields, isNullable);
@ -244,7 +273,8 @@ public class AvroSchemaConverter implements SchemaConverter<Schema> {
boolean defaultNullable, boolean defaultNullable,
List<SchemaField> fields, List<SchemaField> fields,
boolean isNullable, boolean isNullable,
DataHubType typeOverride) { DataHubType typeOverride,
Set<String> visitedRecords) {
log.debug("Record Field Path before expand: {}", fieldPath.asString()); log.debug("Record Field Path before expand: {}", fieldPath.asString());
FieldPath recordPath = fieldPath.expandType(discriminatedType, field.schema().toString()); FieldPath recordPath = fieldPath.expandType(discriminatedType, field.schema().toString());
@ -255,7 +285,6 @@ public class AvroSchemaConverter implements SchemaConverter<Schema> {
? typeOverride.asSchemaFieldType() ? typeOverride.asSchemaFieldType()
: new SchemaFieldDataType().setType(SchemaFieldDataType.Type.create(new RecordType())); : new SchemaFieldDataType().setType(SchemaFieldDataType.Type.create(new RecordType()));
// Add the record field itself
SchemaField recordField = SchemaField recordField =
new SchemaField() new SchemaField()
.setFieldPath(recordPath.asString()) .setFieldPath(recordPath.asString())
@ -263,15 +292,24 @@ public class AvroSchemaConverter implements SchemaConverter<Schema> {
.setNativeDataType(discriminatedType) .setNativeDataType(discriminatedType)
.setNullable(isNullable || defaultNullable) .setNullable(isNullable || defaultNullable)
.setIsPartOfKey(fieldPath.isKeySchema()); .setIsPartOfKey(fieldPath.isKeySchema());
populateCommonProperties(field, recordField); populateCommonProperties(field, recordField);
fields.add(recordField); fields.add(recordField);
if (isCyclicReference(field, visitedRecords)) {
return;
}
visitedRecords.add(field.schema().getFullName());
// Process nested fields // Process nested fields
for (Schema.Field nestedField : field.schema().getFields()) { for (Schema.Field nestedField : field.schema().getFields()) {
processField(nestedField, recordPath, defaultNullable, fields); processField(nestedField, recordPath, defaultNullable, fields, visitedRecords);
} }
visitedRecords.remove(field.schema().getFullName());
}
private boolean isCyclicReference(Schema.Field field, Set<String> visitedRecords) {
return visitedRecords.contains(field.schema().getFullName());
} }
@SneakyThrows @SneakyThrows
@ -327,7 +365,8 @@ public class AvroSchemaConverter implements SchemaConverter<Schema> {
String discriminatedType, String discriminatedType,
boolean defaultNullable, boolean defaultNullable,
List<SchemaField> fields, List<SchemaField> fields,
boolean isNullable) { boolean isNullable,
Set<String> visitedRecords) {
Schema arraySchema = field.schema(); Schema arraySchema = field.schema();
Schema elementSchema = arraySchema.getElementType(); Schema elementSchema = arraySchema.getElementType();
@ -354,7 +393,14 @@ public class AvroSchemaConverter implements SchemaConverter<Schema> {
elementSchema.getDoc() != null ? elementSchema.getDoc() : field.doc(), elementSchema.getDoc() != null ? elementSchema.getDoc() : field.doc(),
null // TODO: What is the default value for an array element? null // TODO: What is the default value for an array element?
); );
processField(elementField, fieldPath, defaultNullable, fields, isNullable, arrayDataHubType); processField(
elementField,
fieldPath,
defaultNullable,
fields,
isNullable,
arrayDataHubType,
visitedRecords);
} else { } else {
SchemaField arrayField = SchemaField arrayField =
@ -377,7 +423,8 @@ public class AvroSchemaConverter implements SchemaConverter<Schema> {
String discriminatedType, String discriminatedType,
boolean defaultNullable, boolean defaultNullable,
List<SchemaField> fields, List<SchemaField> fields,
boolean isNullable) { boolean isNullable,
Set<String> visitedRecords) {
Schema mapSchema = field.schema(); Schema mapSchema = field.schema();
Schema valueSchema = mapSchema.getValueType(); Schema valueSchema = mapSchema.getValueType();
@ -404,7 +451,14 @@ public class AvroSchemaConverter implements SchemaConverter<Schema> {
.clonePlus( .clonePlus(
new FieldElement( new FieldElement(
Collections.singletonList("map"), new ArrayList<>(), null, null)); Collections.singletonList("map"), new ArrayList<>(), null, null));
processField(valueField, valueFieldPath, defaultNullable, fields, isNullable, mapDataHubType); processField(
valueField,
valueFieldPath,
defaultNullable,
fields,
isNullable,
mapDataHubType,
visitedRecords);
} else { } else {
SchemaField mapField = SchemaField mapField =
new SchemaField() new SchemaField()
@ -426,7 +480,8 @@ public class AvroSchemaConverter implements SchemaConverter<Schema> {
boolean defaultNullable, boolean defaultNullable,
List<SchemaField> fields, List<SchemaField> fields,
boolean isNullable, boolean isNullable,
DataHubType typeOverride) { DataHubType typeOverride,
Set<String> visitedRecords) {
List<Schema> unionTypes = field.schema().getTypes(); List<Schema> unionTypes = field.schema().getTypes();
@ -444,7 +499,8 @@ public class AvroSchemaConverter implements SchemaConverter<Schema> {
fieldPath.popLast(), fieldPath.popLast(),
defaultNullable, defaultNullable,
fields, fields,
true); true,
visitedRecords);
return; return;
} }
@ -498,7 +554,7 @@ public class AvroSchemaConverter implements SchemaConverter<Schema> {
typeIndex, typeIndex,
unionFieldPath.asString(), unionFieldPath.asString(),
unionFieldInner.doc()); unionFieldInner.doc());
processField(unionFieldInner, indexedFieldPath, defaultNullable, fields); processField(unionFieldInner, indexedFieldPath, defaultNullable, fields, visitedRecords);
} }
typeIndex++; typeIndex++;
} }

View File

@ -899,6 +899,32 @@ class AvroSchemaConverterTest {
new SchemaFieldDataType().setType(SchemaFieldDataType.Type.create(new StringType()))); new SchemaFieldDataType().setType(SchemaFieldDataType.Type.create(new StringType())));
} }
@Test(groups = "basic")
void testCyclicReferences() throws IOException {
SchemaMetadata schema =
avroSchemaConverter.toDataHubSchema(
readAvroSchema("cyclic_references.avsc"), false, false, dataPlatformUrn, null);
schema.getFields().forEach(System.out::println);
assertEquals(schema.getFields().size(), 2);
assertSchemaField(
schema.getFields().get(0),
"[version=2.0].[type=Parent].[type=string].name",
"string",
false,
false,
new SchemaFieldDataType().setType(SchemaFieldDataType.Type.create(new StringType())));
assertSchemaField(
schema.getFields().get(1),
"[version=2.0].[type=Parent].[type=Parent].parent",
"Parent",
true,
false,
new SchemaFieldDataType().setType(SchemaFieldDataType.Type.create(new RecordType())));
}
private void assertSchemaField( private void assertSchemaField(
SchemaField field, SchemaField field,
String expectedPath, String expectedPath,

View File

@ -0,0 +1,18 @@
{
"type": "record",
"name": "Parent",
"fields": [
{
"name": "name",
"type": "string"
},
{
"name": "parent",
"type": [
"null",
"Parent"
],
"default": null
}
]
}