fix(changeGenerator): fix logic around descriptions and make execution more efficient (#11539)

This commit is contained in:
RyanHolstien 2024-10-05 06:07:07 -05:00 committed by GitHub
parent 09e9d83f26
commit 5c2c555a84
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 144 additions and 47 deletions

View File

@ -4,6 +4,7 @@ import static com.linkedin.metadata.timeline.eventgenerator.ChangeEventGenerator
import com.datahub.util.RecordUtils;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.linkedin.common.AuditStamp;
import com.linkedin.common.urn.DatasetUrn;
import com.linkedin.common.urn.Urn;
@ -173,7 +174,7 @@ public class SchemaMetadataChangeEventGenerator extends EntityChangeEventGenerat
SchemaField baseField,
SchemaField targetField,
Urn datasetUrn,
ChangeCategory changeCategory,
Set<ChangeCategory> changeCategories,
AuditStamp auditStamp) {
List<ChangeEvent> propChangeEvents = new ArrayList<>();
String datasetFieldUrn;
@ -184,7 +185,7 @@ public class SchemaMetadataChangeEventGenerator extends EntityChangeEventGenerat
}
// Description Change.
if (ChangeCategory.DOCUMENTATION.equals(changeCategory)) {
if (changeCategories != null && changeCategories.contains(ChangeCategory.DOCUMENTATION)) {
ChangeEvent descriptionChangeEvent =
getDescriptionChange(baseField, targetField, datasetFieldUrn, auditStamp);
if (descriptionChangeEvent != null) {
@ -193,14 +194,14 @@ public class SchemaMetadataChangeEventGenerator extends EntityChangeEventGenerat
}
// Global Tags
if (ChangeCategory.TAG.equals(changeCategory)) {
if (changeCategories != null && changeCategories.contains(ChangeCategory.TAG)) {
propChangeEvents.addAll(
getGlobalTagChangeEvents(
baseField, targetField, datasetUrn.toString(), datasetFieldUrn, auditStamp));
}
// Glossary terms.
if (ChangeCategory.GLOSSARY_TERM.equals(changeCategory)) {
if (changeCategories != null && changeCategories.contains(ChangeCategory.GLOSSARY_TERM)) {
propChangeEvents.addAll(
getGlossaryTermsChangeEvents(
baseField, targetField, datasetUrn.toString(), datasetFieldUrn, auditStamp));
@ -213,7 +214,7 @@ public class SchemaMetadataChangeEventGenerator extends EntityChangeEventGenerat
SchemaMetadata baseSchema,
SchemaMetadata targetSchema,
Urn datasetUrn,
ChangeCategory changeCategory,
Set<ChangeCategory> changeCategories,
AuditStamp auditStamp) {
// Sort the fields by their field path. This aligns both sets of fields based on field paths for
// comparisons.
@ -247,11 +248,11 @@ public class SchemaMetadataChangeEventGenerator extends EntityChangeEventGenerat
// This is the same field. Check for change events from property changes.
if (!curBaseField.getNativeDataType().equals(curTargetField.getNativeDataType())) {
processNativeTypeChange(
changeCategory, changeEvents, datasetUrn, curBaseField, curTargetField, auditStamp);
changeCategories, changeEvents, datasetUrn, curBaseField, curTargetField, auditStamp);
}
List<ChangeEvent> propChangeEvents =
getFieldPropertyChangeEvents(
curBaseField, curTargetField, datasetUrn, changeCategory, auditStamp);
curBaseField, curTargetField, datasetUrn, changeCategories, auditStamp);
changeEvents.addAll(propChangeEvents);
++baseFieldIdx;
++targetFieldIdx;
@ -268,16 +269,17 @@ public class SchemaMetadataChangeEventGenerator extends EntityChangeEventGenerat
targetFields.subList(targetFieldIdx, targetFields.size()),
renamedFields);
if (renamedField == null) {
processRemoval(changeCategory, changeEvents, datasetUrn, curBaseField, auditStamp);
processRemoval(changeCategories, changeEvents, datasetUrn, curBaseField, auditStamp);
++baseFieldIdx;
} else {
if (ChangeCategory.TECHNICAL_SCHEMA.equals(changeCategory)) {
if (changeCategories != null
&& changeCategories.contains(ChangeCategory.TECHNICAL_SCHEMA)) {
changeEvents.add(
generateRenameEvent(datasetUrn, curBaseField, renamedField, auditStamp));
}
List<ChangeEvent> propChangeEvents =
getFieldPropertyChangeEvents(
curBaseField, curTargetField, datasetUrn, changeCategory, auditStamp);
curBaseField, renamedField, datasetUrn, changeCategories, auditStamp);
changeEvents.addAll(propChangeEvents);
++baseFieldIdx;
renamedFields.add(renamedField);
@ -289,16 +291,17 @@ public class SchemaMetadataChangeEventGenerator extends EntityChangeEventGenerat
findRenamedField(
curTargetField, baseFields.subList(baseFieldIdx, baseFields.size()), renamedFields);
if (renamedField == null) {
processAdd(changeCategory, changeEvents, datasetUrn, curTargetField, auditStamp);
processAdd(changeCategories, changeEvents, datasetUrn, curTargetField, auditStamp);
++targetFieldIdx;
} else {
if (ChangeCategory.TECHNICAL_SCHEMA.equals(changeCategory)) {
if (changeCategories != null
&& changeCategories.contains(ChangeCategory.TECHNICAL_SCHEMA)) {
changeEvents.add(
generateRenameEvent(datasetUrn, renamedField, curTargetField, auditStamp));
}
List<ChangeEvent> propChangeEvents =
getFieldPropertyChangeEvents(
curBaseField, curTargetField, datasetUrn, changeCategory, auditStamp);
renamedField, curTargetField, datasetUrn, changeCategories, auditStamp);
changeEvents.addAll(propChangeEvents);
++targetFieldIdx;
renamedFields.add(renamedField);
@ -309,7 +312,7 @@ public class SchemaMetadataChangeEventGenerator extends EntityChangeEventGenerat
// Handle removed fields. Non-backward compatible change + major version bump
SchemaField baseField = baseFields.get(baseFieldIdx);
if (!renamedFields.contains(baseField)) {
processRemoval(changeCategory, changeEvents, datasetUrn, baseField, auditStamp);
processRemoval(changeCategories, changeEvents, datasetUrn, baseField, auditStamp);
}
++baseFieldIdx;
}
@ -317,14 +320,15 @@ public class SchemaMetadataChangeEventGenerator extends EntityChangeEventGenerat
// Newly added fields. Forwards & backwards compatible change + minor version bump.
SchemaField targetField = targetFields.get(targetFieldIdx);
if (!renamedFields.contains(targetField)) {
processAdd(changeCategory, changeEvents, datasetUrn, targetField, auditStamp);
processAdd(changeCategories, changeEvents, datasetUrn, targetField, auditStamp);
}
++targetFieldIdx;
}
// Handle primary key constraint change events.
List<ChangeEvent> primaryKeyChangeEvents =
getPrimaryKeyChangeEvents(changeCategory, baseSchema, targetSchema, datasetUrn, auditStamp);
getPrimaryKeyChangeEvents(
changeCategories, baseSchema, targetSchema, datasetUrn, auditStamp);
changeEvents.addAll(primaryKeyChangeEvents);
// Handle foreign key constraint change events, currently no-op due to field not being utilized.
@ -375,12 +379,12 @@ public class SchemaMetadataChangeEventGenerator extends EntityChangeEventGenerat
}
private static void processRemoval(
ChangeCategory changeCategory,
Set<ChangeCategory> changeCategories,
List<ChangeEvent> changeEvents,
Urn datasetUrn,
SchemaField baseField,
AuditStamp auditStamp) {
if (ChangeCategory.TECHNICAL_SCHEMA.equals(changeCategory)) {
if (changeCategories != null && changeCategories.contains(ChangeCategory.TECHNICAL_SCHEMA)) {
changeEvents.add(
DatasetSchemaFieldChangeEvent.schemaFieldChangeEventBuilder()
.modifier(getSchemaFieldUrn(datasetUrn, baseField).toString())
@ -401,17 +405,17 @@ public class SchemaMetadataChangeEventGenerator extends EntityChangeEventGenerat
.build());
}
List<ChangeEvent> propChangeEvents =
getFieldPropertyChangeEvents(baseField, null, datasetUrn, changeCategory, auditStamp);
getFieldPropertyChangeEvents(baseField, null, datasetUrn, changeCategories, auditStamp);
changeEvents.addAll(propChangeEvents);
}
private static void processAdd(
ChangeCategory changeCategory,
Set<ChangeCategory> changeCategories,
List<ChangeEvent> changeEvents,
Urn datasetUrn,
SchemaField targetField,
AuditStamp auditStamp) {
if (ChangeCategory.TECHNICAL_SCHEMA.equals(changeCategory)) {
if (changeCategories != null && changeCategories.contains(ChangeCategory.TECHNICAL_SCHEMA)) {
changeEvents.add(
DatasetSchemaFieldChangeEvent.schemaFieldChangeEventBuilder()
.modifier(getSchemaFieldUrn(datasetUrn, targetField).toString())
@ -428,22 +432,23 @@ public class SchemaMetadataChangeEventGenerator extends EntityChangeEventGenerat
.fieldUrn(getSchemaFieldUrn(datasetUrn, targetField))
.nullable(targetField.isNullable())
.auditStamp(auditStamp)
.modificationCategory(SchemaFieldModificationCategory.OTHER)
.build());
}
List<ChangeEvent> propChangeEvents =
getFieldPropertyChangeEvents(null, targetField, datasetUrn, changeCategory, auditStamp);
getFieldPropertyChangeEvents(null, targetField, datasetUrn, changeCategories, auditStamp);
changeEvents.addAll(propChangeEvents);
}
private static void processNativeTypeChange(
ChangeCategory changeCategory,
Set<ChangeCategory> changeCategories,
List<ChangeEvent> changeEvents,
Urn datasetUrn,
SchemaField curBaseField,
SchemaField curTargetField,
AuditStamp auditStamp) {
// Non-backward compatible change + Major version bump
if (ChangeCategory.TECHNICAL_SCHEMA.equals(changeCategory)) {
if (changeCategories != null && changeCategories.contains(ChangeCategory.TECHNICAL_SCHEMA)) {
changeEvents.add(
DatasetSchemaFieldChangeEvent.schemaFieldChangeEventBuilder()
.category(ChangeCategory.TECHNICAL_SCHEMA)
@ -505,12 +510,12 @@ public class SchemaMetadataChangeEventGenerator extends EntityChangeEventGenerat
}
private static List<ChangeEvent> getPrimaryKeyChangeEvents(
ChangeCategory changeCategory,
Set<ChangeCategory> changeCategories,
SchemaMetadata baseSchema,
SchemaMetadata targetSchema,
Urn datasetUrn,
AuditStamp auditStamp) {
if (ChangeCategory.TECHNICAL_SCHEMA.equals(changeCategory)) {
if (changeCategories != null && changeCategories.contains(ChangeCategory.TECHNICAL_SCHEMA)) {
List<ChangeEvent> primaryKeyChangeEvents = new ArrayList<>();
Set<String> basePrimaryKeys =
(baseSchema != null && baseSchema.getPrimaryKeys() != null)
@ -598,7 +603,7 @@ public class SchemaMetadataChangeEventGenerator extends EntityChangeEventGenerat
baseSchema,
targetSchema,
DatasetUrn.createFromString(currentValue.getUrn()),
changeCategory,
Collections.singleton(changeCategory),
null));
} catch (URISyntaxException e) {
throw new IllegalArgumentException("Malformed DatasetUrn " + currentValue.getUrn());
@ -632,18 +637,16 @@ public class SchemaMetadataChangeEventGenerator extends EntityChangeEventGenerat
@Nonnull Aspect<SchemaMetadata> from,
@Nonnull Aspect<SchemaMetadata> to,
@Nonnull AuditStamp auditStamp) {
final List<ChangeEvent> changeEvents = new ArrayList<>();
changeEvents.addAll(
return new ArrayList<>(
computeDiffs(
from.getValue(), to.getValue(), urn, ChangeCategory.DOCUMENTATION, auditStamp));
changeEvents.addAll(
computeDiffs(from.getValue(), to.getValue(), urn, ChangeCategory.TAG, auditStamp));
changeEvents.addAll(
computeDiffs(
from.getValue(), to.getValue(), urn, ChangeCategory.TECHNICAL_SCHEMA, auditStamp));
changeEvents.addAll(
computeDiffs(
from.getValue(), to.getValue(), urn, ChangeCategory.GLOSSARY_TERM, auditStamp));
return changeEvents;
from.getValue(),
to.getValue(),
urn,
ImmutableSet.of(
ChangeCategory.DOCUMENTATION,
ChangeCategory.TAG,
ChangeCategory.TECHNICAL_SCHEMA,
ChangeCategory.GLOSSARY_TERM),
auditStamp));
}
}

View File

@ -6,6 +6,7 @@ import com.linkedin.common.AuditStamp;
import com.linkedin.common.urn.Urn;
import com.linkedin.data.template.StringArray;
import com.linkedin.metadata.timeline.data.ChangeEvent;
import com.linkedin.metadata.timeline.data.dataset.DatasetSchemaFieldChangeEvent;
import com.linkedin.metadata.timeline.data.dataset.SchemaFieldModificationCategory;
import com.linkedin.mxe.SystemMetadata;
import com.linkedin.restli.internal.server.util.DataMapUtils;
@ -18,6 +19,7 @@ import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.commons.io.IOUtils;
import org.springframework.test.context.testng.AbstractTestNGSpringContextTests;
import org.testng.annotations.Test;
@ -41,18 +43,17 @@ public class SchemaMetadataChangeEventGeneratorTest extends AbstractTestNGSpring
changeEvent -> {
actualDescriptions.add(changeEvent.getDescription());
});
assertEquals(expectedDescriptions, actualDescriptions);
assertEquals(actualDescriptions, expectedDescriptions);
}
private static void compareModificationCategories(
Set<String> expectedCategories, List<ChangeEvent> actual) {
Set<String> actualModificationCategories = new HashSet<>();
actual.forEach(
changeEvent -> {
actualModificationCategories.add(
changeEvent.getParameters().get("modificationCategory").toString());
});
assertEquals(expectedCategories, actualModificationCategories);
Set<String> actualModificationCategories =
actual.stream()
.filter(changeEvent -> changeEvent instanceof DatasetSchemaFieldChangeEvent)
.map(changeEvent -> changeEvent.getParameters().get("modificationCategory").toString())
.collect(Collectors.toSet());
assertEquals(actualModificationCategories, expectedCategories);
}
private static Aspect<SchemaMetadata> getSchemaMetadata(List<SchemaField> schemaFieldList) {
@ -236,6 +237,99 @@ public class SchemaMetadataChangeEventGeneratorTest extends AbstractTestNGSpring
assertEquals(14, actual.size());
}
@Test
public void testSchemaFieldPrimaryKeyChangeRenameAdd() throws Exception {
// When a rename cannot be detected, treated as drop -> add
SchemaMetadataChangeEventGenerator test = new SchemaMetadataChangeEventGenerator();
Urn urn = getTestUrn();
String entity = "dataset";
String aspect = "schemaMetadata";
AuditStamp auditStamp = getTestAuditStamp();
Aspect<SchemaMetadata> from =
getSchemaMetadata(
List.of(
new SchemaField()
.setFieldPath("ID")
.setNativeDataType("NUMBER(16,1)")
.setDescription("My Description"),
new SchemaField()
.setFieldPath("ID2")
.setNativeDataType("NUMBER(16,1)")
.setDescription("My Other Description")));
from.getValue().setPrimaryKeys(new StringArray(List.of("ID")));
Aspect<SchemaMetadata> to3 =
getSchemaMetadata(
List.of(
new SchemaField()
.setFieldPath("ID")
.setNativeDataType("NUMBER(16,1)")
.setDescription("My Description"),
new SchemaField()
.setFieldPath("ID2")
.setNativeDataType("NUMBER(16,1)")
.setDescription("My Other Description")));
to3.getValue().setPrimaryKeys(new StringArray(List.of("ID2")));
List<ChangeEvent> actual = test.getChangeEvents(urn, entity, aspect, from, to3, auditStamp);
compareDescriptions(
Set.of(
"A backwards incompatible change due to a primary key constraint change. "
+ "The following fields were removed: 'ID'. The following fields were added: 'ID2'."),
actual);
assertEquals(1, actual.size());
compareModificationCategories(Set.of(SchemaFieldModificationCategory.OTHER.toString()), actual);
Aspect<SchemaMetadata> to4 =
getSchemaMetadata(
List.of(
new SchemaField()
.setFieldPath("IDZ")
.setNativeDataType("NUMBER(16,1)")
.setDescription("My Description"),
new SchemaField()
.setFieldPath("ID2")
.setNativeDataType("NUMBER(16,1)")
.setDescription("My Other Description")));
to4.getValue().setPrimaryKeys(new StringArray(List.of("ID2")));
List<ChangeEvent> actual2 = test.getChangeEvents(urn, entity, aspect, to3, to4, auditStamp);
compareDescriptions(
Set.of(
"A forwards & backwards compatible change due to renaming of the field 'ID to IDZ'."),
actual2);
assertEquals(1, actual2.size());
compareModificationCategories(
Set.of(SchemaFieldModificationCategory.RENAME.toString()), actual2);
Aspect<SchemaMetadata> to5 =
getSchemaMetadata(
List.of(
new SchemaField()
.setFieldPath("IDZ")
.setNativeDataType("NUMBER(16,1)")
.setDescription("My Description"),
new SchemaField()
.setFieldPath("ID1")
.setNativeDataType("NUMBER(16,1)")
.setDescription("My Third Description"),
new SchemaField()
.setFieldPath("ID2")
.setNativeDataType("NUMBER(16,1)")
.setDescription("My Other Description")));
to5.getValue().setPrimaryKeys(new StringArray(List.of("ID2")));
List<ChangeEvent> actual3 = test.getChangeEvents(urn, entity, aspect, to4, to5, auditStamp);
compareDescriptions(
Set.of(
"A forwards & backwards compatible change due to the newly added field 'ID1'.",
"The description 'My Third Description' for the field 'ID1' has been added."),
actual3);
assertEquals(actual3.size(), 2);
compareModificationCategories(
Set.of(SchemaFieldModificationCategory.OTHER.toString()), actual3);
}
// CHECKSTYLE:OFF
private static final String TEST_OBJECT =
"{\"platformSchema\":{\"com.linkedin.schema.KafkaSchema\":{\"documentSchema\":\"{\\\"type\\\":\\\"record\\\",\\\"name\\\":\\\"SampleHdfsSchema\\\",\\\"namespace\\\":\\\"com.linkedin.dataset\\\",\\\"doc\\\":\\\"Sample HDFS dataset\\\",\\\"fields\\\":[{\\\"name\\\":\\\"field_foo\\\",\\\"type\\\":[\\\"string\\\"]},{\\\"name\\\":\\\"field_bar\\\",\\\"type\\\":[\\\"boolean\\\"]}]}\"}},\"created\":{\"actor\":\"urn:li:corpuser:jdoe\",\"time\":1674291843000},\"lastModified\":{\"actor\":\"urn:li:corpuser:jdoe\",\"time\":1674291843000},\"fields\":[{\"nullable\":false,\"fieldPath\":\"shipment_info\",\"description\":\"Shipment info description\",\"isPartOfKey\":false,\"type\":{\"type\":{\"com.linkedin.schema.RecordType\":{}}},\"recursive\":false,\"nativeDataType\":\"varchar(100)\"},{\"nullable\":false,\"fieldPath\":\"shipment_info.date\",\"description\":\"Shipment info date description\",\"isPartOfKey\":false,\"type\":{\"type\":{\"com.linkedin.schema.DateType\":{}}},\"recursive\":false,\"nativeDataType\":\"Date\"},{\"nullable\":false,\"fieldPath\":\"shipment_info.target\",\"description\":\"Shipment info target description\",\"isPartOfKey\":false,\"type\":{\"type\":{\"com.linkedin.schema.StringType\":{}}},\"recursive\":false,\"nativeDataType\":\"text\"},{\"nullable\":false,\"fieldPath\":\"shipment_info.destination\",\"description\":\"Shipment info destination description\",\"isPartOfKey\":false,\"type\":{\"type\":{\"com.linkedin.schema.StringType\":{}}},\"recursive\":false,\"nativeDataType\":\"varchar(100)\"},{\"nullable\":false,\"fieldPath\":\"shipment_info.geo_info\",\"description\":\"Shipment info geo_info description\",\"isPartOfKey\":false,\"type\":{\"type\":{\"com.linkedin.schema.RecordType\":{}}},\"recursive\":false,\"nativeDataType\":\"varchar(100)\"},{\"nullable\":false,\"fieldPath\":\"shipment_info.geo_info.lat\",\"description\":\"Shipment info geo_info lat\",\"isPartOfKey\":false,\"type\":{\"type\":{\"com.linkedin.schema.NumberType\":{}}},\"recursive\":false,\"nativeDataType\":\"float\"},{\"nullable\":false,\"fieldPath\":\"shipment_info.geo_info.lng\",\"description\":\"Shipment info geo_info lng\",\"isPartOfKey\":false,\"type\":{\"type\":{\"com.linkedin.schema.NumberType\":{}}},\"recursive\":false,\"nativeDataType\":\"float\"}],\"schemaName\":\"SampleHdfsSchema\",\"version\":0,\"hash\":\"\",\"platform\":\"urn:li:dataPlatform:hdfs\"}";