fix(async): submit additional default aspects only when not in async mode (#8320)

This commit is contained in:
RyanHolstien 2023-06-30 15:56:12 -05:00 committed by GitHub
parent ac4af259f2
commit 10e5ecf6d2
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
12 changed files with 126 additions and 148 deletions

View File

@ -24,6 +24,7 @@ dependencies {
annotationProcessor externalDependency.lombok
testImplementation externalDependency.springBootTest
testImplementation project(':mock-entity-registry')
testCompile externalDependency.springBoot
testCompile externalDependency.testContainers
testCompile externalDependency.springKafka

View File

@ -51,6 +51,7 @@ dependencies {
annotationProcessor externalDependency.lombok
testCompile project(':test-models')
testImplementation project(':mock-entity-registry')
testCompile externalDependency.mockito
testCompile externalDependency.testng

View File

@ -5,6 +5,7 @@ import com.datahub.authentication.Authentication;
import com.datahub.authentication.AuthenticationContext;
import com.datahub.authorization.ResourceSpec;
import com.datahub.plugins.auth.authorization.Authorizer;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableList;
import com.linkedin.aspect.GetTimeseriesAspectValuesResponse;
import com.linkedin.metadata.resources.operations.Utils;
@ -78,6 +79,11 @@ public class AspectResource extends CollectionResourceTaskTemplate<String, Versi
@Named("entityService")
private EntityService _entityService;
@VisibleForTesting
void setEntityService(EntityService entityService) {
_entityService = entityService;
}
@Inject
@Named("entitySearchService")
private EntitySearchService _entitySearchService;
@ -90,6 +96,11 @@ public class AspectResource extends CollectionResourceTaskTemplate<String, Versi
@Named("authorizerChain")
private Authorizer _authorizer;
@VisibleForTesting
void setAuthorizer(Authorizer authorizer) {
_authorizer = authorizer;
}
/**
* Retrieves the value for an entity that is made up of latest versions of specified aspects.
* TODO: Get rid of this and migrate to getAspect.
@ -191,8 +202,10 @@ public class AspectResource extends CollectionResourceTaskTemplate<String, Versi
EntityService.IngestProposalResult result = _entityService.ingestProposal(metadataChangeProposal, auditStamp, asyncBool);
Urn responseUrn = result.getUrn();
if (!asyncBool) {
AspectUtils.getAdditionalChanges(metadataChangeProposal, _entityService)
.forEach(proposal -> _entityService.ingestProposal(proposal, auditStamp, asyncBool));
}
if (!result.isQueued()) {
tryIndexRunId(responseUrn, metadataChangeProposal.getSystemMetadata(), _entitySearchService);

View File

@ -0,0 +1,85 @@
package com.linkedin.metadata.resources.entity;
import com.datahub.authentication.Actor;
import com.datahub.authentication.ActorType;
import com.datahub.authentication.Authentication;
import com.datahub.authentication.AuthenticationContext;
import com.datahub.plugins.auth.authorization.Authorizer;
import com.linkedin.common.FabricType;
import com.linkedin.common.urn.DataPlatformUrn;
import com.linkedin.common.urn.DatasetUrn;
import com.linkedin.common.urn.Urn;
import com.linkedin.dataset.DatasetProperties;
import com.linkedin.events.metadata.ChangeType;
import com.linkedin.metadata.config.PreProcessHooks;
import com.linkedin.metadata.entity.AspectDao;
import com.linkedin.metadata.entity.EntityService;
import com.linkedin.metadata.event.EventProducer;
import com.linkedin.metadata.models.AspectSpec;
import com.linkedin.metadata.models.registry.EntityRegistry;
import com.linkedin.metadata.service.UpdateIndicesService;
import com.linkedin.metadata.utils.GenericRecordUtils;
import com.linkedin.mxe.MetadataChangeLog;
import com.linkedin.mxe.MetadataChangeProposal;
import java.net.URISyntaxException;
import mock.MockEntityRegistry;
import org.testng.annotations.BeforeTest;
import org.testng.annotations.Test;
import static com.linkedin.metadata.Constants.*;
import static org.mockito.Mockito.*;
public class AspectResourceTest {
private AspectResource _aspectResource;
private EntityService _entityService;
private AspectDao _aspectDao;
private EventProducer _producer;
private EntityRegistry _entityRegistry;
private UpdateIndicesService _updateIndicesService;
private PreProcessHooks _preProcessHooks;
private Authorizer _authorizer;
@BeforeTest
public void setup() {
_aspectResource = new AspectResource();
_aspectDao = mock(AspectDao.class);
_producer = mock(EventProducer.class);
_entityRegistry = new MockEntityRegistry();
_updateIndicesService = mock(UpdateIndicesService.class);
_preProcessHooks = mock(PreProcessHooks.class);
_entityService = new EntityService(_aspectDao, _producer, _entityRegistry, false, _updateIndicesService, _preProcessHooks);
_authorizer = mock(Authorizer.class);
_aspectResource.setAuthorizer(_authorizer);
_aspectResource.setEntityService(_entityService);
}
@Test
public void testAsyncDefaultAspects() throws URISyntaxException {
MetadataChangeProposal mcp = new MetadataChangeProposal();
mcp.setEntityType(DATASET_ENTITY_NAME);
Urn urn = new DatasetUrn(new DataPlatformUrn("platform"), "name", FabricType.PROD);
mcp.setEntityUrn(urn);
DatasetProperties properties = new DatasetProperties().setName("name");
mcp.setAspect(GenericRecordUtils.serializeAspect(properties));
mcp.setAspectName(DATASET_PROPERTIES_ASPECT_NAME);
mcp.setChangeType(ChangeType.UPSERT);
Authentication mockAuthentication = mock(Authentication.class);
AuthenticationContext.setAuthentication(mockAuthentication);
Actor actor = new Actor(ActorType.USER, "user");
when(mockAuthentication.getActor()).thenReturn(actor);
_aspectResource.ingestProposal(mcp, "true");
verify(_producer, times(1)).produceMetadataChangeProposal(urn, mcp);
verifyNoMoreInteractions(_producer);
verifyNoMoreInteractions(_aspectDao);
reset(_producer, _aspectDao);
when(_aspectDao.runInTransactionWithRetry(any(), anyInt()))
.thenReturn(new EntityService.UpdateAspectResult(urn, null, properties, null, null, null, null, 0));
_aspectResource.ingestProposal(mcp, "false");
verify(_producer, times(5)).produceMetadataChangeLog(eq(urn), any(AspectSpec.class), any(MetadataChangeLog.class));
verifyNoMoreInteractions(_producer);
}
}

View File

@ -22,6 +22,7 @@ dependencies {
annotationProcessor externalDependency.lombok
testImplementation externalDependency.springBootTest
testImplementation project(':mock-entity-registry')
testCompile externalDependency.springBoot
testCompile externalDependency.testContainers
testCompile externalDependency.testContainersKafka

View File

@ -1,27 +0,0 @@
package mock;
import com.linkedin.data.schema.RecordDataSchema;
import com.linkedin.data.template.RecordTemplate;
import com.linkedin.metadata.models.AspectSpec;
import com.linkedin.metadata.models.RelationshipFieldSpec;
import com.linkedin.metadata.models.SearchScoreFieldSpec;
import com.linkedin.metadata.models.SearchableFieldSpec;
import com.linkedin.metadata.models.TimeseriesFieldCollectionSpec;
import com.linkedin.metadata.models.TimeseriesFieldSpec;
import com.linkedin.metadata.models.annotation.AspectAnnotation;
import java.util.List;
import javax.annotation.Nonnull;
public class MockAspectSpec extends AspectSpec {
public MockAspectSpec(@Nonnull AspectAnnotation aspectAnnotation,
@Nonnull List<SearchableFieldSpec> searchableFieldSpecs,
@Nonnull List<SearchScoreFieldSpec> searchScoreFieldSpecs,
@Nonnull List<RelationshipFieldSpec> relationshipFieldSpecs,
@Nonnull List<TimeseriesFieldSpec> timeseriesFieldSpecs,
@Nonnull List<TimeseriesFieldCollectionSpec> timeseriesFieldCollectionSpecs, RecordDataSchema schema,
Class<RecordTemplate> aspectClass) {
super(aspectAnnotation, searchableFieldSpecs, searchScoreFieldSpecs, relationshipFieldSpecs, timeseriesFieldSpecs,
timeseriesFieldCollectionSpecs, schema, aspectClass);
}
}

View File

@ -1,118 +0,0 @@
package mock;
import com.linkedin.common.GlossaryTerms;
import com.linkedin.common.SubTypes;
import com.linkedin.data.schema.RecordDataSchema;
import com.linkedin.data.schema.TyperefDataSchema;
import com.linkedin.data.template.RecordTemplate;
import com.linkedin.dataset.DatasetProfile;
import com.linkedin.dataset.ViewProperties;
import com.linkedin.metadata.key.CorpUserKey;
import com.linkedin.metadata.key.DataPlatformKey;
import com.linkedin.metadata.key.DatasetKey;
import com.linkedin.metadata.key.GlossaryTermKey;
import com.linkedin.metadata.key.TagKey;
import com.linkedin.metadata.models.AspectSpec;
import com.linkedin.metadata.models.EntitySpec;
import com.linkedin.metadata.models.annotation.AspectAnnotation;
import com.linkedin.metadata.models.annotation.EntityAnnotation;
import com.linkedin.schema.SchemaMetadata;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import static com.linkedin.metadata.Constants.*;
public class MockEntitySpec implements EntitySpec {
private String _name;
public MockEntitySpec(String name) {
_name = name;
}
@Override
public String getName() {
return _name;
}
@Override
public EntityAnnotation getEntityAnnotation() {
return null;
}
@Override
public String getKeyAspectName() {
return null;
}
@Override
public AspectSpec getKeyAspectSpec() {
if (DATASET_ENTITY_NAME.equals(_name)) {
DatasetKey datasetKey = new DatasetKey();
return createAspectSpec(datasetKey, DATASET_KEY_ASPECT_NAME);
} else if (DATA_PLATFORM_ENTITY_NAME.equals(_name)) {
DataPlatformKey dataPlatformKey = new DataPlatformKey();
return createAspectSpec(dataPlatformKey, DATA_PLATFORM_KEY_ASPECT_NAME);
} else if (TAG_ENTITY_NAME.equals(_name)) {
TagKey tagKey = new TagKey();
return createAspectSpec(tagKey, TAG_KEY_ASPECT_NAME);
} else if (GLOSSARY_TERM_ENTITY_NAME.equals(_name)) {
GlossaryTermKey glossaryTermKey = new GlossaryTermKey();
return createAspectSpec(glossaryTermKey, GLOSSARY_TERM_KEY_ASPECT_NAME);
} else if (CORP_USER_ENTITY_NAME.equals(_name)) {
CorpUserKey corpUserKey = new CorpUserKey();
return createAspectSpec(corpUserKey, CORP_USER_KEY_ASPECT_NAME);
}
return null;
}
public <T extends RecordTemplate> AspectSpec createAspectSpec(T type, String name) {
return new MockAspectSpec(new AspectAnnotation(name, false, false, null),
Collections.emptyList(), Collections.emptyList(), Collections.emptyList(), Collections.emptyList(),
Collections.emptyList(), type.schema(), (Class<RecordTemplate>) type.getClass().asSubclass(RecordTemplate.class));
}
@Override
public List<AspectSpec> getAspectSpecs() {
return Collections.emptyList();
}
@Override
public Map<String, AspectSpec> getAspectSpecMap() {
return Collections.emptyMap();
}
@Override
public Boolean hasAspect(String name) {
return false;
}
private static final Map<String, RecordTemplate> ASPECT_TYPE_MAP;
static {
ASPECT_TYPE_MAP = new HashMap<>();
ASPECT_TYPE_MAP.put(DATASET_KEY_ASPECT_NAME, new DatasetKey());
ASPECT_TYPE_MAP.put(VIEW_PROPERTIES_ASPECT_NAME, new ViewProperties());
ASPECT_TYPE_MAP.put(SCHEMA_METADATA_ASPECT_NAME, new SchemaMetadata());
ASPECT_TYPE_MAP.put(SUB_TYPES_ASPECT_NAME, new SubTypes());
ASPECT_TYPE_MAP.put("datasetProfile", new DatasetProfile());
ASPECT_TYPE_MAP.put(GLOSSARY_TERMS_ASPECT_NAME, new GlossaryTerms());
}
@Override
public AspectSpec getAspectSpec(String name) {
return createAspectSpec(ASPECT_TYPE_MAP.get(name), name);
}
@Override
public RecordDataSchema getSnapshotSchema() {
return null;
}
@Override
public TyperefDataSchema getAspectTyperefSchema() {
return null;
}
}

View File

@ -0,0 +1,5 @@
apply plugin: 'java'
dependencies {
implementation project(':entity-registry')
}

View File

@ -1,11 +1,15 @@
package mock;
import com.linkedin.common.BrowsePaths;
import com.linkedin.common.BrowsePathsV2;
import com.linkedin.common.DataPlatformInstance;
import com.linkedin.common.GlossaryTerms;
import com.linkedin.common.SubTypes;
import com.linkedin.data.schema.RecordDataSchema;
import com.linkedin.data.schema.TyperefDataSchema;
import com.linkedin.data.template.RecordTemplate;
import com.linkedin.dataset.DatasetProfile;
import com.linkedin.dataset.DatasetProperties;
import com.linkedin.dataset.ViewProperties;
import com.linkedin.metadata.key.CorpUserKey;
import com.linkedin.metadata.key.DataPlatformKey;
@ -28,9 +32,16 @@ import static com.linkedin.metadata.Constants.*;
public class MockEntitySpec implements EntitySpec {
private String _name;
private Map<String, AspectSpec> _aspectTypeMap;
public MockEntitySpec(String name) {
_name = name;
_aspectTypeMap = new HashMap<>();
if (DATASET_ENTITY_NAME.equals(name)) {
_aspectTypeMap.put(BROWSE_PATHS_ASPECT_NAME, getAspectSpec(BROWSE_PATHS_ASPECT_NAME));
_aspectTypeMap.put(BROWSE_PATHS_V2_ASPECT_NAME, getAspectSpec(BROWSE_PATHS_V2_ASPECT_NAME));
_aspectTypeMap.put(DATA_PLATFORM_INSTANCE_ASPECT_NAME, getAspectSpec(DATA_PLATFORM_INSTANCE_ASPECT_NAME));
}
}
@Override
@ -82,7 +93,7 @@ public class MockEntitySpec implements EntitySpec {
@Override
public Map<String, AspectSpec> getAspectSpecMap() {
return Collections.emptyMap();
return _aspectTypeMap;
}
@Override
@ -100,6 +111,10 @@ public class MockEntitySpec implements EntitySpec {
ASPECT_TYPE_MAP.put(SUB_TYPES_ASPECT_NAME, new SubTypes());
ASPECT_TYPE_MAP.put("datasetProfile", new DatasetProfile());
ASPECT_TYPE_MAP.put(GLOSSARY_TERMS_ASPECT_NAME, new GlossaryTerms());
ASPECT_TYPE_MAP.put(DATASET_PROPERTIES_ASPECT_NAME, new DatasetProperties());
ASPECT_TYPE_MAP.put(BROWSE_PATHS_ASPECT_NAME, new BrowsePaths());
ASPECT_TYPE_MAP.put(BROWSE_PATHS_V2_ASPECT_NAME, new BrowsePathsV2());
ASPECT_TYPE_MAP.put(DATA_PLATFORM_INSTANCE_ASPECT_NAME, new DataPlatformInstance());
}
@Override
public AspectSpec getAspectSpec(String name) {

View File

@ -59,3 +59,5 @@ include 'metadata-auth:auth-api'
include 'metadata-service:schema-registry-api'
include 'metadata-service:schema-registry-servlet'
include 'metadata-integration:java:examples'
include 'mock-entity-registry'