mirror of
https://github.com/datahub-project/datahub.git
synced 2025-12-28 10:28:22 +00:00
fix(mutator): mutator hook fixes (#11140)
This commit is contained in:
parent
946b9f3745
commit
4d2af40465
@ -52,7 +52,7 @@ public class ConfigEntityRegistry implements EntityRegistry {
|
||||
private final DataSchemaFactory dataSchemaFactory;
|
||||
@Getter private final PluginFactory pluginFactory;
|
||||
|
||||
@Nullable
|
||||
@Getter @Nullable
|
||||
private BiFunction<PluginConfiguration, List<ClassLoader>, PluginFactory> pluginFactoryProvider;
|
||||
|
||||
private final Map<String, EntitySpec> entityNameToSpec;
|
||||
|
||||
@ -22,6 +22,8 @@ import com.linkedin.metadata.aspect.patch.template.dataset.EditableSchemaMetadat
|
||||
import com.linkedin.metadata.aspect.patch.template.dataset.UpstreamLineageTemplate;
|
||||
import com.linkedin.metadata.aspect.patch.template.form.FormInfoTemplate;
|
||||
import com.linkedin.metadata.aspect.patch.template.structuredproperty.StructuredPropertyDefinitionTemplate;
|
||||
import com.linkedin.metadata.aspect.plugins.PluginFactory;
|
||||
import com.linkedin.metadata.aspect.plugins.config.PluginConfiguration;
|
||||
import com.linkedin.metadata.models.AspectSpec;
|
||||
import com.linkedin.metadata.models.DefaultEntitySpec;
|
||||
import com.linkedin.metadata.models.EntitySpec;
|
||||
@ -32,8 +34,11 @@ import java.util.ArrayList;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.function.BiFunction;
|
||||
import java.util.stream.Collectors;
|
||||
import javax.annotation.Nonnull;
|
||||
import javax.annotation.Nullable;
|
||||
import lombok.Getter;
|
||||
|
||||
/**
|
||||
* Implementation of {@link EntityRegistry} that builds {@link DefaultEntitySpec} objects from the a
|
||||
@ -46,6 +51,9 @@ public class SnapshotEntityRegistry implements EntityRegistry {
|
||||
private final AspectTemplateEngine _aspectTemplateEngine;
|
||||
private final Map<String, AspectSpec> _aspectNameToSpec;
|
||||
|
||||
@Getter @Nullable
|
||||
private BiFunction<PluginConfiguration, List<ClassLoader>, PluginFactory> pluginFactoryProvider;
|
||||
|
||||
private static final SnapshotEntityRegistry INSTANCE = new SnapshotEntityRegistry();
|
||||
|
||||
public SnapshotEntityRegistry() {
|
||||
@ -56,6 +64,19 @@ public class SnapshotEntityRegistry implements EntityRegistry {
|
||||
entitySpecs = new ArrayList<>(entityNameToSpec.values());
|
||||
_aspectNameToSpec = populateAspectMap(entitySpecs);
|
||||
_aspectTemplateEngine = populateTemplateEngine(_aspectNameToSpec);
|
||||
pluginFactoryProvider = null;
|
||||
}
|
||||
|
||||
public SnapshotEntityRegistry(
|
||||
BiFunction<PluginConfiguration, List<ClassLoader>, PluginFactory> pluginFactoryProvider) {
|
||||
entityNameToSpec =
|
||||
new EntitySpecBuilder()
|
||||
.buildEntitySpecs(new Snapshot().schema()).stream()
|
||||
.collect(Collectors.toMap(spec -> spec.getName().toLowerCase(), spec -> spec));
|
||||
entitySpecs = new ArrayList<>(entityNameToSpec.values());
|
||||
_aspectNameToSpec = populateAspectMap(entitySpecs);
|
||||
_aspectTemplateEngine = populateTemplateEngine(_aspectNameToSpec);
|
||||
this.pluginFactoryProvider = pluginFactoryProvider;
|
||||
}
|
||||
|
||||
public SnapshotEntityRegistry(UnionTemplate snapshot) {
|
||||
|
||||
@ -6,6 +6,7 @@ import static org.testng.Assert.assertNotNull;
|
||||
import com.datahub.test.TestEntityProfile;
|
||||
import com.linkedin.data.schema.annotation.PathSpecBasedSchemaAnnotationVisitor;
|
||||
import com.linkedin.events.metadata.ChangeType;
|
||||
import com.linkedin.metadata.aspect.plugins.config.AspectPluginConfig;
|
||||
import com.linkedin.metadata.models.EntitySpec;
|
||||
import com.linkedin.metadata.models.EventSpec;
|
||||
import com.linkedin.metadata.models.registry.ConfigEntityRegistry;
|
||||
@ -262,23 +263,42 @@ public class PluginsTest {
|
||||
mergedEntityRegistry.apply(configEntityRegistry2);
|
||||
|
||||
assertEquals(
|
||||
mergedEntityRegistry.getAllAspectPayloadValidators().stream()
|
||||
.filter(p -> p.getConfig().getSupportedOperations().contains("DELETE"))
|
||||
mergedEntityRegistry
|
||||
.getPluginFactory()
|
||||
.getPluginConfiguration()
|
||||
.getAspectPayloadValidators()
|
||||
.stream()
|
||||
.filter(AspectPluginConfig::isEnabled)
|
||||
.filter(p -> p.getSupportedOperations().contains("DELETE"))
|
||||
.count(),
|
||||
1);
|
||||
|
||||
assertEquals(
|
||||
mergedEntityRegistry.getAllMutationHooks().stream()
|
||||
.filter(p -> p.getConfig().getSupportedOperations().contains("DELETE"))
|
||||
mergedEntityRegistry.getPluginFactory().getPluginConfiguration().getMutationHooks().stream()
|
||||
.filter(AspectPluginConfig::isEnabled)
|
||||
.filter(p -> p.getSupportedOperations().contains("DELETE"))
|
||||
.count(),
|
||||
1);
|
||||
|
||||
assertEquals(
|
||||
mergedEntityRegistry.getAllMCLSideEffects().stream()
|
||||
.filter(p -> p.getConfig().getSupportedOperations().contains("DELETE"))
|
||||
mergedEntityRegistry
|
||||
.getPluginFactory()
|
||||
.getPluginConfiguration()
|
||||
.getMclSideEffects()
|
||||
.stream()
|
||||
.filter(AspectPluginConfig::isEnabled)
|
||||
.filter(p -> p.getSupportedOperations().contains("DELETE"))
|
||||
.count(),
|
||||
1);
|
||||
|
||||
assertEquals(
|
||||
mergedEntityRegistry.getAllMCPSideEffects().stream()
|
||||
.filter(p -> p.getConfig().getSupportedOperations().contains("DELETE"))
|
||||
mergedEntityRegistry
|
||||
.getPluginFactory()
|
||||
.getPluginConfiguration()
|
||||
.getMcpSideEffects()
|
||||
.stream()
|
||||
.filter(AspectPluginConfig::isEnabled)
|
||||
.filter(p -> p.getSupportedOperations().contains("DELETE"))
|
||||
.count(),
|
||||
1);
|
||||
}
|
||||
|
||||
@ -99,6 +99,18 @@ public class RecordUtils {
|
||||
return toRecordTemplate(type, dataMap);
|
||||
}
|
||||
|
||||
@Nonnull
|
||||
public static DataMap toDataMap(@Nonnull String jsonString) {
|
||||
DataMap dataMap;
|
||||
try {
|
||||
dataMap = DATA_TEMPLATE_CODEC.stringToMap(jsonString);
|
||||
} catch (IOException e) {
|
||||
throw new ModelConversionException("Failed to deserialize DataMap: " + jsonString);
|
||||
}
|
||||
|
||||
return dataMap;
|
||||
}
|
||||
|
||||
/**
|
||||
* Creates a {@link RecordTemplate} object from a {@link DataMap}.
|
||||
*
|
||||
|
||||
@ -9,6 +9,7 @@ import com.linkedin.metadata.aspect.batch.AspectsBatch;
|
||||
import com.linkedin.metadata.aspect.batch.BatchItem;
|
||||
import com.linkedin.metadata.aspect.batch.ChangeMCP;
|
||||
import com.linkedin.metadata.aspect.batch.MCPItem;
|
||||
import com.linkedin.metadata.aspect.plugins.hooks.MutationHook;
|
||||
import com.linkedin.metadata.aspect.plugins.validation.ValidationExceptionCollection;
|
||||
import com.linkedin.mxe.MetadataChangeProposal;
|
||||
import com.linkedin.util.Pair;
|
||||
@ -47,7 +48,7 @@ public class AspectsBatchImpl implements AspectsBatch {
|
||||
final Map<String, Map<String, SystemAspect>> latestAspects) {
|
||||
|
||||
// Process proposals to change items
|
||||
Stream<ChangeMCP> mutatedProposalsStream =
|
||||
Stream<? extends BatchItem> mutatedProposalsStream =
|
||||
proposedItemsToChangeItemStream(
|
||||
items.stream()
|
||||
.filter(item -> item instanceof ProposedItem)
|
||||
@ -92,21 +93,58 @@ public class AspectsBatchImpl implements AspectsBatch {
|
||||
|
||||
LinkedList<ChangeMCP> newItems =
|
||||
applyMCPSideEffects(upsertBatchItems).collect(Collectors.toCollection(LinkedList::new));
|
||||
Map<String, Set<String>> newUrnAspectNames = getNewUrnAspectsMap(getUrnAspectsMap(), newItems);
|
||||
upsertBatchItems.addAll(newItems);
|
||||
Map<String, Set<String>> newUrnAspectNames =
|
||||
getNewUrnAspectsMap(getUrnAspectsMap(), upsertBatchItems);
|
||||
|
||||
return Pair.of(newUrnAspectNames, upsertBatchItems);
|
||||
}
|
||||
|
||||
private Stream<ChangeMCP> proposedItemsToChangeItemStream(List<MCPItem> proposedItems) {
|
||||
return applyProposalMutationHooks(proposedItems, retrieverContext)
|
||||
.filter(mcpItem -> mcpItem.getMetadataChangeProposal() != null)
|
||||
.map(
|
||||
mcpItem ->
|
||||
ChangeItemImpl.ChangeItemImplBuilder.build(
|
||||
mcpItem.getMetadataChangeProposal(),
|
||||
mcpItem.getAuditStamp(),
|
||||
retrieverContext.getAspectRetriever()));
|
||||
private Stream<? extends BatchItem> proposedItemsToChangeItemStream(List<MCPItem> proposedItems) {
|
||||
List<MutationHook> mutationHooks =
|
||||
retrieverContext.getAspectRetriever().getEntityRegistry().getAllMutationHooks();
|
||||
Stream<? extends BatchItem> unmutatedItems =
|
||||
proposedItems.stream()
|
||||
.filter(
|
||||
proposedItem ->
|
||||
mutationHooks.stream()
|
||||
.noneMatch(
|
||||
mutationHook ->
|
||||
mutationHook.shouldApply(
|
||||
proposedItem.getChangeType(),
|
||||
proposedItem.getUrn(),
|
||||
proposedItem.getAspectName())))
|
||||
.map(
|
||||
mcpItem -> {
|
||||
if (ChangeType.PATCH.equals(mcpItem.getChangeType())) {
|
||||
return PatchItemImpl.PatchItemImplBuilder.build(
|
||||
mcpItem.getMetadataChangeProposal(),
|
||||
mcpItem.getAuditStamp(),
|
||||
retrieverContext.getAspectRetriever().getEntityRegistry());
|
||||
}
|
||||
return ChangeItemImpl.ChangeItemImplBuilder.build(
|
||||
mcpItem.getMetadataChangeProposal(),
|
||||
mcpItem.getAuditStamp(),
|
||||
retrieverContext.getAspectRetriever());
|
||||
});
|
||||
List<MCPItem> mutatedItems =
|
||||
applyProposalMutationHooks(proposedItems, retrieverContext).collect(Collectors.toList());
|
||||
Stream<? extends BatchItem> proposedItemsToChangeItems =
|
||||
mutatedItems.stream()
|
||||
.filter(mcpItem -> mcpItem.getMetadataChangeProposal() != null)
|
||||
// Filter on proposed items again to avoid applying builder to Patch Item side effects
|
||||
.filter(mcpItem -> mcpItem instanceof ProposedItem)
|
||||
.map(
|
||||
mcpItem ->
|
||||
ChangeItemImpl.ChangeItemImplBuilder.build(
|
||||
mcpItem.getMetadataChangeProposal(),
|
||||
mcpItem.getAuditStamp(),
|
||||
retrieverContext.getAspectRetriever()));
|
||||
Stream<? extends BatchItem> sideEffectItems =
|
||||
mutatedItems.stream().filter(mcpItem -> !(mcpItem instanceof ProposedItem));
|
||||
Stream<? extends BatchItem> combinedChangeItems =
|
||||
Stream.concat(proposedItemsToChangeItems, unmutatedItems);
|
||||
return Stream.concat(combinedChangeItems, sideEffectItems);
|
||||
}
|
||||
|
||||
public static class AspectsBatchImplBuilder {
|
||||
|
||||
@ -1,15 +1,22 @@
|
||||
package com.linkedin.gms.factory.entityregistry;
|
||||
|
||||
import com.datahub.plugins.metadata.aspect.SpringPluginFactory;
|
||||
import com.linkedin.gms.factory.plugins.SpringStandardPluginConfiguration;
|
||||
import com.linkedin.metadata.aspect.plugins.PluginFactory;
|
||||
import com.linkedin.metadata.aspect.plugins.config.PluginConfiguration;
|
||||
import com.linkedin.metadata.models.registry.ConfigEntityRegistry;
|
||||
import com.linkedin.metadata.models.registry.EntityRegistry;
|
||||
import com.linkedin.metadata.models.registry.EntityRegistryException;
|
||||
import com.linkedin.metadata.models.registry.MergedEntityRegistry;
|
||||
import com.linkedin.metadata.models.registry.PluginEntityRegistryLoader;
|
||||
import com.linkedin.metadata.models.registry.SnapshotEntityRegistry;
|
||||
import java.util.List;
|
||||
import java.util.function.BiFunction;
|
||||
import javax.annotation.Nonnull;
|
||||
import lombok.SneakyThrows;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.beans.factory.annotation.Qualifier;
|
||||
import org.springframework.context.ApplicationContext;
|
||||
import org.springframework.context.annotation.Bean;
|
||||
import org.springframework.context.annotation.Configuration;
|
||||
import org.springframework.context.annotation.Import;
|
||||
@ -27,13 +34,20 @@ public class EntityRegistryFactory {
|
||||
@Qualifier("pluginEntityRegistry")
|
||||
private PluginEntityRegistryLoader pluginEntityRegistryLoader;
|
||||
|
||||
@Autowired private ApplicationContext applicationContext;
|
||||
|
||||
@SneakyThrows
|
||||
@Bean("entityRegistry")
|
||||
@Primary
|
||||
@Nonnull
|
||||
protected EntityRegistry getInstance() throws EntityRegistryException {
|
||||
protected EntityRegistry getInstance(
|
||||
SpringStandardPluginConfiguration springStandardPluginConfiguration)
|
||||
throws EntityRegistryException {
|
||||
BiFunction<PluginConfiguration, List<ClassLoader>, PluginFactory> pluginFactoryProvider =
|
||||
(config, loaders) -> new SpringPluginFactory(applicationContext, config, loaders);
|
||||
MergedEntityRegistry baseEntityRegistry =
|
||||
new MergedEntityRegistry(SnapshotEntityRegistry.getInstance()).apply(configEntityRegistry);
|
||||
new MergedEntityRegistry(new SnapshotEntityRegistry(pluginFactoryProvider))
|
||||
.apply(configEntityRegistry);
|
||||
pluginEntityRegistryLoader.withBaseRegistry(baseEntityRegistry).start(true);
|
||||
return baseEntityRegistry;
|
||||
}
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user