diff --git a/metadata-io/src/main/java/com/linkedin/metadata/timeline/eventgenerator/SchemaMetadataChangeEventGenerator.java b/metadata-io/src/main/java/com/linkedin/metadata/timeline/eventgenerator/SchemaMetadataChangeEventGenerator.java index c6a48ea27c..53f757d8d6 100644 --- a/metadata-io/src/main/java/com/linkedin/metadata/timeline/eventgenerator/SchemaMetadataChangeEventGenerator.java +++ b/metadata-io/src/main/java/com/linkedin/metadata/timeline/eventgenerator/SchemaMetadataChangeEventGenerator.java @@ -4,6 +4,7 @@ import static com.linkedin.metadata.timeline.eventgenerator.ChangeEventGenerator import com.datahub.util.RecordUtils; import com.google.common.collect.ImmutableMap; +import com.google.common.collect.ImmutableSet; import com.linkedin.common.AuditStamp; import com.linkedin.common.urn.DatasetUrn; import com.linkedin.common.urn.Urn; @@ -173,7 +174,7 @@ public class SchemaMetadataChangeEventGenerator extends EntityChangeEventGenerat SchemaField baseField, SchemaField targetField, Urn datasetUrn, - ChangeCategory changeCategory, + Set changeCategories, AuditStamp auditStamp) { List propChangeEvents = new ArrayList<>(); String datasetFieldUrn; @@ -184,7 +185,7 @@ public class SchemaMetadataChangeEventGenerator extends EntityChangeEventGenerat } // Description Change. - if (ChangeCategory.DOCUMENTATION.equals(changeCategory)) { + if (changeCategories != null && changeCategories.contains(ChangeCategory.DOCUMENTATION)) { ChangeEvent descriptionChangeEvent = getDescriptionChange(baseField, targetField, datasetFieldUrn, auditStamp); if (descriptionChangeEvent != null) { @@ -193,14 +194,14 @@ public class SchemaMetadataChangeEventGenerator extends EntityChangeEventGenerat } // Global Tags - if (ChangeCategory.TAG.equals(changeCategory)) { + if (changeCategories != null && changeCategories.contains(ChangeCategory.TAG)) { propChangeEvents.addAll( getGlobalTagChangeEvents( baseField, targetField, datasetUrn.toString(), datasetFieldUrn, auditStamp)); } // Glossary terms. - if (ChangeCategory.GLOSSARY_TERM.equals(changeCategory)) { + if (changeCategories != null && changeCategories.contains(ChangeCategory.GLOSSARY_TERM)) { propChangeEvents.addAll( getGlossaryTermsChangeEvents( baseField, targetField, datasetUrn.toString(), datasetFieldUrn, auditStamp)); @@ -213,7 +214,7 @@ public class SchemaMetadataChangeEventGenerator extends EntityChangeEventGenerat SchemaMetadata baseSchema, SchemaMetadata targetSchema, Urn datasetUrn, - ChangeCategory changeCategory, + Set changeCategories, AuditStamp auditStamp) { // Sort the fields by their field path. This aligns both sets of fields based on field paths for // comparisons. @@ -247,11 +248,11 @@ public class SchemaMetadataChangeEventGenerator extends EntityChangeEventGenerat // This is the same field. Check for change events from property changes. if (!curBaseField.getNativeDataType().equals(curTargetField.getNativeDataType())) { processNativeTypeChange( - changeCategory, changeEvents, datasetUrn, curBaseField, curTargetField, auditStamp); + changeCategories, changeEvents, datasetUrn, curBaseField, curTargetField, auditStamp); } List propChangeEvents = getFieldPropertyChangeEvents( - curBaseField, curTargetField, datasetUrn, changeCategory, auditStamp); + curBaseField, curTargetField, datasetUrn, changeCategories, auditStamp); changeEvents.addAll(propChangeEvents); ++baseFieldIdx; ++targetFieldIdx; @@ -268,16 +269,17 @@ public class SchemaMetadataChangeEventGenerator extends EntityChangeEventGenerat targetFields.subList(targetFieldIdx, targetFields.size()), renamedFields); if (renamedField == null) { - processRemoval(changeCategory, changeEvents, datasetUrn, curBaseField, auditStamp); + processRemoval(changeCategories, changeEvents, datasetUrn, curBaseField, auditStamp); ++baseFieldIdx; } else { - if (ChangeCategory.TECHNICAL_SCHEMA.equals(changeCategory)) { + if (changeCategories != null + && changeCategories.contains(ChangeCategory.TECHNICAL_SCHEMA)) { changeEvents.add( generateRenameEvent(datasetUrn, curBaseField, renamedField, auditStamp)); } List propChangeEvents = getFieldPropertyChangeEvents( - curBaseField, curTargetField, datasetUrn, changeCategory, auditStamp); + curBaseField, renamedField, datasetUrn, changeCategories, auditStamp); changeEvents.addAll(propChangeEvents); ++baseFieldIdx; renamedFields.add(renamedField); @@ -289,16 +291,17 @@ public class SchemaMetadataChangeEventGenerator extends EntityChangeEventGenerat findRenamedField( curTargetField, baseFields.subList(baseFieldIdx, baseFields.size()), renamedFields); if (renamedField == null) { - processAdd(changeCategory, changeEvents, datasetUrn, curTargetField, auditStamp); + processAdd(changeCategories, changeEvents, datasetUrn, curTargetField, auditStamp); ++targetFieldIdx; } else { - if (ChangeCategory.TECHNICAL_SCHEMA.equals(changeCategory)) { + if (changeCategories != null + && changeCategories.contains(ChangeCategory.TECHNICAL_SCHEMA)) { changeEvents.add( generateRenameEvent(datasetUrn, renamedField, curTargetField, auditStamp)); } List propChangeEvents = getFieldPropertyChangeEvents( - curBaseField, curTargetField, datasetUrn, changeCategory, auditStamp); + renamedField, curTargetField, datasetUrn, changeCategories, auditStamp); changeEvents.addAll(propChangeEvents); ++targetFieldIdx; renamedFields.add(renamedField); @@ -309,7 +312,7 @@ public class SchemaMetadataChangeEventGenerator extends EntityChangeEventGenerat // Handle removed fields. Non-backward compatible change + major version bump SchemaField baseField = baseFields.get(baseFieldIdx); if (!renamedFields.contains(baseField)) { - processRemoval(changeCategory, changeEvents, datasetUrn, baseField, auditStamp); + processRemoval(changeCategories, changeEvents, datasetUrn, baseField, auditStamp); } ++baseFieldIdx; } @@ -317,14 +320,15 @@ public class SchemaMetadataChangeEventGenerator extends EntityChangeEventGenerat // Newly added fields. Forwards & backwards compatible change + minor version bump. SchemaField targetField = targetFields.get(targetFieldIdx); if (!renamedFields.contains(targetField)) { - processAdd(changeCategory, changeEvents, datasetUrn, targetField, auditStamp); + processAdd(changeCategories, changeEvents, datasetUrn, targetField, auditStamp); } ++targetFieldIdx; } // Handle primary key constraint change events. List primaryKeyChangeEvents = - getPrimaryKeyChangeEvents(changeCategory, baseSchema, targetSchema, datasetUrn, auditStamp); + getPrimaryKeyChangeEvents( + changeCategories, baseSchema, targetSchema, datasetUrn, auditStamp); changeEvents.addAll(primaryKeyChangeEvents); // Handle foreign key constraint change events, currently no-op due to field not being utilized. @@ -375,12 +379,12 @@ public class SchemaMetadataChangeEventGenerator extends EntityChangeEventGenerat } private static void processRemoval( - ChangeCategory changeCategory, + Set changeCategories, List changeEvents, Urn datasetUrn, SchemaField baseField, AuditStamp auditStamp) { - if (ChangeCategory.TECHNICAL_SCHEMA.equals(changeCategory)) { + if (changeCategories != null && changeCategories.contains(ChangeCategory.TECHNICAL_SCHEMA)) { changeEvents.add( DatasetSchemaFieldChangeEvent.schemaFieldChangeEventBuilder() .modifier(getSchemaFieldUrn(datasetUrn, baseField).toString()) @@ -401,17 +405,17 @@ public class SchemaMetadataChangeEventGenerator extends EntityChangeEventGenerat .build()); } List propChangeEvents = - getFieldPropertyChangeEvents(baseField, null, datasetUrn, changeCategory, auditStamp); + getFieldPropertyChangeEvents(baseField, null, datasetUrn, changeCategories, auditStamp); changeEvents.addAll(propChangeEvents); } private static void processAdd( - ChangeCategory changeCategory, + Set changeCategories, List changeEvents, Urn datasetUrn, SchemaField targetField, AuditStamp auditStamp) { - if (ChangeCategory.TECHNICAL_SCHEMA.equals(changeCategory)) { + if (changeCategories != null && changeCategories.contains(ChangeCategory.TECHNICAL_SCHEMA)) { changeEvents.add( DatasetSchemaFieldChangeEvent.schemaFieldChangeEventBuilder() .modifier(getSchemaFieldUrn(datasetUrn, targetField).toString()) @@ -428,22 +432,23 @@ public class SchemaMetadataChangeEventGenerator extends EntityChangeEventGenerat .fieldUrn(getSchemaFieldUrn(datasetUrn, targetField)) .nullable(targetField.isNullable()) .auditStamp(auditStamp) + .modificationCategory(SchemaFieldModificationCategory.OTHER) .build()); } List propChangeEvents = - getFieldPropertyChangeEvents(null, targetField, datasetUrn, changeCategory, auditStamp); + getFieldPropertyChangeEvents(null, targetField, datasetUrn, changeCategories, auditStamp); changeEvents.addAll(propChangeEvents); } private static void processNativeTypeChange( - ChangeCategory changeCategory, + Set changeCategories, List changeEvents, Urn datasetUrn, SchemaField curBaseField, SchemaField curTargetField, AuditStamp auditStamp) { // Non-backward compatible change + Major version bump - if (ChangeCategory.TECHNICAL_SCHEMA.equals(changeCategory)) { + if (changeCategories != null && changeCategories.contains(ChangeCategory.TECHNICAL_SCHEMA)) { changeEvents.add( DatasetSchemaFieldChangeEvent.schemaFieldChangeEventBuilder() .category(ChangeCategory.TECHNICAL_SCHEMA) @@ -505,12 +510,12 @@ public class SchemaMetadataChangeEventGenerator extends EntityChangeEventGenerat } private static List getPrimaryKeyChangeEvents( - ChangeCategory changeCategory, + Set changeCategories, SchemaMetadata baseSchema, SchemaMetadata targetSchema, Urn datasetUrn, AuditStamp auditStamp) { - if (ChangeCategory.TECHNICAL_SCHEMA.equals(changeCategory)) { + if (changeCategories != null && changeCategories.contains(ChangeCategory.TECHNICAL_SCHEMA)) { List primaryKeyChangeEvents = new ArrayList<>(); Set basePrimaryKeys = (baseSchema != null && baseSchema.getPrimaryKeys() != null) @@ -598,7 +603,7 @@ public class SchemaMetadataChangeEventGenerator extends EntityChangeEventGenerat baseSchema, targetSchema, DatasetUrn.createFromString(currentValue.getUrn()), - changeCategory, + Collections.singleton(changeCategory), null)); } catch (URISyntaxException e) { throw new IllegalArgumentException("Malformed DatasetUrn " + currentValue.getUrn()); @@ -632,18 +637,16 @@ public class SchemaMetadataChangeEventGenerator extends EntityChangeEventGenerat @Nonnull Aspect from, @Nonnull Aspect to, @Nonnull AuditStamp auditStamp) { - final List changeEvents = new ArrayList<>(); - changeEvents.addAll( + return new ArrayList<>( computeDiffs( - from.getValue(), to.getValue(), urn, ChangeCategory.DOCUMENTATION, auditStamp)); - changeEvents.addAll( - computeDiffs(from.getValue(), to.getValue(), urn, ChangeCategory.TAG, auditStamp)); - changeEvents.addAll( - computeDiffs( - from.getValue(), to.getValue(), urn, ChangeCategory.TECHNICAL_SCHEMA, auditStamp)); - changeEvents.addAll( - computeDiffs( - from.getValue(), to.getValue(), urn, ChangeCategory.GLOSSARY_TERM, auditStamp)); - return changeEvents; + from.getValue(), + to.getValue(), + urn, + ImmutableSet.of( + ChangeCategory.DOCUMENTATION, + ChangeCategory.TAG, + ChangeCategory.TECHNICAL_SCHEMA, + ChangeCategory.GLOSSARY_TERM), + auditStamp)); } } diff --git a/metadata-io/src/test/java/com/linkedin/metadata/timeline/eventgenerator/SchemaMetadataChangeEventGeneratorTest.java b/metadata-io/src/test/java/com/linkedin/metadata/timeline/eventgenerator/SchemaMetadataChangeEventGeneratorTest.java index 772ef374af..88dd81d953 100644 --- a/metadata-io/src/test/java/com/linkedin/metadata/timeline/eventgenerator/SchemaMetadataChangeEventGeneratorTest.java +++ b/metadata-io/src/test/java/com/linkedin/metadata/timeline/eventgenerator/SchemaMetadataChangeEventGeneratorTest.java @@ -6,6 +6,7 @@ import com.linkedin.common.AuditStamp; import com.linkedin.common.urn.Urn; import com.linkedin.data.template.StringArray; import com.linkedin.metadata.timeline.data.ChangeEvent; +import com.linkedin.metadata.timeline.data.dataset.DatasetSchemaFieldChangeEvent; import com.linkedin.metadata.timeline.data.dataset.SchemaFieldModificationCategory; import com.linkedin.mxe.SystemMetadata; import com.linkedin.restli.internal.server.util.DataMapUtils; @@ -18,6 +19,7 @@ import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; +import java.util.stream.Collectors; import org.apache.commons.io.IOUtils; import org.springframework.test.context.testng.AbstractTestNGSpringContextTests; import org.testng.annotations.Test; @@ -41,18 +43,17 @@ public class SchemaMetadataChangeEventGeneratorTest extends AbstractTestNGSpring changeEvent -> { actualDescriptions.add(changeEvent.getDescription()); }); - assertEquals(expectedDescriptions, actualDescriptions); + assertEquals(actualDescriptions, expectedDescriptions); } private static void compareModificationCategories( Set expectedCategories, List actual) { - Set actualModificationCategories = new HashSet<>(); - actual.forEach( - changeEvent -> { - actualModificationCategories.add( - changeEvent.getParameters().get("modificationCategory").toString()); - }); - assertEquals(expectedCategories, actualModificationCategories); + Set actualModificationCategories = + actual.stream() + .filter(changeEvent -> changeEvent instanceof DatasetSchemaFieldChangeEvent) + .map(changeEvent -> changeEvent.getParameters().get("modificationCategory").toString()) + .collect(Collectors.toSet()); + assertEquals(actualModificationCategories, expectedCategories); } private static Aspect getSchemaMetadata(List schemaFieldList) { @@ -236,6 +237,99 @@ public class SchemaMetadataChangeEventGeneratorTest extends AbstractTestNGSpring assertEquals(14, actual.size()); } + @Test + public void testSchemaFieldPrimaryKeyChangeRenameAdd() throws Exception { + // When a rename cannot be detected, treated as drop -> add + SchemaMetadataChangeEventGenerator test = new SchemaMetadataChangeEventGenerator(); + + Urn urn = getTestUrn(); + String entity = "dataset"; + String aspect = "schemaMetadata"; + AuditStamp auditStamp = getTestAuditStamp(); + + Aspect from = + getSchemaMetadata( + List.of( + new SchemaField() + .setFieldPath("ID") + .setNativeDataType("NUMBER(16,1)") + .setDescription("My Description"), + new SchemaField() + .setFieldPath("ID2") + .setNativeDataType("NUMBER(16,1)") + .setDescription("My Other Description"))); + from.getValue().setPrimaryKeys(new StringArray(List.of("ID"))); + Aspect to3 = + getSchemaMetadata( + List.of( + new SchemaField() + .setFieldPath("ID") + .setNativeDataType("NUMBER(16,1)") + .setDescription("My Description"), + new SchemaField() + .setFieldPath("ID2") + .setNativeDataType("NUMBER(16,1)") + .setDescription("My Other Description"))); + to3.getValue().setPrimaryKeys(new StringArray(List.of("ID2"))); + List actual = test.getChangeEvents(urn, entity, aspect, from, to3, auditStamp); + compareDescriptions( + Set.of( + "A backwards incompatible change due to a primary key constraint change. " + + "The following fields were removed: 'ID'. The following fields were added: 'ID2'."), + actual); + assertEquals(1, actual.size()); + compareModificationCategories(Set.of(SchemaFieldModificationCategory.OTHER.toString()), actual); + + Aspect to4 = + getSchemaMetadata( + List.of( + new SchemaField() + .setFieldPath("IDZ") + .setNativeDataType("NUMBER(16,1)") + .setDescription("My Description"), + new SchemaField() + .setFieldPath("ID2") + .setNativeDataType("NUMBER(16,1)") + .setDescription("My Other Description"))); + to4.getValue().setPrimaryKeys(new StringArray(List.of("ID2"))); + + List actual2 = test.getChangeEvents(urn, entity, aspect, to3, to4, auditStamp); + compareDescriptions( + Set.of( + "A forwards & backwards compatible change due to renaming of the field 'ID to IDZ'."), + actual2); + assertEquals(1, actual2.size()); + compareModificationCategories( + Set.of(SchemaFieldModificationCategory.RENAME.toString()), actual2); + + Aspect to5 = + getSchemaMetadata( + List.of( + new SchemaField() + .setFieldPath("IDZ") + .setNativeDataType("NUMBER(16,1)") + .setDescription("My Description"), + new SchemaField() + .setFieldPath("ID1") + .setNativeDataType("NUMBER(16,1)") + .setDescription("My Third Description"), + new SchemaField() + .setFieldPath("ID2") + .setNativeDataType("NUMBER(16,1)") + .setDescription("My Other Description"))); + to5.getValue().setPrimaryKeys(new StringArray(List.of("ID2"))); + + List actual3 = test.getChangeEvents(urn, entity, aspect, to4, to5, auditStamp); + compareDescriptions( + Set.of( + "A forwards & backwards compatible change due to the newly added field 'ID1'.", + "The description 'My Third Description' for the field 'ID1' has been added."), + actual3); + assertEquals(actual3.size(), 2); + compareModificationCategories( + Set.of(SchemaFieldModificationCategory.OTHER.toString()), actual3); + } + // CHECKSTYLE:OFF private static final String TEST_OBJECT = "{\"platformSchema\":{\"com.linkedin.schema.KafkaSchema\":{\"documentSchema\":\"{\\\"type\\\":\\\"record\\\",\\\"name\\\":\\\"SampleHdfsSchema\\\",\\\"namespace\\\":\\\"com.linkedin.dataset\\\",\\\"doc\\\":\\\"Sample HDFS dataset\\\",\\\"fields\\\":[{\\\"name\\\":\\\"field_foo\\\",\\\"type\\\":[\\\"string\\\"]},{\\\"name\\\":\\\"field_bar\\\",\\\"type\\\":[\\\"boolean\\\"]}]}\"}},\"created\":{\"actor\":\"urn:li:corpuser:jdoe\",\"time\":1674291843000},\"lastModified\":{\"actor\":\"urn:li:corpuser:jdoe\",\"time\":1674291843000},\"fields\":[{\"nullable\":false,\"fieldPath\":\"shipment_info\",\"description\":\"Shipment info description\",\"isPartOfKey\":false,\"type\":{\"type\":{\"com.linkedin.schema.RecordType\":{}}},\"recursive\":false,\"nativeDataType\":\"varchar(100)\"},{\"nullable\":false,\"fieldPath\":\"shipment_info.date\",\"description\":\"Shipment info date description\",\"isPartOfKey\":false,\"type\":{\"type\":{\"com.linkedin.schema.DateType\":{}}},\"recursive\":false,\"nativeDataType\":\"Date\"},{\"nullable\":false,\"fieldPath\":\"shipment_info.target\",\"description\":\"Shipment info target description\",\"isPartOfKey\":false,\"type\":{\"type\":{\"com.linkedin.schema.StringType\":{}}},\"recursive\":false,\"nativeDataType\":\"text\"},{\"nullable\":false,\"fieldPath\":\"shipment_info.destination\",\"description\":\"Shipment info destination description\",\"isPartOfKey\":false,\"type\":{\"type\":{\"com.linkedin.schema.StringType\":{}}},\"recursive\":false,\"nativeDataType\":\"varchar(100)\"},{\"nullable\":false,\"fieldPath\":\"shipment_info.geo_info\",\"description\":\"Shipment info geo_info description\",\"isPartOfKey\":false,\"type\":{\"type\":{\"com.linkedin.schema.RecordType\":{}}},\"recursive\":false,\"nativeDataType\":\"varchar(100)\"},{\"nullable\":false,\"fieldPath\":\"shipment_info.geo_info.lat\",\"description\":\"Shipment info geo_info lat\",\"isPartOfKey\":false,\"type\":{\"type\":{\"com.linkedin.schema.NumberType\":{}}},\"recursive\":false,\"nativeDataType\":\"float\"},{\"nullable\":false,\"fieldPath\":\"shipment_info.geo_info.lng\",\"description\":\"Shipment info geo_info lng\",\"isPartOfKey\":false,\"type\":{\"type\":{\"com.linkedin.schema.NumberType\":{}}},\"recursive\":false,\"nativeDataType\":\"float\"}],\"schemaName\":\"SampleHdfsSchema\",\"version\":0,\"hash\":\"\",\"platform\":\"urn:li:dataPlatform:hdfs\"}";