diff --git a/metadata-integration/java/datahub-schematron/lib/src/main/java/io/datahubproject/schematron/converters/avro/AvroSchemaConverter.java b/metadata-integration/java/datahub-schematron/lib/src/main/java/io/datahubproject/schematron/converters/avro/AvroSchemaConverter.java index 0ddb357db7..3c9bb3a2cd 100644 --- a/metadata-integration/java/datahub-schematron/lib/src/main/java/io/datahubproject/schematron/converters/avro/AvroSchemaConverter.java +++ b/metadata-integration/java/datahub-schematron/lib/src/main/java/io/datahubproject/schematron/converters/avro/AvroSchemaConverter.java @@ -131,12 +131,15 @@ public class AvroSchemaConverter implements SchemaConverter { FieldPath basePath = new FieldPath(); basePath.setKeySchema(isKeySchema); + Set visitedRecords = new HashSet<>(); + // Add the record type to the base path if (schema.getType() == Schema.Type.RECORD) { basePath = basePath.expandType(schema.getName(), schema.toString()); + visitedRecords.add(schema.getFullName()); } - processSchema(schema, basePath, defaultNullable, fields); + processSchema(schema, basePath, defaultNullable, fields, visitedRecords); return new SchemaMetadata() .setSchemaName(schema.getName()) @@ -155,26 +158,25 @@ public class AvroSchemaConverter implements SchemaConverter { } private void processSchema( - Schema schema, FieldPath fieldPath, boolean defaultNullable, List fields) { + Schema schema, + FieldPath fieldPath, + boolean defaultNullable, + List fields, + Set visitedRecords) { if (schema.getType() == Schema.Type.RECORD) { for (Schema.Field field : schema.getFields()) { - processField(field, fieldPath, defaultNullable, fields); + processField(field, fieldPath, defaultNullable, fields, visitedRecords); } } } - private void processField( - Schema.Field field, FieldPath fieldPath, boolean defaultNullable, List fields) { - processField(field, fieldPath, defaultNullable, fields, false, null); - } - private void processField( Schema.Field field, FieldPath fieldPath, boolean defaultNullable, List fields, - boolean nullableOverride) { - processField(field, fieldPath, defaultNullable, fields, nullableOverride, null); + Set visitedRecords) { + processField(field, fieldPath, defaultNullable, fields, false, null, visitedRecords); } private void processField( @@ -183,7 +185,18 @@ public class AvroSchemaConverter implements SchemaConverter { boolean defaultNullable, List fields, boolean nullableOverride, - DataHubType typeOverride) { + Set visitedRecords) { + processField(field, fieldPath, defaultNullable, fields, nullableOverride, null, visitedRecords); + } + + private void processField( + Schema.Field field, + FieldPath fieldPath, + boolean defaultNullable, + List fields, + boolean nullableOverride, + DataHubType typeOverride, + Set visitedRecords) { log.debug( "Processing field: {}, Field path : {}, Field schema: {}", field.name(), @@ -215,17 +228,33 @@ public class AvroSchemaConverter implements SchemaConverter { switch (fieldSchema.getType()) { case RECORD: processRecordField( - field, newPath, discriminatedType, defaultNullable, fields, isNullable, typeOverride); + field, + newPath, + discriminatedType, + defaultNullable, + fields, + isNullable, + typeOverride, + visitedRecords); break; case ARRAY: - processArrayField(field, newPath, discriminatedType, defaultNullable, fields, isNullable); + processArrayField( + field, newPath, discriminatedType, defaultNullable, fields, isNullable, visitedRecords); break; case MAP: - processMapField(field, newPath, discriminatedType, defaultNullable, fields, isNullable); + processMapField( + field, newPath, discriminatedType, defaultNullable, fields, isNullable, visitedRecords); break; case UNION: processUnionField( - field, newPath, discriminatedType, defaultNullable, fields, isNullable, typeOverride); + field, + newPath, + discriminatedType, + defaultNullable, + fields, + isNullable, + typeOverride, + visitedRecords); break; case ENUM: processEnumField(field, newPath, discriminatedType, defaultNullable, fields, isNullable); @@ -244,7 +273,8 @@ public class AvroSchemaConverter implements SchemaConverter { boolean defaultNullable, List fields, boolean isNullable, - DataHubType typeOverride) { + DataHubType typeOverride, + Set visitedRecords) { log.debug("Record Field Path before expand: {}", fieldPath.asString()); FieldPath recordPath = fieldPath.expandType(discriminatedType, field.schema().toString()); @@ -255,7 +285,6 @@ public class AvroSchemaConverter implements SchemaConverter { ? typeOverride.asSchemaFieldType() : new SchemaFieldDataType().setType(SchemaFieldDataType.Type.create(new RecordType())); - // Add the record field itself SchemaField recordField = new SchemaField() .setFieldPath(recordPath.asString()) @@ -263,15 +292,24 @@ public class AvroSchemaConverter implements SchemaConverter { .setNativeDataType(discriminatedType) .setNullable(isNullable || defaultNullable) .setIsPartOfKey(fieldPath.isKeySchema()); - populateCommonProperties(field, recordField); - fields.add(recordField); + if (isCyclicReference(field, visitedRecords)) { + return; + } + visitedRecords.add(field.schema().getFullName()); + // Process nested fields for (Schema.Field nestedField : field.schema().getFields()) { - processField(nestedField, recordPath, defaultNullable, fields); + processField(nestedField, recordPath, defaultNullable, fields, visitedRecords); } + + visitedRecords.remove(field.schema().getFullName()); + } + + private boolean isCyclicReference(Schema.Field field, Set visitedRecords) { + return visitedRecords.contains(field.schema().getFullName()); } @SneakyThrows @@ -327,7 +365,8 @@ public class AvroSchemaConverter implements SchemaConverter { String discriminatedType, boolean defaultNullable, List fields, - boolean isNullable) { + boolean isNullable, + Set visitedRecords) { Schema arraySchema = field.schema(); Schema elementSchema = arraySchema.getElementType(); @@ -354,7 +393,14 @@ public class AvroSchemaConverter implements SchemaConverter { elementSchema.getDoc() != null ? elementSchema.getDoc() : field.doc(), null // TODO: What is the default value for an array element? ); - processField(elementField, fieldPath, defaultNullable, fields, isNullable, arrayDataHubType); + processField( + elementField, + fieldPath, + defaultNullable, + fields, + isNullable, + arrayDataHubType, + visitedRecords); } else { SchemaField arrayField = @@ -377,7 +423,8 @@ public class AvroSchemaConverter implements SchemaConverter { String discriminatedType, boolean defaultNullable, List fields, - boolean isNullable) { + boolean isNullable, + Set visitedRecords) { Schema mapSchema = field.schema(); Schema valueSchema = mapSchema.getValueType(); @@ -404,7 +451,14 @@ public class AvroSchemaConverter implements SchemaConverter { .clonePlus( new FieldElement( Collections.singletonList("map"), new ArrayList<>(), null, null)); - processField(valueField, valueFieldPath, defaultNullable, fields, isNullable, mapDataHubType); + processField( + valueField, + valueFieldPath, + defaultNullable, + fields, + isNullable, + mapDataHubType, + visitedRecords); } else { SchemaField mapField = new SchemaField() @@ -426,7 +480,8 @@ public class AvroSchemaConverter implements SchemaConverter { boolean defaultNullable, List fields, boolean isNullable, - DataHubType typeOverride) { + DataHubType typeOverride, + Set visitedRecords) { List unionTypes = field.schema().getTypes(); @@ -444,7 +499,8 @@ public class AvroSchemaConverter implements SchemaConverter { fieldPath.popLast(), defaultNullable, fields, - true); + true, + visitedRecords); return; } @@ -498,7 +554,7 @@ public class AvroSchemaConverter implements SchemaConverter { typeIndex, unionFieldPath.asString(), unionFieldInner.doc()); - processField(unionFieldInner, indexedFieldPath, defaultNullable, fields); + processField(unionFieldInner, indexedFieldPath, defaultNullable, fields, visitedRecords); } typeIndex++; } diff --git a/metadata-integration/java/datahub-schematron/lib/src/test/java/io/datahubproject/schematron/converters/avro/AvroSchemaConverterTest.java b/metadata-integration/java/datahub-schematron/lib/src/test/java/io/datahubproject/schematron/converters/avro/AvroSchemaConverterTest.java index d6522c2d84..675d5f7e71 100644 --- a/metadata-integration/java/datahub-schematron/lib/src/test/java/io/datahubproject/schematron/converters/avro/AvroSchemaConverterTest.java +++ b/metadata-integration/java/datahub-schematron/lib/src/test/java/io/datahubproject/schematron/converters/avro/AvroSchemaConverterTest.java @@ -899,6 +899,32 @@ class AvroSchemaConverterTest { new SchemaFieldDataType().setType(SchemaFieldDataType.Type.create(new StringType()))); } + @Test(groups = "basic") + void testCyclicReferences() throws IOException { + SchemaMetadata schema = + avroSchemaConverter.toDataHubSchema( + readAvroSchema("cyclic_references.avsc"), false, false, dataPlatformUrn, null); + + schema.getFields().forEach(System.out::println); + + assertEquals(schema.getFields().size(), 2); + + assertSchemaField( + schema.getFields().get(0), + "[version=2.0].[type=Parent].[type=string].name", + "string", + false, + false, + new SchemaFieldDataType().setType(SchemaFieldDataType.Type.create(new StringType()))); + assertSchemaField( + schema.getFields().get(1), + "[version=2.0].[type=Parent].[type=Parent].parent", + "Parent", + true, + false, + new SchemaFieldDataType().setType(SchemaFieldDataType.Type.create(new RecordType()))); + } + private void assertSchemaField( SchemaField field, String expectedPath, diff --git a/metadata-integration/java/datahub-schematron/lib/src/test/resources/cyclic_references.avsc b/metadata-integration/java/datahub-schematron/lib/src/test/resources/cyclic_references.avsc new file mode 100644 index 0000000000..4638474134 --- /dev/null +++ b/metadata-integration/java/datahub-schematron/lib/src/test/resources/cyclic_references.avsc @@ -0,0 +1,18 @@ +{ + "type": "record", + "name": "Parent", + "fields": [ + { + "name": "name", + "type": "string" + }, + { + "name": "parent", + "type": [ + "null", + "Parent" + ], + "default": null + } + ] +}