fix(timeline): enhance schema field name change and removal support (#4603)

This commit is contained in:
RyanHolstien 2022-04-15 17:36:04 -05:00 committed by GitHub
parent e572af62ea
commit 15474cdad5
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 176 additions and 95 deletions

View File

@ -22,11 +22,12 @@ import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;
import javax.annotation.Nonnull;
import org.apache.commons.lang.StringUtils;
public class SchemaDiffer implements Differ {
private static final String SCHEMA_METADATA_ASPECT_NAME = "schemaMetadata";
private static final String BACKWARDS_INCOMPATIBLE_DESC = "A backwards incompatible change due to ";
private static final String BACKWARDS_INCOMPATIBLE_DESC = "A backwards incompatible change due to";
private static final String FORWARDS_COMPATIBLE_DESC = "A forwards compatible change due to ";
private static final String BACK_AND_FORWARD_COMPATIBLE_DESC = "A forwards & backwards compatible change due to ";
private static final String FIELD_DESCRIPTION_ADDED_FORMAT =
@ -135,14 +136,11 @@ public class SchemaDiffer implements Differ {
// TODO: This could use some cleanup, lots of repeated logic and tenuous conditionals
private static List<ChangeEvent> computeDiffs(SchemaMetadata baseSchema, SchemaMetadata targetSchema,
Urn datasetUrn, ChangeCategory changeCategory) {
boolean isOrdinalBasedSchema = isSchemaOrdinalBased(targetSchema);
if (!isOrdinalBasedSchema) {
// Sort the fields by their field path.
if (baseSchema != null) {
sortFieldsByPath(baseSchema);
}
sortFieldsByPath(targetSchema);
// Sort the fields by their field path.
if (baseSchema != null) {
sortFieldsByPath(baseSchema);
}
sortFieldsByPath(targetSchema);
// Performs ordinal based diff, primarily based on fixed field ordinals and their types.
SchemaFieldArray baseFields = (baseSchema != null ? baseSchema.getFields() : new SchemaFieldArray());
@ -150,10 +148,18 @@ public class SchemaDiffer implements Differ {
int baseFieldIdx = 0;
int targetFieldIdx = 0;
List<ChangeEvent> changeEvents = new ArrayList<>();
Set<SchemaField> renamedFields = new HashSet<>();
while (baseFieldIdx < baseFields.size() && targetFieldIdx < targetFields.size()) {
SchemaField curBaseField = baseFields.get(baseFieldIdx);
SchemaField curTargetField = targetFields.get(targetFieldIdx);
if (isOrdinalBasedSchema) {
//TODO: Re-evaluate ordinal processing?
int comparison = curBaseField.getFieldPath().compareTo(curTargetField.getFieldPath());
if (renamedFields.contains(curBaseField)) {
baseFieldIdx++;
} else if (renamedFields.contains(curTargetField)) {
targetFieldIdx++;
} else if (comparison == 0) {
// This is the same field. Check for change events from property changes.
if (!curBaseField.getNativeDataType().equals(curTargetField.getNativeDataType())) {
// Non-backward compatible change + Major version bump
if (ChangeCategory.TECHNICAL_SCHEMA.equals(changeCategory)) {
@ -173,109 +179,62 @@ public class SchemaDiffer implements Differ {
changeEvents.addAll(propChangeEvents);
++baseFieldIdx;
++targetFieldIdx;
continue;
}
if (baseFieldIdx == targetFieldIdx && !curBaseField.getFieldPath().equals(curTargetField.getFieldPath())
&& ChangeCategory.TECHNICAL_SCHEMA.equals(changeCategory)) {
// The field got renamed. Forward compatible + Minor version bump.
changeEvents.add(ChangeEvent.builder()
.category(ChangeCategory.TECHNICAL_SCHEMA)
.elementId(getSchemaFieldUrn(datasetUrn, curBaseField))
.target(datasetUrn.toString())
.changeType(ChangeOperation.MODIFY)
.semVerChange(SemanticChangeType.MINOR)
.description(
FORWARDS_COMPATIBLE_DESC + "field name changed from '" + getFieldPathV1(curBaseField) + "' to '"
+ getFieldPathV1(curTargetField) + "'")
.build());
}
// Generate change events from property changes
List<ChangeEvent> propChangeEvents = getFieldPropertyChangeEvents(curBaseField, curTargetField, datasetUrn,
changeCategory);
List<ChangeEvent> propChangeEvents =
getFieldPropertyChangeEvents(curBaseField, curTargetField, datasetUrn, changeCategory);
changeEvents.addAll(propChangeEvents);
++baseFieldIdx;
++targetFieldIdx;
} else {
// Non-ordinal based schemas are pre-sorted by ascending order of fieldPaths.
int comparison = curBaseField.getFieldPath().compareTo(curTargetField.getFieldPath());
if (comparison == 0) {
// This is the same field. Check for change events from property changes.
List<ChangeEvent> propChangeEvents = getFieldPropertyChangeEvents(curBaseField, curTargetField, datasetUrn,
changeCategory);
changeEvents.addAll(propChangeEvents);
++baseFieldIdx;
++targetFieldIdx;
} else if (comparison < 0) {
// BaseFiled got removed. Non-backward compatible change + Major version bump
if (ChangeCategory.TECHNICAL_SCHEMA.equals(changeCategory)) {
changeEvents.add(ChangeEvent.builder()
.category(ChangeCategory.TECHNICAL_SCHEMA)
.elementId(getSchemaFieldUrn(datasetUrn, curBaseField))
.target(datasetUrn.toString())
.changeType(ChangeOperation.REMOVE)
.semVerChange(SemanticChangeType.MAJOR)
.description(BACKWARDS_INCOMPATIBLE_DESC + "removal of the field'" + getFieldPathV1(curBaseField) + "'.")
.build());
}
List<ChangeEvent> propChangeEvents = getFieldPropertyChangeEvents(curBaseField, curTargetField, datasetUrn,
changeCategory);
changeEvents.addAll(propChangeEvents);
} else if (comparison < 0) {
// Base Field was removed or was renamed. Non-backward compatible change + Major version bump
// Check for rename, if rename coincides with other modifications we assume drop/add.
// Assumes that two different fields on the same schema would not have the same description, terms,
// or tags and share the same type
SchemaField renamedField = findRenamedField(curBaseField,
targetFields.subList(targetFieldIdx, targetFields.size()), renamedFields);
if (renamedField == null) {
processRemoval(changeCategory, changeEvents, datasetUrn, curBaseField);
++baseFieldIdx;
} else {
// The targetField got added. Forward & backwards compatible change + minor version bump.
if (ChangeCategory.TECHNICAL_SCHEMA.equals(changeCategory)) {
changeEvents.add(ChangeEvent.builder()
.category(ChangeCategory.TECHNICAL_SCHEMA)
.elementId(getSchemaFieldUrn(datasetUrn, curTargetField))
.target(datasetUrn.toString())
.changeType(ChangeOperation.ADD)
.semVerChange(SemanticChangeType.MINOR)
.description(
BACK_AND_FORWARD_COMPATIBLE_DESC + "the newly added field '" + getFieldPathV1(curTargetField) + "'.")
.build());
}
changeEvents.add(generateRenameEvent(datasetUrn, curBaseField, renamedField));
List<ChangeEvent> propChangeEvents = getFieldPropertyChangeEvents(curBaseField, curTargetField, datasetUrn,
changeCategory);
changeEvents.addAll(propChangeEvents);
++baseFieldIdx;
renamedFields.add(renamedField);
}
} else {
// The targetField got added or a rename occurred. Forward & backwards compatible change + minor version bump.
SchemaField renamedField = findRenamedField(curTargetField,
baseFields.subList(baseFieldIdx, baseFields.size()), renamedFields);
if (renamedField == null) {
processAdd(changeCategory, changeEvents, datasetUrn, curTargetField);
++targetFieldIdx;
} else {
changeEvents.add(generateRenameEvent(datasetUrn, renamedField, curTargetField));
List<ChangeEvent> propChangeEvents = getFieldPropertyChangeEvents(curBaseField, curTargetField, datasetUrn,
changeCategory);
changeEvents.addAll(propChangeEvents);
++targetFieldIdx;
renamedFields.add(renamedField);
}
}
}
while (baseFieldIdx < baseFields.size()) {
// Handle removed fields. Non-backward compatible change + major version bump
SchemaField baseField = baseFields.get(baseFieldIdx);
if (ChangeCategory.TECHNICAL_SCHEMA.equals(changeCategory)) {
changeEvents.add(ChangeEvent.builder()
.elementId(getSchemaFieldUrn(datasetUrn, baseField))
.target(datasetUrn.toString())
.category(ChangeCategory.TECHNICAL_SCHEMA)
.changeType(ChangeOperation.REMOVE)
.semVerChange(SemanticChangeType.MAJOR)
.description(BACKWARDS_INCOMPATIBLE_DESC + "removal of field: '" + getFieldPathV1(baseField) + "'.")
.build());
if (!renamedFields.contains(baseField)) {
processRemoval(changeCategory, changeEvents, datasetUrn, baseField);
}
List<ChangeEvent> propChangeEvents = getFieldPropertyChangeEvents(baseField, null, datasetUrn,
changeCategory);
changeEvents.addAll(propChangeEvents);
++baseFieldIdx;
}
while (targetFieldIdx < targetFields.size()) {
// Newly added fields. Forwards & backwards compatible change + minor version bump.
SchemaField targetField = targetFields.get(targetFieldIdx);
if (ChangeCategory.TECHNICAL_SCHEMA.equals(changeCategory)) {
changeEvents.add(ChangeEvent.builder()
.elementId(getSchemaFieldUrn(datasetUrn, targetField))
.target(datasetUrn.toString())
.category(ChangeCategory.TECHNICAL_SCHEMA)
.changeType(ChangeOperation.ADD)
.semVerChange(SemanticChangeType.MINOR)
.description(BACK_AND_FORWARD_COMPATIBLE_DESC + "the newly added field '" + getFieldPathV1(targetField) + "'.")
.build());
if (!renamedFields.contains(targetField)) {
processAdd(changeCategory, changeEvents, datasetUrn, targetField);
}
List<ChangeEvent> propChangeEvents = getFieldPropertyChangeEvents(null, targetField, datasetUrn,
changeCategory);
changeEvents.addAll(propChangeEvents);
++targetFieldIdx;
targetFieldIdx++;
}
// Handle primary key constraint change events.
@ -304,6 +263,123 @@ public class SchemaDiffer implements Differ {
schemaMetadata.setFields(new SchemaFieldArray(schemaFields));
}
private static SchemaField findRenamedField(SchemaField curField, List<SchemaField> targetFields, Set<SchemaField> renamedFields) {
return targetFields.stream()
.filter(schemaField -> isRenamed(curField, schemaField))
.filter(field -> !renamedFields.contains(field))
.findFirst().orElse(null);
}
private static boolean isRenamed(SchemaField curField, SchemaField schemaField) {
return curField.getNativeDataType().equals(schemaField.getNativeDataType())
&& parentFieldsMatch(curField, schemaField) && descriptionsMatch(curField, schemaField);
}
private static boolean parentFieldsMatch(SchemaField curField, SchemaField schemaField) {
int curFieldIndex = curField.getFieldPath().lastIndexOf(".");
int schemaFieldIndex = schemaField.getFieldPath().lastIndexOf(".");
if (curFieldIndex > 0 && schemaFieldIndex > 0) {
String curFieldParentPath = curField.getFieldPath().substring(0, curFieldIndex);
String schemaFieldParentPath = schemaField.getFieldPath().substring(0, schemaFieldIndex);
return StringUtils.isNotBlank(curFieldParentPath) && curFieldParentPath.equals(schemaFieldParentPath);
}
// No parent field
return curFieldIndex < 0 && schemaFieldIndex < 0;
}
private static boolean descriptionsMatch(SchemaField curField, SchemaField schemaField) {
return StringUtils.isNotBlank(curField.getDescription()) && curField.getDescription().equals(schemaField.getDescription());
}
private static void processRemoval(ChangeCategory changeCategory, List<ChangeEvent> changeEvents, Urn datasetUrn,
SchemaField baseField) {
if (ChangeCategory.TECHNICAL_SCHEMA.equals(changeCategory)) {
changeEvents.add(ChangeEvent.builder()
.elementId(getSchemaFieldUrn(datasetUrn, baseField))
.target(datasetUrn.toString())
.category(ChangeCategory.TECHNICAL_SCHEMA)
.changeType(ChangeOperation.REMOVE)
.semVerChange(SemanticChangeType.MAJOR)
.description(BACKWARDS_INCOMPATIBLE_DESC + " removal of field: '" + getFieldPathV1(baseField) + "'.")
.build());
}
List<ChangeEvent> propChangeEvents = getFieldPropertyChangeEvents(baseField, null, datasetUrn,
changeCategory);
changeEvents.addAll(propChangeEvents);
}
private static void processAdd(ChangeCategory changeCategory, List<ChangeEvent> changeEvents, Urn datasetUrn,
SchemaField targetField) {
if (ChangeCategory.TECHNICAL_SCHEMA.equals(changeCategory)) {
changeEvents.add(ChangeEvent.builder()
.elementId(getSchemaFieldUrn(datasetUrn, targetField))
.target(datasetUrn.toString())
.category(ChangeCategory.TECHNICAL_SCHEMA)
.changeType(ChangeOperation.ADD)
.semVerChange(SemanticChangeType.MINOR)
.description(BACK_AND_FORWARD_COMPATIBLE_DESC + "the newly added field '" + getFieldPathV1(targetField) + "'.")
.build());
}
List<ChangeEvent> propChangeEvents = getFieldPropertyChangeEvents(null, targetField, datasetUrn,
changeCategory);
changeEvents.addAll(propChangeEvents);
}
private static ChangeEvent generateRenameEvent(Urn datasetUrn, SchemaField curBaseField, SchemaField curTargetField) {
return ChangeEvent.builder()
.category(ChangeCategory.TECHNICAL_SCHEMA)
.elementId(getSchemaFieldUrn(datasetUrn, curBaseField))
.target(datasetUrn.toString())
.changeType(ChangeOperation.MODIFY)
.semVerChange(SemanticChangeType.MINOR)
.description(BACK_AND_FORWARD_COMPATIBLE_DESC + "renaming of the field '" + getFieldPathV1(curBaseField)
+ " to " + getFieldPathV1(curTargetField) + "'.")
.build();
}
private static void processOrdinalSchemaFields(SchemaField curBaseField, SchemaField curTargetField,
ChangeCategory changeCategory, List<ChangeEvent> changeEvents, Urn datasetUrn, int baseFieldIdx, int targetFieldIdx) {
if (!curBaseField.getNativeDataType().equals(curTargetField.getNativeDataType())) {
// Non-backward compatible change + Major version bump
if (ChangeCategory.TECHNICAL_SCHEMA.equals(changeCategory)) {
changeEvents.add(ChangeEvent.builder()
.category(ChangeCategory.TECHNICAL_SCHEMA)
.elementId(getSchemaFieldUrn(datasetUrn, curBaseField))
.target(datasetUrn.toString())
.changeType(ChangeOperation.MODIFY)
.semVerChange(SemanticChangeType.MAJOR)
.description(String.format("%s native datatype of the field '%s' changed from '%s' to '%s'.",
BACKWARDS_INCOMPATIBLE_DESC, getFieldPathV1(curTargetField), curBaseField.getNativeDataType(),
curTargetField.getNativeDataType()))
.build());
}
List<ChangeEvent> propChangeEvents = getFieldPropertyChangeEvents(curBaseField, curTargetField, datasetUrn,
changeCategory);
changeEvents.addAll(propChangeEvents);
++baseFieldIdx;
++targetFieldIdx;
} else if (baseFieldIdx == targetFieldIdx && !curBaseField.getFieldPath().equals(curTargetField.getFieldPath())
&& ChangeCategory.TECHNICAL_SCHEMA.equals(changeCategory)) {
// The field got renamed. Forward compatible + Minor version bump.
changeEvents.add(ChangeEvent.builder()
.category(ChangeCategory.TECHNICAL_SCHEMA)
.elementId(getSchemaFieldUrn(datasetUrn, curBaseField))
.target(datasetUrn.toString())
.changeType(ChangeOperation.MODIFY)
.semVerChange(SemanticChangeType.MINOR)
.description(
FORWARDS_COMPATIBLE_DESC + "field name changed from '" + getFieldPathV1(curBaseField) + "' to '"
+ getFieldPathV1(curTargetField) + "'")
.build());
}
// Generate change events from property changes
List<ChangeEvent> propChangeEvents = getFieldPropertyChangeEvents(curBaseField, curTargetField, datasetUrn,
changeCategory);
changeEvents.addAll(propChangeEvents);
++baseFieldIdx;
++targetFieldIdx;
}
private static ChangeEvent getIncompatibleChangeEvent(SchemaMetadata baseSchema, SchemaMetadata targetSchema) {
if (baseSchema != null && targetSchema != null) {
if (!baseSchema.getPlatform().equals(targetSchema.getPlatform())) {
@ -355,7 +431,7 @@ public class SchemaDiffer implements Differ {
.target(datasetUrn.toString())
.changeType(ChangeOperation.MODIFY)
.semVerChange(SemanticChangeType.MAJOR)
.description(BACKWARDS_INCOMPATIBLE_DESC + "removal of the primary key field '" + removedBaseKeyField + "'")
.description(BACKWARDS_INCOMPATIBLE_DESC + " removal of the primary key field '" + removedBaseKeyField + "'")
.build());
}
@ -368,7 +444,7 @@ public class SchemaDiffer implements Differ {
.target(datasetUrn.toString())
.changeType(ChangeOperation.MODIFY)
.semVerChange(SemanticChangeType.MAJOR)
.description(BACKWARDS_INCOMPATIBLE_DESC + "addition of the primary key field '" + addedTargetKeyField + "'")
.description(BACKWARDS_INCOMPATIBLE_DESC + " addition of the primary key field '" + addedTargetKeyField + "'")
.build());
}
return primaryKeyChangeEvents;

View File

@ -85,6 +85,7 @@
{
"fieldPath": "[version=2.0].[type=struct].[type=struct].service.[type=struct].provider.[type=int].id",
"nullable": true,
"description": "Service provider id",
"type": {
"type": {
"com.linkedin.schema.NumberType": {}

View File

@ -70,7 +70,8 @@
"jsonProps": "{\"native_data_type\": \"struct<name:varchar(50),id:tinyint>\"}"
},
{
"fieldPath": "[version=2.0].[type=struct].[type=struct].service.[type=struct].provider.[type=int].id",
"fieldPath": "[version=2.0].[type=struct].[type=struct].service.[type=struct].provider.[type=int].id3",
"description": "Service provider name",
"nullable": true,
"type": {
"type": {
@ -85,6 +86,7 @@
{
"fieldPath": "[version=2.0].[type=struct].[type=struct].service.[type=struct].provider.[type=int].id2",
"nullable": true,
"description": "Service provider id",
"type": {
"type": {
"com.linkedin.schema.NumberType": {}

View File

@ -71,6 +71,7 @@
},
{
"fieldPath": "[version=2.0].[type=struct].[type=struct].service.[type=struct].provider.[type=string].name",
"description": "Service provider name",
"nullable": true,
"type": {
"type": {
@ -85,6 +86,7 @@
{
"fieldPath": "[version=2.0].[type=struct].[type=struct].service.[type=struct].provider.[type=int].id",
"nullable": true,
"description": "Service provider id",
"type": {
"type": {
"com.linkedin.schema.NumberType": {}

View File

@ -29,7 +29,7 @@ def test_all():
assert res_data[1]["semVerChange"] == "MAJOR"
assert len(res_data[1]["changeEvents"]) == 9
assert res_data[2]["semVerChange"] == "MAJOR"
assert len(res_data[2]["changeEvents"]) == 7
assert len(res_data[2]["changeEvents"]) == 6
assert res_data[2]["semVer"] == "2.0.0-computed"
@ -53,9 +53,9 @@ def test_schema():
assert res_data[0]["semVerChange"] == "MINOR"
assert len(res_data[0]["changeEvents"]) == 6
assert res_data[1]["semVerChange"] == "MAJOR"
assert len(res_data[1]["changeEvents"]) == 2
assert len(res_data[1]["changeEvents"]) == 3
assert res_data[2]["semVerChange"] == "MAJOR"
assert len(res_data[2]["changeEvents"]) == 2
assert len(res_data[2]["changeEvents"]) == 3
assert res_data[2]["semVer"] == "2.0.0-computed"