mirror of
https://github.com/datahub-project/datahub.git
synced 2025-12-25 17:08:29 +00:00
feat(fieldpaths): prevent duplicate field paths (#10590)
This commit is contained in:
parent
654653b452
commit
250a22ca07
@ -0,0 +1,142 @@
|
||||
package com.linkedin.metadata.aspect.hooks;
|
||||
|
||||
import static com.linkedin.metadata.Constants.EDITABLE_SCHEMA_METADATA_ASPECT_NAME;
|
||||
import static com.linkedin.metadata.Constants.SCHEMA_METADATA_ASPECT_NAME;
|
||||
|
||||
import com.linkedin.events.metadata.ChangeType;
|
||||
import com.linkedin.metadata.aspect.ReadItem;
|
||||
import com.linkedin.metadata.aspect.RetrieverContext;
|
||||
import com.linkedin.metadata.aspect.batch.BatchItem;
|
||||
import com.linkedin.metadata.aspect.batch.ChangeMCP;
|
||||
import com.linkedin.metadata.aspect.plugins.config.AspectPluginConfig;
|
||||
import com.linkedin.metadata.aspect.plugins.hooks.MutationHook;
|
||||
import com.linkedin.schema.EditableSchemaFieldInfo;
|
||||
import com.linkedin.schema.EditableSchemaFieldInfoArray;
|
||||
import com.linkedin.schema.EditableSchemaMetadata;
|
||||
import com.linkedin.schema.SchemaField;
|
||||
import com.linkedin.schema.SchemaFieldArray;
|
||||
import com.linkedin.schema.SchemaMetadata;
|
||||
import com.linkedin.util.Pair;
|
||||
import java.util.Collection;
|
||||
import java.util.Collections;
|
||||
import java.util.LinkedHashMap;
|
||||
import java.util.LinkedList;
|
||||
import java.util.List;
|
||||
import java.util.function.Function;
|
||||
import java.util.stream.Collectors;
|
||||
import java.util.stream.Stream;
|
||||
import javax.annotation.Nonnull;
|
||||
import lombok.Getter;
|
||||
import lombok.Setter;
|
||||
import lombok.experimental.Accessors;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
|
||||
@Slf4j
|
||||
@Setter
|
||||
@Getter
|
||||
@Accessors(chain = true)
|
||||
public class FieldPathMutator extends MutationHook {
|
||||
@Nonnull private AspectPluginConfig config;
|
||||
|
||||
@Override
|
||||
protected Stream<Pair<ChangeMCP, Boolean>> writeMutation(
|
||||
@Nonnull Collection<ChangeMCP> changeMCPS, @Nonnull RetrieverContext retrieverContext) {
|
||||
|
||||
List<Pair<ChangeMCP, Boolean>> results = new LinkedList<>();
|
||||
|
||||
for (ChangeMCP item : changeMCPS) {
|
||||
if (changeTypeFilter(item) && aspectFilter(item)) {
|
||||
if (item.getAspectName().equals(SCHEMA_METADATA_ASPECT_NAME)) {
|
||||
results.add(Pair.of(item, processSchemaMetadataAspect(item)));
|
||||
} else {
|
||||
results.add(Pair.of(item, processEditableSchemaMetadataAspect(item)));
|
||||
}
|
||||
} else {
|
||||
// no op
|
||||
results.add(Pair.of(item, false));
|
||||
}
|
||||
}
|
||||
|
||||
return results.stream();
|
||||
}
|
||||
|
||||
/*
|
||||
TODO: After some time, this should no longer be required. Assuming at least 1 write has
|
||||
occurred for all schema aspects.
|
||||
*/
|
||||
@Override
|
||||
protected Stream<Pair<ReadItem, Boolean>> readMutation(
|
||||
@Nonnull Collection<ReadItem> items, @Nonnull RetrieverContext retrieverContext) {
|
||||
List<Pair<ReadItem, Boolean>> results = new LinkedList<>();
|
||||
|
||||
for (ReadItem item : items) {
|
||||
if (aspectFilter(item)) {
|
||||
if (item.getAspectName().equals(SCHEMA_METADATA_ASPECT_NAME)) {
|
||||
results.add(Pair.of(item, processSchemaMetadataAspect(item)));
|
||||
} else {
|
||||
results.add(Pair.of(item, processEditableSchemaMetadataAspect(item)));
|
||||
}
|
||||
} else {
|
||||
// no op
|
||||
results.add(Pair.of(item, false));
|
||||
}
|
||||
}
|
||||
|
||||
return results.stream();
|
||||
}
|
||||
|
||||
private static boolean changeTypeFilter(BatchItem item) {
|
||||
return !ChangeType.DELETE.equals(item.getChangeType())
|
||||
&& !ChangeType.PATCH.equals(item.getChangeType());
|
||||
}
|
||||
|
||||
private static boolean aspectFilter(ReadItem item) {
|
||||
return item.getAspectName().equals(SCHEMA_METADATA_ASPECT_NAME)
|
||||
|| item.getAspectName().equals(EDITABLE_SCHEMA_METADATA_ASPECT_NAME);
|
||||
}
|
||||
|
||||
private static boolean processEditableSchemaMetadataAspect(ReadItem item) {
|
||||
boolean mutated = false;
|
||||
final EditableSchemaMetadata schemaMetadata = item.getAspect(EditableSchemaMetadata.class);
|
||||
EditableSchemaFieldInfoArray fields = schemaMetadata.getEditableSchemaFieldInfo();
|
||||
List<EditableSchemaFieldInfo> replaceFields =
|
||||
deduplicateFieldPaths(fields, EditableSchemaFieldInfo::getFieldPath);
|
||||
if (!replaceFields.isEmpty()) {
|
||||
schemaMetadata.setEditableSchemaFieldInfo(new EditableSchemaFieldInfoArray(replaceFields));
|
||||
mutated = true;
|
||||
}
|
||||
return mutated;
|
||||
}
|
||||
|
||||
private static boolean processSchemaMetadataAspect(ReadItem item) {
|
||||
boolean mutated = false;
|
||||
final SchemaMetadata schemaMetadata = item.getAspect(SchemaMetadata.class);
|
||||
SchemaFieldArray fields = schemaMetadata.getFields();
|
||||
List<SchemaField> replaceFields = deduplicateFieldPaths(fields, SchemaField::getFieldPath);
|
||||
if (!replaceFields.isEmpty()) {
|
||||
schemaMetadata.setFields(new SchemaFieldArray(replaceFields));
|
||||
mutated = true;
|
||||
}
|
||||
return mutated;
|
||||
}
|
||||
|
||||
private static <T> List<T> deduplicateFieldPaths(
|
||||
Collection<T> fields, Function<T, String> fieldPathExtractor) {
|
||||
|
||||
// preserve order
|
||||
final LinkedHashMap<String, List<T>> grouped =
|
||||
fields.stream()
|
||||
.collect(
|
||||
Collectors.groupingBy(fieldPathExtractor, LinkedHashMap::new, Collectors.toList()));
|
||||
|
||||
if (grouped.values().stream().anyMatch(v -> v.size() > 1)) {
|
||||
log.warn(
|
||||
"Duplicate field path(s) detected. Dropping duplicates: {}",
|
||||
grouped.values().stream().filter(v -> v.size() > 1).collect(Collectors.toList()));
|
||||
// return first
|
||||
return grouped.values().stream().map(l -> l.get(0)).collect(Collectors.toList());
|
||||
}
|
||||
|
||||
return Collections.emptyList();
|
||||
}
|
||||
}
|
||||
@ -0,0 +1,249 @@
|
||||
package com.linkedin.metadata.aspect.hooks;
|
||||
|
||||
import static com.linkedin.metadata.Constants.DOMAINS_ASPECT_NAME;
|
||||
import static com.linkedin.metadata.Constants.EDITABLE_SCHEMA_METADATA_ASPECT_NAME;
|
||||
import static com.linkedin.metadata.Constants.SCHEMA_METADATA_ASPECT_NAME;
|
||||
import static org.mockito.Mockito.mock;
|
||||
import static org.mockito.Mockito.when;
|
||||
import static org.testng.Assert.assertEquals;
|
||||
|
||||
import com.google.common.collect.ImmutableList;
|
||||
import com.linkedin.common.UrnArray;
|
||||
import com.linkedin.common.urn.DatasetUrn;
|
||||
import com.linkedin.common.urn.UrnUtils;
|
||||
import com.linkedin.domain.Domains;
|
||||
import com.linkedin.events.metadata.ChangeType;
|
||||
import com.linkedin.metadata.aspect.AspectRetriever;
|
||||
import com.linkedin.metadata.aspect.GraphRetriever;
|
||||
import com.linkedin.metadata.aspect.RetrieverContext;
|
||||
import com.linkedin.metadata.aspect.batch.ChangeMCP;
|
||||
import com.linkedin.metadata.aspect.plugins.config.AspectPluginConfig;
|
||||
import com.linkedin.metadata.models.registry.EntityRegistry;
|
||||
import com.linkedin.schema.EditableSchemaFieldInfo;
|
||||
import com.linkedin.schema.EditableSchemaFieldInfoArray;
|
||||
import com.linkedin.schema.EditableSchemaMetadata;
|
||||
import com.linkedin.schema.SchemaField;
|
||||
import com.linkedin.schema.SchemaFieldArray;
|
||||
import com.linkedin.schema.SchemaFieldDataType;
|
||||
import com.linkedin.schema.SchemaMetadata;
|
||||
import com.linkedin.schema.StringType;
|
||||
import com.linkedin.test.metadata.aspect.TestEntityRegistry;
|
||||
import com.linkedin.test.metadata.aspect.batch.TestMCP;
|
||||
import com.linkedin.util.Pair;
|
||||
import java.net.URISyntaxException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.Set;
|
||||
import java.util.stream.Collectors;
|
||||
import org.testng.annotations.BeforeTest;
|
||||
import org.testng.annotations.Test;
|
||||
|
||||
public class FieldPathMutatorTest {
|
||||
|
||||
private EntityRegistry entityRegistry;
|
||||
private RetrieverContext mockRetrieverContext;
|
||||
private DatasetUrn testDatasetUrn;
|
||||
private final FieldPathMutator test =
|
||||
new FieldPathMutator().setConfig(mock(AspectPluginConfig.class));
|
||||
|
||||
@BeforeTest
|
||||
public void init() throws URISyntaxException {
|
||||
testDatasetUrn =
|
||||
DatasetUrn.createFromUrn(
|
||||
UrnUtils.getUrn("urn:li:dataset:(urn:li:dataPlatform:hive,test,PROD)"));
|
||||
|
||||
entityRegistry = new TestEntityRegistry();
|
||||
AspectRetriever mockAspectRetriever = mock(AspectRetriever.class);
|
||||
when(mockAspectRetriever.getEntityRegistry()).thenReturn(entityRegistry);
|
||||
GraphRetriever mockGraphRetriever = mock(GraphRetriever.class);
|
||||
mockRetrieverContext = mock(RetrieverContext.class);
|
||||
when(mockRetrieverContext.getAspectRetriever()).thenReturn(mockAspectRetriever);
|
||||
when(mockRetrieverContext.getGraphRetriever()).thenReturn(mockGraphRetriever);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testValidateIncorrectAspect() {
|
||||
final Domains domains =
|
||||
new Domains()
|
||||
.setDomains(new UrnArray(ImmutableList.of(UrnUtils.getUrn("urn:li:domain:123"))));
|
||||
assertEquals(
|
||||
test.writeMutation(
|
||||
Set.of(
|
||||
TestMCP.builder()
|
||||
.changeType(ChangeType.UPSERT)
|
||||
.urn(testDatasetUrn)
|
||||
.entitySpec(entityRegistry.getEntitySpec(testDatasetUrn.getEntityType()))
|
||||
.aspectSpec(
|
||||
entityRegistry
|
||||
.getEntitySpec(testDatasetUrn.getEntityType())
|
||||
.getAspectSpec(DOMAINS_ASPECT_NAME))
|
||||
.recordTemplate(domains)
|
||||
.build()),
|
||||
mockRetrieverContext)
|
||||
.filter(Pair::getSecond)
|
||||
.count(),
|
||||
0);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testValidateNonDuplicatedSchemaFieldPath() {
|
||||
final SchemaMetadata schema = getMockSchemaMetadataAspect(false);
|
||||
assertEquals(
|
||||
test.writeMutation(
|
||||
Set.of(
|
||||
TestMCP.builder()
|
||||
.changeType(ChangeType.UPSERT)
|
||||
.urn(testDatasetUrn)
|
||||
.entitySpec(entityRegistry.getEntitySpec(testDatasetUrn.getEntityType()))
|
||||
.aspectSpec(
|
||||
entityRegistry
|
||||
.getEntitySpec(testDatasetUrn.getEntityType())
|
||||
.getAspectSpec(SCHEMA_METADATA_ASPECT_NAME))
|
||||
.recordTemplate(schema)
|
||||
.build()),
|
||||
mockRetrieverContext)
|
||||
.filter(Pair::getSecond)
|
||||
.count(),
|
||||
0);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testValidateDuplicatedSchemaFieldPath() {
|
||||
final SchemaMetadata schema = getMockSchemaMetadataAspect(true);
|
||||
|
||||
List<Pair<ChangeMCP, Boolean>> result =
|
||||
test.writeMutation(
|
||||
Set.of(
|
||||
TestMCP.builder()
|
||||
.changeType(ChangeType.UPSERT)
|
||||
.urn(testDatasetUrn)
|
||||
.entitySpec(entityRegistry.getEntitySpec(testDatasetUrn.getEntityType()))
|
||||
.aspectSpec(
|
||||
entityRegistry
|
||||
.getEntitySpec(testDatasetUrn.getEntityType())
|
||||
.getAspectSpec(SCHEMA_METADATA_ASPECT_NAME))
|
||||
.recordTemplate(schema)
|
||||
.build()),
|
||||
mockRetrieverContext)
|
||||
.collect(Collectors.toList());
|
||||
|
||||
assertEquals(result.stream().filter(Pair::getSecond).count(), 1);
|
||||
assertEquals(result.get(0).getFirst().getAspect(SchemaMetadata.class).getFields().size(), 1);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testValidateDeleteDuplicatedSchemaFieldPath() {
|
||||
final SchemaMetadata schema = getMockSchemaMetadataAspect(true);
|
||||
|
||||
assertEquals(
|
||||
test.writeMutation(
|
||||
Set.of(
|
||||
TestMCP.builder()
|
||||
.changeType(ChangeType.DELETE)
|
||||
.urn(testDatasetUrn)
|
||||
.entitySpec(entityRegistry.getEntitySpec(testDatasetUrn.getEntityType()))
|
||||
.aspectSpec(
|
||||
entityRegistry
|
||||
.getEntitySpec(testDatasetUrn.getEntityType())
|
||||
.getAspectSpec(SCHEMA_METADATA_ASPECT_NAME))
|
||||
.recordTemplate(schema)
|
||||
.build()),
|
||||
mockRetrieverContext)
|
||||
.filter(Pair::getSecond)
|
||||
.count(),
|
||||
0);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testValidateNonDuplicatedEditableSchemaFieldPath() {
|
||||
final EditableSchemaMetadata schema = getMockEditableSchemaMetadataAspect(false);
|
||||
assertEquals(
|
||||
test.writeMutation(
|
||||
Set.of(
|
||||
TestMCP.builder()
|
||||
.changeType(ChangeType.UPSERT)
|
||||
.urn(testDatasetUrn)
|
||||
.entitySpec(entityRegistry.getEntitySpec(testDatasetUrn.getEntityType()))
|
||||
.aspectSpec(
|
||||
entityRegistry
|
||||
.getEntitySpec(testDatasetUrn.getEntityType())
|
||||
.getAspectSpec(EDITABLE_SCHEMA_METADATA_ASPECT_NAME))
|
||||
.recordTemplate(schema)
|
||||
.build()),
|
||||
mockRetrieverContext)
|
||||
.filter(Pair::getSecond)
|
||||
.count(),
|
||||
0);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testValidateDuplicatedEditableSchemaFieldPath() {
|
||||
final EditableSchemaMetadata schema = getMockEditableSchemaMetadataAspect(true);
|
||||
|
||||
List<Pair<ChangeMCP, Boolean>> result =
|
||||
test.writeMutation(
|
||||
Set.of(
|
||||
TestMCP.builder()
|
||||
.changeType(ChangeType.UPSERT)
|
||||
.urn(testDatasetUrn)
|
||||
.entitySpec(entityRegistry.getEntitySpec(testDatasetUrn.getEntityType()))
|
||||
.aspectSpec(
|
||||
entityRegistry
|
||||
.getEntitySpec(testDatasetUrn.getEntityType())
|
||||
.getAspectSpec(EDITABLE_SCHEMA_METADATA_ASPECT_NAME))
|
||||
.recordTemplate(schema)
|
||||
.build()),
|
||||
mockRetrieverContext)
|
||||
.collect(Collectors.toList());
|
||||
|
||||
assertEquals(result.stream().filter(Pair::getSecond).count(), 1);
|
||||
assertEquals(
|
||||
result
|
||||
.get(0)
|
||||
.getFirst()
|
||||
.getAspect(EditableSchemaMetadata.class)
|
||||
.getEditableSchemaFieldInfo()
|
||||
.size(),
|
||||
1);
|
||||
}
|
||||
|
||||
private SchemaMetadata getMockSchemaMetadataAspect(boolean duplicateFields) {
|
||||
List<SchemaField> fields = new ArrayList<>();
|
||||
fields.add(
|
||||
new SchemaField()
|
||||
.setType(
|
||||
new SchemaFieldDataType()
|
||||
.setType(SchemaFieldDataType.Type.create(new StringType())))
|
||||
.setNullable(false)
|
||||
.setNativeDataType("string")
|
||||
.setFieldPath("test"));
|
||||
|
||||
if (duplicateFields) {
|
||||
fields.add(
|
||||
new SchemaField()
|
||||
.setType(
|
||||
new SchemaFieldDataType()
|
||||
.setType(SchemaFieldDataType.Type.create(new StringType())))
|
||||
.setNullable(false)
|
||||
.setNativeDataType("string")
|
||||
.setFieldPath("test"));
|
||||
}
|
||||
|
||||
return new SchemaMetadata()
|
||||
.setPlatform(testDatasetUrn.getPlatformEntity())
|
||||
.setFields(new SchemaFieldArray(fields));
|
||||
}
|
||||
|
||||
private EditableSchemaMetadata getMockEditableSchemaMetadataAspect(boolean duplicateFields) {
|
||||
|
||||
List<EditableSchemaFieldInfo> fields = new ArrayList<>();
|
||||
fields.add(new EditableSchemaFieldInfo().setFieldPath("test"));
|
||||
|
||||
if (duplicateFields) {
|
||||
fields.add(new EditableSchemaFieldInfo().setFieldPath("test"));
|
||||
}
|
||||
|
||||
return new EditableSchemaMetadata()
|
||||
.setEditableSchemaFieldInfo(new EditableSchemaFieldInfoArray(fields));
|
||||
}
|
||||
}
|
||||
@ -614,3 +614,16 @@ plugins:
|
||||
supportedEntityAspectNames:
|
||||
- entityName: '*'
|
||||
aspectName: structuredProperties
|
||||
- className: 'com.linkedin.metadata.aspect.hooks.FieldPathMutator'
|
||||
enabled: true
|
||||
supportedOperations:
|
||||
- CREATE
|
||||
- UPSERT
|
||||
- UPDATE
|
||||
- RESTATE
|
||||
- PATCH
|
||||
supportedEntityAspectNames:
|
||||
- entityName: '*'
|
||||
aspectName: 'schemaMetadata'
|
||||
- entityName: '*'
|
||||
aspectName: 'editableSchemaMetadata'
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user